diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java index 8d18885..157e4d8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java @@ -85,4 +85,14 @@ public void setToCache(boolean toCache) { } } + + @Override + public String getName() { + return "MapInput"; + } + + @Override + public Boolean isCacheEnable() { + return new Boolean(toCache); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java index 638c387..f6a4d77 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java @@ -35,4 +35,14 @@ public void setMapFunction(HiveMapFunction mapFunc) { this.mapFunc = mapFunc; } + @Override + public String getName() { + return "MapTran"; + } + + @Override + public Boolean isCacheEnable() { + // TODO Auto-generated method stub + return null; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java index dbc614b..fd6b31c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java @@ -34,4 +34,15 @@ public void setReduceFunction(HiveReduceFunction redFunc) { this.reduceFunc = redFunc; } + + @Override + public String getName() { + return "Reduce"; + } + + @Override + public Boolean isCacheEnable() { + // TODO Auto-generated method stub + return null; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java index 4a597ee..6cdab20 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java @@ -49,4 +49,14 @@ public ShuffleTran(SparkPlan sparkPlan, SparkShuffler sf, int n, boolean toCache } return result; } + + @Override + public String getName() { + return "Shuffle"; + } + + @Override + public Boolean isCacheEnable() { + return new Boolean(toCache); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java index b45494d..81b7e85 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java @@ -26,6 +26,8 @@ import java.util.Map; import java.util.Set; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.io.BytesWritable; @@ -36,6 +38,7 @@ @SuppressWarnings("rawtypes") public class SparkPlan { private static final String CLASS_NAME = SparkPlan.class.getName(); + private static final Log LOG = LogFactory.getLog(SparkPlan.class); private final PerfLogger perfLogger = PerfLogger.getPerfLogger(); private final Set rootTrans = new HashSet(); @@ -72,6 +75,8 @@ tranToOutputRDDMap.put(tran, rdd); } + logSparkPlan(); + JavaPairRDD finalRDD = null; for (SparkTran leafTran : leafTrans) { JavaPairRDD rdd = tranToOutputRDDMap.get(leafTran); @@ -86,6 +91,59 @@ return finalRDD; } + private void logSparkPlan() { + LOG.info("------------------------------ Spark Plan -----------------------------"); + Set keySet = invertedTransGraph.keySet(); + for (SparkTran sparkTran : keySet) { + if (sparkTran instanceof ReduceTran) { + String sparkPlan = " " + sparkTran.getName(); + sparkPlan = getSparkPlan(sparkTran, sparkPlan); + LOG.info(sparkPlan); + } + } + LOG.info("------------------------------ Spark Plan -----------------------------"); + } + + private String getSparkPlan(SparkTran leaf, String sparkPlanMsg) { + if (leaf != null) { + List parents = getParents(leaf); + if (parents.size() > 0) { + sparkPlanMsg = sparkPlanMsg + " <-- "; + boolean isFirst = true; + SparkTran parent = null; + for (SparkTran sparkTran : parents) { + if (isFirst) { + sparkPlanMsg = sparkPlanMsg + "( " + sparkTran.getName(); + sparkPlanMsg = logCacheStatus(sparkPlanMsg, sparkTran); + isFirst = false; + } else { + sparkPlanMsg = sparkPlanMsg + "," + sparkTran.getName(); + sparkPlanMsg = logCacheStatus(sparkPlanMsg, sparkTran); + } + if (getParents(sparkTran).size() > 0 && !(sparkTran instanceof ReduceTran)) { + parent = sparkTran; + } + } + sparkPlanMsg = sparkPlanMsg + " ) "; + return getSparkPlan(parent, sparkPlanMsg); + } else { + return sparkPlanMsg; + } + } + return sparkPlanMsg; + } + + private String logCacheStatus(String sparkPlanMsg, SparkTran sparkTran) { + if (sparkTran.isCacheEnable() != null) { + if (sparkTran.isCacheEnable().booleanValue()) { + sparkPlanMsg = sparkPlanMsg + " (cache on) "; + } else { + sparkPlanMsg = sparkPlanMsg + " (cache off) "; + } + } + return sparkPlanMsg; + } + public void addTran(SparkTran tran) { rootTrans.add(tran); leafTrans.add(tran); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java index 4daa61e..c3c48a0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java @@ -25,4 +25,8 @@ public interface SparkTran { JavaPairRDD transform( JavaPairRDD input); + + public String getName(); + + public Boolean isCacheEnable(); }