commit f3e51447ec138c84f00805b2bf3ebe5d1fc3a313 Author: Sahil Takiar Date: Fri Oct 13 13:15:26 2017 -0700 HIVE-16601: Display Session Id, Query Name / Id, and Dag Id in Spark UI diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index b4ee50ac08..e4189ff619 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1509,7 +1509,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Whether to grant access to the hs2/hive user for queries"), HIVEQUERYNAME ("hive.query.name", null, "This named is used by Tez to set the dag name. This name in turn will appear on \n" + - "the Tez UI representing the work that was done."), + "the Tez UI representing the work that was done. Used by Spark to set the query name, will show up in the\n" + + "Spark UI."), HIVEOPTIMIZEBUCKETINGSORTING("hive.optimize.bucketingsorting", true, "Don't create a reducer for enforcing \n" + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index cbf0561dfe..396fc2aa0e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -2217,7 +2217,7 @@ private TaskRunner launchTask(Task tsk, String queryId, } if (tsk.isMapRedTask() && !(tsk instanceof ConditionalTask)) { if (noName) { - conf.set(MRJobConfig.JOB_NAME, jobname + "(" + tsk.getId() + ")"); + conf.set(MRJobConfig.JOB_NAME, jobname + " (" + tsk.getId() + ")"); } conf.set("mapreduce.workflow.node.name", tsk.getId()); Utilities.setWorkflowAdjacencies(conf, plan); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DagUtils.java new file mode 100644 index 0000000000..97c494b9e6 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DagUtils.java @@ -0,0 +1,20 @@ +package org.apache.hadoop.hive.ql.exec; + +import com.google.common.base.Strings; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.mapreduce.MRJobConfig; + + +public class DagUtils { + + public static String getQueryName(Configuration conf) { + String name = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYNAME); + if (Strings.isNullOrEmpty(name)) { + return conf.get(MRJobConfig.JOB_NAME); + } else { + return name + " (" + conf.get("mapreduce.workflow.node.name") + ")"; + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java index 194585e0f0..83578778c8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java @@ -61,8 +61,9 @@ private static final String SPARK_WAIT_APP_COMPLETE = "spark.yarn.submit.waitAppCompletion"; private static final String SPARK_DEPLOY_MODE = "spark.submit.deployMode"; - public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Exception { - Map sparkConf = initiateSparkConf(hiveconf); + public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf, String sessionId) throws Exception { + Map sparkConf = initiateSparkConf(hiveconf, sessionId); + // Submit spark job through local spark context while spark master is local mode, otherwise submit // spark job through remote spark context. String master = sparkConf.get("spark.master"); @@ -74,7 +75,7 @@ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Ex } } - public static Map initiateSparkConf(HiveConf hiveConf) { + public static Map initiateSparkConf(HiveConf hiveConf, String sessionId) { Map sparkConf = new HashMap(); HBaseConfiguration.addHbaseResources(hiveConf); @@ -82,8 +83,15 @@ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Ex sparkConf.put("spark.master", SPARK_DEFAULT_MASTER); final String appNameKey = "spark.app.name"; String appName = hiveConf.get(appNameKey); + final String sessionIdString = " (sessionId = " + sessionId + ")"; if (appName == null) { - appName = SPARK_DEFAULT_APP_NAME; + if (sessionId == null) { + appName = SPARK_DEFAULT_APP_NAME; + } else { + appName = SPARK_DEFAULT_APP_NAME + sessionIdString; + } + } else { + appName = appName + sessionIdString; } sparkConf.put(appNameKey, appName); sparkConf.put("spark.serializer", SPARK_DEFAULT_SERIALIZER); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index 102e41b9c4..3f72d2cc52 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -33,6 +33,7 @@ import java.util.concurrent.TimeoutException; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -44,6 +45,7 @@ import org.apache.hadoop.hive.conf.HiveConfUtil; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.exec.DagUtils; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobRef; @@ -350,6 +352,8 @@ public Serializable call(JobContext jc) throws Exception { new SparkPlanGenerator(jc.sc(), null, localJobConf, localScratchDir, sparkReporter); SparkPlan plan = gen.generate(localSparkWork); + jc.sc().setJobGroup(localSparkWork.getQueryId(), DagUtils.getQueryName(localJobConf)); + // Execute generated plan. JavaPairRDD finalRDD = plan.generateGraph(); // We use Spark RDD async action to submit job as it's the only way to get jobId now. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java index ca19fd013c..ed1da27fa3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java @@ -183,7 +183,7 @@ public static String getWorkId(BaseWork work) { public static SparkTask createSparkTask(HiveConf conf) { return (SparkTask) TaskFactory.get( - new SparkWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID)), conf); + new SparkWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID), conf), conf); } public static SparkTask createSparkTask(SparkWork work, HiveConf conf) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java index 54d2ceca6e..608b040634 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java @@ -60,7 +60,7 @@ public void open(HiveConf conf) throws HiveException { this.conf = conf; isOpen = true; try { - hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf); + hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf, sessionId); } catch (Throwable e) { // It's possible that user session is closed while creating Spark client. String msg = isOpen ? "Failed to create Spark client for Spark session " + sessionId : diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java index 3c2f0e6e96..7722a0addd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java @@ -79,7 +79,7 @@ public void setup(HiveConf hiveConf) throws HiveException { synchronized (this) { if (!inited) { LOG.info("Setting up the session manager."); - Map conf = HiveSparkClientFactory.initiateSparkConf(hiveConf); + Map conf = HiveSparkClientFactory.initiateSparkConf(hiveConf, null); try { SparkClientFactory.initialize(conf); inited = true; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index dbf4b8da01..56eb30122a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -1337,7 +1337,7 @@ public static void createMRWorkForMergingFiles (FileSinkOperator fsInput, cplan.setName("File Merge"); ((TezWork) work).add(cplan); } else if (conf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { - work = new SparkWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID)); + work = new SparkWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID), conf); cplan.setName("Spark Merge File Work"); ((SparkWork) work).add(cplan); } else { @@ -1350,7 +1350,7 @@ public static void createMRWorkForMergingFiles (FileSinkOperator fsInput, cplan.setName("File Merge"); ((TezWork)work).add(cplan); } else if (conf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { - work = new SparkWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID)); + work = new SparkWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID), conf); cplan.setName("Spark Merge File Work"); ((SparkWork) work).add(cplan); } else { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java index 4e31ba6743..60c830da9b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java @@ -222,7 +222,7 @@ public static void processSkewJoin(JoinOperator joinOp, Task skewJoinMapJoinTask = TaskFactory.get(sparkWork, hiveConf); skewJoinMapJoinTask.setFetchSource(currTask.isFetchSource()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java index 628726b1ac..f950e68df7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java @@ -135,7 +135,7 @@ private void moveWork(SparkWork sparkWork, BaseWork work, SparkWork targetWork) } else { // Create a new SparkWork for all the small tables of this work SparkWork parentWork = - new SparkWork(physicalContext.conf.getVar(HiveConf.ConfVars.HIVEQUERYID)); + new SparkWork(physicalContext.conf.getVar(HiveConf.ConfVars.HIVEQUERYID), physicalContext.conf); // copy cloneToWork to ensure RDD cache still works parentWork.setCloneToWork(sparkWork.getCloneToWork()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java index e7633025c6..28b44d1f48 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java @@ -115,7 +115,7 @@ private static void splitTask(SparkTask currentTask, ReduceWork reduceWork, currentWork.disconnect(reduceWork, childWork); // move works following the current reduce work into a new spark work SparkWork newWork = - new SparkWork(parseContext.getConf().getVar(HiveConf.ConfVars.HIVEQUERYID)); + new SparkWork(parseContext.getConf().getVar(HiveConf.ConfVars.HIVEQUERYID), parseContext.getConf()); newWork.add(childWork); copyWorkGraph(currentWork, newWork, childWork); // remove them from current spark work diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java index fda7080493..4356dc4dd5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java @@ -34,6 +34,9 @@ import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; + +import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.hive.ql.plan.Explain.Vectorization; import org.apache.hadoop.hive.ql.plan.Explain.Level; @@ -50,7 +53,8 @@ public class SparkWork extends AbstractOperatorDesc { private static final AtomicInteger counter = new AtomicInteger(1); - private final String name; + private final String dagName; + private final String queryId; private final Set roots = new LinkedHashSet(); private final Set leaves = new LinkedHashSet<>(); @@ -66,15 +70,23 @@ private Map cloneToWork; - public SparkWork(String name) { - this.name = name + ":" + counter.getAndIncrement(); - cloneToWork = new HashMap(); + public SparkWork(String queryId) { + this(queryId, null); } + public SparkWork(String queryId, Configuration conf) { + this.queryId = queryId; + this.dagName = queryId + ":" + counter.getAndIncrement(); + cloneToWork = new HashMap(); + } @Explain(displayName = "DagName") public String getName() { - return name; + return this.dagName; + } + + public String getQueryId() { + return this.queryId; } /** diff --git a/ql/src/test/queries/clientpositive/parallel.q b/ql/src/test/queries/clientpositive/parallel.q index b8c0445ef9..f2f0d3551c 100644 --- a/ql/src/test/queries/clientpositive/parallel.q +++ b/ql/src/test/queries/clientpositive/parallel.q @@ -1,5 +1,5 @@ set hive.explain.user=false; -set mapred.job.name='test_parallel'; +set hive.query.name='test_parallel'; set hive.exec.parallel=true; set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;