diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java index da30a2b..889c3bc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import java.io.IOException; import java.util.Map; /** @@ -29,7 +30,9 @@ * Update the scheduler configuration with the provided key value pairs. * @param user User issuing the request * @param confUpdate Key-value pairs for configurations to be updated. + * @throws IOException if scheduler could not be reinitialized */ - void mutateConfiguration(String user, Map confUpdate); + void mutateConfiguration(String user, Map confUpdate) + throws IOException; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 7362688..8ab0a36 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -85,6 +85,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; @@ -2486,4 +2487,15 @@ public boolean moveReservedContainer(RMContainer toBeMovedContainer, writeLock.unlock(); } } + + public void updateConfiguration(UserGroupInformation user, + Map confUpdate) throws IOException { + if (csConfProvider instanceof MutableConfigurationProvider) { + ((MutableConfigurationProvider) csConfProvider).mutateConfiguration( + user.getShortUserName(), confUpdate); + } else { + throw new UnsupportedOperationException("Configured CS configuration " + + "provider does not support updating configuration."); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java index a208fb9..b97be1b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java @@ -58,7 +58,11 @@ public synchronized boolean confirmMutation(long id, boolean isValid) { if (isValid) { Map mutations = mutation.getUpdates(); for (Map.Entry kv : mutations.entrySet()) { - schedConf.set(kv.getKey(), kv.getValue()); + if (kv.getValue() == null) { + schedConf.unset(kv.getKey()); + } else { + schedConf.set(kv.getKey(), kv.getValue()); + } } } return true; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java index 267ab6a..cc61a64 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java @@ -68,26 +68,30 @@ public void init(Configuration config) throws IOException { @Override public CapacitySchedulerConfiguration loadConfiguration(Configuration configuration) throws IOException { - Configuration loadedConf = new Configuration(configuration); - loadedConf.addResource(schedConf); + Configuration loadedConf = new Configuration(schedConf); + loadedConf.addResource(configuration); return new CapacitySchedulerConfiguration(loadedConf, false); } @Override public void mutateConfiguration(String user, - Map confUpdate) { + Map confUpdate) throws IOException { Configuration oldConf = new Configuration(schedConf); LogMutation log = new LogMutation(confUpdate, user); long id = confStore.logMutation(log); for (Map.Entry kv : confUpdate.entrySet()) { - schedConf.set(kv.getKey(), kv.getValue()); + if (kv.getValue() == null) { + schedConf.unset(kv.getKey()); + } else { + schedConf.set(kv.getKey(), kv.getValue()); + } } try { rmContext.getScheduler().reinitialize(conf, rmContext); } catch (IOException e) { schedConf = oldConf; confStore.confirmMutation(id, false); - return; + throw e; } confStore.confirmMutation(id, true); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java index bd0602b..8c70fffd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java @@ -57,6 +57,7 @@ import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; +import com.google.common.base.Joiner; import org.apache.commons.codec.binary.Base64; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -141,6 +142,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; @@ -2614,4 +2616,212 @@ public Void run() throws IOException, YarnException { app.getApplicationTimeouts().get(appTimeout.getTimeoutType())); return Response.status(Status.OK).entity(timeout).build(); } + + @PUT + @Path("/scheduler/conf") + @Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8, + MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 }) + @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) + public Response updateSchedulerConfiguration(SchedulerConfigurationInfo + mutationInfo, @Context HttpServletRequest hsr) + throws AuthorizationException, InterruptedException { + init(); + + UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true); + if (callerUGI == null) { + throw new AuthorizationException( + "Unable to obtain user name, user not authenticated"); + } + + if (UserGroupInformation.isSecurityEnabled() && isStaticUser(callerUGI)) { + return Response.status(Status.FORBIDDEN) + .entity("The default static user cannot carry out this operation.") + .build(); + } + + ResourceScheduler scheduler = rm.getResourceScheduler(); + if (scheduler instanceof CapacityScheduler) { + try { + callerUGI.doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws IOException, YarnException { + Map confUpdate = + constructKeyValueConfUpdate(mutationInfo); + ((CapacityScheduler) scheduler).updateConfiguration(callerUGI, + confUpdate); + return null; + } + }); + } catch (IOException e) { + return Response.status(Status.BAD_REQUEST).entity(e.getMessage()) + .build(); + } + return Response.status(Status.OK).entity("Configuration change " + + "successfully applied.").build(); + } else { + return Response.status(Status.BAD_REQUEST) + .entity("Configuration change only supported by CapacityScheduler.") + .build(); + } + } + + private Map constructKeyValueConfUpdate( + SchedulerConfigurationInfo mutationInfo) throws IOException { + CapacitySchedulerConfiguration currentConf = + ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration(); + CapacitySchedulerConfiguration proposedConf = + new CapacitySchedulerConfiguration(currentConf, false); + Map confUpdate = new HashMap<>(); + for (RemoveQueueInfo removeQueueInfo : mutationInfo.getRemoveQueueInfo()) { + removeQueues(removeQueueInfo, "", proposedConf, confUpdate); + } + for (AddQueueInfo addQueueInfo : mutationInfo.getAddQueueInfo()) { + addQueues(addQueueInfo, "", proposedConf, confUpdate); + } + for (UpdateQueueInfo updateQueueInfo : mutationInfo.getUpdateQueueInfo()) { + updateQueues(updateQueueInfo, "", proposedConf, confUpdate); + } + return confUpdate; + } + + private void removeQueues( + RemoveQueueInfo removeInfo, String queuePrefix, + CapacitySchedulerConfiguration proposedConf, + Map confUpdate) throws IOException { + if (removeInfo == null) { + return; + } else { + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + List childRemoveQueueInfos = + removeInfo.getRemoveQueueInfo(); + if (childRemoveQueueInfos == null || + childRemoveQueueInfos.size() == 0) { + // Queue name stored in removeInfo could have multiple components. + String queue = removeInfo.getQueue(); + String queueName = queue.substring(queue.lastIndexOf('.') + 1); + String fullQueueName = queuePrefix.isEmpty() ? queue : + queuePrefix + '.' + queue; + CSQueue queueToRemove = cs.getQueue(queueName); + if (queueToRemove == null || + !queueToRemove.getQueuePath().equals(fullQueueName)) { + throw new IOException("Queue " + fullQueueName + " not found"); + } + String parentQueuePath = fullQueueName.substring(0, fullQueueName + .lastIndexOf('.')); + String[] siblingQueues = proposedConf.getQueues(parentQueuePath); + List newSiblingQueues = new ArrayList<>(); + for (String siblingQueue : siblingQueues) { + if (!siblingQueue.equals(queueName)) { + newSiblingQueues.add(siblingQueue); + } + } + proposedConf.setQueues(parentQueuePath, newSiblingQueues + .toArray(new String[0])); + String queuesConfig = CapacitySchedulerConfiguration.PREFIX + + parentQueuePath + CapacitySchedulerConfiguration.DOT + + CapacitySchedulerConfiguration.QUEUES; + if (newSiblingQueues.size() == 0) { + confUpdate.put(queuesConfig, null); + } else { + confUpdate.put(queuesConfig, Joiner.on(',').join(newSiblingQueues)); + } + for (Map.Entry confRemove : proposedConf.getValByRegex( + ".*" + fullQueueName.replaceAll("\\.", "\\.") + "\\..*") + .entrySet()) { + proposedConf.unset(confRemove.getKey()); + confUpdate.put(confRemove.getKey(), null); + } + } else { + for (RemoveQueueInfo childRemoveQueueInfo : childRemoveQueueInfos) { + removeQueues(childRemoveQueueInfo, (queuePrefix.isEmpty() ? "" : + queuePrefix + ".") + removeInfo.getQueue(), proposedConf, + confUpdate); + } + } + } + } + + private void addQueues( + AddQueueInfo addInfo, String queuePrefix, + CapacitySchedulerConfiguration proposedConf, + Map confUpdate) { + if (addInfo == null) { + return; + } else { + String[] parts = addInfo.getQueue().split("\\."); + StringBuilder parentQueuePath = new StringBuilder(queuePrefix); + for (String queuePart : parts) { + if (queuePart.equals(CapacitySchedulerConfiguration.ROOT)) { + parentQueuePath = new StringBuilder( + CapacitySchedulerConfiguration.ROOT); + continue; + } + boolean foundQueue = false; + String[] subqueues = proposedConf.getQueues( + parentQueuePath.toString()); + if (subqueues != null) { + for (String queue : subqueues) { + if (queue.equals(queuePart)) { + foundQueue = true; + break; + } + } + } + if (!foundQueue) { + List siblingQueues = subqueues == null ? new ArrayList<>() : + new ArrayList<>(Arrays.asList(subqueues)); + siblingQueues.add(queuePart); + proposedConf.setQueues(parentQueuePath.toString(), + siblingQueues.toArray(new String[0])); + confUpdate.put(CapacitySchedulerConfiguration.PREFIX + + parentQueuePath.toString() + CapacitySchedulerConfiguration.DOT + + CapacitySchedulerConfiguration.QUEUES, + Joiner.on(',').join(siblingQueues)); + } + parentQueuePath.append(".").append(queuePart); + } + String keyPrefix = CapacitySchedulerConfiguration.PREFIX + + (queuePrefix.isEmpty() ? "" : queuePrefix + ".") + + addInfo.getQueue() + CapacitySchedulerConfiguration.DOT; + for (Map.Entry kv : addInfo.getParams().entrySet()) { + if (kv.getValue() == null) { + proposedConf.unset(keyPrefix + kv.getKey()); + } else { + proposedConf.set(keyPrefix + kv.getKey(), kv.getValue()); + } + confUpdate.put(keyPrefix + kv.getKey(), kv.getValue()); + } + for (AddQueueInfo childAddInfo : addInfo.getAddQueueInfo()) { + addQueues(childAddInfo, (queuePrefix.isEmpty() ? "" : queuePrefix + + ".") + addInfo.getQueue(), proposedConf, confUpdate); + } + } + } + + private void updateQueues(UpdateQueueInfo updateInfo, + String queuePrefix, CapacitySchedulerConfiguration proposedConf, + Map confUpdate) { + if (updateInfo == null) { + return; + } else { + String queuePath = (queuePrefix.isEmpty() ? "" : queuePrefix + ".") + + updateInfo.getQueue(); + String keyPrefix = CapacitySchedulerConfiguration.PREFIX + + queuePath + CapacitySchedulerConfiguration.DOT; + for (Map.Entry kv : updateInfo.getParams().entrySet()) { + if (kv.getValue() == null) { + proposedConf.unset(keyPrefix + kv.getKey()); + } else { + proposedConf.set(keyPrefix + kv.getKey(), kv.getValue()); + } + confUpdate.put(keyPrefix + kv.getKey(), kv.getValue()); + } + for (UpdateQueueInfo childUpdateInfo : updateInfo + .getUpdateNestedQueues()) { + updateQueues(childUpdateInfo, (queuePrefix.isEmpty() ? "" : + queuePrefix + ".") + updateInfo.getQueue(), proposedConf, + confUpdate); + } + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AddQueueInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AddQueueInfo.java new file mode 100644 index 0000000..f6ee330 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AddQueueInfo.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; + +/** + * Information for adding a queue to scheduler configuration (including params + * for this queue). + */ +@XmlRootElement(name = "add") +@XmlAccessorType(XmlAccessType.FIELD) +public class AddQueueInfo { + + @XmlElement(name = "queueName") + private String queue; + + private HashMap params = new HashMap<>(); + + @XmlElement(name = "add") + private ArrayList addQueueInfo = new ArrayList<>(); + + public AddQueueInfo() { } + + public AddQueueInfo(String queue, Map params, + List childQueueInfo) { + this.queue = queue; + this.params = new HashMap<>(params); + this.addQueueInfo = new ArrayList<>(childQueueInfo); + } + + public String getQueue() { + return this.queue; + } + + public HashMap getParams() { + return this.params; + } + + public ArrayList getAddQueueInfo() { + return this.addQueueInfo; + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/RemoveQueueInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/RemoveQueueInfo.java new file mode 100644 index 0000000..1b09cc5 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/RemoveQueueInfo.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.ArrayList; +import java.util.List; + +/** + * Information for removing a queue from scheduler configuration. + */ +@XmlRootElement(name = "remove") +@XmlAccessorType(XmlAccessType.FIELD) +public class RemoveQueueInfo { + + @XmlElement(name = "queueName") + private String queue; + + @XmlElement(name = "remove") + private ArrayList removeQueueInfo = new ArrayList<>(); + + public RemoveQueueInfo() { + } + + public RemoveQueueInfo(String queue, List removeQueueInfo) { + this.queue = queue; + this.removeQueueInfo = new ArrayList<>(removeQueueInfo); + } + + public String getQueue() { + return this.queue; + } + + public ArrayList getRemoveQueueInfo() { + return this.removeQueueInfo; + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/SchedulerConfigurationInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/SchedulerConfigurationInfo.java new file mode 100644 index 0000000..9d892ed --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/SchedulerConfigurationInfo.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao; + +import java.util.ArrayList; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; + +/** + * Information for making scheduler configuration changes (supports adding, + * removing, or updating a queue). + */ +@XmlRootElement(name = "schedConf") +@XmlAccessorType(XmlAccessType.FIELD) +public class SchedulerConfigurationInfo { + + @XmlElement(name = "add") + private ArrayList addQueueInfo = new ArrayList<>(); + + @XmlElement(name = "remove") + private ArrayList removeQueueInfo = new ArrayList<>(); + + @XmlElement(name = "update") + private ArrayList updateQueueInfo = new ArrayList<>(); + + public SchedulerConfigurationInfo() { + // JAXB needs this + } + + public ArrayList getAddQueueInfo() { + return addQueueInfo; + } + + public ArrayList getRemoveQueueInfo() { + return removeQueueInfo; + } + + public ArrayList getUpdateQueueInfo() { + return updateQueueInfo; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/UpdateQueueInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/UpdateQueueInfo.java new file mode 100644 index 0000000..9f90a11 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/UpdateQueueInfo.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; + +/** + * Information for updating the params of a queue in scheduler configuration. + */ +@XmlRootElement(name = "update") +@XmlAccessorType(XmlAccessType.FIELD) +public class UpdateQueueInfo { + + @XmlElement(name = "queueName") + private String queue; + + private HashMap params = new HashMap<>(); + + @XmlElement(name = "update") + private ArrayList updateQueueInfo = new ArrayList<>(); + + public UpdateQueueInfo() { + } + + public UpdateQueueInfo(String queue, Map params, + List updateQueueInfo) { + this.queue = queue; + this.params = new HashMap<>(params); + this.updateQueueInfo = new ArrayList<>(updateQueueInfo); + } + + public String getQueue() { + return this.queue; + } + + public HashMap getParams() { + return this.params; + } + + public ArrayList getUpdateNestedQueues() { + return this.updateQueueInfo; + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java index 3f103b1..254da31 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java @@ -77,7 +77,11 @@ public void testInMemoryBackedProvider() throws IOException { assertNull(confProvider.loadConfiguration(conf).get("badKey")); doThrow(new IOException()).when(cs).reinitialize(any(Configuration.class), any(RMContext.class)); - confProvider.mutateConfiguration(TEST_USER, badUpdate); + try { + confProvider.mutateConfiguration(TEST_USER, badUpdate); + } catch (IOException e) { + // Expected exception. + } assertNull(confProvider.loadConfiguration(conf).get("badKey")); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesConfigurationMutation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesConfigurationMutation.java new file mode 100644 index 0000000..1ff1198 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesConfigurationMutation.java @@ -0,0 +1,469 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.webapp; + +import com.google.inject.Guice; +import com.google.inject.servlet.ServletModule; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.api.json.JSONJAXBContext; +import com.sun.jersey.api.json.JSONMarshaller; +import com.sun.jersey.guice.spi.container.servlet.GuiceContainer; +import com.sun.jersey.test.framework.WebAppDescriptor; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.QueueState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AddQueueInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.RemoveQueueInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerConfigurationInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.UpdateQueueInfo; +import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; +import org.apache.hadoop.yarn.webapp.GuiceServletConfig; +import org.apache.hadoop.yarn.webapp.JerseyTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response.Status; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +/** + * Test scheduler configuration mutation via REST API. + */ +public class TestRMWebServicesConfigurationMutation extends JerseyTestBase { + + private static MockRM rm; + private static String userName; + private static CapacitySchedulerConfiguration csConf; + private static YarnConfiguration conf; + + private static final List EMPTY_ADD_LIST = + Collections.emptyList(); + + private static class WebServletModule extends ServletModule { + @Override + protected void configureServlets() { + bind(JAXBContextResolver.class); + bind(RMWebServices.class); + bind(GenericExceptionHandler.class); + try { + userName = UserGroupInformation.getCurrentUser().getShortUserName(); + } catch (IOException ioe) { + throw new RuntimeException("Unable to get current user name " + + ioe.getMessage(), ioe); + } + csConf = new CapacitySchedulerConfiguration(); + setupQueueConfiguration(csConf); + conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.set(CapacitySchedulerConfiguration.CS_CONF_PROVIDER, + CapacitySchedulerConfiguration.STORE_CS_CONF_PROVIDER); + conf.set(YarnConfiguration.YARN_ADMIN_ACL, userName); + // Add csConf as a resource to allow child queues of root to change + conf.addResource(csConf); + rm = new MockRM(conf); + bind(ResourceManager.class).toInstance(rm); + serve("/*").with(GuiceContainer.class); + filter("/*").through(TestRMWebServicesAppsModification + .TestRMCustomAuthFilter.class); + } + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + GuiceServletConfig.setInjector( + Guice.createInjector(new WebServletModule())); + } + + private static void setupQueueConfiguration( + CapacitySchedulerConfiguration config) { + config.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[]{"a", "b", "c"}); + + final String a = CapacitySchedulerConfiguration.ROOT + ".a"; + config.setCapacity(a, 25f); + config.setMaximumCapacity(a, 50f); + + final String a1 = a + ".a1"; + final String a2 = a + ".a2"; + config.setQueues(a, new String[]{"a1", "a2"}); + config.setCapacity(a1, 100f); + config.setCapacity(a2, 0f); + + final String b = CapacitySchedulerConfiguration.ROOT + ".b"; + config.setCapacity(b, 75f); + + final String c = CapacitySchedulerConfiguration.ROOT + ".c"; + config.setCapacity(c, 0f); + + final String c1 = c + ".c1"; + config.setQueues(c, new String[] {"c1"}); + config.setCapacity(c1, 0f); + } + + public TestRMWebServicesConfigurationMutation() { + super(new WebAppDescriptor.Builder( + "org.apache.hadoop.yarn.server.resourcemanager.webapp") + .contextListenerClass(GuiceServletConfig.class) + .filterClass(com.google.inject.servlet.GuiceFilter.class) + .contextPath("jersey-guice-filter").servletPath("/").build()); + } + + @Test + public void testAddNestedQueue() throws Exception { + WebResource r = resource(); + + ClientResponse response; + + // Add parent queue root.d with two children d1 and d2. + SchedulerConfigurationInfo scInfo = new SchedulerConfigurationInfo(); + Map fullCapacity = new HashMap<>(); + fullCapacity.put(CapacitySchedulerConfiguration.CAPACITY, "100"); + fullCapacity.put(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY, "100"); + Map nearEmptyCapacity = new HashMap<>(); + nearEmptyCapacity.put(CapacitySchedulerConfiguration.CAPACITY, "1E-4"); + nearEmptyCapacity.put(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY, + "1E-4"); + Map emptyCapacity = new HashMap<>(); + emptyCapacity.put(CapacitySchedulerConfiguration.CAPACITY, "0"); + emptyCapacity.put(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY, "0"); + AddQueueInfo d1 = new AddQueueInfo("d1", fullCapacity, EMPTY_ADD_LIST); + AddQueueInfo d2 = new AddQueueInfo("d2", emptyCapacity, EMPTY_ADD_LIST); + AddQueueInfo d = new AddQueueInfo("d", nearEmptyCapacity, + Arrays.asList(d1, d2)); + AddQueueInfo root = new AddQueueInfo("root", Collections.emptyMap(), + Arrays.asList(d)); + scInfo.getAddQueueInfo().add(root); + response = + r.path("ws").path("v1").path("cluster") + .path("scheduler").path("conf").queryParam("user.name", userName) + .accept(MediaType.APPLICATION_JSON) + .entity(toJson(scInfo, SchedulerConfigurationInfo.class), + MediaType.APPLICATION_JSON) + .put(ClientResponse.class); + + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + CapacitySchedulerConfiguration newCSConf = + ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration(); + assertEquals(4, newCSConf.getQueues("root").length); + assertEquals(2, newCSConf.getQueues("root.d").length); + } + + @Test + public void testAddWithUpdate() throws Exception { + WebResource r = resource(); + + ClientResponse response; + + // Add root.d with capacity 25, reducing root.b capacity from 75 to 50. + SchedulerConfigurationInfo scInfo = new SchedulerConfigurationInfo(); + Map dCapacity = new HashMap<>(); + dCapacity.put(CapacitySchedulerConfiguration.CAPACITY, "25"); + Map bCapacity = new HashMap<>(); + bCapacity.put(CapacitySchedulerConfiguration.CAPACITY, "50"); + AddQueueInfo d = new AddQueueInfo("root.d", dCapacity, + Collections.emptyList()); + UpdateQueueInfo b = new UpdateQueueInfo("root.b", bCapacity, + Collections.emptyList()); + scInfo.getAddQueueInfo().add(d); + scInfo.getUpdateQueueInfo().add(b); + response = + r.path("ws").path("v1").path("cluster") + .path("scheduler").path("conf").queryParam("user.name", userName) + .accept(MediaType.APPLICATION_JSON) + .entity(toJson(scInfo, SchedulerConfigurationInfo.class), + MediaType.APPLICATION_JSON) + .put(ClientResponse.class); + + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + CapacitySchedulerConfiguration newCSConf = + ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration(); + assertEquals(4, newCSConf.getQueues("root").length); + assertEquals(25.0f, newCSConf.getNonLabeledQueueCapacity("root.d"), 0.01f); + assertEquals(50.0f, newCSConf.getNonLabeledQueueCapacity("root.b"), 0.01f); + } + + @Test + public void testRemoveQueue() throws Exception { + WebResource r = resource(); + + ClientResponse response; + + stopQueue("root.a.a2"); + // Remove root.a.a2 + SchedulerConfigurationInfo scInfo = new SchedulerConfigurationInfo(); + RemoveQueueInfo a2Info = new RemoveQueueInfo("a2", + Collections.emptyList()); + RemoveQueueInfo aInfo = new RemoveQueueInfo("a", Arrays.asList(a2Info)); + RemoveQueueInfo rootInfo = + new RemoveQueueInfo("root", Arrays.asList(aInfo)); + scInfo.getRemoveQueueInfo().add(rootInfo); + response = + r.path("ws").path("v1").path("cluster") + .path("scheduler").path("conf").queryParam("user.name", userName) + .accept(MediaType.APPLICATION_JSON) + .entity(toJson(scInfo, SchedulerConfigurationInfo.class), + MediaType.APPLICATION_JSON) + .put(ClientResponse.class); + + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + CapacitySchedulerConfiguration newCSConf = + ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration(); + assertEquals(1, newCSConf.getQueues("root.a").length); + assertEquals("a1", newCSConf.getQueues("root.a")[0]); + } + + @Test + public void testRemoveQueueFullPath() throws Exception{ + WebResource r = resource(); + + ClientResponse response; + + stopQueue("root.a.a2"); + // Remove root.a.a2 + SchedulerConfigurationInfo scInfo = new SchedulerConfigurationInfo(); + RemoveQueueInfo a2 = new RemoveQueueInfo("root.a.a2", + Collections.emptyList()); + scInfo.getRemoveQueueInfo().add(a2); + response = + r.path("ws").path("v1").path("cluster") + .path("scheduler").path("conf").queryParam("user.name", userName) + .accept(MediaType.APPLICATION_JSON) + .entity(toJson(scInfo, SchedulerConfigurationInfo.class), + MediaType.APPLICATION_JSON) + .put(ClientResponse.class); + + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + CapacitySchedulerConfiguration newCSConf = + ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration(); + assertEquals(1, newCSConf.getQueues("root.a").length); + assertEquals("a1", newCSConf.getQueues("root.a")[0]); + } + + @Test + public void testRemoveParentQueue() throws Exception { + WebResource r = resource(); + + ClientResponse response; + + stopQueue("root.c", "root.c.c1"); + // Remove root.c (parent queue) + SchedulerConfigurationInfo scInfo = new SchedulerConfigurationInfo(); + RemoveQueueInfo c = new RemoveQueueInfo("root.c", + Collections.emptyList()); + scInfo.getRemoveQueueInfo().add(c); + response = + r.path("ws").path("v1").path("cluster") + .path("scheduler").path("conf").queryParam("user.name", userName) + .accept(MediaType.APPLICATION_JSON) + .entity(toJson(scInfo, SchedulerConfigurationInfo.class), + MediaType.APPLICATION_JSON) + .put(ClientResponse.class); + + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + CapacitySchedulerConfiguration newCSConf = + ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration(); + assertEquals(2, newCSConf.getQueues("root").length); + } + + @Test + public void testRemoveParentQueueWithCapacity() throws Exception { + WebResource r = resource(); + + ClientResponse response; + + stopQueue("root.a", "root.a.a1", "root.a.a2"); + // Remove root.a (parent queue) with capacity 25 + SchedulerConfigurationInfo scInfo = new SchedulerConfigurationInfo(); + RemoveQueueInfo a = new RemoveQueueInfo("root.a", + Collections.emptyList()); + scInfo.getRemoveQueueInfo().add(a); + Map bCapacity = new HashMap<>(); + bCapacity.put(CapacitySchedulerConfiguration.CAPACITY, "100"); + UpdateQueueInfo b = new UpdateQueueInfo("root.b", bCapacity, + Collections.emptyList()); + scInfo.getUpdateQueueInfo().add(b); + response = + r.path("ws").path("v1").path("cluster") + .path("scheduler").path("conf").queryParam("user.name", userName) + .accept(MediaType.APPLICATION_JSON) + .entity(toJson(scInfo, SchedulerConfigurationInfo.class), + MediaType.APPLICATION_JSON) + .put(ClientResponse.class); + + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + CapacitySchedulerConfiguration newCSConf = + ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration(); + assertEquals(2, newCSConf.getQueues("root").length); + assertEquals(100.0f, newCSConf.getNonLabeledQueueCapacity("root.b"), + 0.01f); + } + + private void stopQueue(String... queuePaths) throws Exception { + WebResource r = resource(); + + ClientResponse response; + + // Set state of queues to STOPPED. + SchedulerConfigurationInfo scInfo = new SchedulerConfigurationInfo(); + Map stoppedParam = new HashMap<>(); + stoppedParam.put(CapacitySchedulerConfiguration.STATE, + QueueState.STOPPED.toString()); + for (String queue : queuePaths) { + UpdateQueueInfo stoppedInfo = new UpdateQueueInfo(queue, stoppedParam, + Collections.emptyList()); + scInfo.getUpdateQueueInfo().add(stoppedInfo); + } + response = + r.path("ws").path("v1").path("cluster") + .path("scheduler").path("conf").queryParam("user.name", userName) + .accept(MediaType.APPLICATION_JSON) + .entity(toJson(scInfo, SchedulerConfigurationInfo.class), + MediaType.APPLICATION_JSON) + .put(ClientResponse.class); + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + CapacitySchedulerConfiguration newCSConf = + ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration(); + for (String queue : queuePaths) { + assertEquals(QueueState.STOPPED, newCSConf.getState(queue)); + } + } + + @Test + public void testUpdateQueue() throws Exception { + WebResource r = resource(); + + ClientResponse response; + + // Update config value. + SchedulerConfigurationInfo scInfo = new SchedulerConfigurationInfo(); + Map updateParam = new HashMap<>(); + updateParam.put(CapacitySchedulerConfiguration.MAXIMUM_AM_RESOURCE_SUFFIX, + "0.2"); + UpdateQueueInfo updateInfo = new UpdateQueueInfo("root.a", updateParam, + Collections.emptyList()); + scInfo.getUpdateQueueInfo().add(updateInfo); + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + assertEquals(CapacitySchedulerConfiguration + .DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT, + cs.getConfiguration() + .getMaximumApplicationMasterResourcePerQueuePercent("root.a"), + 0.001f); + response = + r.path("ws").path("v1").path("cluster") + .path("scheduler").path("conf").queryParam("user.name", userName) + .accept(MediaType.APPLICATION_JSON) + .entity(toJson(scInfo, SchedulerConfigurationInfo.class), + MediaType.APPLICATION_JSON) + .put(ClientResponse.class); + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + CapacitySchedulerConfiguration newCSConf = cs.getConfiguration(); + assertEquals(0.2f, newCSConf + .getMaximumApplicationMasterResourcePerQueuePercent("root.a"), 0.001f); + + // Remove config. Config value should be reverted to default. + updateParam.put(CapacitySchedulerConfiguration.MAXIMUM_AM_RESOURCE_SUFFIX, + null); + updateInfo = new UpdateQueueInfo("root.a", updateParam, + Collections.emptyList()); + scInfo.getUpdateQueueInfo().clear(); + scInfo.getUpdateQueueInfo().add(updateInfo); + response = + r.path("ws").path("v1").path("cluster") + .path("scheduler").path("conf").queryParam("user.name", userName) + .accept(MediaType.APPLICATION_JSON) + .entity(toJson(scInfo, SchedulerConfigurationInfo.class), + MediaType.APPLICATION_JSON) + .put(ClientResponse.class); + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + newCSConf = cs.getConfiguration(); + assertEquals(CapacitySchedulerConfiguration + .DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT, newCSConf + .getMaximumApplicationMasterResourcePerQueuePercent("root.a"), + 0.001f); + } + + @Test + public void testUpdateQueueCapacity() throws Exception { + WebResource r = resource(); + + ClientResponse response; + + // Update root.a and root.b capacity to 50. + SchedulerConfigurationInfo scInfo = new SchedulerConfigurationInfo(); + Map updateParam = new HashMap<>(); + updateParam.put(CapacitySchedulerConfiguration.CAPACITY, "50"); + UpdateQueueInfo aUpdateInfo = new UpdateQueueInfo("root.a", updateParam, + Collections.emptyList()); + UpdateQueueInfo bUpdateInfo = new UpdateQueueInfo("root.b", updateParam, + Collections.emptyList()); + scInfo.getUpdateQueueInfo().add(aUpdateInfo); + scInfo.getUpdateQueueInfo().add(bUpdateInfo); + + response = + r.path("ws").path("v1").path("cluster") + .path("scheduler").path("conf").queryParam("user.name", userName) + .accept(MediaType.APPLICATION_JSON) + .entity(toJson(scInfo, SchedulerConfigurationInfo.class), + MediaType.APPLICATION_JSON) + .put(ClientResponse.class); + assertEquals(Status.OK.getStatusCode(), response.getStatus()); + CapacitySchedulerConfiguration newCSConf = + ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration(); + assertEquals(50.0f, newCSConf.getNonLabeledQueueCapacity("root.a"), 0.01f); + assertEquals(50.0f, newCSConf.getNonLabeledQueueCapacity("root.b"), 0.01f); + } + + @Override + @After + public void tearDown() throws Exception { + if (rm != null) { + rm.stop(); + } + super.tearDown(); + } + + @SuppressWarnings("rawtypes") + private String toJson(Object nsli, Class klass) throws Exception { + StringWriter sw = new StringWriter(); + JSONJAXBContext ctx = new JSONJAXBContext(klass); + JSONMarshaller jm = ctx.createJSONMarshaller(); + jm.marshallToJSON(nsli, sw); + return sw.toString(); + } +}