diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index c9029f2..3fae32d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -37,6 +37,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; +import org.apache.calcite.rel.RelNode; import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -50,6 +51,7 @@ import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper; import org.apache.hadoop.hive.ql.exec.mr.ExecReducer; @@ -64,6 +66,9 @@ import org.apache.hadoop.hive.ql.io.merge.MergeFileOutputFormat; import org.apache.hadoop.hive.ql.io.merge.MergeFileWork; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.optimizer.calcite.translator.toHive.RelNodeConverter; +import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MergeJoinWork; @@ -623,6 +628,10 @@ private Vertex createVertex(JobConf conf, MapWork mapWork, if (mapWork instanceof MergeFileWork) { procClassName = MergeFileTezProcessor.class.getName(); } + + //transformMapWorkToCalcitePlanAndPersist(mapWork, conf); + // TezProcessor may not suffice here. We may need a new processor. + map = Vertex.create(mapWork.getName(), ProcessorDescriptor.create(procClassName) .setUserPayload(serializedConf), numTasks, getContainerResource(conf)); @@ -645,6 +654,23 @@ private Vertex createVertex(JobConf conf, MapWork mapWork, return map; } + private void transformMapWorkToCalcitePlanAndPersist(MapWork mWork, Configuration conf) throws SemanticException { + + //1. Determine if this is needed. + // This is needed if execution engine supports it. Two candidates for this: HBase & LLAP + // Analyze this table to figure out if it is stored in HBase + // If it is, and other criteria are met, convert this mapWork to Calcite plan. + Table tbl = ((TableScanOperator)mWork.getAllRootOperators().iterator().next()).getConf().getTableMetadata(); + + //2. Get hold of Hive operator Tree from MapWork and translate it into Calcite plan. + RelNode calcitePlan = RelNodeConverter.convert(mWork.getWorks()); + + //3. Since we are striving for plan to be independent of hive dependencies. Inline any info + // needed by execution engine from mapwork into calcite operator tree + + //4. Serialize and persist calcite plan. + } + /* * Helper function to create JobConf for specific ReduceWork. */ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/TypeConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/TypeConverter.java index 88c989f..bae2db9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/TypeConverter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/TypeConverter.java @@ -84,11 +84,10 @@ public static RelDataType getType(RelOptCluster cluster, return dtFactory.createStructType(fieldTypes, fieldNames); } - public static RelDataType getType(RelOptCluster cluster, RowResolver rr, + public static RelDataType getType(RelOptCluster cluster, RowSchema rs, List neededCols) throws CalciteSemanticException { RexBuilder rexBuilder = cluster.getRexBuilder(); RelDataTypeFactory dtFactory = rexBuilder.getTypeFactory(); - RowSchema rs = rr.getRowSchema(); List fieldTypes = new LinkedList(); List fieldNames = new LinkedList(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/toHive/ForwardWalker.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/toHive/ForwardWalker.java new file mode 100644 index 0000000..e718e9b --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/toHive/ForwardWalker.java @@ -0,0 +1,71 @@ +package org.apache.hadoop.hive.ql.optimizer.calcite.translator.toHive; + +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; + +public class ForwardWalker extends DefaultGraphWalker { + + /** + * Constructor. + * + * @param disp + * dispatcher to call for each op encountered + */ + public ForwardWalker(Dispatcher disp) { + super(disp); + } + + @SuppressWarnings("unchecked") + protected boolean allParentsDispatched(Node nd) { + Operator op = (Operator) nd; + if (op.getParentOperators() == null) { + return true; + } + for (Node pNode : op.getParentOperators()) { + if (!getDispatchedList().contains(pNode)) { + return false; + } + } + return true; + } + + @SuppressWarnings("unchecked") + protected void addAllParents(Node nd) { + Operator op = (Operator) nd; + if (op.getParentOperators() == null) { + return; + } + getToWalk().removeAll(op.getParentOperators()); + getToWalk().addAll(0, op.getParentOperators()); + } + + /** + * walk the current operator and its descendants. + * + * @param nd + * current operator in the graph + * @throws SemanticException + */ + @Override + public void walk(Node nd) throws SemanticException { + if (opStack.empty() || nd != opStack.peek()) { + opStack.push(nd); + } + if (allParentsDispatched(nd)) { + // all children are done or no need to walk the children + if (!getDispatchedList().contains(nd)) { + dispatch(nd, opStack); + getToWalk().addAll(nd.getChildren()); + } + opStack.pop(); + return; + } + // add children, self to the front of the queue in that order + getToWalk().add(0, nd); + addAllParents(nd); + } +} \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/toHive/RelNodeConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/toHive/RelNodeConverter.java new file mode 100644 index 0000000..0ab2940 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/toHive/RelNodeConverter.java @@ -0,0 +1,537 @@ +package org.apache.hadoop.hive.ql.optimizer.calcite.translator.toHive; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.Stack; + +import org.apache.calcite.linq4j.tree.ConstantExpression; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptSchema; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.prepare.RelOptTableImpl; +import org.apache.calcite.rel.RelCollationImpl; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rel.logical.LogicalAggregate; +import org.apache.calcite.rel.logical.LogicalFilter; +import org.apache.calcite.rel.logical.LogicalJoin; +import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.calcite.rel.logical.LogicalTableScan; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.util.CompositeList; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.hadoop.hive.ql.exec.ColumnInfo; +import org.apache.hadoop.hive.ql.exec.FilterOperator; +import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.exec.JoinOperator; +import org.apache.hadoop.hive.ql.exec.LimitOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.RowSchema; +import org.apache.hadoop.hive.ql.exec.SelectOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.optimizer.calcite.translator.RexNodeConverter; +import org.apache.hadoop.hive.ql.optimizer.calcite.translator.TypeConverter; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.AggregationDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.JoinCondDesc; +import org.apache.hadoop.hive.ql.plan.JoinDesc; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; +import org.apache.calcite.tools.Frameworks; + +import sun.tools.java.Type; + +import com.google.common.base.Function; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; + +public class RelNodeConverter implements Frameworks.PlannerAction{ + private static final Map AGG_MAP = ImmutableMap.of( + "count", SqlStdOperatorTable.COUNT, + "sum", SqlStdOperatorTable.SUM, + "min", SqlStdOperatorTable.MIN, + "max", SqlStdOperatorTable.MAX, + "avg", SqlStdOperatorTable.AVG); + private final List> rootOps; + + public RelNodeConverter(List> rootOps) { + this.rootOps = rootOps; + } + + public static RelNode convert(List> rootOps) throws SemanticException { + + RelNodeConverter converter = new RelNodeConverter(rootOps); + return Frameworks.withPlanner(converter); + } + + static class Context implements NodeProcessorCtx { + RelOptCluster cluster; + RelOptSchema schema; + /* + * A Map from hive column internalNames to Optiq positions. A separate map + * for each Operator. + */ + Map> opPositionMap; + + Map, RelNode> hiveOpToRelNode; + + public Context(RelOptCluster cluster, RelOptSchema schema) { + super(); + this.cluster = cluster; + this.schema = schema; + opPositionMap = new HashMap>(); + hiveOpToRelNode = new HashMap, RelNode>(); + } + + void buildColumnMap(Operator op, RelNode rNode) { + RowSchema rr = op.getSchema(); + ImmutableMap.Builder b = new ImmutableMap.Builder(); + int i = 0; + for (ColumnInfo ci : rr.getSignature()) { + b.put(ci.getInternalName(), i); + i++; + } + opPositionMap.put(rNode, b.build()); + } + + /* + * Why special handling for TableScan? - the RowResolver coming from hive + * for TScan still has all the columns, whereas the Optiq type we build is + * based on the needed columns in the TScanOp. + */ + void buildColumnMap(TableScanOperator tsOp, RelNode rNode) { + RelDataType oType = rNode.getRowType(); + int i = 0; + ImmutableMap.Builder b = new ImmutableMap.Builder(); + for (String fN : oType.getFieldNames()) { + b.put(fN, i); + i++; + } + opPositionMap.put(rNode, b.build()); + } + + Map reducerMap(Map inpMap, ReduceSinkOperator rsOp) { + ImmutableMap.Builder b = new ImmutableMap.Builder(); + Map colExprMap = rsOp.getColumnExprMap(); + for (Map.Entry e : colExprMap.entrySet()) { + String inpCol = ((ExprNodeColumnDesc) e.getValue()).getColumn(); + b.put(e.getKey(), inpMap.get(inpCol)); + } + return b.build(); + } + + /* + * The Optiq JoinRel datatype is formed by combining the columns from its + * input RelNodes. Whereas the Hive RowResolver of the JoinOp contains only + * the columns needed by childOps. + */ + void buildColumnMap(JoinOperator jOp, LogicalJoin jRel) throws SemanticException { + Map leftMap = opPositionMap.get(jRel.getLeft()); + Map rightMap = opPositionMap.get(jRel.getRight()); + leftMap = reducerMap(leftMap, (ReduceSinkOperator) jOp.getParentOperators().get(0)); + rightMap = reducerMap(rightMap, (ReduceSinkOperator) jOp.getParentOperators().get(1)); + int leftColCount = jRel.getLeft().getRowType().getFieldCount(); + ImmutableMap.Builder b = new ImmutableMap.Builder(); + RowSchema joinSchema = jOp.getSchema(); + Set tblNames = joinSchema.getTableNames(); + for (String tblName : tblNames) { + Map posMap = leftMap; + int offset = 0; + if (jOp.getConf().getRightAliases() != null) { + for (String rAlias : jOp.getConf().getRightAliases()) { + if (tblName.equals(rAlias)) { + posMap = rightMap; + offset = leftColCount; + break; + } + } + } + + for (ColumnInfo colInfo : joinSchema.getSignature()) { + if (!colInfo.getTabAlias().equalsIgnoreCase(tblName)) { + continue; + } + ExprNodeDesc e = jOp.getColumnExprMap().get(colInfo.getInternalName()); + String cName = e.getExprString(); + int pos = posMap.get(cName); + b.put(colInfo.getInternalName(), pos + offset); + } + } + + opPositionMap.put(jRel, b.build()); + } + + void propagatePosMap(RelNode node, RelNode parent) { + opPositionMap.put(node, opPositionMap.get(parent)); + } + + RexNode convertToOptiqExpr(final ExprNodeDesc expr, final RelNode optiqOP) throws SemanticException { + return convertToOptiqExpr(expr, optiqOP, 0); + } + + RexNode convertToOptiqExpr(final ExprNodeDesc expr, final RelNode optiqOP, int offset) throws SemanticException { + ImmutableMap posMap = opPositionMap.get(optiqOP); + RexNodeConverter c = new RexNodeConverter(cluster, optiqOP.getRowType(), posMap, offset, false); + return c.convert(expr); + } + + RelNode getParentNode(Operator hiveOp, int i) { + Operator p = hiveOp.getParentOperators().get(i); + return p == null ? null : hiveOpToRelNode.get(p); + } + + } + + static class JoinProcessor implements NodeProcessor { + @Override + @SuppressWarnings("unchecked") + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + Context ctx = (Context) procCtx; + RelNode left = ctx.getParentNode((Operator) nd, 0); + RelNode right = ctx.getParentNode((Operator) nd, 1); + JoinOperator joinOp = (JoinOperator) nd; + JoinCondDesc[] jConds = joinOp.getConf().getConds(); + assert jConds.length == 1; + LogicalJoin joinRel = convertJoinOp(ctx, joinOp, jConds[0], left, right); + ctx.buildColumnMap(joinOp, joinRel); + ctx.hiveOpToRelNode.put(joinOp, joinRel); + return joinRel; + } + + /* + * @todo: cleanup, for now just copied from HiveToOptiqRelConvereter + */ + private LogicalJoin convertJoinOp(Context ctx, JoinOperator op, JoinCondDesc jc, + RelNode leftRel, RelNode rightRel) throws SemanticException { + LogicalJoin joinRel; + Operator leftParent = op.getParentOperators().get(jc.getLeft()); + Operator rightParent = op.getParentOperators().get(jc.getRight()); + + if (leftParent instanceof ReduceSinkOperator && rightParent instanceof ReduceSinkOperator) { + List leftCols = ((ReduceSinkDesc) (leftParent.getConf())).getKeyCols(); + List rightCols = ((ReduceSinkDesc) (rightParent.getConf())).getKeyCols(); + RexNode joinPredicate = null; + JoinRelType joinType = JoinRelType.INNER; + int rightColOffSet = leftRel.getRowType().getFieldCount(); + + // TODO: what about semi join + switch (jc.getType()) { + case JoinDesc.INNER_JOIN: + joinType = JoinRelType.INNER; + break; + case JoinDesc.LEFT_OUTER_JOIN: + joinType = JoinRelType.LEFT; + break; + case JoinDesc.RIGHT_OUTER_JOIN: + joinType = JoinRelType.RIGHT; + break; + case JoinDesc.FULL_OUTER_JOIN: + joinType = JoinRelType.FULL; + break; + } + + int i = 0; + for (ExprNodeDesc expr : leftCols) { + List eqExpr = new LinkedList(); + eqExpr.add(ctx.convertToOptiqExpr(expr, leftRel, 0)); + eqExpr.add(ctx.convertToOptiqExpr(rightCols.get(i), rightRel, rightColOffSet)); + + RexNode eqOp = ctx.cluster.getRexBuilder().makeCall(SqlStdOperatorTable.EQUALS, eqExpr); + i++; + + if (joinPredicate == null) { + joinPredicate = eqOp; + } else { + List conjElements = new LinkedList(); + conjElements.add(joinPredicate); + conjElements.add(eqOp); + joinPredicate = ctx.cluster.getRexBuilder().makeCall(SqlStdOperatorTable.AND, + conjElements); + } + } + + // Translate non-joinkey predicate + Set>> filterExprSet = op.getConf().getFilters().entrySet(); + if (!filterExprSet.isEmpty()) { + RexNode eqExpr; + int colOffSet; + RelNode childRel; + int inputId; + + for (Entry> entry : filterExprSet) { + inputId = entry.getKey().intValue(); + if (inputId == 0) { + colOffSet = 0; + childRel = leftRel; + } else if (inputId == 1) { + colOffSet = rightColOffSet; + childRel = rightRel; + } else { + throw new RuntimeException("Invalid Join Input"); + } + + for (ExprNodeDesc expr : entry.getValue()) { + eqExpr = ctx.convertToOptiqExpr(expr, childRel, colOffSet); + List conjElements = new LinkedList(); + conjElements.add(joinPredicate); + conjElements.add(eqExpr); + joinPredicate = ctx.cluster.getRexBuilder().makeCall(SqlStdOperatorTable.AND, + conjElements); + } + } + } + + joinRel = new LogicalJoin(ctx.cluster, leftRel, rightRel, joinPredicate, joinType, null); + } else { + throw new RuntimeException("Right & Left of Join Condition columns are not equal"); + } + + return joinRel; + } + } + + private static int convertExpr(Context ctx, RelNode input, ExprNodeDesc expr, + List extraExprs) throws SemanticException { + final RexNode rex = ctx.convertToOptiqExpr(expr, input); + final int index; + if (rex instanceof RexInputRef) { + index = ((RexInputRef) rex).getIndex(); + } else { + index = input.getRowType().getFieldCount() + extraExprs.size(); + extraExprs.add(rex); + } + return index; + } + + private static AggregateCall convertAgg(Context ctx, AggregationDesc agg, RelNode input, + ColumnInfo cI, List extraExprs) throws SemanticException { + final SqlAggFunction aggregation = AGG_MAP.get(agg.getGenericUDAFName()); + if (aggregation == null) { + throw new AssertionError("agg not found: " + agg.getGenericUDAFName()); + } + + List argList = new ArrayList(); + RelDataType type = TypeConverter.convert(cI.getType(), ctx.cluster.getTypeFactory()); + if (aggregation.equals(SqlStdOperatorTable.AVG)) { + type = type.getField("sum", false).getType(); + } + for (ExprNodeDesc expr : agg.getParameters()) { + int index = convertExpr(ctx, input, expr, extraExprs); + argList.add(index); + } + + /* + * set the type to the first arg, it there is one; because the RTi set on + * Aggregation call assumes this is the output type. + */ + if (argList.size() > 0) { + RexNode rex = ctx.convertToOptiqExpr(agg.getParameters().get(0), input); + type = rex.getType(); + } + return new AggregateCall(aggregation, agg.getDistinct(), argList, type, null); + } + + static class FilterProcessor implements NodeProcessor { + @Override + @SuppressWarnings("unchecked") + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + Context ctx = (Context) procCtx; + RelNode input = ctx.getParentNode((Operator) nd, 0); + FilterOperator filterOp = (FilterOperator) nd; + RexNode convertedFilterExpr = ctx + .convertToOptiqExpr(filterOp.getConf().getPredicate(), input); + + LogicalFilter filtRel = new LogicalFilter(ctx.cluster, input, convertedFilterExpr); + ctx.propagatePosMap(filtRel, input); + ctx.hiveOpToRelNode.put(filterOp, filtRel); + return filtRel; + } + } + + static class SelectProcessor implements NodeProcessor { + @Override + @SuppressWarnings("unchecked") + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + Context ctx = (Context) procCtx; + RelNode inputRelNode = ctx.getParentNode((Operator) nd, 0); + SelectOperator selectOp = (SelectOperator) nd; + + List colLst = selectOp.getConf().getColList(); + List optiqColLst = new LinkedList(); + + for (ExprNodeDesc colExpr : colLst) { + optiqColLst.add(ctx.convertToOptiqExpr(colExpr, inputRelNode)); + } + + /* + * Hive treats names that start with '_c' as internalNames; so change the + * names so we don't run into this issue when converting back to Hive AST. + */ + List oFieldNames = Lists.transform(selectOp.getConf().getOutputColumnNames(), + new Function() { + @Override + public String apply(String hName) { + return "_o_" + hName; + } + }); + + LogicalProject selRel = new LogicalProject(ctx.cluster,inputRelNode, optiqColLst, oFieldNames, Project.Flags.BOXED); + ctx.buildColumnMap(selectOp, selRel); + ctx.hiveOpToRelNode.put(selectOp, selRel); + return selRel; + } + } + + static class LimitProcessor implements NodeProcessor { + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + Context ctx = (Context) procCtx; + RelNode input = ctx.getParentNode((Operator) nd, 0); + LimitOperator limitOp = (LimitOperator) nd; + + // in Optiq, a limit is represented as a sort on 0 columns + final RexNode fetch; + if (limitOp.getConf().getLimit() >= 0) { + fetch = ctx.cluster.getRexBuilder().makeExactLiteral( + BigDecimal.valueOf(limitOp.getConf().getLimit())); + } else { + fetch = null; + } + Sort sortRel = new Sort(ctx.cluster, ctx.cluster.traitSetOf(Convention.NONE), + input, RelCollationImpl.EMPTY, null, fetch); + ctx.propagatePosMap(sortRel, input); + ctx.hiveOpToRelNode.put(limitOp, sortRel); + return sortRel; + } + } + + static class GroupByProcessor implements NodeProcessor { + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + Context ctx = (Context) procCtx; + + RelNode input = ctx.getParentNode((Operator) nd, 0); + GroupByOperator groupByOp = (GroupByOperator) nd; + ArrayList signature = groupByOp.getSchema().getSignature(); + + // GroupBy is represented by two operators, one map side and one reduce + // side. We only translate the map-side one. + if (groupByOp.getParentOperators().get(0) instanceof ReduceSinkOperator) { + ctx.hiveOpToRelNode.put(groupByOp, input); + return input; + } + + final List extraExprs = Lists.newArrayList(); + final List grpSet = new ArrayList(); + for (ExprNodeDesc key : groupByOp.getConf().getKeys()) { + int index = convertExpr(ctx, input, key, extraExprs); + grpSet.add(index); + } + List aggregateCalls = Lists.newArrayList(); + int i = groupByOp.getConf().getKeys().size(); + for (AggregationDesc agg : groupByOp.getConf().getAggregators()) { + aggregateCalls.add(convertAgg(ctx, agg, input, signature.get(i++), extraExprs)); + } + + if (!extraExprs.isEmpty()) { + // noinspection unchecked + input = new LogicalProject(ctx.cluster, input, CompositeList.of(Lists.transform(input.getRowType() + .getFieldList(), new Function() { + @Override + public RexNode apply(RelDataTypeField input) { + return new RexInputRef(input.getIndex(), input.getType()); + } + }), extraExprs), null, Project.Flags.BOXED); + } + RelNode aggregateRel = new LogicalAggregate(ctx.cluster, input, false, ImmutableBitSet.of(grpSet), null, aggregateCalls); + ctx.buildColumnMap(groupByOp, aggregateRel); + ctx.hiveOpToRelNode.put(groupByOp, aggregateRel); + return aggregateRel; + } + } + + static class TableScanProcessor implements NodeProcessor { + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + Context ctx = (Context) procCtx; + TableScanOperator tableScanOp = (TableScanOperator) nd; + List neededCols = tableScanOp.getNeededColumns(); + RelDataType rowType = TypeConverter.getType(ctx.cluster, tableScanOp.getSchema(), neededCols); + RelOptTable optTable = RelOptTableImpl.create(ctx.schema, rowType, neededCols, new ConstantExpression((java.lang.reflect.Type) Type.tNull, null)); + LogicalTableScan tableRel = new LogicalTableScan(ctx.cluster, optTable); + ctx.buildColumnMap(tableScanOp, tableRel); + ctx.hiveOpToRelNode.put(tableScanOp, tableRel); + return tableRel; + } + } + + static class DefaultProcessor implements NodeProcessor { + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + throw new SemanticException("Unexpected Hive Operator encountered. Aborting translation : " + nd.getName()); + } + } + + @Override + public RelNode apply(RelOptCluster cluster, RelOptSchema relOptSchema, + SchemaPlus rootSchema) { + Context ctx = new Context(cluster, relOptSchema); + + Map rules = ImmutableMap + . builder() + .put(new RuleRegExp("R1", TableScanOperator.getOperatorName() + "%"),new TableScanProcessor()) + .put(new RuleRegExp("R2", FilterOperator.getOperatorName() + "%"), new FilterProcessor()) + .put(new RuleRegExp("R3", SelectOperator.getOperatorName() + "%"), new SelectProcessor()) + .put(new RuleRegExp("R4", JoinOperator.getOperatorName() + "%"), new JoinProcessor()) + .put(new RuleRegExp("R5", LimitOperator.getOperatorName() + "%"), new LimitProcessor()) + .put(new RuleRegExp("R6", GroupByOperator.getOperatorName() + "%"), new GroupByProcessor()) + .build(); + + Dispatcher disp = new DefaultRuleDispatcher(new DefaultProcessor(), rules, ctx); + GraphWalker egw = new ForwardWalker(disp); + + HashMap outputMap = new HashMap(); + ArrayList topNodes = new ArrayList(); + topNodes.addAll(rootOps); + try { + egw.startWalking(topNodes, outputMap); + } catch (SemanticException e) { + throw new RuntimeException("Failure while trying to convert hive Op tree to Calcite Op tree",e); + } + return (RelNode) outputMap.get(rootOps.get(0)); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index 3a613a2..e9aa5ea 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -1229,7 +1229,7 @@ private RelNode genTableLogicalPlan(String tableAlias, QB qb) throws SemanticExc } // 3.4 Build row type from field - RelDataType rowType = TypeConverter.getType(cluster, rr, null); + RelDataType rowType = TypeConverter.getType(cluster, rr.getRowSchema(), null); // 4. Build RelOptAbstractTable String fullyQualifiedTabName = tab.getDbName();