diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java index 59c07c3..2e31f25 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java @@ -121,7 +121,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { @Override public synchronized void processOp(Object row, int tag) throws HiveException { - + LOG.info("Tag: "+tag+", soi: "+parentObjInspectors); StructObjectInspector soi = parentObjInspectors[tag]; List fields = parentFields[tag]; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java index f4b6016..af95aa5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java @@ -20,6 +20,7 @@ import java.io.Serializable; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -28,6 +29,7 @@ import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -105,6 +107,10 @@ // used to group dependent tasks for multi table inserts public final DependencyCollectionTask dependencyTask; + // used to hook up unions + public final Map, BaseWork> unionWorkMap; + public final List currentUnionOperators; + @SuppressWarnings("unchecked") public GenTezProcContext(HiveConf conf, ParseContext parseContext, List> moveTask, List> rootTasks, @@ -126,6 +132,8 @@ public GenTezProcContext(HiveConf conf, ParseContext parseContext, this.linkChildOpWithDummyOp = new HashMap, List>>(); this.dependencyTask = (DependencyCollectionTask) TaskFactory.get(new DependencyCollectionWork(), conf); + this.unionWorkMap = new HashMap, BaseWork>(); + this.currentUnionOperators = new LinkedList(); rootTasks.add(currentTask); } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java index 8363bbf..1ccae7a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; @@ -37,6 +38,7 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.hive.ql.plan.UnionWork; import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType; /** @@ -139,6 +141,32 @@ public Object process(Node nd, Stack stack, root.removeParent(parent); } + if (!context.currentUnionOperators.isEmpty()) { + // if there are union all operators we need to add the work to the set + // of union operators. + + UnionWork unionWork; + if (context.unionWorkMap.containsKey(operator)) { + // we've seen this terminal before and have created a union work object. + // just need to add this work to it. There will be no children of this one + // since we've passed this operator before. + assert operator.getChildOperators().isEmpty(); + unionWork = (UnionWork) context.unionWorkMap.get(operator); + } else { + // first time through. we need to create a union work object and add this + // work to it. Subsequent work should reference the union and not the actual + // work. + unionWork = new UnionWork("Union "+ (++sequenceNumber)); + context.unionWorkMap.put(operator, unionWork); + tezWork.add(unionWork); + } + + tezWork.connect(work, unionWork, EdgeType.CONTAINS); + unionWork.addUnionOperators(context.currentUnionOperators); + context.currentUnionOperators.clear(); + work = unionWork; + } + // No children means we're at the bottom. If there are more operators to scan // the next item will be a new root. if (!operator.getChildOperators().isEmpty()) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index dff743f..e866e3c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -131,14 +131,18 @@ protected void generateTaskTree(List> rootTasks, Pa FileSinkOperator.getOperatorName() + "%"), new CompositeProcessor(new FileSinkProcessor(), genTezWork)); - opRules.put(new RuleRegExp("Bail on Union", + opRules.put(new RuleRegExp("Handle union", UnionOperator.getOperatorName() + "%"), new NodeProcessor() { @Override public Object process(Node n, Stack s, NodeProcessorCtx procCtx, Object... os) throws SemanticException { - throw new SemanticException("Unions not yet supported on Tez." - +" Please use MR for this query"); + GenTezProcContext context = (GenTezProcContext) procCtx; + UnionOperator union = (UnionOperator) n; + + // simply need to remember that we've seen a union. + context.currentUnionOperators.add(union); + return null; } }); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java index 9112a77..3b31378 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java @@ -45,7 +45,8 @@ public enum EdgeType { SIMPLE_EDGE, - BROADCAST_EDGE + BROADCAST_EDGE, + CONTAINS } private static transient final Log LOG = LogFactory.getLog(TezWork.class); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/UnionWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/UnionWork.java new file mode 100644 index 0000000..edb9791 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/plan/UnionWork.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.HashSet; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.UnionOperator; + +/** + * Simple wrapper for union all cases. All contributing work for a union all + * is collected here. Downstream work will connect to the union not the individual + * work. + */ +public class UnionWork extends BaseWork { + + private final Set unionOperators = new HashSet(); + + public UnionWork() { + super(); + } + + public UnionWork(String name) { + super(name); + } + + @Explain(displayName = "Vertex") + @Override + public String getName() { + return super.getName(); + } + + @Override + public Set> getAllRootOperators() { + return null; + } + + public void addUnionOperators(Collection unions) { + unionOperators.addAll(unions); + } + + public Set getUnionOperators() { + return unionOperators; + } +}