Index: ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java (revision 1672320) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java (working copy) @@ -33,4 +33,9 @@ return input.groupByKey(); } + @Override + public String getName() { + return "GroupBy"; + } + } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java (revision 1672320) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java (working copy) @@ -36,6 +36,7 @@ private JavaPairRDD hadoopRDD; private boolean toCache; private final SparkPlan sparkPlan; + private String name = "MapInput"; public MapInput(SparkPlan sparkPlan, JavaPairRDD hadoopRDD) { this(sparkPlan, hadoopRDD, false); @@ -85,4 +86,19 @@ } } + + @Override + public String getName() { + return name; + } + + @Override + public Boolean isCacheEnable() { + return new Boolean(toCache); + } + + @Override + public void setName(String name) { + this.name = name; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java (revision 1672320) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java (working copy) @@ -25,6 +25,8 @@ public class MapTran implements SparkTran { private HiveMapFunction mapFunc; + private String name = "MapTran"; + @Override public JavaPairRDD transform( JavaPairRDD input) { @@ -35,4 +37,19 @@ this.mapFunc = mapFunc; } + @Override + public String getName() { + return name; + } + + @Override + public Boolean isCacheEnable() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void setName(String name) { + this.name = name; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java (revision 1672320) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java (working copy) @@ -24,6 +24,8 @@ public class ReduceTran implements SparkTran, HiveKey, BytesWritable> { private HiveReduceFunction reduceFunc; + + private String name = "Reduce"; @Override public JavaPairRDD transform( @@ -34,4 +36,20 @@ public void setReduceFunction(HiveReduceFunction redFunc) { this.reduceFunc = redFunc; } + + @Override + public String getName() { + return name; + } + + @Override + public Boolean isCacheEnable() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void setName(String name) { + this.name = name; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java (revision 1672320) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java (working copy) @@ -28,6 +28,7 @@ private final int numOfPartitions; private final boolean toCache; private final SparkPlan sparkPlan; + private String name = "Shuffle"; public ShuffleTran(SparkPlan sparkPlan, SparkShuffler sf, int n) { this(sparkPlan, sf, n, false); @@ -49,4 +50,28 @@ } return result; } + + public int getNoOfPartitions() { + return numOfPartitions; + } + + @Override + public String getName() { + return name; + } + + @Override + public Boolean isCacheEnable() { + return new Boolean(toCache); + } + + @Override + public void setName(String name) { + this.name = name; + } + + public SparkShuffler getShuffler() { + return shuffler; + } + } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java (revision 1672320) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java (working copy) @@ -121,4 +121,9 @@ } } + @Override + public String getName() { + return "SortBy"; + } + } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java (revision 1672320) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java (working copy) @@ -21,11 +21,14 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.io.BytesWritable; @@ -36,6 +39,7 @@ @SuppressWarnings("rawtypes") public class SparkPlan { private static final String CLASS_NAME = SparkPlan.class.getName(); + private static final Log LOG = LogFactory.getLog(SparkPlan.class); private final PerfLogger perfLogger = PerfLogger.getPerfLogger(); private final Set rootTrans = new HashSet(); @@ -43,6 +47,8 @@ private final Map> transGraph = new HashMap>(); private final Map> invertedTransGraph = new HashMap>(); private final Set cachedRDDIds = new HashSet(); + private String sparkPlan; + private String arrow = " <-- "; @SuppressWarnings("unchecked") public JavaPairRDD generateGraph() { @@ -72,6 +78,8 @@ tranToOutputRDDMap.put(tran, rdd); } + logSparkPlan(); + JavaPairRDD finalRDD = null; for (SparkTran leafTran : leafTrans) { JavaPairRDD rdd = tranToOutputRDDMap.get(leafTran); @@ -86,6 +94,146 @@ return finalRDD; } + private void addNumberToTrans() { + int i = 1; + String name = null; + + // Traverse leafTran & transGraph add numbers to trans + for (SparkTran leaf : leafTrans) { + name = leaf.getName() + " " + i++; + leaf.setName(name); + } + Set sparkTrans = transGraph.keySet(); + for (SparkTran tran : sparkTrans) { + name = tran.getName() + " " + i++; + tran.setName(name); + } + } + + private void logSparkPlan() { + addNumberToTrans(); + ArrayList leafTran = new ArrayList(); + leafTran.addAll(leafTrans); + + for (SparkTran leaf : leafTrans) { + collectLeafTrans(leaf, leafTran); + } + + // Start Traverse from the leafTrans and get parents of each leafTrans till the end + sparkPlan = "\n\t!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Spark Plan !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! \n\n"; + for (SparkTran leaf : leafTran) { + sparkPlan = sparkPlan + " " + leaf.getName(); + getSparkPlan(leaf); + sparkPlan = sparkPlan + "\n"; + } + sparkPlan = sparkPlan + + " \n\t!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Spark Plan !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! "; + LOG.info(sparkPlan); + } + + private void collectLeafTrans(SparkTran leaf, List reduceTrans) { + List parents = getParents(leaf); + if (parents.size() > 0) { + SparkTran nextLeaf = null; + for (SparkTran leafTran : parents) { + if (leafTran instanceof ReduceTran) { + reduceTrans.add(leafTran); + } else { + if (getParents(leafTran).size() > 0) + nextLeaf = leafTran; + } + } + if (nextLeaf != null) + collectLeafTrans(nextLeaf, reduceTrans); + } + } + + private void getSparkPlan(SparkTran tran) { + List parents = getParents(tran); + List nextLeaf = new ArrayList(); + if (parents.size() > 0) { + sparkPlan = sparkPlan + arrow; + boolean isFirst = true; + for (SparkTran leaf : parents) { + if (isFirst) { + sparkPlan = sparkPlan + "( " + leaf.getName(); + if (leaf instanceof ShuffleTran) { + logShuffleTranStatus((ShuffleTran) leaf); + } else { + logCacheStatus(leaf); + } + isFirst = false; + } else { + sparkPlan = sparkPlan + "," + leaf.getName(); + if (leaf instanceof ShuffleTran) { + logShuffleTranStatus((ShuffleTran) leaf); + } else { + logCacheStatus(leaf); + } + } + // Leave reduceTran it will be expanded in the next line + if (getParents(leaf).size() > 0 && !(leaf instanceof ReduceTran)) { + nextLeaf.add(leaf); + } + } + sparkPlan = sparkPlan + " ) "; + if (nextLeaf.size() > 1) { + logLeafTran(nextLeaf); + } else { + if (nextLeaf.size() != 0) + getSparkPlan(nextLeaf.get(0)); + } + } + } + + private void logLeafTran(List parent) { + sparkPlan = sparkPlan + arrow; + boolean isFirst = true; + for (SparkTran sparkTran : parent) { + List parents = getParents(sparkTran); + SparkTran leaf = parents.get(0); + if (isFirst) { + sparkPlan = sparkPlan + "( " + leaf.getName(); + if (leaf instanceof ShuffleTran) { + logShuffleTranStatus((ShuffleTran) leaf); + } else { + logCacheStatus(leaf); + } + isFirst = false; + } else { + sparkPlan = sparkPlan + "," + leaf.getName(); + if (leaf instanceof ShuffleTran) { + logShuffleTranStatus((ShuffleTran) leaf); + } else { + logCacheStatus(leaf); + } + } + } + sparkPlan = sparkPlan + " ) "; + } + + private void logShuffleTranStatus(ShuffleTran leaf) { + int noOfPartitions = leaf.getNoOfPartitions(); + sparkPlan = sparkPlan + " ( Partitions " + noOfPartitions; + SparkShuffler shuffler = leaf.getShuffler(); + sparkPlan = sparkPlan + ", " + shuffler.getName(); + if (leaf.isCacheEnable()) { + sparkPlan = sparkPlan + ", Cache on"; + } else { + sparkPlan = sparkPlan + ", Cache off"; + } + } + + private void logCacheStatus(SparkTran sparkTran) { + if (sparkTran.isCacheEnable() != null) { + if (sparkTran.isCacheEnable().booleanValue()) { + sparkPlan = sparkPlan + " (cache on) "; + } else { + sparkPlan = sparkPlan + " (cache off) "; + } + } + } + public void addTran(SparkTran tran) { rootTrans.add(tran); leafTrans.add(tran); @@ -168,5 +316,10 @@ return transGraph.get(tran); } + + @Override + public String toString() { + return sparkPlan; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java (revision 1672320) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java (working copy) @@ -27,4 +27,5 @@ JavaPairRDD> shuffle( JavaPairRDD input, int numPartitions); + public String getName(); } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java (revision 1672320) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java (working copy) @@ -25,4 +25,10 @@ public interface SparkTran { JavaPairRDD transform( JavaPairRDD input); + + public String getName(); + + public void setName(String name); + + public Boolean isCacheEnable(); }