diff --git a/dev-support/bin/dist-layout-stitching b/dev-support/bin/dist-layout-stitching
index d4bfd8aaada..cc076bd9e34 100755
--- a/dev-support/bin/dist-layout-stitching
+++ b/dev-support/bin/dist-layout-stitching
@@ -70,6 +70,9 @@ function copyifnotexists()
else
for childpath in "${src}"/*; do
child="${childpath##*/}"
+ if [ "${child}" == "*" ]; then
+ continue;
+ fi
if [[ "${child}" == "doc" ||
"${child}" == "webapps" ]]; then
mkdir -p "${dest}/${child}"
diff --git a/hadoop-assemblies/src/main/resources/assemblies/hadoop-dtss.xml b/hadoop-assemblies/src/main/resources/assemblies/hadoop-dtss.xml
new file mode 100644
index 00000000000..d250dd54744
--- /dev/null
+++ b/hadoop-assemblies/src/main/resources/assemblies/hadoop-dtss.xml
@@ -0,0 +1,44 @@
+
+
+ hadoop-dtss
+
+ dir
+
+ false
+
+
+
+ ${basedir}/src/main/bin
+ dtss/bin
+ 0755
+
+
+ ${basedir}/src/main/python
+ dtss/python
+ 0755
+
+
+ ${basedir}/src/main/sample
+ dtss/sample
+
+
+
+
diff --git a/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml b/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml
index db744f511da..6a81f2fc354 100644
--- a/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml
+++ b/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml
@@ -189,6 +189,17 @@
../hadoop-sls/target/hadoop-sls-${project.version}/sls
/share/hadoop/${hadoop.component}/sls
+
+ ../hadoop-dtss/target
+ /share/hadoop/${hadoop.component}/sources
+
+ *-sources.jar
+
+
+
+ ../hadoop-dtss/target/hadoop-dtss-${project.version}/dtss
+ /share/hadoop/${hadoop.component}/dtss
+
../hadoop-resourceestimator/target
/share/hadoop/${hadoop.component}/sources
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 90a7c4420f8..2420e729a06 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -685,6 +685,11 @@
hadoop-kms
${hadoop.version}
+
+ org.apache.hadoop
+ hadoop-dtss
+ ${project.version}
+
org.apache.hadoop
hadoop-kms
@@ -1380,6 +1385,11 @@
hadoop-sls
${hadoop.version}
+
+ org.apache.hadoop
+ hadoop-dtss
+ ${project.version}
+
org.apache.hadoop
hadoop-cloud-storage
@@ -1754,6 +1764,11 @@
lz4-java
${lz4-java.version}
+
+ net.objecthunter
+ exp4j
+ 0.4.8
+
diff --git a/hadoop-tools/hadoop-dtss/.gitignore b/hadoop-tools/hadoop-dtss/.gitignore
new file mode 100644
index 00000000000..99bbb83b6e8
--- /dev/null
+++ b/hadoop-tools/hadoop-dtss/.gitignore
@@ -0,0 +1,177 @@
+# Created by .ignore support plugin (hsz.mobi)
+### JetBrains template
+# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm
+# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
+
+# CMake
+cmake-build-debug/
+
+# Mongo Explorer plugin:
+.idea/**/mongoSettings.xml
+
+## File-based project format:
+*.iws
+
+## Plugin-specific files:
+
+# IntelliJ
+.idea/
+out/
+
+# mpeltonen/sbt-idea plugin
+.idea_modules/
+
+# JIRA plugin
+atlassian-ide-plugin.xml
+
+# Cursive Clojure plugin
+.idea/replstate.xml
+
+# Crashlytics plugin (for Android Studio and IntelliJ)
+com_crashlytics_export_strings.xml
+crashlytics.properties
+crashlytics-build.properties
+fabric.properties
+### Java template
+# Compiled class file
+*.class
+
+# Log file
+*.log
+
+# BlueJ files
+*.ctxt
+
+*.iml
+
+# Mobile Tools for Java (J2ME)
+.mtj.tmp/
+
+# Package Files #
+*.jar
+*.war
+*.ear
+*.zip
+*.tar.gz
+*.rar
+
+# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
+hs_err_pid*
+
+target/
+pom.xml.tag
+pom.xml.releaseBackup
+pom.xml.versionsBackup
+pom.xml.next
+release.properties
+dependency-reduced-pom.xml
+buildNumber.properties
+.mvn/timing.properties
+
+# Avoid ignoring Maven wrapper jar file (.jar files are usually ignored)
+!/.mvn/wrapper/maven-wrapper.jar
+
+# Byte-compiled / optimized / DLL files
+__pycache__/
+*.py[cod]
+*$py.class
+
+# C extensions
+*.so
+
+# Distribution / packaging
+.Python
+build/
+develop-eggs/
+dist/
+downloads/
+eggs/
+.eggs/
+lib/
+lib64/
+parts/
+sdist/
+var/
+wheels/
+*.egg-info/
+.installed.cfg
+*.egg
+MANIFEST
+
+# PyInstaller
+# Usually these files are written by a python script from a template
+# before PyInstaller builds the exe, so as to inject date/other infos into it.
+*.manifest
+*.spec
+
+# Installer logs
+pip-log.txt
+pip-delete-this-directory.txt
+
+# Unit test / coverage reports
+htmlcov/
+.tox/
+.coverage
+.coverage.*
+.cache
+nosetests.xml
+coverage.xml
+*.cover
+.hypothesis/
+
+# Translations
+*.mo
+*.pot
+
+# Django stuff:
+*.log
+.static_storage/
+.media/
+local_settings.py
+
+# Flask stuff:
+.webassets-cache
+
+# Scrapy stuff:
+.scrapy
+
+# Sphinx documentation
+docs/_build/
+
+# Jupyter Notebook
+.ipynb_checkpoints
+
+# pyenv
+.python-version
+
+# celery beat schedule file
+celerybeat-schedule
+
+# SageMath parsed files
+*.sage.py
+
+# Environments
+.env
+.venv
+env/
+venv/
+ENV/
+env.bak/
+venv.bak/
+
+# Spyder project settings
+.spyderproject
+.spyproject
+
+# Rope project settings
+.ropeproject
+
+# mkdocs documentation
+/site
+
+# mypy
+.mypy_cache/
+
+# OS X
+.DS_Store
+
diff --git a/hadoop-tools/hadoop-dtss/pom.xml b/hadoop-tools/hadoop-dtss/pom.xml
new file mode 100644
index 00000000000..6d97f5c8597
--- /dev/null
+++ b/hadoop-tools/hadoop-dtss/pom.xml
@@ -0,0 +1,202 @@
+
+
+
+ 4.0.0
+
+ org.apache.hadoop
+ hadoop-project
+ 3.4.0-SNAPSHOT
+ ../../hadoop-project
+
+ hadoop-dtss
+ 3.4.0-SNAPSHOT
+ Apache Hadoop Discrete Time Scheduler Simulator
+ Apache Hadoop Discrete Time Scheduler Simulator
+ jar
+
+
+ 4.0
+
+
+
+
+ junit
+ junit
+ test
+
+
+ com.google.code.gson
+ gson
+
+
+ com.google.inject
+ guice
+ ${guice.version}
+
+
+ com.google.inject.extensions
+ guice-multibindings
+ ${guice.version}
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
+ org.apache.hadoop
+ hadoop-yarn-api
+
+
+ org.apache.hadoop
+ hadoop-yarn-server-timelineservice
+
+
+ org.apache.hadoop
+ hadoop-client
+
+
+ org.apache.hadoop
+ hadoop-yarn-server-resourcemanager
+
+
+ commons-lang
+ commons-lang
+ 2.6
+
+
+ org.apache.commons
+ commons-lang3
+
+
+ org.apache.commons
+ commons-csv
+
+
+ commons-io
+ commons-io
+
+
+ commons-cli
+ commons-cli
+
+
+ io.dropwizard.metrics
+ metrics-core
+
+
+ net.objecthunter
+ exp4j
+
+
+ org.mockito
+ mockito-core
+
+
+
+
+
+
+
+ src/main/sample/config/yarn
+
+
+
+
+ org.apache.maven.plugins
+ maven-source-plugin
+
+ true
+
+
+
+
+ jar
+
+
+
+
+
+ org.codehaus.mojo
+ findbugs-maven-plugin
+
+ true
+ true
+ ${basedir}/dev-support/findbugs-exclude.xml
+ Max
+
+
+
+
+
+
+
+ dist
+
+ false
+
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+
+
+ org.apache.hadoop
+ hadoop-assemblies
+ ${project.version}
+
+
+
+
+ dist
+ prepare-package
+
+ single
+
+
+ false
+ false
+ ${project.artifactId}-${project.version}
+
+ hadoop-dtss
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+
+
+ deplist
+ compile
+
+ list
+
+
+
+ ${project.basedir}/target/hadoop-tools-deps/${project.artifactId}.tools-builtin.txt
+
+
+
+
+
+
+
+
+
diff --git a/hadoop-tools/hadoop-dtss/src/main/bin/dtssrun.sh b/hadoop-tools/hadoop-dtss/src/main/bin/dtssrun.sh
new file mode 100755
index 00000000000..fee1c2097a4
--- /dev/null
+++ b/hadoop-tools/hadoop-dtss/src/main/bin/dtssrun.sh
@@ -0,0 +1,95 @@
+#!/usr/bin/env bash
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License. See accompanying LICENSE file.
+#
+
+function hadoop_usage()
+{
+ echo "Usage: dtssrun.sh tracelocation"
+}
+
+function calculate_classpath
+{
+ hadoop_add_to_classpath_tools hadoop-dtss
+ hadoop_add_classpath "${HADOOP_YARN_HOME}/${YARN_DIR}/timelineservice/"'/*'
+ hadoop_add_classpath "${HADOOP_YARN_HOME}/${YARN_DIR}/timelineservice/lib/*"
+}
+
+function run_simulation() {
+ hadoop_add_client_opts
+ hadoop_finalize
+
+ # shellcheck disable=SC2086
+ echo "Running simulation in dtss.sh!"
+ if [[ -z $redirect_path ]]; then
+ hadoop_java_exec dtss org.apache.hadoop.yarn.dtss.Runner "$@"
+ else
+ echo "Redirecting output to $redirect_path"
+ mkdir -p "$(dirname "$redirect_path")"
+ hadoop_java_exec dtss org.apache.hadoop.yarn.dtss.Runner "$@" > $redirect_path 2>&1
+ fi
+}
+
+while getopts "i:m:" opt; do
+ case $opt in
+ i)
+ id=${OPTARG}
+ ;;
+ m)
+ metric_path=${OPTARG}
+ ;;
+ \?)
+ echo "Invalid option: -$OPTARG" >&2
+ exit 1
+ ;;
+ esac
+done
+
+# let's locate libexec...
+if [[ -n "${HADOOP_HOME}" ]]; then
+ HADOOP_DEFAULT_LIBEXEC_DIR="${HADOOP_HOME}/libexec"
+else
+ this="${BASH_SOURCE-$0}"
+ bin=$(cd -P -- "$(dirname -- "${this}")" >/dev/null && pwd -P)
+ HADOOP_DEFAULT_LIBEXEC_DIR="${bin}/../../../../../libexec"
+fi
+
+HADOOP_LIBEXEC_DIR="${HADOOP_LIBEXEC_DIR:-$HADOOP_DEFAULT_LIBEXEC_DIR}"
+# shellcheck disable=SC2034
+HADOOP_NEW_CONFIG=true
+if [[ -f "${HADOOP_LIBEXEC_DIR}/hadoop-config.sh" ]]; then
+ # shellcheck disable=SC1090
+ . "${HADOOP_LIBEXEC_DIR}/hadoop-config.sh"
+else
+ echo "ERROR: Cannot execute ${HADOOP_LIBEXEC_DIR}/hadoop-config.sh." 2>&1
+ exit 1
+fi
+
+calculate_classpath
+
+sim_args=( "${@:1:$OPTIND-1}" )
+shift $((OPTIND-1))
+
+if [[ $# = 0 ]] || [[ $# -gt 2 ]]; then
+ hadoop_exit_with_usage 1
+fi
+
+sim_args+=( "$1" )
+shift 1
+
+if [[ $# = 0 ]]; then
+ run_simulation "${sim_args[@]}"
+else
+ redirect_path=$1
+ run_simulation "${sim_args[@]}"
+fi
diff --git a/hadoop-tools/hadoop-dtss/src/main/bin/run-simulation.sh b/hadoop-tools/hadoop-dtss/src/main/bin/run-simulation.sh
new file mode 100755
index 00000000000..1961bb55973
--- /dev/null
+++ b/hadoop-tools/hadoop-dtss/src/main/bin/run-simulation.sh
@@ -0,0 +1,351 @@
+#!/usr/bin/env bash
+
+print_usage() {
+ echo "usage: run-simulation.sh [-h] [-b] [-p] [-n] [-o] [-t] sim_conf_path output_base_path"
+ echo ""
+
+ echo "This script should be executed from either \$HADOOP_HOME/share/hadoop/tools/dtss/bin/ OR"
+ echo "from \$HADOOP_SRC/hadoop-tools/hadoop-dtss/src/main/bin/."
+ echo "Note that we can only build if executed from the hadoop source path."
+
+ echo ""
+
+ echo "flags (optional):"
+ echo -e "\t-h\t\t\tPrints help message."
+ echo -e "\t-b\t\t\tBuild."
+ echo -e "\t-p\t\t\tPackage."
+ echo -e "\t-n\t\t\tPackage with native libraries."
+ echo -e "\t-t\t\t\tSpecifies the base directory for temporary hadoop setups. Requires an argument."
+ echo -e "\t-o\t\t\tSpecifies whether or not the output should be overwritten if exists."
+ echo ""
+
+ echo "positional arguments:"
+
+ echo -e "\tsim_conf_path\t\tThe simulation configuration path, can either be a directory containing many "
+ echo -e "\t\tconfiguration files or a single configuration file, where the name of the configuration file "
+ echo -e "\t\tspecifies the run ID. For each configuration file, there should be a directory of the same "
+ echo -e "\t\tfile name that specifies the YARN configurations used for the particular run within the same "
+ echo -e "\t\tdirectory as the configuration file."
+
+ echo -e "\toutput_base_path\tSpecifies the base directory for run outputs. A run directory will be created"
+ echo -e "\t\tfurther within output_base_path for each run of the simulation."
+}
+
+pushd () {
+ command pushd "$@" > /dev/null
+}
+
+popd () {
+ command popd "$@" > /dev/null
+}
+
+cleanup() {
+ local -n child_pids=$1
+ local -n tmp_dirs=$2
+
+ echo "Killing child processes ""${child_pids[*]}""..."
+ for i in "${child_pids[@]}"
+ do :
+ echo "Killing child PID $i"
+ kill -9 "$i"
+ done
+
+ wait
+
+ echo "Cleaning up ""${tmp_dirs[*]}""..."
+ for i in "${tmp_dirs[@]}"
+ do :
+ rm -rf "$i" &
+ done
+
+ wait
+ echo "Cleanup done!"
+}
+
+analyze_simulation() {
+ local run_id=$1
+ local output_base_path=$2
+ local output_path="$output_base_path"/"$run_id"
+
+ python_dir="$( cd "$script_dir/../python/" >/dev/null 2>&1 && pwd )"
+ if [ -e $python_dir ]; then
+ echo "Detected python script directory at $python_dir"
+ else
+ echo "Cannot detect python script directory, skipping analysis."
+ return
+ fi
+
+ pushd $python_dir
+ if [[ "$VIRTUAL_ENV" = "" ]]; then
+ in_venv=false
+ venv_dir="$python_dir"/venv
+ if [ ! -d $venv_dir ]; then
+ # Create the virtual environment directory
+ echo "Creating the virtual environment directory at $venv_dir"
+ python3 -m venv $venv_dir
+ fi
+
+ echo "Activating virtual environment..."
+ source "$venv_dir"/bin/activate
+ echo "Upgrading pip..."
+ pip3 install --upgrade pip
+ echo "Installing requirements..."
+ pip3 install -r requirements.txt
+ else
+ # Do not deactivate later if already in venv
+ in_venv=true
+ echo "Already in python virtual environment!"
+ fi
+
+ echo "Running python analysis on $output_path!"
+ python3 sim_analysis.py analyze -i $run_id $output_path $output_base_path
+ echo "Done!"
+ if [ $in_venv = false ]; then
+ deactivate
+ fi
+ popd
+}
+
+run_simulation() {
+ local tmp_dir=$1
+ local sim_conf_path=$2
+ local output_base_path=$3
+
+ local sim_conf_basedir="$( cd "$( dirname "$sim_conf_path" )" >/dev/null 2>&1 && pwd )"
+ local sim_conf_file="${sim_conf_path##*/}"
+ local sim_conf_id="${sim_conf_file%.*}"
+ local output_path="$output_base_path"/"$sim_conf_id"
+ local sim_conf_dir="$sim_conf_basedir"/"$sim_conf_id"
+ echo "Using simulation configuration directory $sim_conf_dir"
+ echo "Using output directory $output_path"
+
+ if [ -d "$output_path" ]; then
+ if [ "$overwrite" = false ]; then
+ echo "$output_path exists and overwrite is not turned on! Returning..."
+ return
+ else
+ echo "$output_path exists and overwrite is turned on! Deleting $output_path"
+ rm -rf "$output_path"
+ fi
+ fi
+
+ echo "Using temp directory $tmp_dir"
+ echo "Copying hadoop home to tmp dir to prevent overwriting..."
+ cp -r "$hadoop_home" "$tmp_dir"
+ local hadoop_basename="$(basename $hadoop_home)"
+ local local_hadoop_home="$tmp_dir/$hadoop_basename"
+ echo "Using tmp hadoop home $local_hadoop_home"
+ local local_hadoop_conf_dir="$local_hadoop_home/etc/hadoop/"
+ cp -r "$sim_conf_dir"/* "$local_hadoop_conf_dir"
+
+ echo "Running simulation!"
+ local out_file="$output_path"/out.txt
+ echo "Writing output file to $out_file"
+ local local_dtss_run_path="$local_hadoop_home/share/hadoop/tools/dtss/bin/dtssrun.sh"
+ if [ ! -e "$local_dtss_run_path" ]; then
+ echo "The dtssrun.sh does not exist at $local_dtss_run_path!"
+ return
+ else
+ echo "Found dtssrun.sh at $local_dtss_run_path"
+ fi
+
+ $local_dtss_run_path -i "$sim_conf_id" -m "$output_base_path" "$sim_conf_path" "$out_file"
+ echo "Simulation run complete!"
+ echo "Analyzing simulation run metrics..."
+ analyze_simulation "$sim_conf_id" "$output_base_path"
+}
+
+if [[ $# = 0 ]]; then
+ print_usage
+ exit 1
+fi
+
+build=false
+pkg=false
+native=false
+from_src=false
+overwrite=false
+tmp_dir_base=""
+
+while getopts ":nbphot:" opt; do
+ case $opt in
+ o)
+ overwrite=true
+ ;;
+ t)
+ if [[ $OPTARG =~ ^-[n/b/p/h/o/t]$ ]]
+ then
+ echo "-t requires an argument!"
+ print_usage
+ exit 1
+ fi
+ tmp_dir_base=$OPTARG
+
+ echo "Using $tmp_dir_base as temp base directory!"
+ if [ ! -d "$tmp_dir_base" ]; then
+ echo "$tmp_dir_base is not a valid directory!"
+ print_usage
+ exit 1
+ fi
+
+ if [ ! -w "$tmp_dir_base" ] || [ ! -r "$tmp_dir_base" ]; then
+ echo "$tmp_dir_base must both be read and writable!"
+ exit 1
+ fi
+ ;;
+ n)
+ native=true
+ ;;
+ b)
+ build=true
+ ;;
+ p)
+ pkg=true
+ ;;
+ h)
+ print_usage
+ exit 0
+ ;;
+ \?)
+ echo "Invalid option: -$OPTARG" >&2
+ print_usage
+ exit 1
+ ;;
+ :)
+ echo "Invalid option: $OPTARG requires an argument" >&2
+ print_usage
+ exit 1
+ esac
+done
+
+script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
+src_root_dir="$( cd "$script_dir/../../../../.." >/dev/null 2>&1 && pwd )"
+
+if [ -e "$src_root_dir"/pom.xml ] && [ -e "$src_root_dir"/LICENSE.txt ]; then
+ echo "Detected Hadoop source root directory at $src_root_dir"
+ from_src=true
+fi
+
+if [ $from_src = true ]; then
+ pushd $src_root_dir
+ # Only build/package if running from source directory
+ if [ $build = true ] ; then
+ echo "-b is turned on, building..." >&2
+ mvn clean install -DskipTests
+ if [ $? -ne 0 ]; then
+ echo "Maven build failed, exiting..."
+ exit 1
+ fi
+ fi
+
+ if [ $pkg = true ] ; then
+ if [ $native = true ] ; then
+ echo "-np is turned on, packaging native..." >&2
+ mvn package -Pnative -Pdist -DskipTests -nsu
+ if [ $? -ne 0 ]; then
+ echo "Maven package failed, exiting..."
+ exit 1
+ fi
+ else
+ echo "-p is turned on, packaging..." >&2
+ mvn package -Pdist -DskipTests -nsu
+ if [ $? -ne 0 ]; then
+ echo "Maven package failed, exiting..."
+ exit 1
+ fi
+ fi
+ fi
+
+ hadoop_home_pattern="hadoop-dist/target/hadoop-*/"
+ files=( $hadoop_home_pattern )
+ hadoop_home_relative="${files[0]}"
+ if [ "$hadoop_home_relative" = "$hadoop_home_pattern" ] ; then
+ echo "Unable to find hadoop home! Exiting..."
+ exit 1
+ fi
+ hadoop_home_relative=$src_root_dir/$hadoop_home_relative
+ popd
+else
+ echo "Skipping build, since we are not running from the source directory."
+ hadoop_home_relative="$script_dir"/../../../../..
+fi
+
+build_or_pkg=$build||$pkg
+
+sim_conf_path_arg=${@:$OPTIND:1}
+if [ -z $sim_conf_path_arg ] ; then
+ if [ ! $build_or_pkg ]; then
+ echo "Must have a simulation configuration path!"
+ exit 1
+ else
+ echo "Done!"
+ exit 0
+ fi
+fi
+
+output_path_arg=${@:$(( $OPTIND + 1)):1}
+if [ -z $output_path_arg ] ; then
+ if [ ! $build_or_pkg ]; then
+ echo "Must have an output path!"
+ exit 1
+ else
+ echo "Done!"
+ exit 0
+ fi
+fi
+
+if [ ! -e $sim_conf_path_arg ]; then
+ echo "The simulation configuration path must exist!"
+ exit 1
+fi
+
+sim_conf_path="$(cd "$(dirname "$sim_conf_path_arg")"; pwd -P)/$(basename "$sim_conf_path_arg")"
+
+echo "Using simulation configuration path: $sim_conf_path"
+
+if [ ! -e $output_path_arg ]; then
+ echo "The output path must exist!"
+ exit 1
+fi
+
+output_base_path="$(cd "$(dirname "$output_path_arg")"; pwd -P)/$(basename "$output_path_arg")"
+echo "Using output path: $output_base_path"
+
+hadoop_home="$( cd "$hadoop_home_relative" >/dev/null 2>&1 && pwd )"
+echo "Located hadoop home directory at $hadoop_home"
+
+pushd "$hadoop_home"/share/hadoop/tools/dtss/bin
+ tmp_dirs=()
+ child_pids=()
+ if [ -f "$sim_conf_path" ] ; then
+ if [ -z "$tmp_dir_base" ] ; then
+ tmp_dir=$(mktemp -d -t sim-XXXXXXXXXX)
+ else
+ tmp_dir=$(mktemp -p "$tmp_dir_base" -d -t sim-XXXXXXXXXX)
+ fi
+ tmp_dirs+=("$tmp_dir")
+ run_simulation "$tmp_dir" "$sim_conf_path" "$output_base_path" &
+ child_pids+=("$!")
+ elif [ -d "$sim_conf_path" ]; then
+ for conf in "$sim_conf_path"/*; do
+ if [ ! -f "$conf" ]; then
+ echo "$conf is not a file, skipping..."
+ continue;
+ fi
+
+ if [ -z "$tmp_dir_base" ] ; then
+ tmp_dir=$(mktemp -d -t sim-XXXXXXXXXX)
+ else
+ tmp_dir=$(mktemp -p "$tmp_dir_base" -d -t sim-XXXXXXXXXX)
+ fi
+
+ tmp_dirs+=("$tmp_dir")
+ echo "Running simulation with configuration $conf!"
+ run_simulation "$tmp_dir" "$conf" "$output_base_path" &
+ child_pids+=("$!")
+ done
+ fi
+
+ trap 'cleanup "${child_pids[*]}" "${tmp_dirs[*]}"' EXIT
+ wait
+popd
diff --git a/hadoop-tools/hadoop-dtss/src/main/java/org/apache/hadoop/yarn/dtss/Launcher.java b/hadoop-tools/hadoop-dtss/src/main/java/org/apache/hadoop/yarn/dtss/Launcher.java
new file mode 100644
index 00000000000..6316af04632
--- /dev/null
+++ b/hadoop-tools/hadoop-dtss/src/main/java/org/apache/hadoop/yarn/dtss/Launcher.java
@@ -0,0 +1,256 @@
+package org.apache.hadoop.yarn.dtss;
+
+
+import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
+import com.google.gson.GsonBuilder;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.Module;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.dtss.cluster.ClusterState;
+import org.apache.hadoop.yarn.dtss.cluster.ClusterTopology;
+import org.apache.hadoop.yarn.dtss.cluster.ClusterTopologyBuilder;
+import org.apache.hadoop.yarn.dtss.cluster.NodeRepresentation;
+import org.apache.hadoop.yarn.dtss.config.parameters.IsUnmanaged;
+import org.apache.hadoop.yarn.dtss.lifecycle.LifeCycleState;
+import org.apache.hadoop.yarn.dtss.metrics.CapacitySchedulerWrapper;
+import org.apache.hadoop.yarn.dtss.metrics.FairSchedulerWrapper;
+import org.apache.hadoop.yarn.dtss.metrics.MetricsConfiguration;
+import org.apache.hadoop.yarn.dtss.nm.MockNM;
+import org.apache.hadoop.yarn.dtss.rm.MockRM;
+import org.apache.hadoop.yarn.dtss.time.Clock;
+import org.apache.hadoop.yarn.dtss.trace.TraceReader;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
+
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.UUID;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Launcher class for the discrete event-driven simulation.
+ */
+@InterfaceAudience.Private
+final class Launcher {
+ // How often we should check traces
+ private static final long TRACE_CHECK_SECONDS = 60;
+
+ // How often heartbeats between NMs and RMs should occur
+ private static final long MIN_RM_NM_HEARTBEAT_INTERVAL_SECONDS = 10;
+
+ private static final Logger LOG = Logger.getLogger(Launcher.class.getName());
+
+ private Launcher() {
+ }
+
+ /**
+ * Reads the YARN configuration and start the simulations.
+ * @param configuration The combined simulation configuration
+ */
+ @VisibleForTesting
+ public static void launch(final Module configuration) {
+ final Injector injector = Guice.createInjector(configuration);
+
+ final Configuration yarnConf = injector.getInstance(Configuration.class);
+ final String schedulerClass = yarnConf.get(YarnConfiguration.RM_SCHEDULER);
+ LOG.log(Level.INFO, "RM scheduler class: " + schedulerClass);
+ boolean isUnmanaged = injector.getInstance(Key.get(Boolean.class, IsUnmanaged.class));
+ if (isUnmanaged) {
+ LOG.log(Level.INFO, "Using unmanaged AMs.");
+ } else {
+ LOG.log(Level.INFO, "Using managed AMs.");
+ }
+
+ // Only these schedulers are supported.
+ // Overwrite the scheduler class with the wrapper class.
+ try {
+ if (Class.forName(schedulerClass) == CapacityScheduler.class) {
+ yarnConf.set(YarnConfiguration.RM_SCHEDULER,
+ CapacitySchedulerWrapper.class.getName());
+ } else if(Class.forName(schedulerClass) == FairScheduler.class) {
+ yarnConf.set(YarnConfiguration.RM_SCHEDULER,
+ FairSchedulerWrapper.class.getName());
+ } else {
+ throw new YarnException(schedulerClass + " is not supported yet.");
+ }
+ } catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ final Clock clock = injector.getInstance(Clock.class);
+ // RM is created here
+ final MockRM rm = injector.getInstance(MockRM.class);
+
+ // Initialize the cluster
+ final ClusterTopologyBuilder clusterTopologyBuilder = injector.getInstance(ClusterTopologyBuilder.class);
+ final ClusterState clusterState = injector.getInstance(ClusterState.class);
+ final ClusterTopology clusterTopology;
+ final TraceReader traceReader = injector.getInstance(TraceReader.class);
+
+ try {
+ clusterTopology = clusterTopologyBuilder.getClusterTopology();
+
+ // Start
+ clock.start();
+ rm.start();
+ final UUID nmHeartbeatAlarmId = registerNodes(yarnConf, clusterTopology, clusterState, rm, clock);
+
+ // Start trace-reading and periodic checking
+ traceReader.init();
+ traceReader.start();
+
+ registerTraceCheckAlarm(clock, traceReader, clusterState, nmHeartbeatAlarmId);
+
+ LifeCycleState clockState = clock.pollNextAlarm();
+
+ // Run until either no more jobs/events, or until the end time is reached.
+ while (shouldContinueRunning(clockState)) {
+ clockState = clock.pollNextAlarm();
+ }
+
+ traceReader.onStop();
+ } catch (final Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ } finally {
+ traceReader.stop();
+ rm.stop();
+ clock.stop();
+ }
+
+ // Write metric results
+ // By now, the metrics manager has already stopped
+ if (rm.getMetricsManager().isMetricsOn()) {
+ assert rm.getMetricsManager().getState() == LifeCycleState.STOPPED;
+ rm.getMetricsManager().writeResults();
+ final CapacitySchedulerConfiguration capacitySchedulerConf;
+ if (rm.getResourceScheduler() instanceof CapacityScheduler) {
+ capacitySchedulerConf = ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
+ } else {
+ capacitySchedulerConf = null;
+ }
+ try {
+ writeConfigs(rm.getMetricsManager().getMetricsConfig(), yarnConf, capacitySchedulerConf, configuration);
+ } catch (final IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ }
+ }
+
+ /**
+ * Registers a periodic alarm on the clock that periodically checks
+ * if all jobs in the trace are completed by checking with the {@link TraceReader}.
+ * Unregister alarms if completed.
+ * @param clock The clock
+ * @param traceReader The trace reader
+ * @param clusterState The current cluster state to unregister NMs
+ * @param nmHeartbeatAlarmId The heartbeat ID for periodic NM-RM heartbeats
+ */
+ private static void registerTraceCheckAlarm(final Clock clock,
+ final TraceReader traceReader,
+ final ClusterState clusterState,
+ final UUID nmHeartbeatAlarmId) {
+ clock.schedulePeriodicAlarm(TRACE_CHECK_SECONDS, periodicClientAlarm -> {
+ if (traceReader.areTraceJobsDone()) {
+ LOG.log(Level.INFO, "Traces completed at " + clock.getInstant() + "!");
+ clock.unschedulePeriodicAlarm(nmHeartbeatAlarmId);
+ clock.unschedulePeriodicAlarm(periodicClientAlarm.getPeriodicAlarmId());
+ for (final MockNM nm : clusterState.getNMs()) {
+ try {
+ nm.unRegisterNode();
+ } catch(final Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ clock.scheduleShutdown(1);
+ }
+ });
+ }
+
+ /**
+ * Registers NMs wth the RM and schedule periodic heartbeats for each NM.
+ * @param conf The YARN configuration
+ * @param clusterTopology The cluster topology
+ * @param clusterState The state of the cluster
+ * @param rm The RM
+ * @param clock The discrete-event clock
+ * @return The periodic heartbeat alarm ID
+ * @throws Exception
+ */
+ private static UUID registerNodes(final Configuration conf,
+ final ClusterTopology clusterTopology,
+ final ClusterState clusterState,
+ final MockRM rm,
+ final Clock clock) throws Exception {
+ final long heartbeatIntervalMs = conf.getLong(
+ YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);
+
+ final long heartbeatIntervalSeconds = Math.max(
+ MIN_RM_NM_HEARTBEAT_INTERVAL_SECONDS, (long) Math.ceil(heartbeatIntervalMs / 1000.0));
+
+ for (final NodeRepresentation nodeRepresentation : clusterTopology.getNodeSet()) {
+ final MockNM nm = new MockNM(nodeRepresentation.getNodeHostName(),
+ nodeRepresentation.getContainersOnNode() * clusterTopology.getContainerMB(),
+ nodeRepresentation.getContainersOnNode() * clusterTopology.getContainerVCores(),
+ rm.getResourceTrackerService());
+
+ clusterState.addNM(nm);
+ nm.registerNode();
+ nm.nodeHeartbeat(true);
+ }
+
+ LOG.log(Level.INFO, "NM heartbeats every " + heartbeatIntervalSeconds + " seconds.");
+
+ return clock.schedulePeriodicAlarm(heartbeatIntervalSeconds, alarm -> {
+ clusterState.heartbeatAll();
+ });
+ }
+
+ private static boolean shouldContinueRunning(final LifeCycleState clockState) {
+ return clockState != LifeCycleState.STOPPED;
+ }
+
+ /**
+ * Write the new configs used for running simulations for diagnostic purposes.
+ * @param metricsConfig The metric configuration
+ * @param yarnConf The YARN configuration
+ * @param capacitySchedulerConf The CapacityScheduler configuration
+ * @param module the simulation configuration
+ * @throws IOException
+ */
+ private static void writeConfigs(
+ final MetricsConfiguration metricsConfig,
+ final Configuration yarnConf,
+ final CapacitySchedulerConfiguration capacitySchedulerConf,
+ final Module module) throws IOException {
+ final String yarnConfPath = metricsConfig.getMetricsFilePath("yarn-conf.xml");
+ assert yarnConfPath != null;
+ try (final FileWriter fw = new FileWriter(yarnConfPath)) {
+ yarnConf.writeXml(fw);
+ }
+
+ if (capacitySchedulerConf != null) {
+ final String capacityConfPath = metricsConfig.getMetricsFilePath("capacity-scheduler.xml");
+ assert capacityConfPath != null;
+ try (final FileWriter fw = new FileWriter(capacityConfPath)) {
+ capacitySchedulerConf.writeXml(fw);
+ }
+ }
+
+ final String simConfPath = metricsConfig.getMetricsFilePath("sim-conf.json");
+ assert simConfPath != null;
+ try (final FileWriter writer = new FileWriter(simConfPath)) {
+ new GsonBuilder().setPrettyPrinting().create().toJson(module, writer);
+ }
+ }
+}
diff --git a/hadoop-tools/hadoop-dtss/src/main/java/org/apache/hadoop/yarn/dtss/Runner.java b/hadoop-tools/hadoop-dtss/src/main/java/org/apache/hadoop/yarn/dtss/Runner.java
new file mode 100644
index 00000000000..6e985fb7fb9
--- /dev/null
+++ b/hadoop-tools/hadoop-dtss/src/main/java/org/apache/hadoop/yarn/dtss/Runner.java
@@ -0,0 +1,157 @@
+package org.apache.hadoop.yarn.dtss;
+
+import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
+import com.google.inject.AbstractModule;
+import com.google.inject.util.Modules;
+import org.apache.commons.cli.*;
+import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.dtss.config.SimulatorModule;
+import org.apache.hadoop.yarn.dtss.config.parameters.runner.RunId;
+import org.apache.hadoop.yarn.dtss.config.parameters.runner.SimulationConfigPath;
+import org.apache.log4j.BasicConfigurator;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.logging.Handler;
+import java.util.logging.Level;
+import java.util.logging.LogManager;
+import java.util.logging.Logger;
+
+/**
+ * Houses the main class to {@link Launcher}, which begins the simulation.
+ */
+@InterfaceAudience.Private
+public final class Runner {
+ private static final Logger LOG = Logger.getLogger(Runner.class.getName());
+ private static String runId = DateTimeFormatter.ofPattern(
+ "yyyy-MM-dd-HH-mm-ss").format(LocalDateTime.now());
+
+ @VisibleForTesting
+ public static final class RunnerModule extends AbstractModule {
+ private final String configPath;
+ private final String runId;
+
+ @VisibleForTesting
+ public static RunnerModule newModule(final String configPath, final String runId) {
+ return new RunnerModule(configPath, runId);
+ }
+
+ private RunnerModule(final String configPath, final String runId) {
+ this.configPath = configPath;
+ this.runId = runId;
+ }
+
+ @Override
+ protected void configure() {
+ bind(String.class).annotatedWith(SimulationConfigPath.class).toInstance(configPath);
+ bind(String.class).annotatedWith(RunId.class).toInstance(runId);
+ }
+ }
+
+ private static Options commandLineOptions() {
+ final Options options = new Options();
+ options
+ .addOption("i", "id", true, "The Run ID of the job.")
+ .addOption("m", "metrics", true, "The metrics output directory");
+
+ return options;
+ }
+
+ @InterfaceStability.Unstable
+ public static void runSimulator(
+ final SimulatorModule simulatorModule, final RunnerModule runnerModule){
+ LOG.log(Level.INFO, String.format(
+ "Read configuration\n%s",
+ ReflectionToStringBuilder.toString(simulatorModule, ToStringStyle.MULTI_LINE_STYLE)
+ ));
+
+ final Level logLevel = simulatorModule.getSimulatorLogLevel();
+ LOG.log(Level.INFO, "Log level set to " + logLevel + ".");
+
+ setSimulatorLogLevel(logLevel);
+
+ Launcher.launch(Modules.combine(simulatorModule, runnerModule));
+ }
+
+ public static void main(final String[] args) {
+ // Sets the Apache logger level
+ org.apache.log4j.Logger.getRootLogger().setLevel(org.apache.log4j.Level.WARN);
+ BasicConfigurator.configure();
+
+ final String configPath;
+ final String metricsOutputDirectory;
+ final CommandLineParser parser = new BasicParser();
+ try {
+ final CommandLine cmd = parser.parse(commandLineOptions(), args);
+
+ if (cmd.hasOption('i')) {
+ runId = cmd.getOptionValue('i');
+ if (runId == null || runId.trim().isEmpty()) {
+ throw new IllegalArgumentException("Run ID cannot be set to empty!");
+ }
+ }
+
+ if (cmd.hasOption('m')) {
+ metricsOutputDirectory = cmd.getOptionValue('m');
+ if (metricsOutputDirectory == null || metricsOutputDirectory.trim().isEmpty()){
+ throw new IllegalArgumentException("Metrics output directory cannot be empty!");
+ }
+ } else {
+ metricsOutputDirectory = null;
+ }
+
+ final String[] restOfArgs = cmd.getArgs();
+
+ assert restOfArgs.length == 1 : "Should only have one command line argument holding " +
+ "the path to the configuration file.";
+ configPath = restOfArgs[0];
+ } catch (final ParseException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+
+ LOG.log(Level.INFO, "Running simulation with ID " + runId + ".");
+ LOG.log(Level.INFO, String.format("Reading configuration from %s", configPath));
+
+ final SimulatorModule simulatorModule;
+ try {
+ simulatorModule = SimulatorModule.newReader().readConfiguration(configPath);
+ if (metricsOutputDirectory != null) {
+ LOG.log(Level.INFO, "Overwriting metrics output directory to " + metricsOutputDirectory);
+ simulatorModule.setMetricsOutputDirectory(metricsOutputDirectory);
+ }
+ } catch (final IOException e) {
+ LOG.log(
+ Level.SEVERE,
+ () -> String.format("Unable to read simulation configuration at %s", configPath)
+ );
+
+ throw new RuntimeException(e);
+ }
+
+ final RunnerModule runnerModule = new RunnerModule(configPath, runId);
+ try {
+ runSimulator(simulatorModule, runnerModule);
+ } catch (final AssertionError assertionError) {
+ LOG.log(Level.SEVERE,
+ "Assertion failed with message: " + assertionError);
+ throw assertionError;
+ } catch (final Throwable e) {
+ LOG.log(Level.SEVERE,
+ "Simulation failed with throwable: " + e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static void setSimulatorLogLevel(final Level logLevel) {
+ final Logger rootLogger = LogManager.getLogManager().getLogger("");
+ rootLogger.setLevel(logLevel);
+ for (final Handler h : rootLogger.getHandlers()) {
+ h.setLevel(logLevel);
+ }
+ }
+}
diff --git a/hadoop-tools/hadoop-dtss/src/main/java/org/apache/hadoop/yarn/dtss/am/MockAM.java b/hadoop-tools/hadoop-dtss/src/main/java/org/apache/hadoop/yarn/dtss/am/MockAM.java
new file mode 100644
index 00000000000..0fa0c85b7b3
--- /dev/null
+++ b/hadoop-tools/hadoop-dtss/src/main/java/org/apache/hadoop/yarn/dtss/am/MockAM.java
@@ -0,0 +1,453 @@
+package org.apache.hadoop.yarn.dtss.am;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.*;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
+import org.apache.hadoop.yarn.dtss.nm.MockNM;
+import org.apache.hadoop.yarn.dtss.rm.MockRM;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.log4j.Logger;
+
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.PrivilegedExceptionAction;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * This class is based on the MockAM class used in the tests
+ * for YARN's resourcemanager project.
+ */
+public final class MockAM {
+
+ private static final Logger LOG = Logger.getLogger(MockAM.class);
+
+ private final AtomicLong allocationRequestId = new AtomicLong();
+ private volatile int responseId = 0;
+ private final ApplicationAttemptId attemptId;
+ private RMContext context;
+ private ApplicationMasterProtocol amRMProtocol;
+ private UserGroupInformation ugi;
+ private volatile AllocateResponse lastResponse;
+ private Map, PlacementConstraint> placementConstraints =
+ new HashMap<>();
+ private List schedulingRequests = new ArrayList<>();
+
+ private final List requests = new ArrayList();
+ private final List releases = new ArrayList();
+
+ public MockAM(RMContext context, ApplicationMasterProtocol amRMProtocol,
+ ApplicationAttemptId attemptId) {
+ this.context = context;
+ this.amRMProtocol = amRMProtocol;
+ this.attemptId = attemptId;
+ }
+
+ public void setAMRMProtocol(ApplicationMasterProtocol amRMProtocol,
+ RMContext context) {
+ this.context = context;
+ this.amRMProtocol = amRMProtocol;
+ }
+
+ /**
+ * Wait until an attempt has reached a specified state.
+ * The timeout is 40 seconds.
+ * @param finalState the attempt state waited
+ * @throws InterruptedException
+ * if interrupted while waiting for the state transition
+ */
+ private void waitForState(RMAppAttemptState finalState)
+ throws InterruptedException {
+ RMApp app = context.getRMApps().get(attemptId.getApplicationId());
+ RMAppAttempt attempt = app.getRMAppAttempt(attemptId);
+ MockRM.waitForState(attempt, finalState);
+ }
+
+ public RegisterApplicationMasterResponse registerAppAttempt()
+ throws Exception {
+ return registerAppAttempt(true);
+ }
+
+ public void addPlacementConstraint(Set tags,
+ PlacementConstraint constraint) {
+ placementConstraints.put(tags, constraint);
+ }
+
+ public MockAM addSchedulingRequest(List reqs) {
+ schedulingRequests.addAll(reqs);
+ return this;
+ }
+
+ public RegisterApplicationMasterResponse registerAppAttempt(boolean wait)
+ throws Exception {
+ if (wait) {
+ waitForState(RMAppAttemptState.LAUNCHED);
+ }
+ responseId = 0;
+ final RegisterApplicationMasterRequest req =
+ Records.newRecord(RegisterApplicationMasterRequest.class);
+ req.setHost("");
+ req.setRpcPort(1);
+ req.setTrackingUrl("");
+ if (!placementConstraints.isEmpty()) {
+ req.setPlacementConstraints(this.placementConstraints);
+ }
+ if (ugi == null) {
+ ugi = UserGroupInformation.createRemoteUser(
+ attemptId.toString());
+ Token token =
+ context.getRMApps().get(attemptId.getApplicationId())
+ .getRMAppAttempt(attemptId).getAMRMToken();
+ ugi.addTokenIdentifier(token.decodeIdentifier());
+ }
+ try {
+ return ugi
+ .doAs(
+ (PrivilegedExceptionAction) () ->
+ amRMProtocol.registerApplicationMaster(req));
+ } catch (UndeclaredThrowableException e) {
+ throw (Exception) e.getCause();
+ }
+ }
+
+ public int getResponseId() {
+ return responseId;
+ }
+
+ public void addRequests(String[] hosts, int memory, int priority,
+ int containers) throws Exception {
+ requests.addAll(createReq(hosts, memory, priority, containers));
+ }
+
+ public AllocateResponse schedule() throws Exception {
+ AllocateResponse response = allocate(requests, releases);
+ requests.clear();
+ releases.clear();
+ return response;
+ }
+
+ public void addContainerToBeReleased(ContainerId containerId) {
+ releases.add(containerId);
+ }
+
+ public AllocateResponse allocate(
+ String host, int memory, int numContainers,
+ List releases) throws Exception {
+ return allocate(host, memory, numContainers, releases, null);
+ }
+
+ public AllocateResponse allocate(
+ String host, int memory, int numContainers,
+ List releases, String labelExpression) throws Exception {
+ return allocate(host, memory, numContainers, 1, releases, labelExpression);
+ }
+
+ public AllocateResponse allocate(
+ String host, int memory, int numContainers, int priority,
+ List releases, String labelExpression) throws Exception {
+ List reqs =
+ createReq(new String[] { host }, memory, priority, numContainers, labelExpression);
+ return allocate(reqs, releases);
+ }
+
+ public AllocateResponse allocate(
+ String host, Resource cap, int numContainers,
+ List rels, String labelExpression) throws Exception {
+ List reqs = new ArrayList<>();
+ ResourceRequest oneReq =
+ createResourceReq(host, cap, numContainers,
+ labelExpression);
+ reqs.add(oneReq);
+ return allocate(reqs, rels);
+ }
+
+ public List createReq(String[] hosts, int memory,
+ int priority, int containers) throws Exception {
+ return createReq(hosts, memory, priority, containers, null);
+ }
+
+ public List createReq(String[] hosts, int memory,
+ int priority, int containers, String labelExpression) throws Exception {
+ List reqs = new ArrayList();
+ if (hosts != null) {
+ for (String host : hosts) {
+ final long allocId = allocationRequestId.getAndIncrement();
+ // only add host/rack request when asked host isn't ANY
+ if (!host.equals(ResourceRequest.ANY)) {
+ ResourceRequest hostReq =
+ createResourceReq(host, memory, priority, containers,
+ labelExpression);
+ hostReq.setAllocationRequestId(allocId);
+ reqs.add(hostReq);
+ ResourceRequest rackReq =
+ createResourceReq("/default-rack", memory, priority, containers,
+ labelExpression);
+ rackReq.setAllocationRequestId(allocId);
+ reqs.add(rackReq);
+ }
+ }
+ }
+
+ ResourceRequest offRackReq = createResourceReq(ResourceRequest.ANY, memory,
+ priority, containers, labelExpression);
+ offRackReq.setAllocationRequestId(allocationRequestId.getAndIncrement());
+ reqs.add(offRackReq);
+ return reqs;
+ }
+
+ public ResourceRequest createResourceReq(String resource, int memory, int priority,
+ int containers) throws Exception {
+ return createResourceReq(resource, memory, priority, containers, null);
+ }
+
+ public ResourceRequest createResourceReq(String resource, int memory,
+ int priority, int containers, String labelExpression) throws Exception {
+ return createResourceReq(resource, memory, priority, containers,
+ labelExpression, ExecutionTypeRequest.newInstance());
+ }
+
+ public ResourceRequest createResourceReq(String resource, int memory,
+ int priority, int containers, String labelExpression,
+ ExecutionTypeRequest executionTypeRequest) throws Exception {
+ ResourceRequest req = Records.newRecord(ResourceRequest.class);
+ req.setResourceName(resource);
+ req.setNumContainers(containers);
+ Priority pri = Records.newRecord(Priority.class);
+ pri.setPriority(priority);
+ req.setPriority(pri);
+ Resource capability = Records.newRecord(Resource.class);
+ capability.setMemorySize(memory);
+ req.setCapability(capability);
+ if (labelExpression != null) {
+ req.setNodeLabelExpression(labelExpression);
+ }
+ req.setExecutionTypeRequest(executionTypeRequest);
+ return req;
+
+ }
+
+ public ResourceRequest createResourceReq(String host, Resource cap,
+ int containers, String labelExpression) throws Exception {
+ ResourceRequest req = Records.newRecord(ResourceRequest.class);
+ req.setResourceName(host);
+ req.setNumContainers(containers);
+ Priority pri = Records.newRecord(Priority.class);
+ pri.setPriority(1);
+ req.setPriority(pri);
+ req.setCapability(cap);
+ if (labelExpression != null) {
+ req.setNodeLabelExpression(labelExpression);
+ }
+ req.setExecutionTypeRequest(ExecutionTypeRequest.newInstance());
+ return req;
+
+ }
+
+ public AllocateResponse allocate(
+ List resourceRequest, List releases)
+ throws Exception {
+ final AllocateRequest req =
+ AllocateRequest.newInstance(0, 0F, resourceRequest,
+ releases, null);
+ if (!schedulingRequests.isEmpty()) {
+ req.setSchedulingRequests(schedulingRequests);
+ schedulingRequests.clear();
+ }
+ return allocate(req);
+ }
+
+ public AllocateResponse allocate(List resourceRequest,
+ List newSchedulingRequests, List releases)
+ throws Exception {
+ final AllocateRequest req =
+ AllocateRequest.newInstance(0, 0F, resourceRequest,
+ releases, null);
+ if (newSchedulingRequests != null) {
+ addSchedulingRequest(newSchedulingRequests);
+ }
+ if (!schedulingRequests.isEmpty()) {
+ req.setSchedulingRequests(schedulingRequests);
+ schedulingRequests.clear();
+ }
+ return allocate(req);
+ }
+
+ public AllocateResponse allocateIntraAppAntiAffinity(
+ ResourceSizing resourceSizing, Priority priority, long allocationId,
+ Set allocationTags, String... targetTags) throws Exception {
+ return allocateAppAntiAffinity(resourceSizing, priority, allocationId,
+ null, allocationTags, targetTags);
+ }
+
+ public AllocateResponse allocateAppAntiAffinity(
+ ResourceSizing resourceSizing, Priority priority, long allocationId,
+ String namespace, Set allocationTags, String... targetTags)
+ throws Exception {
+ return this.allocate(null,
+ Arrays.asList(SchedulingRequest.newBuilder().executionType(
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+ .allocationRequestId(allocationId).priority(priority)
+ .allocationTags(allocationTags).placementConstraintExpression(
+ PlacementConstraints
+ .targetNotIn(PlacementConstraints.NODE,
+ PlacementConstraints.PlacementTargets
+ .allocationTagWithNamespace(namespace, targetTags))
+ .build())
+ .resourceSizing(resourceSizing).build()), null);
+ }
+
+ public AllocateResponse allocateIntraAppAntiAffinity(
+ String nodePartition, ResourceSizing resourceSizing, Priority priority,
+ long allocationId, String... tags) throws Exception {
+ return this.allocate(null,
+ Arrays.asList(SchedulingRequest.newBuilder().executionType(
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+ .allocationRequestId(allocationId).priority(priority)
+ .placementConstraintExpression(PlacementConstraints
+ .targetNotIn(PlacementConstraints.NODE,
+ PlacementConstraints.PlacementTargets
+ .allocationTag(tags),
+ PlacementConstraints.PlacementTargets
+ .nodePartition(nodePartition)).build())
+ .resourceSizing(resourceSizing).build()), null);
+ }
+
+ public AllocateResponse sendContainerResizingRequest(
+ List updateRequests) throws Exception {
+ final AllocateRequest req = AllocateRequest.newInstance(0, 0F, null, null,
+ updateRequests, null);
+ return allocate(req);
+ }
+
+ public AllocateResponse sendContainerUpdateRequest(
+ List updateRequests) throws Exception {
+ final AllocateRequest req = AllocateRequest.newInstance(0, 0F, null, null,
+ updateRequests, null);
+ return allocate(req);
+ }
+
+ public AllocateResponse allocate(AllocateRequest allocateRequest)
+ throws Exception {
+ UserGroupInformation ugi =
+ UserGroupInformation.createRemoteUser(attemptId.toString());
+ Token token =
+ context.getRMApps().get(attemptId.getApplicationId())
+ .getRMAppAttempt(attemptId).getAMRMToken();
+ ugi.addTokenIdentifier(token.decodeIdentifier());
+ lastResponse = doAllocateAs(ugi, allocateRequest);
+ return lastResponse;
+ }
+
+ public AllocateResponse doAllocateAs(UserGroupInformation ugi,
+ final AllocateRequest req) throws Exception {
+ req.setResponseId(responseId);
+ try {
+ AllocateResponse response =
+ ugi.doAs(new PrivilegedExceptionAction() {
+ @Override
+ public AllocateResponse run() throws Exception {
+ return amRMProtocol.allocate(req);
+ }
+ });
+ responseId = response.getResponseId();
+ return response;
+ } catch (UndeclaredThrowableException e) {
+ throw (Exception) e.getCause();
+ }
+ }
+
+ public AllocateResponse doHeartbeat() throws Exception {
+ return allocate(Collections.emptyList(), Collections.emptyList());
+ }
+
+ public void unregisterAppAttempt() throws Exception {
+ waitForState(RMAppAttemptState.RUNNING);
+ unregisterAppAttempt(true);
+ }
+
+ public void unregisterAppAttempt(boolean waitForStateRunning)
+ throws Exception {
+ final FinishApplicationMasterRequest req =
+ FinishApplicationMasterRequest.newInstance(
+ FinalApplicationStatus.SUCCEEDED, "", "");
+ unregisterAppAttempt(req, waitForStateRunning);
+ }
+
+ public void unregisterAppAttempt(final FinishApplicationMasterRequest req,
+ boolean waitForStateRunning) throws Exception {
+ if (waitForStateRunning) {
+ waitForState(RMAppAttemptState.RUNNING);
+ }
+ if (ugi == null) {
+ ugi = UserGroupInformation.createRemoteUser(attemptId.toString());
+ Token token =
+ context.getRMApps()
+ .get(attemptId.getApplicationId())
+ .getRMAppAttempt(attemptId).getAMRMToken();
+ ugi.addTokenIdentifier(token.decodeIdentifier());
+ }
+ try {
+ ugi.doAs(new PrivilegedExceptionAction