commit 637da9add0e194fdcbd0a31a0ab83ff12f2713c4 Author: Sahil Takiar Date: Thu Jan 4 10:18:06 2018 -0800 HIVE-18368: Improve Spark Debug RDD Graph diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java index 26cfebdfb5..9f2b7c0ee8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java @@ -36,17 +36,18 @@ private JavaPairRDD hadoopRDD; private boolean toCache; private final SparkPlan sparkPlan; - private String name = "MapInput"; + private final String name; public MapInput(SparkPlan sparkPlan, JavaPairRDD hadoopRDD) { - this(sparkPlan, hadoopRDD, false); + this(sparkPlan, hadoopRDD, false, "MapInput"); } public MapInput(SparkPlan sparkPlan, - JavaPairRDD hadoopRDD, boolean toCache) { + JavaPairRDD hadoopRDD, boolean toCache, String name) { this.hadoopRDD = hadoopRDD; this.toCache = toCache; this.sparkPlan = sparkPlan; + this.name = name; } public void setToCache(boolean toCache) { @@ -66,6 +67,7 @@ public void setToCache(boolean toCache) { } else { result = hadoopRDD; } + result.setName(this.name); return result; } @@ -96,9 +98,4 @@ public String getName() { public Boolean isCacheEnable() { return new Boolean(toCache); } - - @Override - public void setName(String name) { - this.name = name; - } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java index 2a189915d9..ac5db3e662 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java @@ -24,20 +24,23 @@ public class MapTran extends CacheTran { private HiveMapFunction mapFunc; - private String name = "MapTran"; + private String name; public MapTran() { - this(false); + this(false, "MapTran"); } - public MapTran(boolean cache) { + public MapTran(boolean cache, String name) { super(cache); + this.name = name; } @Override public JavaPairRDD doTransform( JavaPairRDD input) { - return input.mapPartitionsToPair(mapFunc); + JavaPairRDD rdd = input.mapPartitionsToPair(mapFunc); + rdd.setName(this.name + " (" + rdd.getNumPartitions() + (isCacheEnable() ? ", cached)" : ")")); + return rdd; } public void setMapFunction(HiveMapFunction mapFunc) { @@ -48,9 +51,4 @@ public void setMapFunction(HiveMapFunction mapFunc) { public String getName() { return name; } - - @Override - public void setName(String name) { - this.name = name; - } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java index 709a4ace34..3d99523db4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java @@ -24,20 +24,23 @@ public class ReduceTran extends CacheTran { private HiveReduceFunction reduceFunc; - private String name = "Reduce"; + private final String name; public ReduceTran() { - this(false); + this(false, "Reduce"); } - public ReduceTran(boolean caching) { + public ReduceTran(boolean caching, String name) { super(caching); + this.name = name; } @Override public JavaPairRDD doTransform( JavaPairRDD input) { - return input.mapPartitionsToPair(reduceFunc); + JavaPairRDD rdd = input.mapPartitionsToPair(reduceFunc); + rdd.setName(this.name + " (" + rdd.getNumPartitions() + (isCacheEnable() ? ", cached)" : ")")); + return rdd; } public void setReduceFunction(HiveReduceFunction redFunc) { @@ -48,9 +51,4 @@ public void setReduceFunction(HiveReduceFunction redFunc) { public String getName() { return name; } - - @Override - public void setName(String name) { - this.name = name; - } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java index 2aac2b416b..0170bb92c2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java @@ -28,17 +28,18 @@ private final int numOfPartitions; private final boolean toCache; private final SparkPlan sparkPlan; - private String name = "Shuffle"; + private final String name; public ShuffleTran(SparkPlan sparkPlan, SparkShuffler sf, int n) { - this(sparkPlan, sf, n, false); + this(sparkPlan, sf, n, false, "Shuffle"); } - public ShuffleTran(SparkPlan sparkPlan, SparkShuffler sf, int n, boolean toCache) { + public ShuffleTran(SparkPlan sparkPlan, SparkShuffler sf, int n, boolean toCache, String name) { shuffler = sf; numOfPartitions = n; this.toCache = toCache; this.sparkPlan = sparkPlan; + this.name = name; } @Override @@ -48,7 +49,7 @@ public ShuffleTran(SparkPlan sparkPlan, SparkShuffler sf, int n, boolean toCache sparkPlan.addCachedRDDId(result.id()); result = result.persist(StorageLevel.MEMORY_AND_DISK()); } - return result; + return result.setName(this.name); } public int getNoOfPartitions() { @@ -65,11 +66,6 @@ public Boolean isCacheEnable() { return new Boolean(toCache); } - @Override - public void setName(String name) { - this.name = name; - } - public SparkShuffler getShuffler() { return shuffler; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java index 9a2ab5120d..8857183831 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java @@ -68,6 +68,7 @@ rdd = prevRDD; } else { rdd = rdd.union(prevRDD); + rdd.setName("UnionRDD (" + rdd.getNumPartitions() + ")"); } } rdd = tran.transform(rdd); @@ -76,8 +77,6 @@ tranToOutputRDDMap.put(tran, rdd); } - logSparkPlan(); - JavaPairRDD finalRDD = null; for (SparkTran leafTran : leafTrans) { JavaPairRDD rdd = tranToOutputRDDMap.get(leafTran); @@ -85,156 +84,15 @@ finalRDD = rdd; } else { finalRDD = finalRDD.union(rdd); + rdd.setName("UnionRDD (" + finalRDD.getNumPartitions() + ")"); } } perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_BUILD_RDD_GRAPH); - if (LOG.isDebugEnabled()) { - LOG.info("print generated spark rdd graph:\n" + SparkUtilities.rddGraphToString(finalRDD)); - } - return finalRDD; - } - - private void addNumberToTrans() { - int i = 1; - String name = null; - - // Traverse leafTran & transGraph add numbers to trans - for (SparkTran leaf : leafTrans) { - name = leaf.getName() + " " + i++; - leaf.setName(name); - } - Set sparkTrans = transGraph.keySet(); - for (SparkTran tran : sparkTrans) { - name = tran.getName() + " " + i++; - tran.setName(name); - } - } - private void logSparkPlan() { - addNumberToTrans(); - ArrayList leafTran = new ArrayList(); - leafTran.addAll(leafTrans); + LOG.info("\n\nSpark RDD Graph:\n\n" + finalRDD.toDebugString() + "\n"); - for (SparkTran leaf : leafTrans) { - collectLeafTrans(leaf, leafTran); - } - - // Start Traverse from the leafTrans and get parents of each leafTrans till - // the end - StringBuilder sparkPlan = new StringBuilder( - "\n\t!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Spark Plan !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! \n\n"); - for (SparkTran leaf : leafTran) { - sparkPlan.append(leaf.getName()); - getSparkPlan(leaf, sparkPlan); - sparkPlan.append("\n"); - } - sparkPlan - .append(" \n\t!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Spark Plan !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! "); - LOG.info(sparkPlan.toString()); - } - - private void collectLeafTrans(SparkTran leaf, List reduceTrans) { - List parents = getParents(leaf); - if (parents.size() > 0) { - SparkTran nextLeaf = null; - for (SparkTran leafTran : parents) { - if (leafTran instanceof ReduceTran) { - reduceTrans.add(leafTran); - } else { - if (getParents(leafTran).size() > 0) - nextLeaf = leafTran; - } - } - if (nextLeaf != null) - collectLeafTrans(nextLeaf, reduceTrans); - } - } - - private void getSparkPlan(SparkTran tran, StringBuilder sparkPlan) { - List parents = getParents(tran); - List nextLeaf = new ArrayList(); - if (parents.size() > 0) { - sparkPlan.append(" <-- "); - boolean isFirst = true; - for (SparkTran leaf : parents) { - if (isFirst) { - sparkPlan.append("( " + leaf.getName()); - if (leaf instanceof ShuffleTran) { - logShuffleTranStatus((ShuffleTran) leaf, sparkPlan); - } else { - logCacheStatus(leaf, sparkPlan); - } - isFirst = false; - } else { - sparkPlan.append("," + leaf.getName()); - if (leaf instanceof ShuffleTran) { - logShuffleTranStatus((ShuffleTran) leaf, sparkPlan); - } else { - logCacheStatus(leaf, sparkPlan); - } - } - // Leave reduceTran it will be expanded in the next line - if (getParents(leaf).size() > 0 && !(leaf instanceof ReduceTran)) { - nextLeaf.add(leaf); - } - } - sparkPlan.append(" ) "); - if (nextLeaf.size() > 1) { - logLeafTran(nextLeaf, sparkPlan); - } else { - if (nextLeaf.size() != 0) - getSparkPlan(nextLeaf.get(0), sparkPlan); - } - } - } - - private void logLeafTran(List parent, StringBuilder sparkPlan) { - sparkPlan.append(" <-- "); - boolean isFirst = true; - for (SparkTran sparkTran : parent) { - List parents = getParents(sparkTran); - SparkTran leaf = parents.get(0); - if (isFirst) { - sparkPlan.append("( " + leaf.getName()); - if (leaf instanceof ShuffleTran) { - logShuffleTranStatus((ShuffleTran) leaf, sparkPlan); - } else { - logCacheStatus(leaf, sparkPlan); - } - isFirst = false; - } else { - sparkPlan.append("," + leaf.getName()); - if (leaf instanceof ShuffleTran) { - logShuffleTranStatus((ShuffleTran) leaf, sparkPlan); - } else { - logCacheStatus(leaf, sparkPlan); - } - } - } - sparkPlan.append(" ) "); - } - - private void logShuffleTranStatus(ShuffleTran leaf, StringBuilder sparkPlan) { - int noOfPartitions = leaf.getNoOfPartitions(); - sparkPlan.append(" ( Partitions " + noOfPartitions); - SparkShuffler shuffler = leaf.getShuffler(); - sparkPlan.append(", " + shuffler.getName()); - if (leaf.isCacheEnable()) { - sparkPlan.append(", Cache on"); - } else { - sparkPlan.append(", Cache off"); - } - } - - private void logCacheStatus(SparkTran sparkTran, StringBuilder sparkPlan) { - if (sparkTran.isCacheEnable() != null) { - if (sparkTran.isCacheEnable().booleanValue()) { - sparkPlan.append(" (cache on) "); - } else { - sparkPlan.append(" (cache off) "); - } - } + return finalRDD; } public void addTran(SparkTran tran) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 079ec424f4..073f305cc7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +41,7 @@ import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper; @@ -51,6 +53,7 @@ import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty; import org.apache.hadoop.hive.ql.plan.SparkWork; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.stats.StatsCollectionContext; import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.hive.ql.stats.StatsPublisher; @@ -138,9 +141,12 @@ private SparkTran generateParentTran(SparkPlan sparkPlan, SparkWork sparkWork, result = generateMapInput(sparkPlan, (MapWork)work); sparkPlan.addTran(result); } else if (work instanceof ReduceWork) { + boolean toCache = cloneToWork.containsKey(work); List parentWorks = sparkWork.getParents(work); - result = generate(sparkPlan, - sparkWork.getEdgeProperty(parentWorks.get(0), work), cloneToWork.containsKey(work)); + SparkEdgeProperty sparkEdgeProperty = sparkWork.getEdgeProperty(parentWorks.get(0), work); + String rddName = work.getName() + " (" + sparkEdgeProperty.getShuffleType() + ", " + + sparkEdgeProperty.getNumPartitions() + (toCache ? ", cached)" : ")"); + result = generate(sparkPlan, sparkEdgeProperty, toCache, rddName); sparkPlan.addTran(result); for (BaseWork parentWork : parentWorks) { sparkPlan.connect(workToTranMap.get(parentWork), result); @@ -198,12 +204,20 @@ private MapInput generateMapInput(SparkPlan sparkPlan, MapWork mapWork) hadoopRDD = sc.hadoopRDD(jobConf, ifClass, WritableComparable.class, Writable.class); } + boolean toCache = false/*cloneToWork.containsKey(mapWork)*/; + + String rddName = mapWork.getName() + " (" + mapWork.getAllRootOperators().stream() + .filter(op -> op instanceof TableScanOperator) + .map(ts -> ((TableScanDesc) ts.getConf()).getAlias()) + .collect(Collectors.joining(", ")) + + ", " + hadoopRDD.getNumPartitions() + (toCache ? ", cached)" : ")"); + // Caching is disabled for MapInput due to HIVE-8920 - MapInput result = new MapInput(sparkPlan, hadoopRDD, false/*cloneToWork.containsKey(mapWork)*/); + MapInput result = new MapInput(sparkPlan, hadoopRDD, toCache, rddName); return result; } - private ShuffleTran generate(SparkPlan sparkPlan, SparkEdgeProperty edge, boolean toCache) { + private ShuffleTran generate(SparkPlan sparkPlan, SparkEdgeProperty edge, boolean toCache, String name) { Preconditions.checkArgument(!edge.isShuffleNone(), "AssertionError: SHUFFLE_NONE should only be used for UnionWork."); SparkShuffler shuffler; @@ -214,7 +228,7 @@ private ShuffleTran generate(SparkPlan sparkPlan, SparkEdgeProperty edge, boolea } else { shuffler = new GroupByShuffler(); } - return new ShuffleTran(sparkPlan, shuffler, edge.getNumPartitions(), toCache); + return new ShuffleTran(sparkPlan, shuffler, edge.getNumPartitions(), toCache, name); } private SparkTran generate(BaseWork work, SparkWork sparkWork) throws Exception { @@ -238,12 +252,12 @@ private SparkTran generate(BaseWork work, SparkWork sparkWork) throws Exception "Can't make path " + outputPath + " : " + e.getMessage()); } } - MapTran mapTran = new MapTran(caching); + MapTran mapTran = new MapTran(caching, work.getName()); HiveMapFunction mapFunc = new HiveMapFunction(confBytes, sparkReporter); mapTran.setMapFunction(mapFunc); return mapTran; } else if (work instanceof ReduceWork) { - ReduceTran reduceTran = new ReduceTran(caching); + ReduceTran reduceTran = new ReduceTran(caching, work.getName()); HiveReduceFunction reduceFunc = new HiveReduceFunction(confBytes, sparkReporter); reduceTran.setReduceFunction(reduceFunc); return reduceTran; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java index 671c9837e0..c3c48a04fc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java @@ -28,7 +28,5 @@ public String getName(); - public void setName(String name); - public Boolean isCacheEnable(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java index ca19fd013c..73302e634e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java @@ -17,10 +17,8 @@ */ package org.apache.hadoop.hive.ql.exec.spark; -import java.io.File; import java.io.IOException; import java.net.URI; -import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Collection; @@ -42,12 +40,8 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.io.BytesWritable; import org.apache.hive.spark.client.SparkClientUtilities; -import org.apache.spark.Dependency; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.rdd.RDD; -import org.apache.spark.rdd.UnionRDD; -import scala.collection.JavaConversions; + /** * Contains utilities methods used as part of Spark tasks. @@ -130,36 +124,6 @@ public static SparkSession getSparkSession(HiveConf conf, return sparkSession; } - - public static String rddGraphToString(JavaPairRDD rdd) { - StringBuilder sb = new StringBuilder(); - rddToString(rdd.rdd(), sb, ""); - return sb.toString(); - } - - private static void rddToString(RDD rdd, StringBuilder sb, String offset) { - sb.append(offset).append(rdd.getClass().getCanonicalName()).append("[").append(rdd.hashCode()).append("]"); - if (rdd.getStorageLevel().useMemory()) { - sb.append("(cached)"); - } - sb.append("\n"); - Collection dependencies = JavaConversions.asJavaCollection(rdd.dependencies()); - if (dependencies != null) { - offset += "\t"; - for (Dependency dependency : dependencies) { - RDD parentRdd = dependency.rdd(); - rddToString(parentRdd, sb, offset); - } - } else if (rdd instanceof UnionRDD) { - UnionRDD unionRDD = (UnionRDD) rdd; - offset += "\t"; - Collection parentRdds = JavaConversions.asJavaCollection(unionRDD.rdds()); - for (RDD parentRdd : parentRdds) { - rddToString(parentRdd, sb, offset); - } - } - } - /** * Generate a temporary path for dynamic partition pruning in Spark branch * TODO: no longer need this if we use accumulator! diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java index 21238dbeb7..82c2cf8aeb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java @@ -415,7 +415,7 @@ public int hashCode() { combine.createPool(job, f); poolMap.put(combinePathInputFormat, f); } else { - LOG.info("CombineHiveInputSplit: pool is already created for " + path + + LOG.debug("CombineHiveInputSplit: pool is already created for " + path + "; using filter path " + filterPath); f.addPath(filterPath); }