diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index bea08ee..f51d3bb 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -385,6 +385,7 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { // a symbolic name to reference in the Hive source code. Properties with non-null // values will override any values set in the underlying Hadoop configuration. HADOOPBIN("hadoop.bin.path", findHadoopBinary(), "", true), + YARNBIN("yarn.bin.path", findYarnBinary(), "", true), HIVE_FS_HAR_IMPL("fs.har.impl", "org.apache.hadoop.hive.shims.HiveHarFileSystem", "The implementation for accessing Hadoop Archives. Note that this won't be applicable to Hadoop versions less than 0.20"), MAPREDMAXSPLITSIZE(FileInputFormat.SPLIT_MAXSIZE, 256000000L, "", true), @@ -2559,16 +2560,27 @@ public String toString() { } private static String findHadoopBinary() { + String val = findHadoopHome(); + // if can't find hadoop home we can at least try /usr/bin/hadoop + val = (val == null ? File.separator + "usr" : val) + + File.separator + "bin" + File.separator + "hadoop"; + // Launch hadoop command file on windows. + return val + (Shell.WINDOWS ? ".cmd" : ""); + } + + private static String findYarnBinary() { + String val = findHadoopHome(); + val = (val == null ? "yarn" : val + File.separator + "bin" + File.separator + "yarn"); + return val + (Shell.WINDOWS ? ".cmd" : ""); + } + + private static String findHadoopHome() { String val = System.getenv("HADOOP_HOME"); // In Hadoop 1.X and Hadoop 2.X HADOOP_HOME is gone and replaced with HADOOP_PREFIX if (val == null) { val = System.getenv("HADOOP_PREFIX"); } - // and if all else fails we can at least try /usr/bin/hadoop - val = (val == null ? File.separator + "usr" : val) - + File.separator + "bin" + File.separator + "hadoop"; - // Launch hadoop command file on windows. - return val + (Shell.WINDOWS ? ".cmd" : ""); + return val; } public String getDefaultValue() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java index fb0498a..6990e80 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java @@ -34,10 +34,12 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor { private RemoteSparkJobStatus sparkJobStatus; + private final HiveConf hiveConf; public RemoteSparkJobMonitor(HiveConf hiveConf, RemoteSparkJobStatus sparkJobStatus) { super(hiveConf); this.sparkJobStatus = sparkJobStatus; + this.hiveConf = hiveConf; } @Override @@ -77,6 +79,7 @@ public int startMonitor() { Map progressMap = sparkJobStatus.getSparkStageProgress(); if (!running) { perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING); + printAppInfo(); // print job stages. console.printInfo("\nQuery Hive on Spark job[" + sparkJobStatus.getJobId() + "] stages:"); @@ -137,4 +140,16 @@ public int startMonitor() { perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_RUN_JOB); return rc; } + + private void printAppInfo() { + String sparkMaster = hiveConf.get("spark.master"); + if (sparkMaster != null && sparkMaster.startsWith("yarn")) { + String appID = sparkJobStatus.getAppID(); + if (appID != null) { + console.printInfo("Running with YARN Application = " + appID); + console.printInfo("Kill Command = " + + HiveConf.getVar(hiveConf, HiveConf.ConfVars.YARNBIN) + " application -kill " + appID); + } + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java index fa45ec8..7959089 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java @@ -29,6 +29,8 @@ */ public interface SparkJobStatus { + String getAppID(); + int getJobId(); JobExecutionStatus getState() throws HiveException; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java index ebc5c16..3c15521 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java @@ -66,6 +66,11 @@ public LocalSparkJobStatus(JavaSparkContext sparkContext, int jobId, } @Override + public String getAppID() { + return sparkContext.sc().applicationId(); + } + + @Override public int getJobId() { return jobId; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java index e8d581f..d84c026 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java @@ -62,6 +62,17 @@ public RemoteSparkJobStatus(SparkClient sparkClient, JobHandle job } @Override + public String getAppID() { + Future getAppID = sparkClient.run(new GetAppIDJob()); + try { + return getAppID.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS); + } catch (Exception e) { + LOG.warn("Failed to get APP ID.", e); + return null; + } + } + + @Override public int getJobId() { return jobHandle.getSparkJobIds().size() == 1 ? jobHandle.getSparkJobIds().get(0) : -1; } @@ -268,4 +279,15 @@ public JobExecutionStatus status() { } }; } + + private static class GetAppIDJob implements Job { + + public GetAppIDJob() { + } + + @Override + public String call(JobContext jc) throws Exception { + return jc.sc().sc().applicationId(); + } + } }