diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java index ceb7b6c..0cda183 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java @@ -222,7 +222,8 @@ public Object process(Node nd, Stack stack, // finally hook everything up LOG.debug("Connecting union work ("+unionWork+") with work ("+work+")"); SparkEdgeProperty edgeProp = new SparkEdgeProperty(0/*EdgeType.CONTAINS*/); - sparkWork.connect(unionWork, work, edgeProp); + //sparkWork.connect(unionWork, work, edgeProp); + sparkWork.connect(work, unionWork, edgeProp); unionWork.addUnionOperators(context.currentUnionOperators); context.currentUnionOperators.clear(); context.workWithUnionOperators.add(work); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java index 3840318..45be58d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.Stack; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -34,6 +35,7 @@ import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; @@ -43,6 +45,7 @@ import org.apache.hadoop.hive.ql.lib.GraphWalker; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -148,19 +151,19 @@ protected void generateTaskTree(List> rootTasks, Pa // TableScanOperator.getOperatorName() + "%"), // new ProcessAnalyzeTable(GenSparkUtils.getUtils())); -// opRules.put(new RuleRegExp("Remember union", UnionOperator.getOperatorName() + "%"), -// new NodeProcessor() { -// @Override -// public Object process(Node n, Stack s, -// NodeProcessorCtx procCtx, Object... os) throws SemanticException { -// GenSparkProcContext context = (GenSparkProcContext) procCtx; -// UnionOperator union = (UnionOperator) n; -// -// // simply need to remember that we've seen a union. -// context.currentUnionOperators.add(union); -// return null; -// } -// }); + opRules.put(new RuleRegExp("Remember union", UnionOperator.getOperatorName() + "%"), + new NodeProcessor() { + @Override + public Object process(Node n, Stack s, + NodeProcessorCtx procCtx, Object... os) throws SemanticException { + GenSparkProcContext context = (GenSparkProcContext) procCtx; + UnionOperator union = (UnionOperator) n; + + // simply need to remember that we've seen a union. + context.currentUnionOperators.add(union); + return null; + } + }); // The dispatcher fires the processor corresponding to the closest matching // rule and passes the context along