commit fa00549d747a354c33e899b8adab7a85db6a852f Author: Sahil Takiar Date: Fri Oct 13 13:15:26 2017 -0700 HIVE-16601: Display Session Id, Query Name / Id, and Dag Id in Spark UI diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index b4ee50ac08..e4189ff619 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1509,7 +1509,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Whether to grant access to the hs2/hive user for queries"), HIVEQUERYNAME ("hive.query.name", null, "This named is used by Tez to set the dag name. This name in turn will appear on \n" + - "the Tez UI representing the work that was done."), + "the Tez UI representing the work that was done. Used by Spark to set the query name, will show up in the\n" + + "Spark UI."), HIVEOPTIMIZEBUCKETINGSORTING("hive.optimize.bucketingsorting", true, "Don't create a reducer for enforcing \n" + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index cbf0561dfe..9c11bd7e43 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -131,6 +131,8 @@ public class Driver implements CommandProcessor { + public static final String MAPREDUCE_WORKFLOW_NODE_NAME = "mapreduce.workflow.node.name"; + static final private String CLASS_NAME = Driver.class.getName(); private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); static final private LogHelper console = new LogHelper(LOG); @@ -2217,9 +2219,9 @@ private TaskRunner launchTask(Task tsk, String queryId, } if (tsk.isMapRedTask() && !(tsk instanceof ConditionalTask)) { if (noName) { - conf.set(MRJobConfig.JOB_NAME, jobname + "(" + tsk.getId() + ")"); + conf.set(MRJobConfig.JOB_NAME, jobname + " (" + tsk.getId() + ")"); } - conf.set("mapreduce.workflow.node.name", tsk.getId()); + conf.set(MAPREDUCE_WORKFLOW_NODE_NAME, tsk.getId()); Utilities.setWorkflowAdjacencies(conf, plan); cxt.incCurJobNo(1); console.printInfo("Launching Job " + cxt.getCurJobNo() + " out of " + jobs); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DagUtils.java new file mode 100644 index 0000000000..aed1b2cf53 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DagUtils.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import com.google.common.base.Strings; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.mapreduce.MRJobConfig; + + +public class DagUtils { + + public static String getQueryName(Configuration conf) { + String name = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYNAME); + if (Strings.isNullOrEmpty(name)) { + return conf.get(MRJobConfig.JOB_NAME); + } else { + return name + " (" + conf.get(Driver.MAPREDUCE_WORKFLOW_NODE_NAME) + ")"; + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java index 194585e0f0..83578778c8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java @@ -61,8 +61,9 @@ private static final String SPARK_WAIT_APP_COMPLETE = "spark.yarn.submit.waitAppCompletion"; private static final String SPARK_DEPLOY_MODE = "spark.submit.deployMode"; - public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Exception { - Map sparkConf = initiateSparkConf(hiveconf); + public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf, String sessionId) throws Exception { + Map sparkConf = initiateSparkConf(hiveconf, sessionId); + // Submit spark job through local spark context while spark master is local mode, otherwise submit // spark job through remote spark context. String master = sparkConf.get("spark.master"); @@ -74,7 +75,7 @@ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Ex } } - public static Map initiateSparkConf(HiveConf hiveConf) { + public static Map initiateSparkConf(HiveConf hiveConf, String sessionId) { Map sparkConf = new HashMap(); HBaseConfiguration.addHbaseResources(hiveConf); @@ -82,8 +83,15 @@ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Ex sparkConf.put("spark.master", SPARK_DEFAULT_MASTER); final String appNameKey = "spark.app.name"; String appName = hiveConf.get(appNameKey); + final String sessionIdString = " (sessionId = " + sessionId + ")"; if (appName == null) { - appName = SPARK_DEFAULT_APP_NAME; + if (sessionId == null) { + appName = SPARK_DEFAULT_APP_NAME; + } else { + appName = SPARK_DEFAULT_APP_NAME + sessionIdString; + } + } else { + appName = appName + sessionIdString; } sparkConf.put(appNameKey, appName); sparkConf.put("spark.serializer", SPARK_DEFAULT_SERIALIZER); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index 102e41b9c4..3088dce156 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hive.conf.HiveConfUtil; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.exec.DagUtils; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobRef; @@ -350,6 +351,8 @@ public Serializable call(JobContext jc) throws Exception { new SparkPlanGenerator(jc.sc(), null, localJobConf, localScratchDir, sparkReporter); SparkPlan plan = gen.generate(localSparkWork); + jc.sc().setJobGroup(localSparkWork.getQueryId(), DagUtils.getQueryName(localJobConf)); + // Execute generated plan. JavaPairRDD finalRDD = plan.generateGraph(); // We use Spark RDD async action to submit job as it's the only way to get jobId now. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java index 54d2ceca6e..608b040634 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java @@ -60,7 +60,7 @@ public void open(HiveConf conf) throws HiveException { this.conf = conf; isOpen = true; try { - hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf); + hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf, sessionId); } catch (Throwable e) { // It's possible that user session is closed while creating Spark client. String msg = isOpen ? "Failed to create Spark client for Spark session " + sessionId : diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java index 3c2f0e6e96..7722a0addd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java @@ -79,7 +79,7 @@ public void setup(HiveConf hiveConf) throws HiveException { synchronized (this) { if (!inited) { LOG.info("Setting up the session manager."); - Map conf = HiveSparkClientFactory.initiateSparkConf(hiveConf); + Map conf = HiveSparkClientFactory.initiateSparkConf(hiveConf, null); try { SparkClientFactory.initialize(conf); inited = true; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java index fda7080493..8332272143 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java @@ -50,7 +50,8 @@ public class SparkWork extends AbstractOperatorDesc { private static final AtomicInteger counter = new AtomicInteger(1); - private final String name; + private final String dagName; + private final String queryId; private final Set roots = new LinkedHashSet(); private final Set leaves = new LinkedHashSet<>(); @@ -66,15 +67,19 @@ private Map cloneToWork; - public SparkWork(String name) { - this.name = name + ":" + counter.getAndIncrement(); + public SparkWork(String queryId) { + this.queryId = queryId; + this.dagName = queryId + ":" + counter.getAndIncrement(); cloneToWork = new HashMap(); } - @Explain(displayName = "DagName") public String getName() { - return name; + return this.dagName; + } + + public String getQueryId() { + return this.queryId; } /** diff --git a/ql/src/test/queries/clientpositive/parallel.q b/ql/src/test/queries/clientpositive/parallel.q index b8c0445ef9..f2f0d3551c 100644 --- a/ql/src/test/queries/clientpositive/parallel.q +++ b/ql/src/test/queries/clientpositive/parallel.q @@ -1,5 +1,5 @@ set hive.explain.user=false; -set mapred.job.name='test_parallel'; +set hive.query.name='test_parallel'; set hive.exec.parallel=true; set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;