diff --git hadoop-tools/hadoop-sls/pom.xml hadoop-tools/hadoop-sls/pom.xml index a42f8decd8d..4fc056d07d1 100644 --- hadoop-tools/hadoop-sls/pom.xml +++ hadoop-tools/hadoop-sls/pom.xml @@ -73,6 +73,10 @@ com.fasterxml.jackson.core jackson-databind + + org.apache.hadoop + hadoop-yarn-server-router + @@ -123,6 +127,8 @@ src/main/data/2jobs2min-rumen-jh.json + src/main/data/nodes.csv + src/main/data/subclusters.json src/main/html/js/thirdparty/jquery.js src/main/html/js/thirdparty/d3-LICENSE src/main/html/js/thirdparty/d3.v3.js @@ -131,6 +137,7 @@ src/main/html/track.html.template src/test/resources/simulate.html.template src/test/resources/simulate.info.html.template + src/test/resources/subclusters.json src/test/resources/track.html.template src/test/resources/syn.json src/test/resources/syn_generic.json diff --git hadoop-tools/hadoop-sls/src/main/bin/slsrun.sh hadoop-tools/hadoop-sls/src/main/bin/slsrun.sh index 5a9de89ae69..fa442ef1c73 100644 --- hadoop-tools/hadoop-sls/src/main/bin/slsrun.sh +++ hadoop-tools/hadoop-sls/src/main/bin/slsrun.sh @@ -21,6 +21,7 @@ function hadoop_usage() echo " (deprecated --input-rumen= | --input-sls=)" echo " --output-dir=" echo " [--nodes=]" + echo " [--subclusters=]" echo " [--track-jobs=]" echo " [--print-simulation]" } @@ -53,6 +54,9 @@ function parse_args() --print-simulation) printsimulation="true" ;; + --subclusters=*) + subclusters=${i#*=} + ;; *) hadoop_error "ERROR: Invalid option ${i}" hadoop_exit_with_usage 1 @@ -110,6 +114,10 @@ function run_simulation() { hadoop_add_param args -printsimulation "-printsimulation" fi + if [[ -n "${subclusters}" ]] ; then + hadoop_add_param args -subclusters "-subclusters ${subclusters}" + fi + hadoop_add_client_opts hadoop_finalize diff --git hadoop-tools/hadoop-sls/src/main/data/nodes.csv hadoop-tools/hadoop-sls/src/main/data/nodes.csv new file mode 100644 index 00000000000..621163dfa87 --- /dev/null +++ hadoop-tools/hadoop-sls/src/main/data/nodes.csv @@ -0,0 +1,22 @@ +node7, cluster123, default-rack1 +node0, cluster123, default-rack1 +node480, cluster124, default-rack13 +node482, cluster124, default-rack13 +node483, cluster124, default-rack13 +node484, cluster124, default-rack13 +node485, cluster124, default-rack13 +node486, cluster124, default-rack13 +node487, cluster124, default-rack13 +node488, cluster124, default-rack13 +node489, cluster124, default-rack13 +node490, cluster124, default-rack13 +node491, cluster124, default-rack13 +node492, cluster124, default-rack13 +node493, cluster124, default-rack13 +node494, cluster124, default-rack13 +node495, cluster124, default-rack13 +node496, cluster124, default-rack13 +node497, cluster124, default-rack13 +node498, cluster124, default-rack13 +node499, cluster124, default-rack13 +node500, cluster124, default-rack13 diff --git hadoop-tools/hadoop-sls/src/main/data/subclusters.json hadoop-tools/hadoop-sls/src/main/data/subclusters.json new file mode 100644 index 00000000000..5840e0d043c --- /dev/null +++ hadoop-tools/hadoop-sls/src/main/data/subclusters.json @@ -0,0 +1,75 @@ +{ + "subclusters": { + "entry": { + "key": "cluster123", + "value": { + "nodes": [ + "node7", + "node0" + ], + "configs": { + "entry": { + "key": "yarn.resourcemanager.epoch", + "value": "1329" + } + } + } + }, + "entry": { + "key": "cluster124", + "value": { + "nodes": [ + "node480", + "node481", + "node482", + "node483", + "node484", + "node485", + "node486", + "node487", + "node488", + "node489", + "node490", + "node491", + "node492", + "node493", + "node494", + "node495", + "node496", + "node497", + "node498", + "node499", + "node500" + ], + "configs": { + "entry": { + "key": "yarn.resourcemanager.scheduler.address", + "value": "localhost:9030" + }, + "entry": { + "key": "yarn.resourcemanager.resource-tracker.address", + "value": "localhost:9031" + }, + "entry": { + "key": "yarn.resourcemanager.admin.address", + "value": "localhost:9033" + }, + "entry": { + "key": "yarn.resourcemanager.address", + "value": "localhost:9032" + }, + "entry": { + "key": "yarn.resourcemanager.webapp.address", + "value": "localhost:9088" + }, + "entry": { + "key": "yarn.resourcemanager.epoch", + "value": "23" + } + } + } + } + } +} + + diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/ClusterInfo.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/ClusterInfo.java new file mode 100644 index 00000000000..6ba9955e2d4 --- /dev/null +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/ClusterInfo.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.sls; + +import javax.xml.bind.JAXBException; +import javax.xml.bind.MarshalException; +import javax.xml.bind.Marshaller; +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.io.File; +import java.io.FileInputStream; +import java.io.StringReader; +import java.io.StringWriter; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + + +import com.sun.jersey.api.json.JSONConfiguration; +import com.sun.jersey.api.json.JSONJAXBContext; +import com.sun.jersey.api.json.JSONMarshaller; +import com.sun.jersey.api.json.JSONUnmarshaller; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.mapreduce.Cluster; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +@XmlRootElement(name ="federation-sls") +@XmlAccessorType(XmlAccessType.FIELD) +public class ClusterInfo { + private Map subclusters = new HashMap<>(); + + private static final Logger LOG = LoggerFactory.getLogger(ClusterInfo.class); + private static JSONJAXBContext jsonjaxbContext = initContext(); + + public ClusterInfo(){ + + } + + public Map getSubclusters() { + return subclusters; + } + + public void setSubclusters(Map subclusters) { + this.subclusters = subclusters; + } + + private static JSONJAXBContext initContext() { + try{ + return new JSONJAXBContext(JSONConfiguration.DEFAULT, ClusterInfo.class); + } catch (JAXBException e) + { + LOG.error("Couldn't parse cluster info.", e); + } + return null; + } + + public static ClusterInfo fromFile(File f) throws Exception { + if(jsonjaxbContext == null){ + throw new Exception("Didn't initialize JsonJAXBContext"); + } + + JSONUnmarshaller unmarshaller = jsonjaxbContext.createJSONUnmarshaller(); + FileInputStream in = new FileInputStream(f); + ClusterInfo clusterInfo = null; + try { + clusterInfo = unmarshaller.unmarshalFromJSON(in, ClusterInfo.class); + } finally { + in.close(); + } + return clusterInfo; + } + public static ClusterInfo fromString(String f) throws Exception { + if(jsonjaxbContext == null){ + throw new Exception("Didn't initialize JsonJAXBContext"); + } + + JSONUnmarshaller unmarshaller = jsonjaxbContext.createJSONUnmarshaller(); + ClusterInfo clusterInfo = unmarshaller.unmarshalFromJSON(new StringReader(f), ClusterInfo.class); + return clusterInfo; + } + +} diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index e85973258a0..3aa044a61d2 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -22,6 +22,8 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.Reader; +import java.lang.reflect.InvocationTargetException; +import java.text.MessageFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -29,6 +31,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Random; import java.util.Set; import java.util.Collections; @@ -58,14 +61,30 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; +import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy; @@ -73,6 +92,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; +import org.apache.hadoop.yarn.server.router.Router; import org.apache.hadoop.yarn.sls.appmaster.AMSimulator; import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator; @@ -123,7 +143,7 @@ private long maxRuntime; private final static Map simulateInfoMap = - new HashMap(); + new HashMap(); // logger public final static Logger LOG = LoggerFactory.getLogger(SLSRunner.class); @@ -133,6 +153,20 @@ private static boolean exitAtTheFinish = false; + private static final String DEFAULT_RM_KEY = "DEFAULT_RM"; + private ApplicationMasterProtocol rmProxy; + + private NMContext nmSimContext; + private AsyncDispatcher dispatcher; + private Map nodeToSubCluster; + private Map subIdToRM; + + private boolean isFederated = false; + private ApplicationClientProtocol clientRM; + private Router router; + + private String subClusterFile; + /** * The type of trace in input. */ @@ -167,6 +201,8 @@ private void init(Configuration tempConf) throws ClassNotFoundException { queueAppNumMap = new HashMap<>(); amMap = new ConcurrentHashMap<>(); amClassMap = new HashMap<>(); + nodeToSubCluster = new HashMap<>(); + subIdToRM = new HashMap<>(); // runner configuration setConf(tempConf); @@ -217,31 +253,50 @@ private Resource getNodeManagerResource() { } public void setSimulationParams(TraceType inType, String[] inTraces, - String nodes, String outDir, Set trackApps, - boolean printsimulation) throws IOException, ClassNotFoundException { + String nodes, String clusters, String outDir, Set trackApps, + boolean printsimulation) { this.inputType = inType; this.inputTraces = inTraces.clone(); this.nodeFile = nodes; this.trackedApps = trackApps; + this.subClusterFile = clusters; this.printSimulation = printsimulation; metricsOutputDir = outDir; - + this.isFederated = HAUtil.isFederationEnabled(getConf()); + if (isFederated && subClusterFile == null) { + String errorMsg = + "Federation is enabled but no subclusters file is given."; + LOG.error(errorMsg); + printUsage(); + throw new RuntimeException(errorMsg); + } } - public void start() throws IOException, ClassNotFoundException, YarnException, - InterruptedException { + public void start() throws Exception { // start resource manager startRM(); + //start AMRMProxy + this.nmSimContext = initializeNMSimulatorContext(); + if (!isFederated) { + this.rmProxy = getDefaultRM().getApplicationMasterService(); + this.clientRM = getDefaultRM().getClientRMService(); + }else{ + startAMRMProxy(); + startRouter(); + this.clientRM = router.getClientRMService(); + } // start node managers startNM(); // start application masters startAM(); // set queue & tracked apps information - ((SchedulerWrapper) rm.getResourceScheduler()).getTracker() - .setQueueSet(this.queueAppNumMap.keySet()); - ((SchedulerWrapper) rm.getResourceScheduler()).getTracker() - .setTrackedAppSet(this.trackedApps); + for (Entry entry : subIdToRM.entrySet()) { + ((SchedulerWrapper) entry.getValue().getResourceScheduler()).getTracker() + .setQueueSet(this.queueAppNumMap.keySet()); + ((SchedulerWrapper) entry.getValue().getResourceScheduler()).getTracker() + .setTrackedAppSet(this.trackedApps); + } // print out simulation info printSimulationInfo(); // blocked until all nodes RUNNING @@ -250,8 +305,132 @@ public void start() throws IOException, ClassNotFoundException, YarnException, runner.start(); } - private void startRM() throws ClassNotFoundException, YarnException { - Configuration rmConf = new YarnConfiguration(getConf()); + private NMContext initializeNMSimulatorContext(){ + NMContext context = null; + try { + context = createNMSimContext(); + this.nmSimContext = context; + context.setNodeId(NodeId.newInstance("localhost", 0)); + }catch(Exception e){ + LOG.error("Error creating NMSimulator context", e); + } + return context; + } + + private NMContext createNMSimContext() { + NMStateStoreService nmStore = new NMNullStateStoreService(); + nmStore.init(getConf()); + NMTokenSecretManagerInNM nmTokenSecretManager = new NMTokenSecretManagerInNM(); + NMContainerTokenSecretManager nmContainerTokenSecretManager = + new NMContainerTokenSecretManager(getConf(), nmStore); + boolean isDistSchedulingEnabled = + getConf().getBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, + YarnConfiguration.DEFAULT_DIST_SCHEDULING_ENABLED); + return new NMContext(nmContainerTokenSecretManager, nmTokenSecretManager, + null, null, nmStore, isDistSchedulingEnabled, getConf()); + } + + public ApplicationClientProtocol getClientRM() { + return clientRM; + } + + public ApplicationMasterProtocol getRMProxy() { + return rmProxy; + } + + public ResourceManager getRMFromAppId(ApplicationId appId) + throws YarnException { + if (!isFederated) { + return getDefaultRM(); + } + GetApplicationHomeSubClusterRequest request = + GetApplicationHomeSubClusterRequest.newInstance(appId); + GetApplicationHomeSubClusterResponse resp = + FederationStateStoreFacade.getInstance().getStateStoreClient() + .getApplicationHomeSubCluster(request); + return getRMFromClusterId( + resp.getApplicationHomeSubCluster().getHomeSubCluster().toString()); + } + + public ResourceManager getRMFromClusterId(String clusterId) { + return subIdToRM.get(clusterId); + } + + private void startAMRMProxy() { + AsyncDispatcher dispatcher = new AsyncDispatcher(); + this.dispatcher = dispatcher; + AMRMProxyService amrmproxy = + new AMRMProxyService(this.nmSimContext, dispatcher); + amrmproxy.init(getConf()); + amrmproxy.start(); + this.rmProxy = amrmproxy; + amrmproxy.start(); + } + + private void startRouter() { + router = new Router(false); + router.init(getConf()); + router.start(); + } + + private void startRM() throws Exception { + Configuration rmConf = getConf(); + if(!isFederated){ + subIdToRM.put(DEFAULT_RM_KEY, startAnRM(rmConf)); + return; + } + + FederationStateStoreFacade facade = + FederationStateStoreFacade.getInstance(); + facade.getStateStoreClient().init(getConf()); + facade.reinitialize(facade.getStateStoreClient(), getConf()); + loadRMs(rmConf); + } + + private void loadRMs(Configuration conf) throws Exception { + String file = subClusterFile; + ClusterInfo cluster = ClusterInfo.fromFile(new File(file)); + Map map = cluster.getSubclusters(); + for (Entry entry : map.entrySet()) { + String subId = entry.getKey(); + SubClusterInfo sub = entry.getValue(); + List nodes = sub.getNodes(); + for (String n : nodes) { + nodeToSubCluster.put(n, subId); + } + Configuration configuration = new Configuration(conf); + for (Map.Entry confStr : sub.getConfigs().entrySet()) { + configuration.set(confStr.getKey(), confStr.getValue()); + } + configuration.set(YarnConfiguration.RM_CLUSTER_ID, subId); + ResourceManager rm = startAnRM(configuration); + subIdToRM.put(subId, rm); + //Wait needed in order to make sure cluster timestamps are different + Thread.sleep(10); + } + waitForRMsActive(); + } + + private void waitForRMsActive() throws YarnException, InterruptedException { + FederationStateStoreFacade stateStore = + FederationStateStoreFacade.getInstance(); + for (String subId : subIdToRM.keySet()) { + if(subId.equals(DEFAULT_RM_KEY)){ + continue; + } + LOG.info(MessageFormat.format("Waiting for {0}", subId)); + while (!stateStore.getSubCluster(SubClusterId.newInstance(subId)) + .getState().isActive()) { + Thread.sleep(getConf().getInt( + YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS, + YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS)); + } + LOG.info(MessageFormat.format("{0} up and active", subId)); + } + } + + private ResourceManager startAnRM(Configuration rmConf) + throws ClassNotFoundException, YarnException { String schedulerClass = rmConf.get(YarnConfiguration.RM_SCHEDULER); if (Class.forName(schedulerClass) == CapacityScheduler.class) { @@ -285,7 +464,17 @@ protected ApplicationMasterLauncher createAMLauncher() { // Init and start the actual ResourceManager rm.init(rmConf); + if(isFederated) { + try{ + rm.getFederationStateStoreService().reinitialize( + FederationStateStoreFacade.getInstance().getStateStoreClient(), + rmConf); + } catch (Exception e) { + LOG.error("Couldn't reinitialize RM StateStoreService", e); + } + } rm.start(); + return rm; } private void startNM() throws YarnException, IOException, @@ -393,6 +582,23 @@ private void waitForNodesRunning() throws InterruptedException { System.currentTimeMillis() - startTimeMS); } + public boolean isFederated(){ + return isFederated; + } + + public AsyncDispatcher getDispatcher(){ + return dispatcher; + } + + public NMContext getNmSimContext(){ + return nmSimContext; + } + + //Only to be used in non-federated mode + public ResourceManager getDefaultRM(){ + return subIdToRM.get(DEFAULT_RM_KEY); + } + @SuppressWarnings("unchecked") private void startAM() throws YarnException, IOException { switch (inputType) { @@ -588,6 +794,7 @@ private void startAMFromRumenTrace(String inputTrace, long baselineTimeMS) createAMForJob(job, baselineTimeMS); } catch (Exception e) { LOG.error("Failed to create an AM: {}", e.getMessage()); + e.printStackTrace(); } job = reader.getNext(); @@ -819,12 +1026,12 @@ private void printSimulationInfo() { LOG.info("------------------------------------"); LOG.info("# nodes = {}, # racks = {}, capacity " + "of each node {}.", - numNMs, numRacks, nodeManagerResource); + numNMs, numRacks, nodeManagerResource); LOG.info("------------------------------------"); // job LOG.info("# applications = {}, # total " + "tasks = {}, average # tasks per application = {}", - numAMs, numTasks, (int)(Math.ceil((numTasks + 0.0) / numAMs))); + numAMs, numTasks, (int)(Math.ceil((numTasks + 0.0) / numAMs))); LOG.info("JobId\tQueue\tAMType\tDuration\t#Tasks"); for (Map.Entry entry : amMap.entrySet()) { AMSimulator am = entry.getValue(); @@ -852,12 +1059,12 @@ private void printSimulationInfo() { simulateInfoMap.put("Number of applications", numAMs); simulateInfoMap.put("Number of tasks", numTasks); simulateInfoMap.put("Average tasks per applicaion", - (int)(Math.ceil((numTasks + 0.0) / numAMs))); + (int)(Math.ceil((numTasks + 0.0) / numAMs))); simulateInfoMap.put("Number of queues", queueAppNumMap.size()); simulateInfoMap.put("Average applications per queue", - (int)(Math.ceil((numAMs + 0.0) / queueAppNumMap.size()))); + (int)(Math.ceil((numAMs + 0.0) / queueAppNumMap.size()))); simulateInfoMap.put("Estimated simulate time (s)", - (long)(Math.ceil(maxRuntime / 1000.0))); + (long)(Math.ceil(maxRuntime / 1000.0))); } public Map getNmMap() { @@ -876,12 +1083,26 @@ public static void decreaseRemainingApps() { } public void stop() throws InterruptedException { - rm.stop(); + if (isFederated) { + for (Entry e : subIdToRM.entrySet()) { + e.getValue().getFederationStateStoreService() + .reinitialize(null, getConf()); + e.getValue().stop(); + } + try { + FederationStateStoreFacade.getInstance().getStateStoreClient().close(); + } catch (Exception e) { + e.printStackTrace(); + } + ((AMRMProxyService) rmProxy).stop(); + router.stop(); + }else{ + getDefaultRM().stop(); + } runner.stop(); } - public int run(final String[] argv) throws IOException, InterruptedException, - ParseException, ClassNotFoundException, YarnException { + public int run(final String[] argv) throws IOException, ParseException, ClassNotFoundException, YarnException { Options options = new Options(); @@ -897,6 +1118,7 @@ public int run(final String[] argv) throws IOException, InterruptedException, options.addOption("output", true, "output directory"); options.addOption("trackjobs", true, "jobs to be tracked during simulating"); + options.addOption("subclusters", true, "subcluster topology"); options.addOption("printsimulation", false, "print out simulation information"); @@ -939,6 +1161,8 @@ public int run(final String[] argv) throws IOException, InterruptedException, String tempNodeFile = cmd.hasOption("nodes") ? cmd.getOptionValue("nodes") : ""; + String subclusters = + cmd.hasOption("subclusters") ? cmd.getOptionValue("subclusters") : null; TraceType tempTraceType = TraceType.SLS; switch (traceType) { @@ -959,10 +1183,14 @@ public int run(final String[] argv) throws IOException, InterruptedException, String[] inputFiles = traceLocation.split(","); - setSimulationParams(tempTraceType, inputFiles, tempNodeFile, output, - trackedJobSet, cmd.hasOption("printsimulation")); + setSimulationParams(tempTraceType, inputFiles, tempNodeFile, subclusters, + output, trackedJobSet, cmd.hasOption("printsimulation")); - start(); + try { + start(); + } catch (Exception e) { + throw new YarnException("Failed to start SLS runner", e); + } return 0; } @@ -980,7 +1208,8 @@ static void printUsage() { "Options: -tracetype " + "SLS|RUMEN|SYNTH -tracelocation FILE,FILE... " + "(deprecated alternative options --inputsls FILE, FILE,... " + " | --inputrumen FILE,FILE,...)" - + "-output FILE [-nodes FILE] [-trackjobs JobId,JobId...] " + + "-output FILE [-nodes FILE] [-subclusters FILE] " + + "[-trackjobs JobId,JobId...] " + "[-printsimulation]"); System.err.println(); } diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SubClusterInfo.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SubClusterInfo.java new file mode 100644 index 00000000000..3e1a216d7cf --- /dev/null +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SubClusterInfo.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.sls; + +import javax.xml.bind.JAXBException; +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import com.sun.jersey.api.json.JSONConfiguration; +import com.sun.jersey.api.json.JSONJAXBContext; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +@XmlRootElement(name = "federation-sls") +@XmlAccessorType(XmlAccessType.FIELD) +public class SubClusterInfo { + private static final Logger LOG = + LoggerFactory.getLogger(SubClusterInfo.class); + private static JSONJAXBContext jsonjaxbContext = initContext(); + private List nodes = new LinkedList<>(); + private Map configs = new HashMap<>(); + + public List getNodes() { + return nodes; + } + + public void setNodes(List nodes) { + this.nodes = nodes; + } + + public Map getConfigs() { + return configs; + } + + public void setConfigs(Map configs) { + this.configs = configs; + } + + public SubClusterInfo() { + + } + + private static JSONJAXBContext initContext() { + try { + return new JSONJAXBContext(JSONConfiguration.DEFAULT, + SubClusterInfo.class); + } catch (JAXBException e) { + LOG.error("Couldn't parse subcluster info.", e); + } + return null; + } +} diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java index 8e1c256c636..1bf787b8c7a 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java @@ -20,7 +20,9 @@ import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; +import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; +import java.text.MessageFormat; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -30,17 +32,24 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -50,15 +59,23 @@ import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.api.ContainerType; +import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; @@ -114,6 +131,11 @@ private ReservationSubmissionRequest reservationRequest; + private UserGroupInformation ugiUser; + + private ApplicationMasterProtocol rmProxy; + private ApplicationClientProtocol clientRm; + public AMSimulator() { this.responseQueue = new LinkedBlockingQueue<>(); } @@ -136,6 +158,51 @@ public void init(int heartbeatInterval, this.traceStartTimeMS = startTime; this.traceFinishTimeMS = finishTime; this.amContainerResource = amResource; + this.rmProxy = + se == null ? rm.getApplicationMasterService() : se.getRMProxy(); + this.clientRm = se == null ? rm.getClientRMService() : se.getClientRM(); + } + + private StartContainerRequest initializeStartRequest( + ApplicationAttemptId appAttemptId, int id, Token token) { + ContainerTokenIdentifier cid = + createContainerTokenIdentifier(appAttemptId, id); + se.getNmSimContext().getContainerTokenSecretManager() + .startContainerSuccessful(cid); + ContainerLaunchContext amContext = BuilderUtils + .newContainerLaunchContext(new HashMap(), + new HashMap(), null, + new HashMap(), null, + new HashMap()); + StartContainerRequest req = null; + try { + req = (StartContainerRequest.newInstance(amContext, BuilderUtils + .newContainerToken(se.getNmSimContext().getNodeId(), new byte[1], + cid))); + Credentials creds = YarnServerSecurityUtils. + parseCredentials(req.getContainerLaunchContext()); + creds.addToken(token.getService(), token); + DataOutputBuffer dob = new DataOutputBuffer(); + creds.writeTokenStorageToStream(dob); + req.getContainerLaunchContext() + .setTokens(ByteBuffer.wrap(dob.getData(), 0, dob.getLength())); + } catch (SecretManager.InvalidToken invalidToken) { + invalidToken.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + return req; + } + + private ContainerTokenIdentifier createContainerTokenIdentifier( + ApplicationAttemptId appAttemptId, int id) { + ContainerTokenIdentifier containerTokenIdentifier = + new ContainerTokenIdentifier( + ContainerId.newContainerId(appAttemptId, id), "473", user, + amContainerResource, System.currentTimeMillis() + 100000L, 0, 1234, + Priority.newInstance(0), 0, null, null, + ContainerType.APPLICATION_MASTER); + return containerTokenIdentifier; } /** @@ -167,6 +234,26 @@ public synchronized void notifyAMContainerLaunched(Container masterContainer) throws Exception { this.amContainer = masterContainer; this.appAttemptId = masterContainer.getId().getApplicationAttemptId(); + this.ugiUser = + UserGroupInformation.createRemoteUser(this.appAttemptId.toString()); + if (se != null && se.isFederated()) { + rm = se.getRMFromAppId(appId); + LOG.info(MessageFormat.format("App {0} is running on {1}", appId, + YarnConfiguration.getClusterId(rm.getConfig()))); + //preparing the AMRMProxy to start the application + AMRMProxyService amrmproxy = (AMRMProxyService) rmProxy; + Token token = + rm.getRMContext().getAMRMTokenSecretManager() + .createAndGetAMRMToken(appAttemptId); + StartContainerRequest initialStartReq = + initializeStartRequest(appAttemptId, appId.getId(), + token); + synchronized (amrmproxy.getConfig()) { + amrmproxy.getConfig().set(YarnConfiguration.RM_CLUSTER_ID, + YarnConfiguration.getClusterId(rm.getConfig())); + amrmproxy.processApplicationStartRequest(initialStartReq); + } + } registerAM(); isAMContainerRunning = true; } @@ -182,7 +269,7 @@ private ReservationId submitReservationWhenSpecified() ugi.doAs(new PrivilegedExceptionAction() { @Override public Object run() throws YarnException, IOException { - rm.getClientRMService().submitReservation(reservationRequest); + clientRm.submitReservation(reservationRequest); LOG.info("RESERVATION SUCCESSFULLY SUBMITTED " + reservationRequest.getReservationId()); return null; @@ -245,19 +332,22 @@ public void lastStep() throws Exception { ugi.doAs(new PrivilegedExceptionAction() { @Override public Object run() throws Exception { - rm.getApplicationMasterService() - .finishApplicationMaster(finishAMRequest); + rmProxy.finishApplicationMaster(finishAMRequest); return null; } }); + if(se != null && se.isFederated()){ + se.getDispatcher().getEventHandler() + .handle(new ApplicationFinishEvent(appId, "Killed")); + } simulateFinishTimeMS = System.currentTimeMillis() - baselineTimeMS; // record job running information SchedulerMetrics schedulerMetrics = - ((SchedulerWrapper)rm.getResourceScheduler()).getSchedulerMetrics(); + ((SchedulerWrapper)rm.getResourceScheduler()).getSchedulerMetrics(); if (schedulerMetrics != null) { schedulerMetrics.addAMRuntime(appId, traceStartTimeMS, traceFinishTimeMS, - simulateStartTimeMS, simulateFinishTimeMS); + simulateStartTimeMS, simulateFinishTimeMS); } } @@ -306,7 +396,7 @@ private void submitApp(ReservationId reservationId) GetNewApplicationRequest newAppRequest = Records.newRecord(GetNewApplicationRequest.class); GetNewApplicationResponse newAppResponse = - rm.getClientRMService().getNewApplication(newAppRequest); + clientRm.getNewApplication(newAppRequest); appId = newAppResponse.getApplicationId(); // submit the application @@ -337,7 +427,7 @@ private void submitApp(ReservationId reservationId) ugi.doAs(new PrivilegedExceptionAction() { @Override public Object run() throws YarnException, IOException { - rm.getClientRMService().submitApplication(subAppRequest); + clientRm.submitApplication(subAppRequest); return null; } }); @@ -345,28 +435,26 @@ public Object run() throws YarnException, IOException { } private void registerAM() - throws YarnException, IOException, InterruptedException { + throws IOException, InterruptedException { // register application master final RegisterApplicationMasterRequest amRegisterRequest = - Records.newRecord(RegisterApplicationMasterRequest.class); + Records.newRecord(RegisterApplicationMasterRequest.class); amRegisterRequest.setHost("localhost"); amRegisterRequest.setRpcPort(1000); amRegisterRequest.setTrackingUrl("localhost:1000"); - UserGroupInformation ugi = - UserGroupInformation.createRemoteUser(appAttemptId.toString()); Token token = rm.getRMContext().getRMApps().get(appId) .getRMAppAttempt(appAttemptId).getAMRMToken(); - ugi.addTokenIdentifier(token.decodeIdentifier()); + ugiUser.addTokenIdentifier(token.decodeIdentifier()); - ugi.doAs( - new PrivilegedExceptionAction() { - @Override - public RegisterApplicationMasterResponse run() throws Exception { - return rm.getApplicationMasterService() - .registerApplicationMaster(amRegisterRequest); - } - }); + ugiUser.doAs( + new PrivilegedExceptionAction() { + @Override + public RegisterApplicationMasterResponse run() + throws Exception { + return rmProxy.registerApplicationMaster(amRegisterRequest); + } + }); LOG.info("Register the application master for application {}", appId); } @@ -390,6 +478,14 @@ public void untrackApp() { } } + public ApplicationMasterProtocol getRmProxy() { + return rmProxy; + } + + public UserGroupInformation getUgiUser() { + return ugiUser; + } + protected List packageRequests( List csList, int priority) { // create requests diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java index 6f0f85ff904..8db6212042d 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java @@ -343,13 +343,13 @@ protected void sendContainerRequest() .get(appAttemptId.getApplicationId()) .getRMAppAttempt(appAttemptId).getAMRMToken(); ugi.addTokenIdentifier(token.decodeIdentifier()); - AllocateResponse response = ugi.doAs( - new PrivilegedExceptionAction() { - @Override - public AllocateResponse run() throws Exception { - return rm.getApplicationMasterService().allocate(request); - } - }); + AllocateResponse response = getUgiUser().doAs( + new PrivilegedExceptionAction() { + @Override + public AllocateResponse run() throws Exception { + return getRmProxy().allocate(request); + } + }); if (response != null) { responseQueue.put(response); } diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java index b41f5f20296..15d99d4ef69 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java @@ -233,11 +233,11 @@ protected void sendContainerRequest() .get(appAttemptId.getApplicationId()) .getRMAppAttempt(appAttemptId).getAMRMToken(); ugi.addTokenIdentifier(token.decodeIdentifier()); - AllocateResponse response = ugi.doAs( + AllocateResponse response = getUgiUser().doAs( new PrivilegedExceptionAction() { @Override public AllocateResponse run() throws Exception { - return rm.getApplicationMasterService().allocate(request); + return getRmProxy().allocate(request); } }); if (response != null) { diff --git hadoop-tools/hadoop-sls/src/main/sample-conf/yarn-site-federated.xml hadoop-tools/hadoop-sls/src/main/sample-conf/yarn-site-federated.xml new file mode 100644 index 00000000000..57dc9e97774 --- /dev/null +++ hadoop-tools/hadoop-sls/src/main/sample-conf/yarn-site-federated.xml @@ -0,0 +1,186 @@ + + + + + + + yarn.resourcemanager.scheduler.class + + org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler + + + + + + + yarn.sls.enabled + true + + + + The address of the RM web application. + yarn.resourcemanager.webapp.address + localhost:18088 + + + + yarn.resourcemanager.resource-tracker.address + localhost:18031 + + + + The address of the scheduler interface. + yarn.resourcemanager.scheduler.address + localhost:18030 + + + + The address of the applications manager interface in the + RM. + + yarn.resourcemanager.address + localhost:18032 + + + + The address of the RM admin interface. + yarn.resourcemanager.admin.address + localhost:18033 + + + + Set to false, to avoid ip check + hadoop.security.token.service.use_ip + false + + + + yarn.scheduler.capacity.maximum-applications + 1000 + Maximum number of applications in the system which + can be concurrently active both running and pending + + + + + Whether to use preemption. Note that preemption is + experimental + in the current version. Defaults to false. + + yarn.scheduler.fair.preemption + true + + + + Whether to allow multiple container assignments in one + heartbeat. Defaults to false. + + yarn.scheduler.fair.assignmultiple + true + + + + yarn.federation.enabled + true + + + + yarn.nodemanager.amrmproxy.enabled + true + + + + yarn.nodemanager.amrmproxy.interceptor-class.pipeline + + org.apache.hadoop.yarn.server.nodemanager.amrmproxy.FederationInterceptor + + + + + yarn.resourcemanger.cluster-id + cluster124 + + + + yarn.federation.state-store.heartbeat-interval-secs + 5 + + + + yarn.federation.cache-ttl.secs + 0 + + + + yarn.federation.state-store.class + + org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationGPGStateStore + + + + + yarn.federation.machine-list + nodes.resolve + + + + yarn.federation.policy-manager + + org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager + + + + + yarn.federation.policy-manager-params + + {"routerPolicyWeights":{"entry":{"key":{"id":"cluster124"},"value":"1.0"},"entry":{"key":{"id":"cluster123"},"value":"1.0"}},"amrmPolicyWeights":{"entry":{"key":{"id":"cluster124"}, + "value":"1.0"}, + "entry":{"key":{"id":"cluster123"},"value":"1.0"}},"headroomAlpha":"0.5"} + + + + + yarn.federation.state-store.heartbeat-interval-secs + 5 + + + + yarn.federation.state-store.class + + org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationGPGStateStore + + + + + yarn.federation.machine-list + nodes.csv + + + + yarn.federation.policy-manager + + org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager + + + + + yarn.federation.policy-manager-params + + {"routerPolicyWeights":{"entry":{"key":{"id":"cluster124"},"value":"1.0"},"entry":{"key":{"id":"cluster123"},"value":"1.0"}},"amrmPolicyWeights":{"entry":{"key":{"id":"cluster124"}, + "value":"1.0"}, + "entry":{"key":{"id":"cluster123"},"value":"1.0"}},"headroomAlpha":"0.5"} + + + diff --git hadoop-tools/hadoop-sls/src/site/markdown/SchedulerLoadSimulator.md hadoop-tools/hadoop-sls/src/site/markdown/SchedulerLoadSimulator.md index 9df49985507..4219d995328 100644 --- hadoop-tools/hadoop-sls/src/site/markdown/SchedulerLoadSimulator.md +++ hadoop-tools/hadoop-sls/src/site/markdown/SchedulerLoadSimulator.md @@ -187,6 +187,7 @@ The simulator supports two types of input files: the rumen traces and its own in (deprecated --input-rumen= | --input-sls=) --output-dir= [--nodes=] + [--subclusters=] [--track-jobs=] [--print-simulation] @@ -213,6 +214,11 @@ The simulator supports two types of input files: the rumen traces and its own in topology fetched from the input json files. Users can specifies a new topology by setting this parameter. Refer to the appendix for the topology file format. +* `--subclusters`: The federation subcluster topology. This argument is required + if federation is enabled in the `yarn-site.xml` file. This specifies topology of + the subclusters the simulation is to use if federation is enabled. Refer to the + appendix for the topology file format. + * `--track-jobs`: The particular jobs that will be tracked during simulator running, spearated by comma. @@ -502,9 +508,9 @@ Here we provide an example format of the synthetic generator json file. We use * } -### Simulator input topology file format +### Simulator node input topology file format -Here is an example input topology file which has 3 nodes organized in 1 rack. +Here is an example node input topology file which has 3 nodes organized in 1 rack. { "rack" : "default-rack", @@ -517,6 +523,66 @@ Here is an example input topology file which has 3 nodes organized in 1 rack. }] } +### Simulator subcluster input topology file format + +Here is an example subcluster input topology file which has 2 subclusters. There are two nodes in the first subcluster +and one on the other. It is important that this file is consistent with the other node topology file in that nodes listed +in this file must also be listed in the other, and vice versa. Also, the configuration overrides specified for each subcluster +given below are important. The configuration overrides given below for cluster2 that specify overrides in port numbers are +necessary to avoid port collisions across multiple resource managers. + + { + "subclusters": { + "entry": { + "key": "cluster1", + "value": { + "nodes": [ + "node1", + "node2" + ], + "configs": { + "key": "yarn.resourcemanager.epoch", + "value": "1329" + } + } + }, + "entry": { + "key": "cluster2", + "value": { + "nodes": [ + "node3", + ], + "configs": { + "entry": { + "key": "yarn.resourcemanager.scheduler.address", + "value": "localhost:9030" + }, + "entry": { + "key": "yarn.resourcemanager.resource-tracker.address", + "value": "localhost:9031" + }, + "entry": { + "key": "yarn.resourcemanager.admin.address", + "value": "localhost:9033" + }, + "entry": { + "key": "yarn.resourcemanager.address", + "value": "localhost:9032" + }, + "entry": { + "key": "yarn.resourcemanager.webapp.address", + "value": "localhost:9088" + }, + "entry": { + "key": "yarn.resourcemanager.epoch", + "value": "23" + } + } + } + } + } + } + ### Notes on LogNormal distribution: LogNormal distributions represent well many of the parameters we see in practice (e.g., most jobs have a small number of mappers, but few might be very large, and few very small, but greater than zero. It is diff --git hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/BaseSLSRunnerTest.java hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/BaseSLSRunnerTest.java index 668be145d70..b367bb0df50 100644 --- hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/BaseSLSRunnerTest.java +++ hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/BaseSLSRunnerTest.java @@ -55,6 +55,9 @@ @Parameter(value = 3) public String nodeFile; + @Parameter(value = 4) + public String subclustersFile; + protected SLSRunner sls; protected String ongoingInvariantFile; protected String exitInvariantFile; @@ -104,6 +107,11 @@ public void uncaughtException(Thread t, Throwable e) { args = ArrayUtils.addAll(args, new String[] {"-nodes", nodeFile }); } + if (subclustersFile != null) { + args = ArrayUtils + .addAll(args, new String[] { "-subclusters", subclustersFile }); + } + // enable continuous invariant checks conf.set(YarnConfiguration.RM_SCHEDULER, schedulerType); if (ongoingInvariantFile != null) { diff --git hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestReservationSystemInvariants.java hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestReservationSystemInvariants.java index 22e1e2e729e..c39d5de6537 100644 --- hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestReservationSystemInvariants.java +++ hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestReservationSystemInvariants.java @@ -47,9 +47,9 @@ // Test with both schedulers, and all three trace types return Arrays.asList(new Object[][] { {CapacityScheduler.class.getCanonicalName(), "SYNTH", - "src/test/resources/syn.json", null}, + "src/test/resources/syn.json", null, null}, {FairScheduler.class.getCanonicalName(), "SYNTH", - "src/test/resources/syn.json", null} + "src/test/resources/syn.json", null, null} }); } diff --git hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSGenericSynth.java hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSGenericSynth.java index 79ebe219bfc..1a33ec950e3 100644 --- hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSGenericSynth.java +++ hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSGenericSynth.java @@ -50,13 +50,13 @@ return Arrays.asList(new Object[][] { // covering the no nodeFile case - {capScheduler, "SYNTH", synthTraceFile, null }, + {capScheduler, "SYNTH", synthTraceFile, null, null }, // covering new commandline and CapacityScheduler - {capScheduler, "SYNTH", synthTraceFile, nodeFile }, + {capScheduler, "SYNTH", synthTraceFile, nodeFile, null }, // covering FairScheduler - {fairScheduler, "SYNTH", synthTraceFile, nodeFile }, + {fairScheduler, "SYNTH", synthTraceFile, nodeFile, null }, }); } diff --git hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSRunner.java hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSRunner.java index abb3b5e904a..6f769743f29 100644 --- hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSRunner.java +++ hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSRunner.java @@ -51,24 +51,24 @@ // Test with both schedulers, and all three load producers. return Arrays.asList(new Object[][] { - // covering old commandline in tests - {capScheduler, "OLD_RUMEN", rumenTraceFile, nodeFile }, - {capScheduler, "OLD_SLS", slsTraceFile, nodeFile }, + // comafiosavering old commandline in tests + {capScheduler, "OLD_RUMEN", rumenTraceFile, nodeFile, null}, + {capScheduler, "OLD_SLS", slsTraceFile, nodeFile, null }, // covering the no nodeFile case - {capScheduler, "SYNTH", synthTraceFile, null }, - {capScheduler, "RUMEN", rumenTraceFile, null }, - {capScheduler, "SLS", slsTraceFile, null }, + {capScheduler, "SYNTH", synthTraceFile, null, null }, + {capScheduler, "RUMEN", rumenTraceFile, null, null }, + {capScheduler, "SLS", slsTraceFile, null, null }, // covering new commandline and CapacityScheduler - {capScheduler, "SYNTH", synthTraceFile, nodeFile }, - {capScheduler, "RUMEN", rumenTraceFile, nodeFile }, - {capScheduler, "SLS", slsTraceFile, nodeFile }, + {capScheduler, "SYNTH", synthTraceFile, nodeFile, null }, + {capScheduler, "RUMEN", rumenTraceFile, nodeFile, null }, + {capScheduler, "SLS", slsTraceFile, nodeFile, null }, // covering FairScheduler - {fairScheduler, "SYNTH", synthTraceFile, nodeFile }, - {fairScheduler, "RUMEN", rumenTraceFile, nodeFile }, - {fairScheduler, "SLS", slsTraceFile, nodeFile } + {fairScheduler, "SYNTH", synthTraceFile, nodeFile, null }, + {fairScheduler, "RUMEN", rumenTraceFile, nodeFile, null }, + {fairScheduler, "SLS", slsTraceFile, nodeFile, null } }); } diff --git hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSRunnerFederated.java hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSRunnerFederated.java new file mode 100644 index 00000000000..0524057ffd7 --- /dev/null +++ hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSRunnerFederated.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sls; + +import net.jcip.annotations.NotThreadSafe; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.*; + +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.text.MessageFormat; +import java.util.*; +import java.util.logging.Logger; + +/** + * This test performs simple runs of the SLS with different trace types and + * schedulers. + */ +@RunWith(value = Parameterized.class) +@NotThreadSafe +public class TestSLSRunnerFederated extends BaseSLSRunnerTest { + + @Parameters(name = "Testing with: {1}, {0}, (nodeFile {3})") + public static Collection data() { + + String capScheduler = CapacityScheduler.class.getCanonicalName(); + String fairScheduler = FairScheduler.class.getCanonicalName(); + String slsTraceFile = "src/test/resources/inputsls.json"; + String rumenTraceFile = "src/main/data/2jobs2min-rumen-jh.json"; + String synthTraceFile = "src/test/resources/syn.json"; + String nodeFile = "src/test/resources/nodes.json"; + String subclustersFile = "src/test/resources/subclusters.json"; + + // Test with both schedulers, and all three load producers. + return Arrays.asList(new Object[][] { + // covering old commandline in tests + {capScheduler, "OLD_RUMEN", rumenTraceFile, nodeFile, subclustersFile }, + {capScheduler, "OLD_SLS", slsTraceFile, nodeFile, subclustersFile }, + + // covering the no nodeFile case + {capScheduler, "RUMEN", rumenTraceFile, null, subclustersFile }, + {capScheduler, "SLS", slsTraceFile, null, subclustersFile }, + + // covering new commandline and CapacityScheduler + {capScheduler, "RUMEN", rumenTraceFile, nodeFile, subclustersFile }, + {capScheduler, "SLS", slsTraceFile, nodeFile, subclustersFile }, + + // covering FairScheduler + {fairScheduler, "RUMEN", rumenTraceFile, nodeFile, subclustersFile }, + {fairScheduler, "SLS", slsTraceFile, nodeFile, subclustersFile } + }); + } + + @Before + public void setup() { + ongoingInvariantFile = "src/test/resources/ongoing-invariants.txt"; + exitInvariantFile = "src/test/resources/exit-invariants.txt"; + } + + @Test(timeout = 180000) + @SuppressWarnings("all") + public void testSimulatorRunning() throws Exception { + Configuration conf = new Configuration(false); + conf.addResource("yarn-site-federated.xml"); + long timeTillShutdownInsec = 20L; + runSLS(conf, timeTillShutdownInsec); + } +} diff --git hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSStreamAMSynth.java hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSStreamAMSynth.java index a5d30e02d85..ab1eac23373 100644 --- hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSStreamAMSynth.java +++ hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSStreamAMSynth.java @@ -50,13 +50,13 @@ return Arrays.asList(new Object[][] { // covering the no nodeFile case - {capScheduler, "SYNTH", synthTraceFile, null }, + {capScheduler, "SYNTH", synthTraceFile, null, null }, // covering new commandline and CapacityScheduler - {capScheduler, "SYNTH", synthTraceFile, nodeFile }, + {capScheduler, "SYNTH", synthTraceFile, nodeFile, null }, // covering FairScheduler - {fairScheduler, "SYNTH", synthTraceFile, nodeFile }, + {fairScheduler, "SYNTH", synthTraceFile, nodeFile, null }, }); } diff --git hadoop-tools/hadoop-sls/src/test/resources/subclusters.json hadoop-tools/hadoop-sls/src/test/resources/subclusters.json new file mode 100644 index 00000000000..2d4c0e8d49b --- /dev/null +++ hadoop-tools/hadoop-sls/src/test/resources/subclusters.json @@ -0,0 +1,79 @@ +{ + "subclusters": { + "entry": { + "key": "cluster123", + "value": { + "nodes": [ + "node7", + "node0" + ], + "configs": { + "entry": { + "key": "yarn.resourcemanager.epoch", + "value": "1329" + } + } + } + }, + "entry": { + "key": "cluster124", + "value": { + "nodes": [ + "node480", + "node481", + "node482", + "node483", + "node484", + "node485", + "node486", + "node487", + "node488", + "node489", + "node490", + "node491", + "node492", + "node493", + "node494", + "node495", + "node496", + "node497", + "node498", + "node499", + "node500" + ], + "configs": { + "entry": { + "key": "yarn.resourcemanager.scheduler.address", + "value": "localhost:9030" + }, + "entry": { + "key": "yarn.resourcemanager.resource-tracker.address", + "value": "localhost:9031" + }, + "entry": { + "key": "yarn.resourcemanager.admin.address", + "value": "localhost:9033" + }, + "entry": { + "key": "yarn.resourcemanager.address", + "value": "localhost:9032" + }, + "entry": { + "key": "yarn.resourcemanager.webapp.address", + "value": "localhost:9088" + }, + "entry": { + "key": "yarn.sls.metrics.web.address.port", + "value": "12001" + }, + "entry": { + "key": "yarn.resourcemanager.epoch", + "value": "23" + } + } + } + } + } +} + + diff --git hadoop-tools/hadoop-sls/src/test/resources/yarn-site-federated.xml hadoop-tools/hadoop-sls/src/test/resources/yarn-site-federated.xml new file mode 100644 index 00000000000..39bb4a51e8f --- /dev/null +++ hadoop-tools/hadoop-sls/src/test/resources/yarn-site-federated.xml @@ -0,0 +1,137 @@ + + + + + + + yarn.resourcemanager.scheduler.class + + org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler + + + + + + + yarn.sls.enabled + true + + + + The address of the RM web application. + yarn.resourcemanager.webapp.address + localhost:8088 + + + + yarn.resourcemanager.resource-tracker.address + localhost:8031 + + + + The address of the scheduler interface. + yarn.resourcemanager.scheduler.address + localhost:8030 + + + + The address of the applications manager interface in the + RM. + + yarn.resourcemanager.address + localhost:8032 + + + + The address of the RM admin interface. + yarn.resourcemanager.admin.address + localhost:8033 + + + + Set to false, to avoid ip check + hadoop.security.token.service.use_ip + false + + + + yarn.scheduler.capacity.maximum-applications + 1000 + Maximum number of applications in the system which + can be concurrently active both running and pending + + + + + Whether to use preemption. Note that preemption is + experimental + in the current version. Defaults to false. + + yarn.scheduler.fair.preemption + true + + + + yarn.router.bind-host + 0.0.0.0 + + + + yarn.router.clientrm.interceptor-class.pipeline + + org.apache.hadoop.yarn.server.router.clientrm.FederationClientInterceptor + + + + + yarn.nodemanager.amrmproxy.interceptor-class.pipeline + + org.apache.hadoop.yarn.server.nodemanager.amrmproxy.FederationInterceptor + + + + + Whether to allow multiple container assignments in one + heartbeat. Defaults to false. + + yarn.scheduler.fair.assignmultiple + true + + + + yarn.federation.enabled + true + + + + yarn.federation.state-store.heartbeat-interval-secs + 5 + + + + yarn.federation.cache-ttl.secs + 0 + + + + yarn.federation.state-store.class + org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore + + + + yarn.resourcemanager.reservation-system.enable + false + + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index fe7cb8f7640..1017f664c9c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -178,7 +178,16 @@ private static void addDeprecatedKeys() { IPC_PREFIX + "rpc.class"; public static final String DEFAULT_IPC_RPC_IMPL = "org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC"; - + + //////////////////////////////// + //Scheduled Load Simulator Configs + //////////////////////////////// + public static final String SLS_PREFIX = YARN_PREFIX + "sls"; + + //Required to be enabled in Federated SLS + public static final String SLS_ENABLED = SLS_PREFIX + ".enabled"; + public static final boolean DEFAULT_SLS_ENABLED = false; + //////////////////////////////// // Resource Manager Configs //////////////////////////////// @@ -210,6 +219,10 @@ private static void addDeprecatedKeys() { RM_PREFIX + "auto-update.containers"; public static final boolean DEFAULT_RM_AUTO_UPDATE_CONTAINERS = false; + public static final String RM_APPID_PROVIDER = RM_PREFIX + ".appIdProvider"; + public static final String DEFAULT_RM_APPID_PROVIDER = + "org.apache.hadoop.yarn.server.resourcemanager.DefaultApplicationIdProvider"; + /** The actual bind address for the RM.*/ public static final String RM_BIND_HOST = RM_PREFIX + "bind-host"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java index 25a9e52a8d3..249798397ee 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java @@ -565,6 +565,10 @@ protected String buildCacheKey(String typeName, String methodName, return buffer.toString(); } + public FederationStateStore getStateStoreClient() { + return stateStore; + } + /** * Internal class that implements the CacheLoader interface that can be * plugged into the CacheManager to load objects into the cache for specified diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationIdProvider.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationIdProvider.java new file mode 100644 index 00000000000..bea02fd7650 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationIdProvider.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.terracotta.offheapstore.util.FindbugsSuppressWarnings; + +public abstract class ApplicationIdProvider { + + private static final Log LOG = LogFactory.getLog(ApplicationIdProvider.class); + private static ApplicationIdProvider INSTANCE = null; + private static Object lock = new Object(); + + public abstract ApplicationId getNewApplicationId(Configuration conf, + RecordFactory recordFactory); + + @FindbugsSuppressWarnings + public static ApplicationIdProvider getInstance(Configuration conf) { + if (INSTANCE != null) { + return INSTANCE; + } + synchronized (lock) { + if (INSTANCE != null) { + return INSTANCE; + } + String providerName = conf.get(YarnConfiguration.RM_APPID_PROVIDER, + YarnConfiguration.DEFAULT_RM_APPID_PROVIDER); + try { + Class clazz = conf.getClassByName(providerName); + INSTANCE = + (ApplicationIdProvider) ReflectionUtils.newInstance(clazz, conf); + } catch (ClassNotFoundException e) { + LOG.fatal("Bad application id provider given", e); + } + } + return INSTANCE; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index be997534dd2..52bf13b7070 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -197,7 +197,6 @@ private static final Log LOG = LogFactory.getLog(ClientRMService.class); - final private AtomicInteger applicationCounter = new AtomicInteger(0); final private YarnScheduler scheduler; final private RMContext rmContext; private final RMAppManager rmAppManager; @@ -335,9 +334,8 @@ private boolean checkAccess(UserGroupInformation callerUGI, String owner, } ApplicationId getNewApplicationId() { - ApplicationId applicationId = org.apache.hadoop.yarn.server.utils.BuilderUtils - .newApplicationId(recordFactory, ResourceManager.getClusterTimeStamp(), - applicationCounter.incrementAndGet()); + ApplicationId applicationId = ApplicationIdProvider.getInstance(getConfig()) + .getNewApplicationId(getConfig(), recordFactory); LOG.info("Allocated new applicationId: " + applicationId.getId()); return applicationId; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultApplicationIdProvider.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultApplicationIdProvider.java new file mode 100644 index 00000000000..569d8349339 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultApplicationIdProvider.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; + +import java.util.concurrent.atomic.AtomicInteger; + +public class DefaultApplicationIdProvider extends ApplicationIdProvider{ + + final private AtomicInteger applicationCounter = new AtomicInteger(0); + + @Override + public ApplicationId getNewApplicationId(Configuration conf, + RecordFactory recordFactory) { + return BuilderUtils + .newApplicationId(recordFactory, ResourceManager.getClusterTimeStamp(), + applicationCounter.incrementAndGet()); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMNMInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMNMInfo.java index 1b7ddd3b278..7da5966bdf3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMNMInfo.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMNMInfo.java @@ -29,7 +29,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.util.MBeans; +import org.apache.hadoop.yarn.conf.HAUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; @@ -45,7 +48,7 @@ /** * Constructor for RMNMInfo registers the bean with JMX. - * + * * @param rmc resource manager's context object * @param sched resource manager's scheduler object */ @@ -55,10 +58,17 @@ public RMNMInfo(RMContext rmc, ResourceScheduler sched) { StandardMBean bean; try { - bean = new StandardMBean(this,RMNMInfoBeans.class); - MBeans.register("ResourceManager", "RMNMInfo", bean); + bean = new StandardMBean(this, RMNMInfoBeans.class); + Configuration conf = rmc.getYarnConfiguration(); + String suffix = ""; + boolean slsEnabled = conf.getBoolean(YarnConfiguration.SLS_ENABLED, + YarnConfiguration.DEFAULT_SLS_ENABLED); + if (HAUtil.isFederationEnabled(conf) && slsEnabled) { + suffix = "-" + YarnConfiguration.getClusterId(conf); + } + MBeans.register("ResourceManager", "RMNMInfo" + suffix, bean); } catch (NotCompliantMBeanException e) { - LOG.warn("Error registering RMNMInfo MBean", e); + LOG.warn("Error registering RMNMInfo MBean", e); } LOG.info("Registered RMNMInfo MBean"); } @@ -70,7 +80,7 @@ public RMNMInfo(RMContext rmc, ResourceScheduler sched) { /** * Implements getLiveNodeManagers() - * + * * @return JSON formatted string containing statuses of all node managers */ @Override // RMNMInfoBeans diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java index 530184fe1b9..16e801f4e53 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java @@ -301,4 +301,11 @@ public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster( DeleteApplicationHomeSubClusterRequest request) throws YarnException { return stateStoreClient.deleteApplicationHomeSubCluster(request); } + + public void reinitialize(FederationStateStore stateStore, + Configuration conf) { + this.stateStoreClient = stateStore; + setConfig(conf); + this.config = conf; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java index 76050d067f7..1967e97fa66 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java @@ -28,6 +28,7 @@ import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil; @@ -67,6 +68,7 @@ private RouterClientRMService clientRMProxyService; private RouterRMAdminService rmAdminProxyService; private WebApp webApp; + private boolean hasWebApp; @VisibleForTesting protected String webAppAddress; @@ -78,7 +80,12 @@ private static final String METRICS_NAME = "Router"; public Router() { + this(true); + } + + public Router(boolean hasWebApp) { super(Router.class.getName()); + this.hasWebApp = hasWebApp; } protected void doSecureLogin() throws IOException { @@ -110,7 +117,9 @@ protected void serviceStart() throws Exception { } catch (IOException e) { throw new YarnRuntimeException("Failed Router login", e); } - startWepApp(); + if (hasWebApp) { + startWepApp(); + } super.serviceStart(); } @@ -182,4 +191,8 @@ public static void main(String[] argv) { System.exit(-1); } } + + public ApplicationClientProtocol getClientRMService() { + return clientRMProxyService; + } }