diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DagUtils.java new file mode 100644 index 0000000..efc79b6 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DagUtils.java @@ -0,0 +1,23 @@ +package org.apache.hadoop.hive.ql.exec; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; + + +public class DagUtils { + + public static String createDagName(Configuration conf, String queryId) { + String name = getUserSpecifiedDagName(conf); + if (name == null) { + name = queryId; + } + + assert name != null; + return name; + } + + public static String getUserSpecifiedDagName(Configuration conf) { + String name = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYNAME); + return (name != null) ? name : conf.get("mapred.job.name"); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java index 6e9ba7c..92b54ac 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java @@ -61,8 +61,8 @@ private static final String SPARK_WAIT_APP_COMPLETE = "spark.yarn.submit.waitAppCompletion"; private static final String SPARK_DEPLOY_MODE = "spark.submit.deployMode"; - public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Exception { - Map sparkConf = initiateSparkConf(hiveconf); + public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf, String sessionId) throws Exception { + Map sparkConf = initiateSparkConf(hiveconf, sessionId); // Submit spark job through local spark context while spark master is local mode, otherwise submit // spark job through remote spark context. String master = sparkConf.get("spark.master"); @@ -74,7 +74,7 @@ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Ex } } - public static Map initiateSparkConf(HiveConf hiveConf) { + public static Map initiateSparkConf(HiveConf hiveConf, String sessionId) { Map sparkConf = new HashMap(); HBaseConfiguration.addHbaseResources(hiveConf); @@ -83,7 +83,11 @@ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Ex final String appNameKey = "spark.app.name"; String appName = hiveConf.get(appNameKey); if (appName == null) { - appName = SPARK_DEFAULT_APP_NAME; + if (sessionId == null) { + appName = SPARK_DEFAULT_APP_NAME; + } else { + appName = SPARK_DEFAULT_APP_NAME + " " + sessionId; + } } sparkConf.put(appNameKey, appName); sparkConf.put("spark.serializer", SPARK_DEFAULT_SERIALIZER); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index 4ca8f93..62b08fc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hive.conf.HiveConfUtil; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.exec.DagUtils; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobRef; @@ -345,6 +346,8 @@ public Serializable call(JobContext jc) throws Exception { new SparkPlanGenerator(jc.sc(), null, localJobConf, localScratchDir, sparkReporter); SparkPlan plan = gen.generate(localSparkWork); + jc.sc().setJobGroup(DagUtils.createDagName(localJobConf, localSparkWork.getQueryId()), localSparkWork.getDagId()); + // Execute generated plan. JavaPairRDD finalRDD = plan.generateGraph(); // We use Spark RDD async action to submit job as it's the only way to get jobId now. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java index 51c6715..5049f20 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java @@ -59,7 +59,7 @@ public void open(HiveConf conf) throws HiveException { this.conf = conf; isOpen = true; try { - hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf); + hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf, this.sessionId); } catch (Throwable e) { throw new HiveException("Failed to create spark client.", e); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java index 3c2f0e6..7722a0a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java @@ -79,7 +79,7 @@ public void setup(HiveConf hiveConf) throws HiveException { synchronized (this) { if (!inited) { LOG.info("Setting up the session manager."); - Map conf = HiveSparkClientFactory.initiateSparkConf(hiveConf); + Map conf = HiveSparkClientFactory.initiateSparkConf(hiveConf, null); try { SparkClientFactory.initialize(conf); inited = true; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 6497495..dda3e67 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -1285,21 +1285,6 @@ private void setupQuickStart(TezEdgeProperty edgeProp, Vertex v) } } - public String createDagName(Configuration conf, QueryPlan plan) { - String name = getUserSpecifiedDagName(conf); - if (name == null) { - name = plan.getQueryId(); - } - - assert name != null; - return name; - } - - public static String getUserSpecifiedDagName(Configuration conf) { - String name = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYNAME); - return (name != null) ? name : conf.get("mapred.job.name"); - } - private DagUtils() { // don't instantiate } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 1c84c6a..f613d6f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -350,7 +350,7 @@ DAG build(JobConf conf, TezWork work, Path scratchDir, FileSystem fs = scratchDir.getFileSystem(conf); // the name of the dag is what is displayed in the AM/Job UI - String dagName = utils.createDagName(conf, queryPlan); + String dagName = org.apache.hadoop.hive.ql.exec.DagUtils.createDagName(conf, queryPlan.getQueryId()); LOG.info("Dag name: " + dagName); DAG dag = DAG.create(dagName); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 88bf829..f54cd35 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -1324,7 +1324,7 @@ public static void createMRWorkForMergingFiles (FileSinkOperator fsInput, cplan.setName("File Merge"); ((TezWork) work).add(cplan); } else if (conf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { - work = new SparkWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID)); + work = new SparkWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID), conf); cplan.setName("Spark Merge File Work"); ((SparkWork) work).add(cplan); } else { @@ -1337,7 +1337,7 @@ public static void createMRWorkForMergingFiles (FileSinkOperator fsInput, cplan.setName("File Merge"); ((TezWork)work).add(cplan); } else if (conf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { - work = new SparkWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID)); + work = new SparkWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID), conf); cplan.setName("Spark Merge File Work"); ((SparkWork) work).add(cplan); } else { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java index c970611..f5f6e79 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java @@ -222,7 +222,7 @@ public static void processSkewJoin(JoinOperator joinOp, Task skewJoinMapJoinTask = TaskFactory.get(sparkWork, hiveConf); skewJoinMapJoinTask.setFetchSource(currTask.isFetchSource()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java index a3ec990..8437660 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java @@ -162,7 +162,7 @@ private void moveWork(SparkWork sparkWork, BaseWork work, SparkWork targetWork) } else { // Create a new SparkWork for all the small tables of this work SparkWork parentWork = - new SparkWork(physicalContext.conf.getVar(HiveConf.ConfVars.HIVEQUERYID)); + new SparkWork(physicalContext.conf.getVar(HiveConf.ConfVars.HIVEQUERYID), physicalContext.conf); // copy cloneToWork to ensure RDD cache still works parentWork.setCloneToWork(sparkWork.getCloneToWork()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java index abc9fcf..45bd5aa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java @@ -115,7 +115,7 @@ private static void splitTask(SparkTask currentTask, ReduceWork reduceWork, currentWork.disconnect(reduceWork, childWork); // move works following the current reduce work into a new spark work SparkWork newWork = - new SparkWork(parseContext.getConf().getVar(HiveConf.ConfVars.HIVEQUERYID)); + new SparkWork(parseContext.getConf().getVar(HiveConf.ConfVars.HIVEQUERYID), parseContext.getConf()); newWork.add(childWork); copyWorkGraph(currentWork, newWork, childWork); // remove them from current spark work diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java index 9d46cac..e9c9e4e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java @@ -33,6 +33,10 @@ import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; + +import org.apache.hadoop.conf.Configuration; + +import org.apache.hadoop.hive.ql.exec.DagUtils; import org.apache.hadoop.hive.ql.plan.Explain.Vectorization; import org.apache.hadoop.hive.ql.plan.Explain.Level; @@ -48,7 +52,8 @@ vectorization = Vectorization.SUMMARY_PATH) public class SparkWork extends AbstractOperatorDesc { private static int counter; - private final String name; + private final String queryName; + private final String queryId; private final Set roots = new LinkedHashSet(); private final Set leaves = new LinkedHashSet<>(); @@ -59,20 +64,38 @@ new HashMap>(); protected final Map, SparkEdgeProperty> edgeProperties = new HashMap, SparkEdgeProperty>(); + private final String dagId; private Map> requiredCounterPrefix; private Map cloneToWork; - public SparkWork(String name) { - this.name = name + ":" + (++counter); - cloneToWork = new HashMap(); + public SparkWork(String queryId) { + this(queryId, null); } + public SparkWork(String queryId, Configuration conf) { + this.queryId = queryId; + this.dagId = queryId + ":" + (++counter); + String queryName = (conf != null) ? DagUtils.getUserSpecifiedDagName(conf) : null; + if (queryName == null) { + queryName = this.dagId; + } + this.queryName = queryName; + cloneToWork = new HashMap(); + } @Explain(displayName = "DagName") public String getName() { - return name; + return queryName; + } + + public String getDagId() { + return dagId; + } + + public String getQueryId() { + return this.queryId; } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java index a037ea3..6cc0208 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java @@ -36,7 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.ql.exec.tez.DagUtils; +import org.apache.hadoop.hive.ql.exec.DagUtils; import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.hive.ql.plan.Explain.Level;