commit 3b8d96294e9c2da781e2b27b6e3562161e8e5ec8 Author: Sahil Takiar Date: Wed Feb 21 16:42:21 2018 -0800 HIVE-18525: Add explain plan to Hive on Spark Web UI diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java index 0a6e17ab93..c69c55a32d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java @@ -609,7 +609,7 @@ else if (ent.getValue() != null) { } private JSONArray outputList(List l, PrintStream out, boolean hasHeader, - boolean extended, boolean jsonOutput, int indent) throws Exception { + boolean extended, boolean jsonOutput, int indent, boolean inTest) throws Exception { boolean first_el = true; boolean nl = false; @@ -633,7 +633,7 @@ private JSONArray outputList(List l, PrintStream out, boolean hasHeader, out.println(); } JSONObject jsonOut = outputPlan(o, out, extended, - jsonOutput, jsonOutput ? 0 : (hasHeader ? indent + 2 : indent)); + jsonOutput, jsonOutput ? 0 : (hasHeader ? indent + 2 : indent), "", inTest); if (jsonOutput) { outputArray.put(jsonOut); } @@ -671,10 +671,13 @@ private JSONObject outputPlan(Object work, @VisibleForTesting JSONObject outputPlan(Object work, PrintStream out, boolean extended, boolean jsonOutput, int indent, String appendToHeader) throws Exception { + return outputPlan(work, out, extended, jsonOutput, indent, appendToHeader, + queryState.getConf().getBoolVar(ConfVars.HIVE_IN_TEST)); + } - // Are we running tests? - final boolean inTest = queryState.getConf().getBoolVar(ConfVars.HIVE_IN_TEST); - + public JSONObject outputPlan(Object work, PrintStream out, + boolean extended, boolean jsonOutput, int indent, + String appendToHeader, boolean inTest) throws Exception { // Check if work has an explain annotation Annotation note = AnnotationUtils.getAnnotation(work.getClass(), Explain.class); @@ -772,7 +775,7 @@ JSONObject outputPlan(Object work, PrintStream out, if (operator.getConf() != null) { String appender = isLogical ? " (" + operator.getOperatorId() + ")" : ""; JSONObject jsonOut = outputPlan(operator.getConf(), out, extended, - jsonOutput, jsonOutput ? 0 : indent, appender); + jsonOutput, jsonOutput ? 0 : indent, appender, inTest); if (this.work != null && (this.work.isUserLevelExplain() || this.work.isFormatted())) { if (jsonOut != null && jsonOut.length() > 0) { ((JSONObject) jsonOut.get(JSONObject.getNames(jsonOut)[0])).put("OperatorId:", @@ -794,7 +797,7 @@ JSONObject outputPlan(Object work, PrintStream out, if (operator.getChildOperators() != null) { int cindent = jsonOutput ? 0 : indent + 2; for (Operator op : operator.getChildOperators()) { - JSONObject jsonOut = outputPlan(op, out, extended, jsonOutput, cindent); + JSONObject jsonOut = outputPlan(op, out, extended, jsonOutput, cindent, "", inTest); if (jsonOutput) { ((JSONObject)json.get(JSONObject.getNames(json)[0])).accumulate("children", jsonOut); } @@ -970,7 +973,8 @@ JSONObject outputPlan(Object work, PrintStream out, out.print(header); } - JSONArray jsonOut = outputList(l, out, !skipHeader && !emptyHeader, extended, jsonOutput, ind); + JSONArray jsonOut = outputList(l, out, !skipHeader && !emptyHeader, extended, + jsonOutput, ind, inTest); if (jsonOutput && !l.isEmpty()) { json.put(header, jsonOut); @@ -984,7 +988,7 @@ JSONObject outputPlan(Object work, PrintStream out, if (!skipHeader && out != null) { out.println(header); } - JSONObject jsonOut = outputPlan(val, out, extended, jsonOutput, ind); + JSONObject jsonOut = outputPlan(val, out, extended, jsonOutput, ind, "", inTest); if (jsonOutput && jsonOut != null && jsonOut.length() != 0) { if (!skipHeader) { json.put(header, jsonOut); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java index 4b77ac9b00..770a1000a1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/CacheTran.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.spark; +import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.io.WritableComparable; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.storage.StorageLevel; @@ -28,10 +29,12 @@ private boolean caching = false; private JavaPairRDD cachedRDD; protected final String name; + private final BaseWork baseWork; - protected CacheTran(boolean cache, String name) { + protected CacheTran(boolean cache, String name, BaseWork baseWork) { this.caching = cache; this.name = name; + this.baseWork = baseWork; } @Override @@ -59,4 +62,9 @@ public Boolean isCacheEnable() { public String getName() { return name; } + + @Override + public BaseWork getBaseWork() { + return baseWork; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java index b1a0d55367..b242f57db8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.spark; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableUtils; @@ -37,17 +38,20 @@ private boolean toCache; private final SparkPlan sparkPlan; private final String name; + private final BaseWork baseWork; public MapInput(SparkPlan sparkPlan, JavaPairRDD hadoopRDD) { - this(sparkPlan, hadoopRDD, false, "MapInput"); + this(sparkPlan, hadoopRDD, false, "MapInput", null); } public MapInput(SparkPlan sparkPlan, - JavaPairRDD hadoopRDD, boolean toCache, String name) { + JavaPairRDD hadoopRDD, boolean toCache, String + name, BaseWork baseWork) { this.hadoopRDD = hadoopRDD; this.toCache = toCache; this.sparkPlan = sparkPlan; this.name = name; + this.baseWork = baseWork; } public void setToCache(boolean toCache) { @@ -98,4 +102,9 @@ public String getName() { public Boolean isCacheEnable() { return new Boolean(toCache); } + + @Override + public BaseWork getBaseWork() { + return baseWork; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java index b102f5147d..7e95b1201b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.spark; import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.io.BytesWritable; import org.apache.spark.api.java.JavaPairRDD; @@ -26,11 +27,11 @@ private HiveMapFunction mapFunc; public MapTran() { - this(false, "MapTran"); + this(false, "MapTran", null); } - public MapTran(boolean cache, String name) { - super(cache, name); + public MapTran(boolean cache, String name, BaseWork baseWork) { + super(cache, name, baseWork); } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java index 3b34c78a7a..4bafcb9ae7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.spark; import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.io.BytesWritable; import org.apache.spark.api.java.JavaPairRDD; @@ -26,11 +27,11 @@ private HiveReduceFunction reduceFunc; public ReduceTran() { - this(false, "Reduce"); + this(false, "Reduce", null); } - public ReduceTran(boolean caching, String name) { - super(caching, name); + public ReduceTran(boolean caching, String name, BaseWork baseWork) { + super(caching, name, baseWork); } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java index 40ff01ab7a..f69807954b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.spark; import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty; import org.apache.hadoop.io.BytesWritable; import org.apache.spark.api.java.JavaPairRDD; @@ -31,19 +32,21 @@ private final SparkPlan sparkPlan; private final String name; private final SparkEdgeProperty edge; + private final BaseWork baseWork; public ShuffleTran(SparkPlan sparkPlan, SparkShuffler sf, int n) { - this(sparkPlan, sf, n, false, "Shuffle", null); + this(sparkPlan, sf, n, false, "Shuffle", null, null); } public ShuffleTran(SparkPlan sparkPlan, SparkShuffler sf, int n, boolean toCache, String name, - SparkEdgeProperty edge) { + SparkEdgeProperty edge, BaseWork baseWork) { shuffler = sf; numOfPartitions = n; this.toCache = toCache; this.sparkPlan = sparkPlan; this.name = name; this.edge = edge; + this.baseWork = baseWork; } @Override @@ -71,6 +74,11 @@ public Boolean isCacheEnable() { return new Boolean(toCache); } + @Override + public BaseWork getBaseWork() { + return baseWork; + } + public SparkShuffler getShuffler() { return shuffler; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java index b21e3865f6..109ebf1b25 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.exec.spark; +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -26,6 +28,7 @@ import java.util.Map; import java.util.Set; +import org.apache.hadoop.hive.ql.exec.ExplainTask; import org.apache.spark.SparkContext; import org.apache.spark.util.CallSite; import org.slf4j.Logger; @@ -68,7 +71,7 @@ // Root tran, it must be MapInput Preconditions.checkArgument(tran instanceof MapInput, "AssertionError: tran must be an instance of MapInput"); - sc.setCallSite(CallSite.apply(tran.getName(), "")); + sc.setCallSite(CallSite.apply(tran.getName(), getLongFormCallSite(tran))); rdd = tran.transform(null); } else { for (SparkTran parent : parents) { @@ -82,7 +85,7 @@ rdd.setName("UnionRDD (" + rdd.getNumPartitions() + ")"); } } - sc.setCallSite(CallSite.apply(tran.getName(), "")); + sc.setCallSite(CallSite.apply(tran.getName(), getLongFormCallSite(tran))); rdd = tran.transform(rdd); } @@ -104,11 +107,26 @@ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_BUILD_RDD_GRAPH); - LOG.info("\n\nSpark RDD Graph:\n\n" + finalRDD.toDebugString() + "\n"); + LOG.info("Spark RDD Graph:\n\n" + finalRDD.toDebugString() + "\n"); return finalRDD; } + private String getLongFormCallSite(SparkTran tran) { + ExplainTask explainTask = new ExplainTask(); + String explainOutput = ""; + try { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + explainTask.outputPlan(tran.getBaseWork(), new PrintStream(outputStream), false, false, 0, + null, true); + explainOutput = tran.getName() + " Explain Plan:\n\n" + outputStream.toString(); + LOG.info(explainOutput); + } catch (Exception e) { + LOG.error("Error while generating explain plan for " + tran.getName(), e); + } + return explainOutput; + } + public void addTran(SparkTran tran) { rootTrans.add(tran); leafTrans.add(tran); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index c9a3196126..59f1f78e75 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -145,7 +145,7 @@ private SparkTran generateParentTran(SparkPlan sparkPlan, SparkWork sparkWork, boolean toCache = cloneToWork.containsKey(work); List parentWorks = sparkWork.getParents(work); SparkEdgeProperty sparkEdgeProperty = sparkWork.getEdgeProperty(parentWorks.get(0), work); - result = generate(sparkPlan, sparkEdgeProperty, toCache, work.getName()); + result = generate(sparkPlan, sparkEdgeProperty, toCache, work.getName(), work); sparkPlan.addTran(result); for (BaseWork parentWork : parentWorks) { sparkPlan.connect(workToTranMap.get(parentWork), result); @@ -216,12 +216,12 @@ private MapInput generateMapInput(SparkPlan sparkPlan, MapWork mapWork) (toCache ? ", cached)" : ")"); // Caching is disabled for MapInput due to HIVE-8920 - MapInput result = new MapInput(sparkPlan, hadoopRDD, toCache, rddName); + MapInput result = new MapInput(sparkPlan, hadoopRDD, toCache, rddName, mapWork); return result; } private ShuffleTran generate(SparkPlan sparkPlan, SparkEdgeProperty edge, boolean toCache, - String name) { + String name, BaseWork work) { Preconditions.checkArgument(!edge.isShuffleNone(), "AssertionError: SHUFFLE_NONE should only be used for UnionWork."); @@ -233,7 +233,7 @@ private ShuffleTran generate(SparkPlan sparkPlan, SparkEdgeProperty edge, boolea } else { shuffler = new GroupByShuffler(); } - return new ShuffleTran(sparkPlan, shuffler, edge.getNumPartitions(), toCache, name, edge); + return new ShuffleTran(sparkPlan, shuffler, edge.getNumPartitions(), toCache, name, edge, work); } private SparkTran generate(BaseWork work, SparkWork sparkWork) throws Exception { @@ -257,12 +257,12 @@ private SparkTran generate(BaseWork work, SparkWork sparkWork) throws Exception "Can't make path " + outputPath + " : " + e.getMessage()); } } - MapTran mapTran = new MapTran(caching, work.getName()); + MapTran mapTran = new MapTran(caching, work.getName(), work); HiveMapFunction mapFunc = new HiveMapFunction(confBytes, sparkReporter); mapTran.setMapFunction(mapFunc); return mapTran; } else if (work instanceof ReduceWork) { - ReduceTran reduceTran = new ReduceTran(caching, work.getName()); + ReduceTran reduceTran = new ReduceTran(caching, work.getName(), work); HiveReduceFunction reduceFunc = new HiveReduceFunction(confBytes, sparkReporter); reduceTran.setReduceFunction(reduceFunc); return reduceTran; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java index f9057b9254..29f8b3e7d7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.spark; +import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.io.WritableComparable; import org.apache.spark.api.java.JavaPairRDD; @@ -28,5 +29,7 @@ public String getName(); + public BaseWork getBaseWork(); + public Boolean isCacheEnable(); }