diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java index b8e36cb..34c795b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java @@ -33,4 +33,9 @@ return input.groupByKey(); } + @Override + public String getName() { + return "GroupBy"; + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java index 8d18885..6ae3081 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java @@ -36,6 +36,7 @@ private JavaPairRDD hadoopRDD; private boolean toCache; private final SparkPlan sparkPlan; + private String name; public MapInput(SparkPlan sparkPlan, JavaPairRDD hadoopRDD) { this(sparkPlan, hadoopRDD, false); @@ -85,4 +86,23 @@ public void setToCache(boolean toCache) { } } + + @Override + public String getName() { + if (name != null) { + return name; + } else { + return "MapInput"; + } + } + + @Override + public Boolean isCacheEnable() { + return new Boolean(toCache); + } + + @Override + public void setName(String name) { + this.name = name; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java index 638c387..ba6fe64 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java @@ -25,6 +25,8 @@ public class MapTran implements SparkTran { private HiveMapFunction mapFunc; + private String name; + @Override public JavaPairRDD transform( JavaPairRDD input) { @@ -35,4 +37,19 @@ public void setMapFunction(HiveMapFunction mapFunc) { this.mapFunc = mapFunc; } + @Override + public String getName() { + return name; + } + + @Override + public Boolean isCacheEnable() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void setName(String name) { + this.name = name; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java index dbc614b..d73ae62 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java @@ -24,6 +24,8 @@ public class ReduceTran implements SparkTran, HiveKey, BytesWritable> { private HiveReduceFunction reduceFunc; + + private String name; @Override public JavaPairRDD transform( @@ -34,4 +36,20 @@ public void setReduceFunction(HiveReduceFunction redFunc) { this.reduceFunc = redFunc; } + + @Override + public String getName() { + return name; + } + + @Override + public Boolean isCacheEnable() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void setName(String name) { + this.name = name; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java index 4a597ee..e05a53b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java @@ -28,6 +28,7 @@ private final int numOfPartitions; private final boolean toCache; private final SparkPlan sparkPlan; + private String name; public ShuffleTran(SparkPlan sparkPlan, SparkShuffler sf, int n) { this(sparkPlan, sf, n, false); @@ -49,4 +50,28 @@ public ShuffleTran(SparkPlan sparkPlan, SparkShuffler sf, int n, boolean toCache } return result; } + + public int getNoOfPartitions(){ + return numOfPartitions; + } + + @Override + public String getName() { + return name; + } + + @Override + public Boolean isCacheEnable() { + return new Boolean(toCache); + } + + @Override + public void setName(String name) { + this.name = name; + } + + public SparkShuffler getShuffler() { + return shuffler; + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java index 2545a9d..cf537ce 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java @@ -24,6 +24,7 @@ import org.apache.spark.Partitioner; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.PairFlatMapFunction; + import scala.Tuple2; import java.util.*; @@ -121,4 +122,9 @@ public void remove() { } } + @Override + public String getName() { + return "SortBy"; + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java index b45494d..7ed53f2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java @@ -21,13 +21,17 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.io.BytesWritable; import org.apache.spark.api.java.JavaPairRDD; @@ -36,6 +40,7 @@ @SuppressWarnings("rawtypes") public class SparkPlan { private static final String CLASS_NAME = SparkPlan.class.getName(); + private static final Log LOG = LogFactory.getLog(SparkPlan.class); private final PerfLogger perfLogger = PerfLogger.getPerfLogger(); private final Set rootTrans = new HashSet(); @@ -72,6 +77,25 @@ tranToOutputRDDMap.put(tran, rdd); } +// Iterator iterator = leafTrans.iterator(); +// for(;iterator.hasNext();){ +// SparkTran next = iterator.next(); +// LOG.info(next.getClass().getName()); +// } +// +// Iterator iterator2 = rootTrans.iterator(); +// for(;iterator2.hasNext();){ +// LOG.info(iterator2.next()); +// } +// +// LOG.info("------------------------------ Spark Plan -----------------------------"); +// Set keySet = invertedTransGraph.keySet(); +// for (SparkTran leaf : keySet) { +// LOG.info(leaf.getClass().getName()); +// } + + logSparkPlan(); + JavaPairRDD finalRDD = null; for (SparkTran leafTran : leafTrans) { JavaPairRDD rdd = tranToOutputRDDMap.get(leafTran); @@ -86,6 +110,146 @@ return finalRDD; } + private void addNumbersToTrans() { + Set keySet = invertedTransGraph.keySet(); + int i = 1; + for (SparkTran leaf : keySet) { + String name = null; + if (leaf instanceof ReduceTran) { + name = "Reduce"; + } else if (leaf instanceof ShuffleTran) { + name = "Shuffle"; + } else if (leaf instanceof MapTran) { + name = "MapTran"; + } else { + name = "MapInput"; + } + name = name + " " + i; + i++; + leaf.setName(name); + } + + Iterator iterator = rootTrans.iterator(); + for (; iterator.hasNext();) { + SparkTran next = iterator.next(); + if (next instanceof MapInput) + next.setName("MapInput " + i); + i++; + } + } + + private void logSparkPlan() { + addNumbersToTrans(); + Set keySet = invertedTransGraph.keySet(); + String sparkPlan = "\n\t !!!!!!!!!!!!!!!!!!!!!!!!!! Spark Plan !!!!!!!!!!!!!!!!!!!!!!!!!!! \n"; + for (SparkTran sparkTran : keySet) { + if (sparkTran instanceof ReduceTran) { + sparkPlan = sparkPlan + " " + sparkTran.getName(); + sparkPlan = getSparkPlan(sparkTran, sparkPlan); + sparkPlan = sparkPlan + "\n"; + } + } + sparkPlan = sparkPlan + " !!!!!!!!!!!!!!!!!!!!!!!!!! Spark Plan !!!!!!!!!!!!!!!!!!!!!!!!!!! "; + LOG.info(sparkPlan); + } + + private String getSparkPlan(SparkTran leaf, String sparkPlanMsg) { + if (leaf != null) { + List parents = getParents(leaf); + List parent = new ArrayList(); + if (parents.size() > 0) { + sparkPlanMsg = sparkPlanMsg + " <-- "; + boolean isFirst = true; + for (SparkTran sparkTran : parents) { + if (isFirst) { + sparkPlanMsg = sparkPlanMsg + "( " + sparkTran.getName(); + if (sparkTran instanceof ShuffleTran) { + sparkPlanMsg = logShuffleTranStatus(sparkPlanMsg, (ShuffleTran) sparkTran); + } else { + sparkPlanMsg = logCacheStatus(sparkPlanMsg, sparkTran); + } + isFirst = false; + } else { + sparkPlanMsg = sparkPlanMsg + "," + sparkTran.getName(); + if (sparkTran instanceof ShuffleTran) { + sparkPlanMsg = logShuffleTranStatus(sparkPlanMsg, (ShuffleTran) sparkTran); + } else { + sparkPlanMsg = logCacheStatus(sparkPlanMsg, sparkTran); + } + } + if (getParents(sparkTran).size() > 0 && !(sparkTran instanceof ReduceTran)) { + parent.add(sparkTran); + } + } + sparkPlanMsg = sparkPlanMsg + " ) "; + if (parent.size() > 1) { + return logLeafTran(parent, sparkPlanMsg); + } else { + if (parent.size() != 0) + return getSparkPlan(parent.get(0), sparkPlanMsg); + } + } else { + return sparkPlanMsg; + } + } + return sparkPlanMsg; + } + + private String logLeafTran(List parent, String sparkPlanMsg) { + sparkPlanMsg = sparkPlanMsg + " <-- "; + boolean isFirst = true; + for (SparkTran sparkTran : parent) { + List parents = getParents(sparkTran); + SparkTran leaf = parents.get(0); + if (isFirst) { + sparkPlanMsg = sparkPlanMsg + "( " + leaf.getName(); + if (leaf instanceof ShuffleTran) { + sparkPlanMsg = logShuffleTranStatus(sparkPlanMsg, (ShuffleTran) leaf); + } else { + sparkPlanMsg = logCacheStatus(sparkPlanMsg, leaf); + } + isFirst = false; + } else { + sparkPlanMsg = sparkPlanMsg + "," + leaf.getName(); + if (leaf instanceof ShuffleTran) { + sparkPlanMsg = logShuffleTranStatus(sparkPlanMsg, (ShuffleTran) leaf); + } else { + sparkPlanMsg = logCacheStatus(sparkPlanMsg, leaf); + } + } + } + sparkPlanMsg = sparkPlanMsg + " ) "; + return sparkPlanMsg; + } + + private String logShuffleTranStatus(String sparkPlanMsg, ShuffleTran leaf) { + int noOfPartitions = leaf.getNoOfPartitions(); + sparkPlanMsg = sparkPlanMsg + " ( Partitions " + noOfPartitions; + SparkShuffler shuffler = leaf.getShuffler(); + sparkPlanMsg = sparkPlanMsg + ", " + shuffler.getName(); + if (leaf.isCacheEnable()) { + sparkPlanMsg = sparkPlanMsg + ", Cache ON"; + } else { + sparkPlanMsg = sparkPlanMsg + ", Cache OFF"; + } + return sparkPlanMsg; + } + + private void logLeafTran(List parent) { + + } + + private String logCacheStatus(String sparkPlanMsg, SparkTran sparkTran) { + if (sparkTran.isCacheEnable() != null) { + if (sparkTran.isCacheEnable().booleanValue()) { + sparkPlanMsg = sparkPlanMsg + " (cache on) "; + } else { + sparkPlanMsg = sparkPlanMsg + " (cache off) "; + } + } + return sparkPlanMsg; + } + public void addTran(SparkTran tran) { rootTrans.add(tran); leafTrans.add(tran); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java index 53845a0..2e33ef9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java @@ -27,4 +27,5 @@ JavaPairRDD> shuffle( JavaPairRDD input, int numPartitions); + public String getName(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java index 4daa61e..b4b86d2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java @@ -25,4 +25,10 @@ public interface SparkTran { JavaPairRDD transform( JavaPairRDD input); + + public String getName(); + + public void setName(String name); + + public Boolean isCacheEnable(); }