diff --git hadoop-tools/hadoop-sls/pom.xml hadoop-tools/hadoop-sls/pom.xml
index a42f8decd8d..4fc056d07d1 100644
--- hadoop-tools/hadoop-sls/pom.xml
+++ hadoop-tools/hadoop-sls/pom.xml
@@ -73,6 +73,10 @@
com.fasterxml.jackson.core
jackson-databind
+
+ org.apache.hadoop
+ hadoop-yarn-server-router
+
@@ -123,6 +127,8 @@
src/main/data/2jobs2min-rumen-jh.json
+ src/main/data/nodes.csv
+ src/main/data/subclusters.json
src/main/html/js/thirdparty/jquery.js
src/main/html/js/thirdparty/d3-LICENSE
src/main/html/js/thirdparty/d3.v3.js
@@ -131,6 +137,7 @@
src/main/html/track.html.template
src/test/resources/simulate.html.template
src/test/resources/simulate.info.html.template
+ src/test/resources/subclusters.json
src/test/resources/track.html.template
src/test/resources/syn.json
src/test/resources/syn_generic.json
diff --git hadoop-tools/hadoop-sls/src/main/bin/slsrun.sh hadoop-tools/hadoop-sls/src/main/bin/slsrun.sh
index 5a9de89ae69..fa442ef1c73 100644
--- hadoop-tools/hadoop-sls/src/main/bin/slsrun.sh
+++ hadoop-tools/hadoop-sls/src/main/bin/slsrun.sh
@@ -21,6 +21,7 @@ function hadoop_usage()
echo " (deprecated --input-rumen= | --input-sls=)"
echo " --output-dir="
echo " [--nodes=]"
+ echo " [--subclusters=]"
echo " [--track-jobs=]"
echo " [--print-simulation]"
}
@@ -53,6 +54,9 @@ function parse_args()
--print-simulation)
printsimulation="true"
;;
+ --subclusters=*)
+ subclusters=${i#*=}
+ ;;
*)
hadoop_error "ERROR: Invalid option ${i}"
hadoop_exit_with_usage 1
@@ -110,6 +114,10 @@ function run_simulation() {
hadoop_add_param args -printsimulation "-printsimulation"
fi
+ if [[ -n "${subclusters}" ]] ; then
+ hadoop_add_param args -subclusters "-subclusters ${subclusters}"
+ fi
+
hadoop_add_client_opts
hadoop_finalize
diff --git hadoop-tools/hadoop-sls/src/main/data/nodes.csv hadoop-tools/hadoop-sls/src/main/data/nodes.csv
new file mode 100644
index 00000000000..621163dfa87
--- /dev/null
+++ hadoop-tools/hadoop-sls/src/main/data/nodes.csv
@@ -0,0 +1,22 @@
+node7, cluster123, default-rack1
+node0, cluster123, default-rack1
+node480, cluster124, default-rack13
+node482, cluster124, default-rack13
+node483, cluster124, default-rack13
+node484, cluster124, default-rack13
+node485, cluster124, default-rack13
+node486, cluster124, default-rack13
+node487, cluster124, default-rack13
+node488, cluster124, default-rack13
+node489, cluster124, default-rack13
+node490, cluster124, default-rack13
+node491, cluster124, default-rack13
+node492, cluster124, default-rack13
+node493, cluster124, default-rack13
+node494, cluster124, default-rack13
+node495, cluster124, default-rack13
+node496, cluster124, default-rack13
+node497, cluster124, default-rack13
+node498, cluster124, default-rack13
+node499, cluster124, default-rack13
+node500, cluster124, default-rack13
diff --git hadoop-tools/hadoop-sls/src/main/data/subclusters.json hadoop-tools/hadoop-sls/src/main/data/subclusters.json
new file mode 100644
index 00000000000..5840e0d043c
--- /dev/null
+++ hadoop-tools/hadoop-sls/src/main/data/subclusters.json
@@ -0,0 +1,75 @@
+{
+ "subclusters": {
+ "entry": {
+ "key": "cluster123",
+ "value": {
+ "nodes": [
+ "node7",
+ "node0"
+ ],
+ "configs": {
+ "entry": {
+ "key": "yarn.resourcemanager.epoch",
+ "value": "1329"
+ }
+ }
+ }
+ },
+ "entry": {
+ "key": "cluster124",
+ "value": {
+ "nodes": [
+ "node480",
+ "node481",
+ "node482",
+ "node483",
+ "node484",
+ "node485",
+ "node486",
+ "node487",
+ "node488",
+ "node489",
+ "node490",
+ "node491",
+ "node492",
+ "node493",
+ "node494",
+ "node495",
+ "node496",
+ "node497",
+ "node498",
+ "node499",
+ "node500"
+ ],
+ "configs": {
+ "entry": {
+ "key": "yarn.resourcemanager.scheduler.address",
+ "value": "localhost:9030"
+ },
+ "entry": {
+ "key": "yarn.resourcemanager.resource-tracker.address",
+ "value": "localhost:9031"
+ },
+ "entry": {
+ "key": "yarn.resourcemanager.admin.address",
+ "value": "localhost:9033"
+ },
+ "entry": {
+ "key": "yarn.resourcemanager.address",
+ "value": "localhost:9032"
+ },
+ "entry": {
+ "key": "yarn.resourcemanager.webapp.address",
+ "value": "localhost:9088"
+ },
+ "entry": {
+ "key": "yarn.resourcemanager.epoch",
+ "value": "23"
+ }
+ }
+ }
+ }
+ }
+}
+
+
diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/ClusterInfo.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/ClusterInfo.java
new file mode 100644
index 00000000000..6ba9955e2d4
--- /dev/null
+++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/ClusterInfo.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.sls;
+
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.MarshalException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+
+import com.sun.jersey.api.json.JSONConfiguration;
+import com.sun.jersey.api.json.JSONJAXBContext;
+import com.sun.jersey.api.json.JSONMarshaller;
+import com.sun.jersey.api.json.JSONUnmarshaller;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+@XmlRootElement(name ="federation-sls")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class ClusterInfo {
+ private Map subclusters = new HashMap<>();
+
+ private static final Logger LOG = LoggerFactory.getLogger(ClusterInfo.class);
+ private static JSONJAXBContext jsonjaxbContext = initContext();
+
+ public ClusterInfo(){
+
+ }
+
+ public Map getSubclusters() {
+ return subclusters;
+ }
+
+ public void setSubclusters(Map subclusters) {
+ this.subclusters = subclusters;
+ }
+
+ private static JSONJAXBContext initContext() {
+ try{
+ return new JSONJAXBContext(JSONConfiguration.DEFAULT, ClusterInfo.class);
+ } catch (JAXBException e)
+ {
+ LOG.error("Couldn't parse cluster info.", e);
+ }
+ return null;
+ }
+
+ public static ClusterInfo fromFile(File f) throws Exception {
+ if(jsonjaxbContext == null){
+ throw new Exception("Didn't initialize JsonJAXBContext");
+ }
+
+ JSONUnmarshaller unmarshaller = jsonjaxbContext.createJSONUnmarshaller();
+ FileInputStream in = new FileInputStream(f);
+ ClusterInfo clusterInfo = null;
+ try {
+ clusterInfo = unmarshaller.unmarshalFromJSON(in, ClusterInfo.class);
+ } finally {
+ in.close();
+ }
+ return clusterInfo;
+ }
+ public static ClusterInfo fromString(String f) throws Exception {
+ if(jsonjaxbContext == null){
+ throw new Exception("Didn't initialize JsonJAXBContext");
+ }
+
+ JSONUnmarshaller unmarshaller = jsonjaxbContext.createJSONUnmarshaller();
+ ClusterInfo clusterInfo = unmarshaller.unmarshalFromJSON(new StringReader(f), ClusterInfo.class);
+ return clusterInfo;
+ }
+
+}
diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
index 1e83e405597..da2607a306e 100644
--- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
+++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
@@ -22,6 +22,7 @@
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
+import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -29,6 +30,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.Collections;
@@ -58,6 +60,9 @@
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
@@ -65,8 +70,21 @@
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
+import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
@@ -74,6 +92,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
+import org.apache.hadoop.yarn.server.router.Router;
import org.apache.hadoop.yarn.sls.appmaster.AMSimulator;
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator;
@@ -134,6 +153,20 @@
private static boolean exitAtTheFinish = false;
+ private static final String DEFAULT_RM_KEY = "DEFAULT_RM";
+ private ApplicationMasterProtocol rmProxy;
+
+ private NMContext nmSimContext;
+ private AsyncDispatcher dispatcher;
+ private Map nodeToSubCluster;
+ private Map subIdToRM;
+
+ private boolean isFederated = false;
+ private ApplicationClientProtocol clientRM;
+ private Router router;
+
+ private String subClusterFile;
+
/**
* The type of trace in input.
*/
@@ -168,6 +201,8 @@ private void init(Configuration tempConf) throws ClassNotFoundException {
queueAppNumMap = new HashMap<>();
amMap = new ConcurrentHashMap<>();
amClassMap = new HashMap<>();
+ nodeToSubCluster = new HashMap<>();
+ subIdToRM = new HashMap<>();
// runner configuration
setConf(tempConf);
@@ -218,31 +253,50 @@ private Resource getNodeManagerResource() {
}
public void setSimulationParams(TraceType inType, String[] inTraces,
- String nodes, String outDir, Set trackApps,
- boolean printsimulation) throws IOException, ClassNotFoundException {
+ String nodes, String clusters, String outDir, Set trackApps,
+ boolean printsimulation) {
this.inputType = inType;
this.inputTraces = inTraces.clone();
this.nodeFile = nodes;
this.trackedApps = trackApps;
+ this.subClusterFile = clusters;
this.printSimulation = printsimulation;
metricsOutputDir = outDir;
-
+ this.isFederated = HAUtil.isFederationEnabled(getConf());
+ if (isFederated && subClusterFile == null) {
+ String errorMsg =
+ "Federation is enabled but no subclusters file is given.";
+ LOG.error(errorMsg);
+ printUsage();
+ throw new RuntimeException(errorMsg);
+ }
}
- public void start() throws IOException, ClassNotFoundException, YarnException,
- InterruptedException {
+ public void start() throws Exception {
// start resource manager
startRM();
+ //start AMRMProxy
+ this.nmSimContext = initializeNMSimulatorContext();
+ if (!isFederated) {
+ this.rmProxy = getDefaultRM().getApplicationMasterService();
+ this.clientRM = getDefaultRM().getClientRMService();
+ }else{
+ startAMRMProxy();
+ startRouter();
+ this.clientRM = router.getClientRMService();
+ }
// start node managers
startNM();
// start application masters
startAM();
// set queue & tracked apps information
- ((SchedulerWrapper) rm.getResourceScheduler()).getTracker()
- .setQueueSet(this.queueAppNumMap.keySet());
- ((SchedulerWrapper) rm.getResourceScheduler()).getTracker()
- .setTrackedAppSet(this.trackedApps);
+ for (Entry entry : subIdToRM.entrySet()) {
+ ((SchedulerWrapper) entry.getValue().getResourceScheduler()).getTracker()
+ .setQueueSet(this.queueAppNumMap.keySet());
+ ((SchedulerWrapper) entry.getValue().getResourceScheduler()).getTracker()
+ .setTrackedAppSet(this.trackedApps);
+ }
// print out simulation info
printSimulationInfo();
// blocked until all nodes RUNNING
@@ -251,8 +305,132 @@ public void start() throws IOException, ClassNotFoundException, YarnException,
runner.start();
}
- private void startRM() throws ClassNotFoundException, YarnException {
- Configuration rmConf = new YarnConfiguration(getConf());
+ private NMContext initializeNMSimulatorContext(){
+ NMContext context = null;
+ try {
+ context = createNMSimContext();
+ this.nmSimContext = context;
+ context.setNodeId(NodeId.newInstance("localhost", 0));
+ }catch(Exception e){
+ LOG.error("Error creating NMSimulator context", e);
+ }
+ return context;
+ }
+
+ private NMContext createNMSimContext() {
+ NMStateStoreService nmStore = new NMNullStateStoreService();
+ nmStore.init(getConf());
+ NMTokenSecretManagerInNM nmTokenSecretManager = new NMTokenSecretManagerInNM();
+ NMContainerTokenSecretManager nmContainerTokenSecretManager =
+ new NMContainerTokenSecretManager(getConf(), nmStore);
+ boolean isDistSchedulingEnabled =
+ getConf().getBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED,
+ YarnConfiguration.DEFAULT_DIST_SCHEDULING_ENABLED);
+ return new NMContext(nmContainerTokenSecretManager, nmTokenSecretManager,
+ null, null, nmStore, isDistSchedulingEnabled, getConf());
+ }
+
+ public ApplicationClientProtocol getClientRM() {
+ return clientRM;
+ }
+
+ public ApplicationMasterProtocol getRMProxy() {
+ return rmProxy;
+ }
+
+ public ResourceManager getRMFromAppId(ApplicationId appId)
+ throws YarnException {
+ if (!isFederated) {
+ return getDefaultRM();
+ }
+ GetApplicationHomeSubClusterRequest request =
+ GetApplicationHomeSubClusterRequest.newInstance(appId);
+ GetApplicationHomeSubClusterResponse resp =
+ FederationStateStoreFacade.getInstance().getStateStoreClient()
+ .getApplicationHomeSubCluster(request);
+ return getRMFromClusterId(
+ resp.getApplicationHomeSubCluster().getHomeSubCluster().toString());
+ }
+
+ public ResourceManager getRMFromClusterId(String clusterId) {
+ return subIdToRM.get(clusterId);
+ }
+
+ private void startAMRMProxy() {
+ AsyncDispatcher dispatcher = new AsyncDispatcher();
+ this.dispatcher = dispatcher;
+ AMRMProxyService amrmproxy =
+ new AMRMProxyService(this.nmSimContext, dispatcher);
+ amrmproxy.init(getConf());
+ amrmproxy.start();
+ this.rmProxy = amrmproxy;
+ amrmproxy.start();
+ }
+
+ private void startRouter() {
+ router = new Router(false);
+ router.init(getConf());
+ router.start();
+ }
+
+ private void startRM() throws Exception {
+ Configuration rmConf = getConf();
+ if(!isFederated){
+ subIdToRM.put(DEFAULT_RM_KEY, startAnRM(rmConf));
+ return;
+ }
+
+ FederationStateStoreFacade facade =
+ FederationStateStoreFacade.getInstance();
+ facade.getStateStoreClient().init(getConf());
+ facade.reinitialize(facade.getStateStoreClient(), getConf());
+ loadRMs(rmConf);
+ }
+
+ private void loadRMs(Configuration conf) throws Exception {
+ String file = subClusterFile;
+ ClusterInfo cluster = ClusterInfo.fromFile(new File(file));
+ Map map = cluster.getSubclusters();
+ for (Entry entry : map.entrySet()) {
+ String subId = entry.getKey();
+ SubClusterInfo sub = entry.getValue();
+ List nodes = sub.getNodes();
+ for (String n : nodes) {
+ nodeToSubCluster.put(n, subId);
+ }
+ Configuration configuration = new Configuration(conf);
+ for (Map.Entry confStr : sub.getConfigs().entrySet()) {
+ configuration.set(confStr.getKey(), confStr.getValue());
+ }
+ configuration.set(YarnConfiguration.RM_CLUSTER_ID, subId);
+ ResourceManager rm = startAnRM(configuration);
+ subIdToRM.put(subId, rm);
+ //Wait needed in order to make sure cluster timestamps are different
+ Thread.sleep(10);
+ }
+ waitForRMsActive();
+ }
+
+ private void waitForRMsActive() throws YarnException, InterruptedException {
+ FederationStateStoreFacade stateStore =
+ FederationStateStoreFacade.getInstance();
+ for (String subId : subIdToRM.keySet()) {
+ if(subId.equals(DEFAULT_RM_KEY)){
+ continue;
+ }
+ LOG.info(MessageFormat.format("Waiting for {0}", subId));
+ while (!stateStore.getSubCluster(SubClusterId.newInstance(subId))
+ .getState().isActive()) {
+ Thread.sleep(getConf().getInt(
+ YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS,
+ YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS));
+ }
+ LOG.info(MessageFormat.format("{0} up and active", subId));
+ }
+ }
+
+ private ResourceManager startAnRM(Configuration rmConf)
+ throws ClassNotFoundException, YarnException {
String schedulerClass = rmConf.get(YarnConfiguration.RM_SCHEDULER);
if (Class.forName(schedulerClass) == CapacityScheduler.class) {
@@ -286,7 +464,17 @@ protected ApplicationMasterLauncher createAMLauncher() {
// Init and start the actual ResourceManager
rm.init(rmConf);
+ if(isFederated) {
+ try{
+ rm.getFederationStateStoreService().reinitialize(
+ FederationStateStoreFacade.getInstance().getStateStoreClient(),
+ rmConf);
+ } catch (Exception e) {
+ LOG.error("Couldn't reinitialize RM StateStoreService", e);
+ }
+ }
rm.start();
+ return rm;
}
private void startNM() throws YarnException, IOException,
@@ -385,6 +573,23 @@ private void waitForNodesRunning() throws InterruptedException {
System.currentTimeMillis() - startTimeMS);
}
+ public boolean isFederated(){
+ return isFederated;
+ }
+
+ public AsyncDispatcher getDispatcher(){
+ return dispatcher;
+ }
+
+ public NMContext getNmSimContext(){
+ return nmSimContext;
+ }
+
+ //Only to be used in non-federated mode
+ public ResourceManager getDefaultRM(){
+ return subIdToRM.get(DEFAULT_RM_KEY);
+ }
+
@SuppressWarnings("unchecked")
private void startAM() throws YarnException, IOException {
switch (inputType) {
@@ -883,12 +1088,26 @@ public static void decreaseRemainingApps() {
}
public void stop() throws InterruptedException {
- rm.stop();
+ if (isFederated) {
+ for (Entry e : subIdToRM.entrySet()) {
+ e.getValue().getFederationStateStoreService()
+ .reinitialize(null, getConf());
+ e.getValue().stop();
+ }
+ try {
+ FederationStateStoreFacade.getInstance().getStateStoreClient().close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ ((AMRMProxyService) rmProxy).stop();
+ router.stop();
+ }else{
+ getDefaultRM().stop();
+ }
runner.stop();
}
- public int run(final String[] argv) throws IOException, InterruptedException,
- ParseException, ClassNotFoundException, YarnException {
+ public int run(final String[] argv) throws IOException, ParseException, ClassNotFoundException, YarnException {
Options options = new Options();
@@ -904,6 +1123,7 @@ public int run(final String[] argv) throws IOException, InterruptedException,
options.addOption("output", true, "output directory");
options.addOption("trackjobs", true,
"jobs to be tracked during simulating");
+ options.addOption("subclusters", true, "subcluster topology");
options.addOption("printsimulation", false,
"print out simulation information");
@@ -946,6 +1166,8 @@ public int run(final String[] argv) throws IOException, InterruptedException,
String tempNodeFile =
cmd.hasOption("nodes") ? cmd.getOptionValue("nodes") : "";
+ String subclusters =
+ cmd.hasOption("subclusters") ? cmd.getOptionValue("subclusters") : null;
TraceType tempTraceType = TraceType.SLS;
switch (traceType) {
@@ -966,10 +1188,14 @@ public int run(final String[] argv) throws IOException, InterruptedException,
String[] inputFiles = traceLocation.split(",");
- setSimulationParams(tempTraceType, inputFiles, tempNodeFile, output,
- trackedJobSet, cmd.hasOption("printsimulation"));
+ setSimulationParams(tempTraceType, inputFiles, tempNodeFile, subclusters,
+ output, trackedJobSet, cmd.hasOption("printsimulation"));
- start();
+ try {
+ start();
+ } catch (Exception e) {
+ throw new YarnException("Failed to start SLS runner", e);
+ }
return 0;
}
@@ -987,7 +1213,8 @@ static void printUsage() {
"Options: -tracetype " + "SLS|RUMEN|SYNTH -tracelocation FILE,FILE... "
+ "(deprecated alternative options --inputsls FILE, FILE,... "
+ " | --inputrumen FILE,FILE,...)"
- + "-output FILE [-nodes FILE] [-trackjobs JobId,JobId...] "
+ + "-output FILE [-nodes FILE] [-subclusters FILE] "
+ + "[-trackjobs JobId,JobId...] "
+ "[-printsimulation]");
System.err.println();
}
diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SubClusterInfo.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SubClusterInfo.java
new file mode 100644
index 00000000000..3e1a216d7cf
--- /dev/null
+++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SubClusterInfo.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.sls;
+
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import com.sun.jersey.api.json.JSONConfiguration;
+import com.sun.jersey.api.json.JSONJAXBContext;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+@XmlRootElement(name = "federation-sls")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class SubClusterInfo {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SubClusterInfo.class);
+ private static JSONJAXBContext jsonjaxbContext = initContext();
+ private List nodes = new LinkedList<>();
+ private Map configs = new HashMap<>();
+
+ public List getNodes() {
+ return nodes;
+ }
+
+ public void setNodes(List nodes) {
+ this.nodes = nodes;
+ }
+
+ public Map getConfigs() {
+ return configs;
+ }
+
+ public void setConfigs(Map configs) {
+ this.configs = configs;
+ }
+
+ public SubClusterInfo() {
+
+ }
+
+ private static JSONJAXBContext initContext() {
+ try {
+ return new JSONJAXBContext(JSONConfiguration.DEFAULT,
+ SubClusterInfo.class);
+ } catch (JAXBException e) {
+ LOG.error("Couldn't parse subcluster info.", e);
+ }
+ return null;
+ }
+}
diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
index 5f34cfccfb8..a03d45543da 100644
--- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
+++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
@@ -20,7 +20,9 @@
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
+import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
+import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -30,17 +32,24 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -50,15 +59,23 @@
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.ContainerType;
+import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
@@ -116,6 +133,11 @@
private ReservationSubmissionRequest reservationRequest;
+ private UserGroupInformation ugiUser;
+
+ private ApplicationMasterProtocol rmProxy;
+ private ApplicationClientProtocol clientRm;
+
public AMSimulator() {
this.responseQueue = new LinkedBlockingQueue<>();
}
@@ -140,6 +162,51 @@ public void init(int heartbeatInterval,
this.traceFinishTimeMS = finishTime;
this.amContainerResource = amResource;
this.nodeLabelExpression = nodeLabelExpr;
+ this.rmProxy =
+ se == null ? rm.getApplicationMasterService() : se.getRMProxy();
+ this.clientRm = se == null ? rm.getClientRMService() : se.getClientRM();
+ }
+
+ private StartContainerRequest initializeStartRequest(
+ ApplicationAttemptId appAttemptId, int id, Token token) {
+ ContainerTokenIdentifier cid =
+ createContainerTokenIdentifier(appAttemptId, id);
+ se.getNmSimContext().getContainerTokenSecretManager()
+ .startContainerSuccessful(cid);
+ ContainerLaunchContext amContext = BuilderUtils
+ .newContainerLaunchContext(new HashMap(),
+ new HashMap(), null,
+ new HashMap(), null,
+ new HashMap());
+ StartContainerRequest req = null;
+ try {
+ req = (StartContainerRequest.newInstance(amContext, BuilderUtils
+ .newContainerToken(se.getNmSimContext().getNodeId(), new byte[1],
+ cid)));
+ Credentials creds = YarnServerSecurityUtils.
+ parseCredentials(req.getContainerLaunchContext());
+ creds.addToken(token.getService(), token);
+ DataOutputBuffer dob = new DataOutputBuffer();
+ creds.writeTokenStorageToStream(dob);
+ req.getContainerLaunchContext()
+ .setTokens(ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
+ } catch (SecretManager.InvalidToken invalidToken) {
+ invalidToken.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return req;
+ }
+
+ private ContainerTokenIdentifier createContainerTokenIdentifier(
+ ApplicationAttemptId appAttemptId, int id) {
+ ContainerTokenIdentifier containerTokenIdentifier =
+ new ContainerTokenIdentifier(
+ ContainerId.newContainerId(appAttemptId, id), "473", user,
+ amContainerResource, System.currentTimeMillis() + 100000L, 0, 1234,
+ Priority.newInstance(0), 0, null, null,
+ ContainerType.APPLICATION_MASTER);
+ return containerTokenIdentifier;
}
/**
@@ -171,6 +238,26 @@ public synchronized void notifyAMContainerLaunched(Container masterContainer)
throws Exception {
this.amContainer = masterContainer;
this.appAttemptId = masterContainer.getId().getApplicationAttemptId();
+ this.ugiUser =
+ UserGroupInformation.createRemoteUser(this.appAttemptId.toString());
+ if (se != null && se.isFederated()) {
+ rm = se.getRMFromAppId(appId);
+ LOG.info(MessageFormat.format("App {0} is running on {1}", appId,
+ YarnConfiguration.getClusterId(rm.getConfig())));
+ //preparing the AMRMProxy to start the application
+ AMRMProxyService amrmproxy = (AMRMProxyService) rmProxy;
+ Token token =
+ rm.getRMContext().getAMRMTokenSecretManager()
+ .createAndGetAMRMToken(appAttemptId);
+ StartContainerRequest initialStartReq =
+ initializeStartRequest(appAttemptId, appId.getId(),
+ token);
+ synchronized (amrmproxy.getConfig()) {
+ amrmproxy.getConfig().set(YarnConfiguration.RM_CLUSTER_ID,
+ YarnConfiguration.getClusterId(rm.getConfig()));
+ amrmproxy.processApplicationStartRequest(initialStartReq);
+ }
+ }
registerAM();
isAMContainerRunning = true;
}
@@ -186,7 +273,7 @@ private ReservationId submitReservationWhenSpecified()
ugi.doAs(new PrivilegedExceptionAction