From 19a4fcdf620ea7f453c918eca8067ad671a30eb9 Mon Sep 17 00:00:00 2001 From: Na Yang Date: Tue, 16 Sep 2014 16:51:29 -0700 Subject: [PATCH] HIVE-8141:Refactor the GraphTran code by moving union handling logic to UnionTran [Spark Branch] --- .../hadoop/hive/ql/exec/spark/GraphTran.java | 27 ++++------------------ .../hadoop/hive/ql/exec/spark/UnionTran.java | 18 ++++++++++----- 2 files changed, 17 insertions(+), 28 deletions(-) diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GraphTran.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GraphTran.java index 93674c1..acd42be 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GraphTran.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GraphTran.java @@ -67,29 +67,12 @@ public void execute() throws IllegalStateException { while (getChildren(tran).size() > 0) { SparkTran childTran = getChildren(tran).get(0); - if (childTran instanceof UnionTran) { - List> unionInputList = unionInputs - .get(childTran); - if (unionInputList == null) { - // process the first union input RDD, cache it in the hash map - unionInputList = new LinkedList>(); - unionInputList.add(rdd); - unionInputs.put(childTran, unionInputList); - break; - } else if (unionInputList.size() < this.getParents(childTran).size() - 1) { - // not the last input RDD yet, continue caching it in the hash map - unionInputList.add(rdd); - break; - } else if (unionInputList.size() == this.getParents(childTran).size() - 1) { // process - // process the last input RDD - for (JavaPairRDD inputRDD : unionInputList) { - ((UnionTran) childTran).setOtherInput(inputRDD); - rdd = childTran.transform(rdd); - } - } - } else { - rdd = childTran.transform(rdd); + if (childTran instanceof UnionTran && + this.getParents(childTran).size() > ((UnionTran)childTran).getOtherInputList().size() + 1) { + ((UnionTran) childTran).addOtherInput(rdd); + break; } + rdd = childTran.transform(rdd); tran = childTran; } // if the current transformation is a leaf tran and it has not got processed yet, cache its corresponding RDD diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/UnionTran.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/UnionTran.java index 40f22a0..546b448 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/UnionTran.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/UnionTran.java @@ -18,24 +18,30 @@ package org.apache.hadoop.hive.ql.exec.spark; +import java.util.LinkedList; +import java.util.List; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.io.BytesWritable; import org.apache.spark.api.java.JavaPairRDD; public class UnionTran implements SparkTran { - JavaPairRDD otherInput; + List> otherInputsList = new LinkedList>(); @Override public JavaPairRDD transform( JavaPairRDD input) { - return input.union(otherInput); + JavaPairRDD result = input; + for (JavaPairRDD otherInput : otherInputsList) { + result = result.union(otherInput); + } + return result; } - public void setOtherInput(JavaPairRDD otherInput) { - this.otherInput = otherInput; + public void addOtherInput(JavaPairRDD input) { + otherInputsList.add(input); } - public JavaPairRDD getOtherInput() { - return this.otherInput; + public List> getOtherInputList() { + return this.otherInputsList; } } -- 1.8.5.2 (Apple Git-48)