commit 9880a843120432a024257fcbb32f030584d37348 Author: Sahil Takiar Date: Thu Jan 4 10:18:06 2018 -0800 HIVE-18368: Improve Spark Debug RDD Graph diff --git a/data/conf/perf-reg/spark/hive-site.xml b/data/conf/perf-reg/spark/hive-site.xml index 497a61f1fd..acc09ade6c 100644 --- a/data/conf/perf-reg/spark/hive-site.xml +++ b/data/conf/perf-reg/spark/hive-site.xml @@ -32,6 +32,11 @@ A base for other temporary directories. + + spark.eventLog.enabled + true + + hive.exec.scratchdir ${test.tmp.dir}/scratchdir @@ -216,7 +221,7 @@ spark.master - local-cluster[1,2,1024] + local[*] diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java index cab97a0fb3..f43b44968b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java @@ -25,7 +25,9 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.hive.ql.exec.DagUtils; import org.apache.hive.spark.client.SparkClientUtilities; +import org.apache.spark.util.CallSite; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileSystem; @@ -160,8 +162,12 @@ public SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) thr // Execute generated plan. JavaPairRDD finalRDD = plan.generateGraph(); + + sc.setJobGroup("queryId = " + sparkWork.getQueryId(), DagUtils.getQueryName(jobConf)); + // We use Spark RDD async action to submit job as it's the only way to get jobId now. JavaFutureAction future = finalRDD.foreachAsync(HiveVoidFunction.getInstance()); + // As we always use foreach action to submit RDD graph, it would only trigger one job. int jobId = future.jobIds().get(0); LocalSparkJobStatus sparkJobStatus = new LocalSparkJobStatus( diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java index d240d18d1c..b1a0d55367 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java @@ -36,17 +36,18 @@ private JavaPairRDD hadoopRDD; private boolean toCache; private final SparkPlan sparkPlan; - private String name = "MapInput"; + private final String name; public MapInput(SparkPlan sparkPlan, JavaPairRDD hadoopRDD) { - this(sparkPlan, hadoopRDD, false); + this(sparkPlan, hadoopRDD, false, "MapInput"); } public MapInput(SparkPlan sparkPlan, - JavaPairRDD hadoopRDD, boolean toCache) { + JavaPairRDD hadoopRDD, boolean toCache, String name) { this.hadoopRDD = hadoopRDD; this.toCache = toCache; this.sparkPlan = sparkPlan; + this.name = name; } public void setToCache(boolean toCache) { @@ -66,6 +67,7 @@ public void setToCache(boolean toCache) { } else { result = hadoopRDD; } + result.setName(this.name); return result; } @@ -96,9 +98,4 @@ public String getName() { public Boolean isCacheEnable() { return new Boolean(toCache); } - - @Override - public void setName(String name) { - this.name = name; - } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java index 2cc6845aa6..ae332cdcef 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java @@ -24,20 +24,23 @@ public class MapTran extends CacheTran { private HiveMapFunction mapFunc; - private String name = "MapTran"; + private final String name; public MapTran() { - this(false); + this(false, "MapTran"); } - public MapTran(boolean cache) { + public MapTran(boolean cache, String name) { super(cache); + this.name = name; } @Override public JavaPairRDD doTransform( JavaPairRDD input) { - return input.mapPartitionsToPair(mapFunc); + JavaPairRDD rdd = input.mapPartitionsToPair(mapFunc); + rdd.setName(this.name + " (" + rdd.getNumPartitions() + (isCacheEnable() ? ", cached)" : ")")); + return rdd; } public void setMapFunction(HiveMapFunction mapFunc) { @@ -48,9 +51,4 @@ public void setMapFunction(HiveMapFunction mapFunc) { public String getName() { return name; } - - @Override - public void setName(String name) { - this.name = name; - } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java index 9045e0590a..fa1d72971d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java @@ -24,20 +24,23 @@ public class ReduceTran extends CacheTran { private HiveReduceFunction reduceFunc; - private String name = "Reduce"; + private final String name; public ReduceTran() { - this(false); + this(false, "Reduce"); } - public ReduceTran(boolean caching) { + public ReduceTran(boolean caching, String name) { super(caching); + this.name = name; } @Override public JavaPairRDD doTransform( JavaPairRDD input) { - return input.mapPartitionsToPair(reduceFunc); + JavaPairRDD rdd = input.mapPartitionsToPair(reduceFunc); + rdd.setName(this.name + " (" + rdd.getNumPartitions() + (isCacheEnable() ? ", cached)" : ")")); + return rdd; } public void setReduceFunction(HiveReduceFunction redFunc) { @@ -48,9 +51,4 @@ public void setReduceFunction(HiveReduceFunction redFunc) { public String getName() { return name; } - - @Override - public void setName(String name) { - this.name = name; - } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java index aec96bc2ef..40ff01ab7a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.spark; import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty; import org.apache.hadoop.io.BytesWritable; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.storage.StorageLevel; @@ -28,17 +29,21 @@ private final int numOfPartitions; private final boolean toCache; private final SparkPlan sparkPlan; - private String name = "Shuffle"; + private final String name; + private final SparkEdgeProperty edge; public ShuffleTran(SparkPlan sparkPlan, SparkShuffler sf, int n) { - this(sparkPlan, sf, n, false); + this(sparkPlan, sf, n, false, "Shuffle", null); } - public ShuffleTran(SparkPlan sparkPlan, SparkShuffler sf, int n, boolean toCache) { + public ShuffleTran(SparkPlan sparkPlan, SparkShuffler sf, int n, boolean toCache, String name, + SparkEdgeProperty edge) { shuffler = sf; numOfPartitions = n; this.toCache = toCache; this.sparkPlan = sparkPlan; + this.name = name; + this.edge = edge; } @Override @@ -48,7 +53,8 @@ public ShuffleTran(SparkPlan sparkPlan, SparkShuffler sf, int n, boolean toCache sparkPlan.addCachedRDDId(result.id()); result = result.persist(StorageLevel.MEMORY_AND_DISK()); } - return result; + return result.setName(this.name + " (" + edge.getShuffleType() + ", " + numOfPartitions + + (toCache ? ", cached)" : ")")); } public int getNoOfPartitions() { @@ -65,11 +71,6 @@ public Boolean isCacheEnable() { return new Boolean(toCache); } - @Override - public void setName(String name) { - this.name = name; - } - public SparkShuffler getShuffler() { return shuffler; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java index 5d27692317..38cd3b6eb0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java @@ -26,6 +26,8 @@ import java.util.Map; import java.util.Set; +import org.apache.spark.SparkContext; +import org.apache.spark.util.CallSite; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.io.HiveKey; @@ -48,6 +50,12 @@ private final Map> invertedTransGraph = new HashMap>(); private final Set cachedRDDIds = new HashSet(); + private final SparkContext sc; + + SparkPlan(SparkContext sc) { + this.sc = sc; + } + @SuppressWarnings("unchecked") public JavaPairRDD generateGraph() { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_BUILD_RDD_GRAPH); @@ -60,6 +68,7 @@ // Root tran, it must be MapInput Preconditions.checkArgument(tran instanceof MapInput, "AssertionError: tran must be an instance of MapInput"); + sc.setCallSite(CallSite.apply(tran.getName(), "")); rdd = tran.transform(null); } else { for (SparkTran parent : parents) { @@ -67,174 +76,36 @@ if (rdd == null) { rdd = prevRDD; } else { + sc.setCallSite(CallSite.apply("UnionRDD (" + rdd.name() + ", " + + prevRDD.name() + ")", "")); rdd = rdd.union(prevRDD); + rdd.setName("UnionRDD (" + rdd.getNumPartitions() + ")"); } } + sc.setCallSite(CallSite.apply(tran.getName(), "")); rdd = tran.transform(rdd); } tranToOutputRDDMap.put(tran, rdd); } - logSparkPlan(); - JavaPairRDD finalRDD = null; for (SparkTran leafTran : leafTrans) { JavaPairRDD rdd = tranToOutputRDDMap.get(leafTran); if (finalRDD == null) { finalRDD = rdd; } else { + sc.setCallSite(CallSite.apply("UnionRDD", "")); finalRDD = finalRDD.union(rdd); + finalRDD.setName("UnionRDD (" + finalRDD.getNumPartitions() + ")"); } } perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_BUILD_RDD_GRAPH); - if (LOG.isDebugEnabled()) { - LOG.info("print generated spark rdd graph:\n" + SparkUtilities.rddGraphToString(finalRDD)); - } - return finalRDD; - } - - private void addNumberToTrans() { - int i = 1; - String name = null; - - // Traverse leafTran & transGraph add numbers to trans - for (SparkTran leaf : leafTrans) { - name = leaf.getName() + " " + i++; - leaf.setName(name); - } - Set sparkTrans = transGraph.keySet(); - for (SparkTran tran : sparkTrans) { - name = tran.getName() + " " + i++; - tran.setName(name); - } - } - - private void logSparkPlan() { - addNumberToTrans(); - ArrayList leafTran = new ArrayList(); - leafTran.addAll(leafTrans); - - for (SparkTran leaf : leafTrans) { - collectLeafTrans(leaf, leafTran); - } - - // Start Traverse from the leafTrans and get parents of each leafTrans till - // the end - StringBuilder sparkPlan = new StringBuilder( - "\n\t!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Spark Plan !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! \n\n"); - for (SparkTran leaf : leafTran) { - sparkPlan.append(leaf.getName()); - getSparkPlan(leaf, sparkPlan); - sparkPlan.append("\n"); - } - sparkPlan - .append(" \n\t!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Spark Plan !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! "); - LOG.info(sparkPlan.toString()); - } - - private void collectLeafTrans(SparkTran leaf, List reduceTrans) { - List parents = getParents(leaf); - if (parents.size() > 0) { - SparkTran nextLeaf = null; - for (SparkTran leafTran : parents) { - if (leafTran instanceof ReduceTran) { - reduceTrans.add(leafTran); - } else { - if (getParents(leafTran).size() > 0) - nextLeaf = leafTran; - } - } - if (nextLeaf != null) - collectLeafTrans(nextLeaf, reduceTrans); - } - } - - private void getSparkPlan(SparkTran tran, StringBuilder sparkPlan) { - List parents = getParents(tran); - List nextLeaf = new ArrayList(); - if (parents.size() > 0) { - sparkPlan.append(" <-- "); - boolean isFirst = true; - for (SparkTran leaf : parents) { - if (isFirst) { - sparkPlan.append("( " + leaf.getName()); - if (leaf instanceof ShuffleTran) { - logShuffleTranStatus((ShuffleTran) leaf, sparkPlan); - } else { - logCacheStatus(leaf, sparkPlan); - } - isFirst = false; - } else { - sparkPlan.append("," + leaf.getName()); - if (leaf instanceof ShuffleTran) { - logShuffleTranStatus((ShuffleTran) leaf, sparkPlan); - } else { - logCacheStatus(leaf, sparkPlan); - } - } - // Leave reduceTran it will be expanded in the next line - if (getParents(leaf).size() > 0 && !(leaf instanceof ReduceTran)) { - nextLeaf.add(leaf); - } - } - sparkPlan.append(" ) "); - if (nextLeaf.size() > 1) { - logLeafTran(nextLeaf, sparkPlan); - } else { - if (nextLeaf.size() != 0) - getSparkPlan(nextLeaf.get(0), sparkPlan); - } - } - } - - private void logLeafTran(List parent, StringBuilder sparkPlan) { - sparkPlan.append(" <-- "); - boolean isFirst = true; - for (SparkTran sparkTran : parent) { - List parents = getParents(sparkTran); - SparkTran leaf = parents.get(0); - if (isFirst) { - sparkPlan.append("( " + leaf.getName()); - if (leaf instanceof ShuffleTran) { - logShuffleTranStatus((ShuffleTran) leaf, sparkPlan); - } else { - logCacheStatus(leaf, sparkPlan); - } - isFirst = false; - } else { - sparkPlan.append("," + leaf.getName()); - if (leaf instanceof ShuffleTran) { - logShuffleTranStatus((ShuffleTran) leaf, sparkPlan); - } else { - logCacheStatus(leaf, sparkPlan); - } - } - } - sparkPlan.append(" ) "); - } - private void logShuffleTranStatus(ShuffleTran leaf, StringBuilder sparkPlan) { - int noOfPartitions = leaf.getNoOfPartitions(); - sparkPlan.append(" ( Partitions " + noOfPartitions); - SparkShuffler shuffler = leaf.getShuffler(); - sparkPlan.append(", " + shuffler.getName()); - if (leaf.isCacheEnable()) { - sparkPlan.append(", Cache on"); - } else { - sparkPlan.append(", Cache off"); - } - } + LOG.info("\n\nSpark RDD Graph:\n\n" + finalRDD.toDebugString() + "\n"); - private void logCacheStatus(SparkTran sparkTran, StringBuilder sparkPlan) { - if (sparkTran.isCacheEnable() != null) { - if (sparkTran.isCacheEnable().booleanValue()) { - sparkPlan.append(" (cache on) "); - } else { - sparkPlan.append(" (cache off) "); - } - } + return finalRDD; } public void addTran(SparkTran tran) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index c52692d546..c9a3196126 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -23,7 +23,9 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import org.apache.spark.util.CallSite; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +42,7 @@ import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper; @@ -51,6 +54,7 @@ import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty; import org.apache.hadoop.hive.ql.plan.SparkWork; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.stats.StatsCollectionContext; import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.hive.ql.stats.StatsPublisher; @@ -98,7 +102,7 @@ public SparkPlanGenerator( public SparkPlan generate(SparkWork sparkWork) throws Exception { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_BUILD_PLAN); - SparkPlan sparkPlan = new SparkPlan(); + SparkPlan sparkPlan = new SparkPlan(this.sc.sc()); cloneToWork = sparkWork.getCloneToWork(); workToTranMap.clear(); workToParentWorkTranMap.clear(); @@ -138,9 +142,10 @@ private SparkTran generateParentTran(SparkPlan sparkPlan, SparkWork sparkWork, result = generateMapInput(sparkPlan, (MapWork)work); sparkPlan.addTran(result); } else if (work instanceof ReduceWork) { + boolean toCache = cloneToWork.containsKey(work); List parentWorks = sparkWork.getParents(work); - result = generate(sparkPlan, - sparkWork.getEdgeProperty(parentWorks.get(0), work), cloneToWork.containsKey(work)); + SparkEdgeProperty sparkEdgeProperty = sparkWork.getEdgeProperty(parentWorks.get(0), work); + result = generate(sparkPlan, sparkEdgeProperty, toCache, work.getName()); sparkPlan.addTran(result); for (BaseWork parentWork : parentWorks) { sparkPlan.connect(workToTranMap.get(parentWork), result); @@ -189,6 +194,8 @@ private MapInput generateMapInput(SparkPlan sparkPlan, MapWork mapWork) JobConf jobConf = cloneJobConf(mapWork); Class ifClass = getInputFormat(jobConf, mapWork); + sc.sc().setCallSite(CallSite.apply(mapWork.getName(), "")); + JavaPairRDD hadoopRDD; if (mapWork.getNumMapTasks() != null) { jobConf.setNumMapTasks(mapWork.getNumMapTasks()); @@ -198,12 +205,24 @@ private MapInput generateMapInput(SparkPlan sparkPlan, MapWork mapWork) hadoopRDD = sc.hadoopRDD(jobConf, ifClass, WritableComparable.class, Writable.class); } + boolean toCache = false/*cloneToWork.containsKey(mapWork)*/; + + String tables = mapWork.getAllRootOperators().stream() + .filter(op -> op instanceof TableScanOperator) + .map(ts -> ((TableScanDesc) ts.getConf()).getAlias()) + .collect(Collectors.joining(", ")); + + String rddName = mapWork.getName() + " (" + tables + ", " + hadoopRDD.getNumPartitions() + + (toCache ? ", cached)" : ")"); + // Caching is disabled for MapInput due to HIVE-8920 - MapInput result = new MapInput(sparkPlan, hadoopRDD, false/*cloneToWork.containsKey(mapWork)*/); + MapInput result = new MapInput(sparkPlan, hadoopRDD, toCache, rddName); return result; } - private ShuffleTran generate(SparkPlan sparkPlan, SparkEdgeProperty edge, boolean toCache) { + private ShuffleTran generate(SparkPlan sparkPlan, SparkEdgeProperty edge, boolean toCache, + String name) { + Preconditions.checkArgument(!edge.isShuffleNone(), "AssertionError: SHUFFLE_NONE should only be used for UnionWork."); SparkShuffler shuffler; @@ -214,7 +233,7 @@ private ShuffleTran generate(SparkPlan sparkPlan, SparkEdgeProperty edge, boolea } else { shuffler = new GroupByShuffler(); } - return new ShuffleTran(sparkPlan, shuffler, edge.getNumPartitions(), toCache); + return new ShuffleTran(sparkPlan, shuffler, edge.getNumPartitions(), toCache, name, edge); } private SparkTran generate(BaseWork work, SparkWork sparkWork) throws Exception { @@ -238,12 +257,12 @@ private SparkTran generate(BaseWork work, SparkWork sparkWork) throws Exception "Can't make path " + outputPath + " : " + e.getMessage()); } } - MapTran mapTran = new MapTran(caching); + MapTran mapTran = new MapTran(caching, work.getName()); HiveMapFunction mapFunc = new HiveMapFunction(confBytes, sparkReporter); mapTran.setMapFunction(mapFunc); return mapTran; } else if (work instanceof ReduceWork) { - ReduceTran reduceTran = new ReduceTran(caching); + ReduceTran reduceTran = new ReduceTran(caching, work.getName()); HiveReduceFunction reduceFunc = new HiveReduceFunction(confBytes, sparkReporter); reduceTran.setReduceFunction(reduceFunc); return reduceTran; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java index 037efe1f99..f9057b9254 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java @@ -28,7 +28,5 @@ public String getName(); - public void setName(String name); - public Boolean isCacheEnable(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java index f332790a56..943a4ee00a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java @@ -50,12 +50,8 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.io.BytesWritable; import org.apache.hive.spark.client.SparkClientUtilities; -import org.apache.spark.Dependency; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.rdd.RDD; -import org.apache.spark.rdd.UnionRDD; -import scala.collection.JavaConversions; + /** * Contains utilities methods used as part of Spark tasks. @@ -138,36 +134,6 @@ public static SparkSession getSparkSession(HiveConf conf, return sparkSession; } - - public static String rddGraphToString(JavaPairRDD rdd) { - StringBuilder sb = new StringBuilder(); - rddToString(rdd.rdd(), sb, ""); - return sb.toString(); - } - - private static void rddToString(RDD rdd, StringBuilder sb, String offset) { - sb.append(offset).append(rdd.getClass().getCanonicalName()).append("[").append(rdd.hashCode()).append("]"); - if (rdd.getStorageLevel().useMemory()) { - sb.append("(cached)"); - } - sb.append("\n"); - Collection dependencies = JavaConversions.asJavaCollection(rdd.dependencies()); - if (dependencies != null) { - offset += "\t"; - for (Dependency dependency : dependencies) { - RDD parentRdd = dependency.rdd(); - rddToString(parentRdd, sb, offset); - } - } else if (rdd instanceof UnionRDD) { - UnionRDD unionRDD = (UnionRDD) rdd; - offset += "\t"; - Collection parentRdds = JavaConversions.asJavaCollection(unionRDD.rdds()); - for (RDD parentRdd : parentRdds) { - rddToString(parentRdd, sb, offset); - } - } - } - /** * Generate a temporary path for dynamic partition pruning in Spark branch * TODO: no longer need this if we use accumulator! diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java index e4dfc009d9..fea2158a4c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java @@ -415,7 +415,7 @@ public int hashCode() { combine.createPool(job, f); poolMap.put(combinePathInputFormat, f); } else { - LOG.info("CombineHiveInputSplit: pool is already created for " + path + + LOG.debug("CombineHiveInputSplit: pool is already created for " + path + "; using filter path " + filterPath); f.addPath(filterPath); } diff --git a/ql/src/test/queries/clientpositive/perf/query3.q b/ql/src/test/queries/clientpositive/perf/query3.q index a70a62fd88..ca5c30ae7b 100644 --- a/ql/src/test/queries/clientpositive/perf/query3.q +++ b/ql/src/test/queries/clientpositive/perf/query3.q @@ -1,6 +1,5 @@ set hive.mapred.mode=nonstrict; -- start query 1 in stream 0 using template query3.tpl and seed 2031708268 -explain select dt.d_year ,item.i_brand_id brand_id ,item.i_brand brand diff --git a/ql/src/test/results/clientpositive/perf/spark/query3.q.out b/ql/src/test/results/clientpositive/perf/spark/query3.q.out index 7fdd478b4b..eba3311836 100644 --- a/ql/src/test/results/clientpositive/perf/spark/query3.q.out +++ b/ql/src/test/results/clientpositive/perf/spark/query3.q.out @@ -1,5 +1,4 @@ -PREHOOK: query: explain -select dt.d_year +PREHOOK: query: select dt.d_year ,item.i_brand_id brand_id ,item.i_brand brand ,sum(ss_ext_sales_price) sum_agg @@ -18,8 +17,11 @@ select dt.d_year ,brand_id limit 100 PREHOOK: type: QUERY -POSTHOOK: query: explain -select dt.d_year +PREHOOK: Input: default@date_dim +PREHOOK: Input: default@item +PREHOOK: Input: default@store_sales +#### A masked pattern was here #### +POSTHOOK: query: select dt.d_year ,item.i_brand_id brand_id ,item.i_brand brand ,sum(ss_ext_sales_price) sum_agg @@ -38,146 +40,7 @@ select dt.d_year ,brand_id limit 100 POSTHOOK: type: QUERY -STAGE DEPENDENCIES: - Stage-1 is a root stage - Stage-0 depends on stages: Stage-1 - -STAGE PLANS: - Stage: Stage-1 - Spark - Edges: - Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 400), Map 6 (PARTITION-LEVEL SORT, 400) - Reducer 3 <- Map 7 (PARTITION-LEVEL SORT, 438), Reducer 2 (PARTITION-LEVEL SORT, 438) - Reducer 4 <- Reducer 3 (GROUP, 481) - Reducer 5 <- Reducer 4 (SORT, 1) +POSTHOOK: Input: default@date_dim +POSTHOOK: Input: default@item +POSTHOOK: Input: default@store_sales #### A masked pattern was here #### - Vertices: - Map 1 - Map Operator Tree: - TableScan - alias: store_sales - Statistics: Num rows: 575995635 Data size: 50814502088 Basic stats: COMPLETE Column stats: NONE - Filter Operator - predicate: (ss_item_sk is not null and ss_sold_date_sk is not null) (type: boolean) - Statistics: Num rows: 575995635 Data size: 50814502088 Basic stats: COMPLETE Column stats: NONE - Select Operator - expressions: ss_sold_date_sk (type: int), ss_item_sk (type: int), ss_ext_sales_price (type: decimal(7,2)) - outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 575995635 Data size: 50814502088 Basic stats: COMPLETE Column stats: NONE - Reduce Output Operator - key expressions: _col1 (type: int) - sort order: + - Map-reduce partition columns: _col1 (type: int) - Statistics: Num rows: 575995635 Data size: 50814502088 Basic stats: COMPLETE Column stats: NONE - value expressions: _col0 (type: int), _col2 (type: decimal(7,2)) - Map 6 - Map Operator Tree: - TableScan - alias: item - Statistics: Num rows: 462000 Data size: 663560457 Basic stats: COMPLETE Column stats: NONE - Filter Operator - predicate: ((i_manufact_id = 436) and i_item_sk is not null) (type: boolean) - Statistics: Num rows: 231000 Data size: 331780228 Basic stats: COMPLETE Column stats: NONE - Select Operator - expressions: i_item_sk (type: int), i_brand_id (type: int), i_brand (type: string) - outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 231000 Data size: 331780228 Basic stats: COMPLETE Column stats: NONE - Reduce Output Operator - key expressions: _col0 (type: int) - sort order: + - Map-reduce partition columns: _col0 (type: int) - Statistics: Num rows: 231000 Data size: 331780228 Basic stats: COMPLETE Column stats: NONE - value expressions: _col1 (type: int), _col2 (type: string) - Map 7 - Map Operator Tree: - TableScan - alias: dt - Statistics: Num rows: 73049 Data size: 81741831 Basic stats: COMPLETE Column stats: NONE - Filter Operator - predicate: ((d_moy = 12) and d_date_sk is not null) (type: boolean) - Statistics: Num rows: 36524 Data size: 40870356 Basic stats: COMPLETE Column stats: NONE - Select Operator - expressions: d_date_sk (type: int), d_year (type: int) - outputColumnNames: _col0, _col1 - Statistics: Num rows: 36524 Data size: 40870356 Basic stats: COMPLETE Column stats: NONE - Reduce Output Operator - key expressions: _col0 (type: int) - sort order: + - Map-reduce partition columns: _col0 (type: int) - Statistics: Num rows: 36524 Data size: 40870356 Basic stats: COMPLETE Column stats: NONE - value expressions: _col1 (type: int) - Reducer 2 - Reduce Operator Tree: - Join Operator - condition map: - Inner Join 0 to 1 - keys: - 0 _col1 (type: int) - 1 _col0 (type: int) - outputColumnNames: _col0, _col2, _col4, _col5 - Statistics: Num rows: 633595212 Data size: 55895953508 Basic stats: COMPLETE Column stats: NONE - Reduce Output Operator - key expressions: _col0 (type: int) - sort order: + - Map-reduce partition columns: _col0 (type: int) - Statistics: Num rows: 633595212 Data size: 55895953508 Basic stats: COMPLETE Column stats: NONE - value expressions: _col2 (type: decimal(7,2)), _col4 (type: int), _col5 (type: string) - Reducer 3 - Reduce Operator Tree: - Join Operator - condition map: - Inner Join 0 to 1 - keys: - 0 _col0 (type: int) - 1 _col0 (type: int) - outputColumnNames: _col2, _col4, _col5, _col8 - Statistics: Num rows: 696954748 Data size: 61485550191 Basic stats: COMPLETE Column stats: NONE - Group By Operator - aggregations: sum(_col2) - keys: _col8 (type: int), _col4 (type: int), _col5 (type: string) - mode: hash - outputColumnNames: _col0, _col1, _col2, _col3 - Statistics: Num rows: 696954748 Data size: 61485550191 Basic stats: COMPLETE Column stats: NONE - Reduce Output Operator - key expressions: _col0 (type: int), _col1 (type: int), _col2 (type: string) - sort order: +++ - Map-reduce partition columns: _col0 (type: int), _col1 (type: int), _col2 (type: string) - Statistics: Num rows: 696954748 Data size: 61485550191 Basic stats: COMPLETE Column stats: NONE - value expressions: _col3 (type: decimal(17,2)) - Reducer 4 - Reduce Operator Tree: - Group By Operator - aggregations: sum(VALUE._col0) - keys: KEY._col0 (type: int), KEY._col1 (type: int), KEY._col2 (type: string) - mode: mergepartial - outputColumnNames: _col0, _col1, _col2, _col3 - Statistics: Num rows: 348477374 Data size: 30742775095 Basic stats: COMPLETE Column stats: NONE - Reduce Output Operator - key expressions: _col0 (type: int), _col3 (type: decimal(17,2)), _col1 (type: int) - sort order: +-+ - Statistics: Num rows: 348477374 Data size: 30742775095 Basic stats: COMPLETE Column stats: NONE - TopN Hash Memory Usage: 0.1 - value expressions: _col2 (type: string) - Reducer 5 - Reduce Operator Tree: - Select Operator - expressions: KEY.reducesinkkey0 (type: int), KEY.reducesinkkey2 (type: int), VALUE._col0 (type: string), KEY.reducesinkkey1 (type: decimal(17,2)) - outputColumnNames: _col0, _col1, _col2, _col3 - Statistics: Num rows: 348477374 Data size: 30742775095 Basic stats: COMPLETE Column stats: NONE - Limit - Number of rows: 100 - Statistics: Num rows: 100 Data size: 8800 Basic stats: COMPLETE Column stats: NONE - File Output Operator - compressed: false - Statistics: Num rows: 100 Data size: 8800 Basic stats: COMPLETE Column stats: NONE - table: - input format: org.apache.hadoop.mapred.SequenceFileInputFormat - output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat - serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe - - Stage: Stage-0 - Fetch Operator - limit: 100 - Processor Tree: - ListSink -