diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java index b8e36cb..e128dd2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java @@ -33,4 +33,8 @@ return input.groupByKey(); } + @Override + public String getName() { + return "GroupBy"; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java index 157e4d8..26cfebd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java @@ -36,6 +36,7 @@ private JavaPairRDD hadoopRDD; private boolean toCache; private final SparkPlan sparkPlan; + private String name = "MapInput"; public MapInput(SparkPlan sparkPlan, JavaPairRDD hadoopRDD) { this(sparkPlan, hadoopRDD, false); @@ -88,11 +89,16 @@ public void setToCache(boolean toCache) { @Override public String getName() { - return "MapInput"; + return name; } @Override public Boolean isCacheEnable() { return new Boolean(toCache); } + + @Override + public void setName(String name) { + this.name = name; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java index f6a4d77..2170243 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java @@ -24,6 +24,7 @@ public class MapTran implements SparkTran { private HiveMapFunction mapFunc; + private String name = "MapTran"; @Override public JavaPairRDD transform( @@ -37,12 +38,16 @@ public void setMapFunction(HiveMapFunction mapFunc) { @Override public String getName() { - return "MapTran"; + return name; } @Override public Boolean isCacheEnable() { - // TODO Auto-generated method stub return null; } + + @Override + public void setName(String name) { + this.name = name; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java index fd6b31c..e60dfac 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java @@ -24,6 +24,7 @@ public class ReduceTran implements SparkTran, HiveKey, BytesWritable> { private HiveReduceFunction reduceFunc; + private String name = "Reduce"; @Override public JavaPairRDD transform( @@ -37,12 +38,16 @@ public void setReduceFunction(HiveReduceFunction redFunc) { @Override public String getName() { - return "Reduce"; + return name; } @Override public Boolean isCacheEnable() { - // TODO Auto-generated method stub return null; } + + @Override + public void setName(String name) { + this.name = name; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java index 6cdab20..a774395 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java @@ -28,6 +28,7 @@ private final int numOfPartitions; private final boolean toCache; private final SparkPlan sparkPlan; + private String name = "Shuffle"; public ShuffleTran(SparkPlan sparkPlan, SparkShuffler sf, int n) { this(sparkPlan, sf, n, false); @@ -50,13 +51,26 @@ public ShuffleTran(SparkPlan sparkPlan, SparkShuffler sf, int n, boolean toCache return result; } + public int getNoOfPartitions() { + return numOfPartitions; + } + @Override public String getName() { - return "Shuffle"; + return name; } @Override public Boolean isCacheEnable() { return new Boolean(toCache); } + + @Override + public void setName(String name) { + this.name = name; + } + + public SparkShuffler getShuffler() { + return shuffler; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java index 2545a9d..766813c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java @@ -56,6 +56,11 @@ public SortByShuffler(boolean totalOrder) { return rdd.mapPartitionsToPair(new ShuffleFunction()); } + @Override + public String getName() { + return "SortBy"; + } + private static class ShuffleFunction implements PairFlatMapFunction>, HiveKey, Iterable> { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java index 81b7e85..0d931a2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java @@ -46,6 +46,8 @@ private final Map> transGraph = new HashMap>(); private final Map> invertedTransGraph = new HashMap>(); private final Set cachedRDDIds = new HashSet(); + private StringBuilder sparkPlan = new StringBuilder(); + private String arrow = " <-- "; @SuppressWarnings("unchecked") public JavaPairRDD generateGraph() { @@ -81,9 +83,9 @@ for (SparkTran leafTran : leafTrans) { JavaPairRDD rdd = tranToOutputRDDMap.get(leafTran); if (finalRDD == null) { - finalRDD = rdd; + finalRDD = rdd; } else { - finalRDD = finalRDD.union(rdd); + finalRDD = finalRDD.union(rdd); } } @@ -91,57 +93,146 @@ return finalRDD; } + private void addNumberToTrans() { + int i = 1; + String name = null; + + // Traverse leafTran & transGraph add numbers to trans + for (SparkTran leaf : leafTrans) { + name = leaf.getName() + " " + i++; + leaf.setName(name); + } + Set sparkTrans = transGraph.keySet(); + for (SparkTran tran : sparkTrans) { + name = tran.getName() + " " + i++; + tran.setName(name); + } + } + private void logSparkPlan() { - LOG.info("------------------------------ Spark Plan -----------------------------"); - Set keySet = invertedTransGraph.keySet(); - for (SparkTran sparkTran : keySet) { - if (sparkTran instanceof ReduceTran) { - String sparkPlan = " " + sparkTran.getName(); - sparkPlan = getSparkPlan(sparkTran, sparkPlan); - LOG.info(sparkPlan); + addNumberToTrans(); + ArrayList leafTran = new ArrayList(); + leafTran.addAll(leafTrans); + + for (SparkTran leaf : leafTrans) { + collectLeafTrans(leaf, leafTran); + } + + // Start Traverse from the leafTrans and get parents of each leafTrans till + // the end + sparkPlan + .append("\n\t!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Spark Plan !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! \n\n"); + for (SparkTran leaf : leafTran) { + sparkPlan.append(leaf.getName()); + getSparkPlan(leaf); + sparkPlan.append("\n"); + } + sparkPlan + .append(" \n\t!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Spark Plan !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! "); + LOG.info(sparkPlan); + } + + private void collectLeafTrans(SparkTran leaf, List reduceTrans) { + List parents = getParents(leaf); + if (parents.size() > 0) { + SparkTran nextLeaf = null; + for (SparkTran leafTran : parents) { + if (leafTran instanceof ReduceTran) { + reduceTrans.add(leafTran); + } else { + if (getParents(leafTran).size() > 0) + nextLeaf = leafTran; + } } + if (nextLeaf != null) + collectLeafTrans(nextLeaf, reduceTrans); } - LOG.info("------------------------------ Spark Plan -----------------------------"); - } - - private String getSparkPlan(SparkTran leaf, String sparkPlanMsg) { - if (leaf != null) { - List parents = getParents(leaf); - if (parents.size() > 0) { - sparkPlanMsg = sparkPlanMsg + " <-- "; - boolean isFirst = true; - SparkTran parent = null; - for (SparkTran sparkTran : parents) { - if (isFirst) { - sparkPlanMsg = sparkPlanMsg + "( " + sparkTran.getName(); - sparkPlanMsg = logCacheStatus(sparkPlanMsg, sparkTran); - isFirst = false; + } + + private void getSparkPlan(SparkTran tran) { + List parents = getParents(tran); + List nextLeaf = new ArrayList(); + if (parents.size() > 0) { + sparkPlan.append(arrow); + boolean isFirst = true; + for (SparkTran leaf : parents) { + if (isFirst) { + sparkPlan.append("( " + leaf.getName()); + if (leaf instanceof ShuffleTran) { + logShuffleTranStatus((ShuffleTran) leaf); } else { - sparkPlanMsg = sparkPlanMsg + "," + sparkTran.getName(); - sparkPlanMsg = logCacheStatus(sparkPlanMsg, sparkTran); + logCacheStatus(leaf); } - if (getParents(sparkTran).size() > 0 && !(sparkTran instanceof ReduceTran)) { - parent = sparkTran; + isFirst = false; + } else { + sparkPlan.append("," + leaf.getName()); + if (leaf instanceof ShuffleTran) { + logShuffleTranStatus((ShuffleTran) leaf); + } else { + logCacheStatus(leaf); } } - sparkPlanMsg = sparkPlanMsg + " ) "; - return getSparkPlan(parent, sparkPlanMsg); + // Leave reduceTran it will be expanded in the next line + if (getParents(leaf).size() > 0 && !(leaf instanceof ReduceTran)) { + nextLeaf.add(leaf); + } + } + sparkPlan.append(" ) "); + if (nextLeaf.size() > 1) { + logLeafTran(nextLeaf); + } else { + if (nextLeaf.size() != 0) + getSparkPlan(nextLeaf.get(0)); + } + } + } + + private void logLeafTran(List parent) { + sparkPlan.append(arrow); + boolean isFirst = true; + for (SparkTran sparkTran : parent) { + List parents = getParents(sparkTran); + SparkTran leaf = parents.get(0); + if (isFirst) { + sparkPlan.append("( " + leaf.getName()); + if (leaf instanceof ShuffleTran) { + logShuffleTranStatus((ShuffleTran) leaf); + } else { + logCacheStatus(leaf); + } + isFirst = false; } else { - return sparkPlanMsg; + sparkPlan.append("," + leaf.getName()); + if (leaf instanceof ShuffleTran) { + logShuffleTranStatus((ShuffleTran) leaf); + } else { + logCacheStatus(leaf); + } } } - return sparkPlanMsg; + sparkPlan.append(" ) "); } - private String logCacheStatus(String sparkPlanMsg, SparkTran sparkTran) { + private void logShuffleTranStatus(ShuffleTran leaf) { + int noOfPartitions = leaf.getNoOfPartitions(); + sparkPlan.append(" ( Partitions " + noOfPartitions); + SparkShuffler shuffler = leaf.getShuffler(); + sparkPlan.append(", " + shuffler.getName()); + if (leaf.isCacheEnable()) { + sparkPlan.append(", Cache on"); + } else { + sparkPlan.append(", Cache off"); + } + } + + private void logCacheStatus(SparkTran sparkTran) { if (sparkTran.isCacheEnable() != null) { if (sparkTran.isCacheEnable().booleanValue()) { - sparkPlanMsg = sparkPlanMsg + " (cache on) "; + sparkPlan.append(" (cache on) "); } else { - sparkPlanMsg = sparkPlanMsg + " (cache off) "; + sparkPlan.append(" (cache off) "); } } - return sparkPlanMsg; } public void addTran(SparkTran tran) { @@ -227,4 +318,8 @@ public void connect(SparkTran parent, SparkTran child) { return transGraph.get(tran); } + @Override + public String toString() { + return sparkPlan.toString(); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java index 53845a0..40e251f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java @@ -27,4 +27,6 @@ JavaPairRDD> shuffle( JavaPairRDD input, int numPartitions); + public String getName(); + } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java index c3c48a0..671c983 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java @@ -28,5 +28,7 @@ public String getName(); + public void setName(String name); + public Boolean isCacheEnable(); }