diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java index 1539a6e..e375ff6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.UTCClock; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -325,6 +326,8 @@ public static String getDefaultReservationSystem(ResourceScheduler scheduler) { // currently only capacity scheduler is supported if (scheduler instanceof CapacityScheduler) { return CapacityReservationSystem.class.getName(); + } else if (scheduler instanceof FairScheduler) { + return FairReservationSystem.class.getName(); } return null; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/FairReservationSystem.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/FairReservationSystem.java new file mode 100644 index 0000000..9376164 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/FairReservationSystem.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FairReservationSystem extends AbstractReservationSystem { + private static final Logger LOG = LoggerFactory + .getLogger(CapacityReservationSystem.class); + + private FairScheduler fairScheduler; + + public FairReservationSystem() { + super(FairReservationSystem.class.getName()); + } + + @Override + public void reinitialize(Configuration conf, RMContext rmContext) + throws YarnException { + // Validate if the scheduler is fair scheduler + ResourceScheduler scheduler = rmContext.getScheduler(); + if (!(scheduler instanceof FairScheduler)) { + throw new YarnRuntimeException("Class " + + scheduler.getClass().getCanonicalName() + " not instance of " + + FairScheduler.class.getCanonicalName()); + } + fairScheduler = (FairScheduler) scheduler; + this.conf = conf; + super.reinitialize(conf, rmContext); + } + + @Override + protected ReservationSchedulerConfiguration + getReservationSchedulerConfiguration() { + return fairScheduler.getAllocationConfiguration(); + } + + @Override + protected ResourceCalculator getResourceCalculator() { + return fairScheduler.getResourceCalculator(); + } + + @Override + protected QueueMetrics getRootQueueMetrics() { + return fairScheduler.getRootQueueMetrics(); + } + + @Override + protected Resource getMinAllocation() { + return fairScheduler.getMinimumResourceCapability(); + } + + @Override + protected Resource getMaxAllocation() { + return fairScheduler.getMaximumResourceCapability(); + } + + @Override + protected String getPlanQueuePath(String planQueueName) { + return planQueueName; } + + @Override + protected Resource getPlanQueueCapacity(String planQueueName) { + return fairScheduler.getQueueManager().getParentQueue(planQueueName, false) + .getSteadyFairShare(); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java index 70a6496..4b70f9a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java @@ -17,22 +17,22 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; import org.apache.hadoop.yarn.util.resource.Resources; -import com.google.common.annotations.VisibleForTesting; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; -public class AllocationConfiguration { +public class AllocationConfiguration extends ReservationSchedulerConfiguration { private static final AccessControlList EVERYBODY_ACL = new AccessControlList("*"); private static final AccessControlList NOBODY_ACL = new AccessControlList(" "); @@ -76,6 +76,9 @@ // preempt other queues' tasks. private final Map fairSharePreemptionThresholds; + private final Map + reservationQueueConfigs; + private final Map schedulingPolicies; private final SchedulingPolicy defaultSchedulingPolicy; @@ -100,6 +103,7 @@ public AllocationConfiguration(Map minQueueResources, Map fairSharePreemptionTimeouts, Map fairSharePreemptionThresholds, Map> queueAcls, + Map reservationQueueConfigs, QueuePlacementPolicy placementPolicy, Map> configuredQueues) { this.minQueueResources = minQueueResources; @@ -117,6 +121,7 @@ public AllocationConfiguration(Map minQueueResources, this.fairSharePreemptionTimeouts = fairSharePreemptionTimeouts; this.fairSharePreemptionThresholds = fairSharePreemptionThresholds; this.queueAcls = queueAcls; + this.reservationQueueConfigs = reservationQueueConfigs; this.placementPolicy = placementPolicy; this.configuredQueues = configuredQueues; } @@ -137,6 +142,8 @@ public AllocationConfiguration(Configuration conf) { fairSharePreemptionThresholds = new HashMap(); schedulingPolicies = new HashMap(); defaultSchedulingPolicy = SchedulingPolicy.DEFAULT_POLICY; + reservationQueueConfigs = new HashMap(); configuredQueues = new HashMap>(); for (FSQueueType queueType : FSQueueType.values()) { configuredQueues.put(queueType, new HashSet()); @@ -262,4 +269,74 @@ public SchedulingPolicy getDefaultSchedulingPolicy() { public QueuePlacementPolicy getPlacementPolicy() { return placementPolicy; } + + @Override + public boolean isReservable(String queue) { + return reservationQueueConfigs.containsKey(queue); + } + + @Override + public long getReservationWindow(String queue) { + ReservationQueueConfiguration config = getReservationQueueConfig(queue); + return config.getReservationWindowMsec(); + } + + @Override + public float getAverageCapacity(String queue) { + ReservationQueueConfiguration config = getReservationQueueConfig(queue); + return config.getAvgOverTimeMultiplier() * 100; + } + + @Override + public float getInstantaneousMaxCapacity(String queue) { + ReservationQueueConfiguration config = getReservationQueueConfig(queue); + return config.getMaxOverTimeMultiplier() * 100; + } + + @Override + public String getReservationAdmissionPolicy(String queue) { + ReservationQueueConfiguration config = getReservationQueueConfig(queue); + return config.getReservationAdmissionPolicy(); + } + + @Override + public String getReservationAgent(String queue) { + ReservationQueueConfiguration config = getReservationQueueConfig(queue); + return config.getReservationAgent(); + } + + @Override + public boolean getShowReservationAsQueues(String queue) { + ReservationQueueConfiguration config = getReservationQueueConfig(queue); + return config.shouldShowReservationAsQueues(); + } + + @Override + public String getReplanner(String queue) { + ReservationQueueConfiguration config = getReservationQueueConfig(queue); + return config.getPlanner(); + } + + @Override + public boolean getMoveOnExpiry(String queue) { + ReservationQueueConfiguration config = getReservationQueueConfig(queue); + return config.shouldMoveOnExpiry(); + } + + @Override + public long getEnforcementWindow(String queue) { + ReservationQueueConfiguration config = getReservationQueueConfig(queue); + return config.getEnforcementWindowMsec(); + } + + private ReservationQueueConfiguration getReservationQueueConfig( + String queue) { + ReservationQueueConfiguration configuration = reservationQueueConfigs.get + (queue); + if (configuration == null) { + throw new RuntimeException("No queue named " + queue + + "supports reservations"); + } + return configuration; + } } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java index 2022510..3f854b8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -17,20 +17,7 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; -import java.io.File; -import java.io.IOException; -import java.net.URL; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import javax.xml.parsers.DocumentBuilder; -import javax.xml.parsers.DocumentBuilderFactory; -import javax.xml.parsers.ParserConfigurationException; - +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -51,7 +38,18 @@ import org.w3c.dom.Text; import org.xml.sax.SAXException; -import com.google.common.annotations.VisibleForTesting; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; @Public @Unstable @@ -222,6 +220,9 @@ public synchronized void reloadAllocations() throws IOException, new HashMap(); Map> queueAcls = new HashMap>(); + Map + reservationQueueConfigs = new HashMap(); int userMaxAppsDefault = Integer.MAX_VALUE; int queueMaxAppsDefault = Integer.MAX_VALUE; float queueMaxAMShareDefault = 0.5f; @@ -337,7 +338,8 @@ public synchronized void reloadAllocations() throws IOException, loadQueue(parent, element, minQueueResources, maxQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts, - fairSharePreemptionThresholds, queueAcls, configuredQueues); + fairSharePreemptionThresholds, queueAcls, configuredQueues, + reservationQueueConfigs); } // Load placement policy and pass it configured queues @@ -372,7 +374,7 @@ public synchronized void reloadAllocations() throws IOException, queueMaxAMShareDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts, fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls, - newPlacementPolicy, configuredQueues); + reservationQueueConfigs, newPlacementPolicy, configuredQueues); lastSuccessfulReload = clock.getTime(); lastReloadAttemptFailed = false; @@ -392,8 +394,9 @@ private void loadQueue(String parentName, Element element, Map minSharePreemptionTimeouts, Map fairSharePreemptionTimeouts, Map fairSharePreemptionThresholds, - Map> queueAcls, - Map> configuredQueues) + Map> queueAcls, + Map> configuredQueues, + Map reservationQueueConfigs) throws AllocationConfigurationException { String queueName = element.getAttribute("name"); if (parentName != null) { @@ -452,16 +455,21 @@ private void loadQueue(String parentName, Element element, String text = ((Text)field.getFirstChild()).getData(); acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text)); } else if ("aclAdministerApps".equals(field.getTagName())) { - String text = ((Text)field.getFirstChild()).getData(); + String text = ((Text) field.getFirstChild()).getData(); acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text)); + } else if ("reservation".equals(field.getTagName())) { + ReservationQueueConfiguration reservationQueueConfiguration = + ReservationQueueConfiguration.createFromXml(field); + reservationQueueConfigs.put(queueName, reservationQueueConfiguration); + isLeaf = false; + configuredQueues.get(FSQueueType.PARENT).add(queueName); } else if ("queue".endsWith(field.getTagName()) || "pool".equals(field.getTagName())) { loadQueue(queueName, field, minQueueResources, maxQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts, fairSharePreemptionThresholds, - queueAcls, configuredQueues); - configuredQueues.get(FSQueueType.PARENT).add(queueName); + queueAcls, configuredQueues, reservationQueueConfigs); isLeaf = false; } } @@ -473,6 +481,8 @@ private void loadQueue(String parentName, Element element, } else { configuredQueues.get(FSQueueType.LEAF).add(queueName); } + } else { + configuredQueues.get(FSQueueType.PARENT).add(queueName); } queueAcls.put(queueName, acls); if (maxQueueResources.containsKey(queueName) && diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 3fc3019..07aef85 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -1539,4 +1539,16 @@ public synchronized void updateNodeResource(RMNode nm, return EnumSet .of(SchedulerResourceTypes.MEMORY, SchedulerResourceTypes.CPU); } + + @Override + public Set getPlanQueues() throws YarnException { + Set planQueues = new HashSet(); + for (FSQueue fsQueue : queueMgr.getQueues()) { + String queueName = fsQueue.getName(); + if (allocConf.isReservable(queueName)) { + planQueues.add(queueName); + } + } + return planQueues; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/ReservationQueueConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/ReservationQueueConfiguration.java new file mode 100644 index 0000000..b05308c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/ReservationQueueConfiguration.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; + + +import org.w3c.dom.Element; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; +import org.w3c.dom.Text; + +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class ReservationQueueConfiguration { + private long reservationWindow; + private long enforcementWindow; + private String reservationAdmissionPolicy; + private String reservationAgent; + private String planner; + private boolean showReservationAsQueues; + private boolean moveOnExpiry; + private float avgOverTimeMultiplier; + private float maxOverTimeMultiplier; + + public ReservationQueueConfiguration() { + this.reservationWindow = ReservationSchedulerConfiguration + .DEFAULT_RESERVATION_WINDOW; + this.enforcementWindow = ReservationSchedulerConfiguration + .DEFAULT_RESERVATION_ENFORCEMENT_WINDOW; + this.reservationAdmissionPolicy = ReservationSchedulerConfiguration + .DEFAULT_RESERVATION_ADMISSION_POLICY; + this.reservationAgent = ReservationSchedulerConfiguration + .DEFAULT_RESERVATION_AGENT_NAME; + this.planner = ReservationSchedulerConfiguration + .DEFAULT_RESERVATION_PLANNER_NAME; + this.showReservationAsQueues = ReservationSchedulerConfiguration + .DEFAULT_SHOW_RESERVATIONS_AS_QUEUES; + this.moveOnExpiry = ReservationSchedulerConfiguration + .DEFAULT_RESERVATION_MOVE_ON_EXPIRY; + this.avgOverTimeMultiplier = ReservationSchedulerConfiguration + .DEFAULT_CAPACITY_OVER_TIME_MULTIPLIER; + this.maxOverTimeMultiplier = ReservationSchedulerConfiguration + .DEFAULT_CAPACITY_OVER_TIME_MULTIPLIER; + } + + public long getReservationWindowMsec() { + return reservationWindow; + } + + public long getEnforcementWindowMsec() { + return enforcementWindow; + } + + public boolean shouldShowReservationAsQueues() { + return showReservationAsQueues; + } + + public boolean shouldMoveOnExpiry() { + return moveOnExpiry; + } + + public String getReservationAdmissionPolicy() { + return reservationAdmissionPolicy; + } + + public String getReservationAgent() { + return reservationAgent; + } + + public String getPlanner() { + return planner; + } + + public float getAvgOverTimeMultiplier() { + return avgOverTimeMultiplier; + } + + public float getMaxOverTimeMultiplier() { + return maxOverTimeMultiplier; + } + + public static ReservationQueueConfiguration createFromXml( + Element reservationQueueElement) { + ReservationQueueConfiguration configuration = new ReservationQueueConfiguration(); + NodeList fields = reservationQueueElement.getChildNodes(); + for (int j = 0; j < fields.getLength(); j++) { + Node fieldNode = fields.item(j); + if (!(fieldNode instanceof Element)) + continue; + Element field = (Element) fieldNode; + if ("average-over-time-multiplier".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + float val = Float.parseFloat(text); + configuration.avgOverTimeMultiplier = val; + } else if ("max-over-time-multiplier".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + float val = Float.parseFloat(text); + configuration.maxOverTimeMultiplier = val; + } else if ("reservation-window".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + long val = Long.parseLong(text); + configuration.reservationWindow = val; + } else if ("enforcement-window".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + long val = Long.parseLong(text); + configuration.enforcementWindow = val; + } else if ("reservation-planner".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + configuration.planner = text; + } else if ("reservation-agent".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + configuration.reservationAgent = text; + } else if ("reservation-policy".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + configuration.reservationAdmissionPolicy = text; + } else if ("reservation-move-on-expiry".equals(field.getTagName())) { + String text = ((Text) field.getFirstChild()).getData().trim(); + boolean val = Boolean.parseBoolean(text); + configuration.moveOnExpiry = val; + } else if ("show-reservations-as-queues".equals(field.getTagName())) { + String text = ((Text) field.getFirstChild()).getData().trim(); + boolean val = Boolean.parseBoolean(text); + configuration.showReservationAsQueues = val; + } + } + return configuration; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java index b5e4df2..a751257 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java @@ -22,7 +22,9 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.FileWriter; import java.io.IOException; +import java.io.PrintWriter; import java.util.Collections; import java.util.Map; import java.util.Random; @@ -101,6 +103,62 @@ public static void validateNewReservationQueue( .assertTrue(newPlan.getSharingPolicy() instanceof CapacityOverTimePolicy); } + static void setupAllocationFile(String allocationFile) + throws IOException { + PrintWriter out = new PrintWriter(new FileWriter(allocationFile)); + out.println(""); + out.println(""); + out.println(""); + out.println("1"); + out.println(""); + out.println(""); + out.println("1"); + out.println(""); + out.println("3"); + out.println(""); + out.println(""); + out.println("7"); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println("8"); + out.println(""); + out.println("drf"); + out.println(""); + out.close(); + } + + static void updateAllocationFile(String allocationFile) + throws IOException { + PrintWriter out = new PrintWriter(new FileWriter(allocationFile)); + out.println(""); + out.println(""); + out.println(""); + out.println("5"); + out.println(""); + out.println(""); + out.println("5"); + out.println(""); + out.println("3"); + out.println(""); + out.println(""); + out.println("7"); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println("80"); + out.println(""); + out.println(""); + out.println(""); + out.println("10"); + out.println(""); + out.println("drf"); + out.println(""); + out.close(); + } + @SuppressWarnings("unchecked") public CapacityScheduler mockCapacityScheduler(int numContainers) throws IOException { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java new file mode 100644 index 0000000..ba69d22 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java @@ -0,0 +1,158 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerTestBase; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; + +import static org.mockito.Mockito.when; + +public class TestFairReservationSystem extends FairSchedulerTestBase { + private final static String ALLOC_FILE = new File(TEST_DIR, + TestFairReservationSystem.class.getName() + ".xml").getAbsolutePath(); + + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, + ResourceScheduler.class); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + return conf; + } + + @Before + public void setup() throws IOException { + conf = createConfiguration(); + } + + @After + public void teardown() { + if (resourceManager != null) { + resourceManager.stop(); + resourceManager = null; + } + conf = null; + } + + @Test + public void testFairReservationSystemInitialize() throws IOException { + ReservationSystemTestUtil.setupAllocationFile(ALLOC_FILE); + + ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); + + // Setup + RMContext mockRMContext = testUtil.createRMContext(conf); + setupFairScheduler(testUtil, mockRMContext); + + FairReservationSystem reservationSystem = new FairReservationSystem(); + reservationSystem.setRMContext(mockRMContext); + + try { + reservationSystem.reinitialize(scheduler.getConf(), mockRMContext); + } catch (YarnException e) { + Assert.fail(e.getMessage()); + } + + ReservationSystemTestUtil.validateReservationQueue(reservationSystem, + testUtil.getFullReservationQueueName()); + } + + @Test + public void testFairReservationSystemReinitialize() throws IOException { + ReservationSystemTestUtil.setupAllocationFile(ALLOC_FILE); + + ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); + + // Setup + RMContext mockContext = testUtil.createRMContext(conf); + setupFairScheduler(testUtil, mockContext); + + FairReservationSystem reservationSystem = new FairReservationSystem(); + reservationSystem.setRMContext(mockContext); + + try { + reservationSystem.reinitialize(scheduler.getConf(), mockContext); + } catch (YarnException e) { + Assert.fail(e.getMessage()); + } + + // Assert queue in original config + final String planQNam = testUtil.getFullReservationQueueName(); + ReservationSystemTestUtil.validateReservationQueue(reservationSystem, + planQNam); + + // Dynamically add a plan + ReservationSystemTestUtil.updateAllocationFile(ALLOC_FILE); + scheduler.reinitialize(conf, mockContext); + + try { + reservationSystem.reinitialize(conf, mockContext); + } catch (YarnException e) { + Assert.fail(e.getMessage()); + } + + String newQueue = "root.reservation"; + ReservationSystemTestUtil.validateNewReservationQueue + (reservationSystem, newQueue); + } + + private void setupFairScheduler(ReservationSystemTestUtil testUtil, + RMContext rmContext) throws + IOException { + //resourceManager = new MockRM(conf); + //resourceManager.start(); + //scheduler = (FairScheduler) resourceManager.getResourceScheduler(); + + // scheduler = Mockito.spy(new FairScheduler()); + scheduler = new FairScheduler(); + scheduler.setRMContext(rmContext); + + int numContainers = 10; + // testUtil.initializeRMContext(numContainers, scheduler, rmContext); + when(rmContext.getScheduler()).thenReturn(scheduler); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, rmContext); + + + Resource resource = testUtil.calculateClusterResource(numContainers); + RMNode node1 = MockNodes.newNodeInfo(1, resource, 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java index 656e20d..1e68c64 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java @@ -21,18 +21,24 @@ import java.io.File; import java.io.FileWriter; +import java.io.IOException; import java.io.PrintWriter; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.reservation + .ReservationSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.NestedUserQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Test; +import org.xml.sax.SAXException; + +import javax.xml.parsers.ParserConfigurationException; public class TestAllocationFileLoaderService { @@ -525,7 +531,55 @@ public void testQueueAlongsideRoot() throws Exception { allocLoader.setReloadListener(confHolder); allocLoader.reloadAllocations(); } - + + @Test + public void testReservableQueue() throws Exception { + Configuration conf = new Configuration(); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println("10000"); + out.println("false"); + out.println("DummyAgentName"); + out.println("2.4f"); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.close(); + + AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); + allocLoader.init(conf); + ReloadListener confHolder = new ReloadListener(); + allocLoader.setReloadListener(confHolder); + allocLoader.reloadAllocations(); + + AllocationConfiguration allocConf = confHolder.allocConf; + String reservableQueueName = "root.reservable"; + String nonreservableQueueName = "root.other"; + assertFalse(allocConf.isReservable(nonreservableQueueName)); + assertTrue(allocConf.isReservable(reservableQueueName)); + + // Configured properties + assertFalse(allocConf.getMoveOnExpiry(reservableQueueName)); + assertEquals(10000, allocConf.getReservationWindow(reservableQueueName)); + assertEquals(2.4, allocConf.getInstantaneousMaxCapacity + (reservableQueueName), 0.0001); + assertEquals("DummyAgentName", allocConf.getReservationAgent(reservableQueueName)); + + // Default properties + assertEquals(1, allocConf.getAverageCapacity(reservableQueueName), 0.001); + assertFalse(allocConf.getShowReservationAsQueues(reservableQueueName)); + assertEquals(ReservationSchedulerConfiguration + .DEFAULT_RESERVATION_ADMISSION_POLICY, + allocConf.getReservationAdmissionPolicy(reservableQueueName)); + } + private class ReloadListener implements AllocationFileLoaderService.Listener { public AllocationConfiguration allocConf;