From b18de38a07304a2d59df2400dbbbee99483703da Mon Sep 17 00:00:00 2001 From: Na Yang Date: Fri, 8 Aug 2014 17:21:43 -0700 Subject: [PATCH] HIVE-7541: Support union all on Spark --- .../hadoop/hive/ql/exec/spark/GraphTran.java | 115 +++++++++++++++++++++ .../hadoop/hive/ql/exec/spark/SparkClient.java | 6 +- .../hadoop/hive/ql/exec/spark/SparkPlan.java | 32 ++---- .../hive/ql/exec/spark/SparkPlanGenerator.java | 99 +++++++++++------- .../hadoop/hive/ql/exec/spark/UnionTran.java | 22 ++++ .../hadoop/hive/ql/parse/spark/GenSparkUtils.java | 4 +- 6 files changed, 217 insertions(+), 61 deletions(-) create mode 100644 ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GraphTran.java create mode 100644 ql/src/java/org/apache/hadoop/hive/ql/exec/spark/UnionTran.java diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GraphTran.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GraphTran.java new file mode 100644 index 0000000..2ed330a --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GraphTran.java @@ -0,0 +1,115 @@ +package org.apache.hadoop.hive.ql.exec.spark; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.io.BytesWritable; +import org.apache.spark.api.java.JavaPairRDD; + +public class GraphTran { + + private Set rootTrans = new HashSet(); + private Set leafTrans = new HashSet(); + private Map> transGraph = new HashMap>(); + private Map> invertedTransGraph = new HashMap>(); + private Map>> unionInputs = new HashMap>>(); + private Map> mapInputs = new HashMap>(); + + public void addTran(SparkTran tran) { + if (!rootTrans.contains(tran)) { + rootTrans.add(tran); + leafTrans.add(tran); + transGraph.put(tran, new LinkedList()); + invertedTransGraph.put(tran, new LinkedList()); + } + } + + public void addTranInput(SparkTran tran, JavaPairRDD input) { + mapInputs.put(tran, input); + } + + public void execute() throws Exception { + JavaPairRDD resultRDD = null; + for (SparkTran tran : rootTrans) { + //make sure all the root trans are MapTran + assert(tran instanceof MapTran); + JavaPairRDD input = mapInputs.get(tran); + if (input == null) { + throw new Exception("input is missing for transformation!"); + } + JavaPairRDD rdd = tran.transform(input); + + while (getChildren(tran).size() > 0) { + SparkTran childTran = getChildren(tran).get(0); + if (childTran instanceof UnionTran) { + List> unionInputList = unionInputs.get(childTran); + if (unionInputList == null) { // process the first union input RDD, cache it in the hash map + unionInputList = new LinkedList>(); + unionInputList.add(rdd); + unionInputs.put(childTran, unionInputList); + break; + } else if (unionInputList.size() < this.getParents(childTran).size()-1) { // not the last input RDD yet, continue caching it in the hash map + unionInputList.add(rdd); + break; + } else if (unionInputList.size() == this.getParents(childTran).size()-1) { // process the last input RDD + for (JavaPairRDD inputRDD : unionInputList) { + ((UnionTran)childTran).setOtherInput(inputRDD); + rdd = childTran.transform(rdd); + } + } + }else { + rdd = childTran.transform(rdd); + } + tran = childTran; + } + resultRDD = rdd; + } + if (resultRDD != null) { + resultRDD.foreach(HiveVoidFunction.getInstance()); + } + } + + public void connect(SparkTran a, SparkTran b) { + transGraph.get(a).add(b); + invertedTransGraph.get(b).add(a); + rootTrans.remove(b); + leafTrans.remove(a); + } + + public Set getRootTrans() { + return new HashSet(rootTrans); + } + + public Set getLeafTrans() { + return new HashSet(leafTrans); + } + + public List getParents(SparkTran tran) { + assert invertedTransGraph.containsKey(tran) + && invertedTransGraph.get(tran) != null; + return new LinkedList(invertedTransGraph.get(tran)); + } + + public List getChildren(SparkTran tran) { + assert transGraph.containsKey(tran) + && transGraph.get(tran) != null; + return new LinkedList(transGraph.get(tran)); + } + + public void disconnect(SparkTran a, SparkTran b) { + transGraph.get(a).remove(b); + invertedTransGraph.get(b).remove(a); + + if (getChildren(a).isEmpty()) { + leafTrans.add(a); + } + if (getParents(b).isEmpty()) { + rootTrans.add(b); + } + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java index 743717d..3722da6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java @@ -149,7 +149,11 @@ public int execute(DriverContext driverContext, SparkWork sparkWork) { // Execute generated plan. // TODO: we should catch any exception and return more meaningful error code. - plan.execute(); + try{ + plan.execute(); + } catch (Exception e) { + //TODO: handle the exception + } return 0; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java index b24f3d0..3666fad 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java @@ -18,31 +18,19 @@ package org.apache.hadoop.hive.ql.exec.spark; -import org.apache.hadoop.io.BytesWritable; -import org.apache.spark.api.java.JavaPairRDD; - public class SparkPlan { - private JavaPairRDD input; - private SparkTran tran; - - public void execute() { - JavaPairRDD rdd = tran.transform(input); - rdd.foreach(HiveVoidFunction.getInstance()); - } - public SparkTran getTran() { - return tran; - } + private GraphTran tran; - public void setTran(SparkTran tran) { - this.tran = tran; - } + public void execute() throws Exception{ + tran.execute(); + } - public JavaPairRDD getInput() { - return input; - } + public void setTran(GraphTran tran) { + this.tran = tran; + } - public void setInput(JavaPairRDD input) { - this.input = input; - } + public GraphTran getTran() { + return tran; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 45eff67..a4e259d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -19,8 +19,9 @@ package org.apache.hadoop.hive.ql.exec.spark; import java.io.IOException; -import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.hadoop.fs.Path; @@ -36,6 +37,7 @@ import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty; import org.apache.hadoop.hive.ql.plan.SparkWork; +import org.apache.hadoop.hive.ql.plan.UnionWork; import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.hive.ql.stats.StatsPublisher; import org.apache.hadoop.io.BytesWritable; @@ -50,6 +52,7 @@ private JobConf jobConf; private Context context; private Path scratchDir; + private Map unionWorkTrans = new HashMap(); public SparkPlanGenerator(JavaSparkContext sc, Context context, JobConf jobConf, Path scratchDir) { this.sc = sc; @@ -60,56 +63,75 @@ public SparkPlanGenerator(JavaSparkContext sc, Context context, JobConf jobConf, public SparkPlan generate(SparkWork sparkWork) throws Exception { SparkPlan plan = new SparkPlan(); - List trans = new ArrayList(); + GraphTran trans = new GraphTran(); Set roots = sparkWork.getRoots(); - assert(roots != null && roots.size() == 1); - BaseWork w = roots.iterator().next(); - MapWork mapWork = (MapWork) w; - trans.add(generate(w)); - while (sparkWork.getChildren(w).size() > 0) { - ReduceWork child = (ReduceWork) sparkWork.getChildren(w).get(0); - SparkEdgeProperty edge = sparkWork.getEdgeProperty(w, child); - SparkShuffler st = generate(edge); - ReduceTran rt = generate(child); - rt.setShuffler(st); - rt.setNumPartitions(edge.getNumPartitions()); - trans.add(rt); - w = child; + for (BaseWork w : roots) { + JobConf newJobConf = new JobConf(this.jobConf); + MapWork mapWork = (MapWork) w; + SparkTran tran = generate(w, newJobConf); + trans.addTran(tran); + JavaPairRDD input = generateRDD(mapWork, newJobConf); + trans.addTranInput(tran, input); + + while (sparkWork.getChildren(w).size() > 0) { + BaseWork child = sparkWork.getChildren(w).get(0); + if (child instanceof ReduceWork) { + SparkEdgeProperty edge = sparkWork.getEdgeProperty(w, child); + SparkShuffler st = generate(edge); + ReduceTran rt = generate((ReduceWork)child); + rt.setShuffler(st); + rt.setNumPartitions(edge.getNumPartitions()); + trans.addTran(rt); + trans.connect(tran, rt); + w = child; + tran = rt; + } else if (child instanceof UnionWork) { + if(unionWorkTrans.get(child) != null) { + trans.connect(tran, unionWorkTrans.get(child)); + break; + } else { + SparkTran ut = generate((UnionWork)child); + unionWorkTrans.put(child, ut); + trans.addTran(ut); + trans.connect(tran, ut); + w = child; + tran = ut; + } + } + } } - ChainedTran chainedTran = new ChainedTran(trans); - plan.setTran(chainedTran); - JavaPairRDD input = generateRDD(mapWork); - plan.setInput(input); + unionWorkTrans.clear(); + plan.setTran(trans); return plan; - } +} - private JavaPairRDD generateRDD(MapWork mapWork) throws Exception { - List inputPaths = Utilities.getInputPaths(jobConf, mapWork, scratchDir, context, false); - Utilities.setInputPaths(jobConf, inputPaths); - Utilities.setMapWork(jobConf, mapWork, scratchDir, true); + private JavaPairRDD generateRDD(MapWork mapWork, JobConf conf) throws Exception { + List inputPaths = Utilities.getInputPaths(conf, mapWork, scratchDir, context, false); + Utilities.setInputPaths(conf, inputPaths); + Utilities.setMapWork(conf, mapWork, scratchDir, true); Class ifClass = HiveInputFormat.class; // The mapper class is expected by the HiveInputFormat. - jobConf.set("mapred.mapper.class", ExecMapper.class.getName()); - return sc.hadoopRDD(jobConf, ifClass, WritableComparable.class, Writable.class); + conf.set("mapred.mapper.class", ExecMapper.class.getName()); + return sc.hadoopRDD(conf, ifClass, WritableComparable.class, Writable.class); } - private SparkTran generate(BaseWork bw) throws IOException, HiveException { + private SparkTran generate(BaseWork bw, JobConf conf) throws IOException, HiveException { // initialize stats publisher if necessary if (bw.isGatheringStats()) { StatsPublisher statsPublisher; - StatsFactory factory = StatsFactory.newFactory(jobConf); + StatsFactory factory = StatsFactory.newFactory(conf); if (factory != null) { statsPublisher = factory.getStatsPublisher(); - if (!statsPublisher.init(jobConf)) { // creating stats table if not exists - if (HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) { + if (!statsPublisher.init(conf)) { // creating stats table if not exists + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) { throw new HiveException(ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg()); } } } } if (bw instanceof MapWork) { - return generate((MapWork)bw); + return generate((MapWork)bw, conf); } else if (bw instanceof ReduceWork) { return generate((ReduceWork)bw); } else { @@ -117,12 +139,12 @@ private SparkTran generate(BaseWork bw) throws IOException, HiveException { } } - private MapTran generate(MapWork mw) throws IOException { + private MapTran generate(MapWork mw, JobConf conf) throws IOException { MapTran result = new MapTran(); - Utilities.setMapWork(jobConf, mw, scratchDir, true); - Utilities.createTmpDirs(jobConf, mw); - jobConf.set("mapred.mapper.class", ExecMapper.class.getName()); - byte[] confBytes = KryoSerializer.serializeJobConf(jobConf); + Utilities.setMapWork(conf, mw, scratchDir, true); + Utilities.createTmpDirs(conf, mw); + conf.set("mapred.mapper.class", ExecMapper.class.getName()); + byte[] confBytes = KryoSerializer.serializeJobConf(conf); HiveMapFunction mapFunc = new HiveMapFunction(confBytes); result.setMapFunction(mapFunc); return result; @@ -144,5 +166,10 @@ private ReduceTran generate(ReduceWork rw) throws IOException { result.setReduceFunction(mapFunc); return result; } + + private UnionTran generate(UnionWork uw) { + UnionTran result = new UnionTran(); + return result; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/UnionTran.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/UnionTran.java new file mode 100644 index 0000000..15a234a --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/UnionTran.java @@ -0,0 +1,22 @@ +package org.apache.hadoop.hive.ql.exec.spark; + +import org.apache.hadoop.io.BytesWritable; +import org.apache.spark.api.java.JavaPairRDD; + +public class UnionTran implements SparkTran { + JavaPairRDD otherInput; + + @Override + public JavaPairRDD transform (JavaPairRDD input) { + return input.union(otherInput); + } + + public void setOtherInput(JavaPairRDD otherInput) { + this.otherInput = otherInput; + } + + public JavaPairRDD getOtherInput() { + return this.otherInput; + } +} + diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java index e4cb6f3..cf1bead 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java @@ -231,8 +231,8 @@ public void removeUnionOperators(Configuration conf, GenSparkProcContext context linked = context.linkedFileSinks.get(path); linked.add(desc); - desc.setDirName(new Path(path, ""+linked.size())); - desc.setLinkedFileSinkDesc(linked); +// desc.setDirName(new Path(path, ""+linked.size())); +// desc.setLinkedFileSinkDesc(linked); } if (current instanceof UnionOperator) { -- 1.8.5.2 (Apple Git-48)