diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java index 768e512..6477c69 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java @@ -32,6 +32,7 @@ import org.apache.calcite.rel.RelFieldCollation; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.SemiJoin; +import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; @@ -47,14 +48,19 @@ import org.apache.hadoop.hive.ql.exec.OperatorFactory; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.RowSchema; +import org.apache.hadoop.hive.ql.exec.SelectOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.AcidUtils.Operation; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinLeafPredicateInfo; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo; +import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSort; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveUnion; import org.apache.hadoop.hive.ql.parse.JoinCond; import org.apache.hadoop.hive.ql.parse.JoinType; @@ -69,6 +75,7 @@ import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.SelectDesc; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.UnionDesc; import com.google.common.collect.ImmutableList; @@ -136,7 +143,11 @@ public Operator convert(RelNode root) throws SemanticException { } OpAttr dispatch(RelNode rn) throws SemanticException { - if (rn instanceof HiveJoin) { + if (rn instanceof HiveTableScan) { + return visit((HiveTableScan) rn); + } else if (rn instanceof HiveProject) { + return visit((HiveProject) rn); + } else if (rn instanceof HiveJoin) { return visit((HiveJoin) rn); } else if (rn instanceof SemiJoin) { SemiJoin sj = (SemiJoin) rn; @@ -153,6 +164,97 @@ OpAttr dispatch(RelNode rn) throws SemanticException { return null; } + /** + * TODO: 1. PPD needs to get pushed in to TS + * + * @param scanRel + * @return + */ + OpAttr visit(HiveTableScan scanRel) { + + if (LOG.isDebugEnabled()) { + LOG.debug("Translating operator rel#" + scanRel.getId() + ":" + scanRel.getRelTypeName() + + " with row type: [" + scanRel.getRowType() + "]"); + } + + RelOptHiveTable ht = (RelOptHiveTable) scanRel.getTable(); + Map newVColMap = new HashMap(); + + // 1. Setup TableScan Desc + // 1.1 Create TableScanDesc + String tableAlias = ht.getTableAlias(); + List virtualCols = new ArrayList(ht.getVirtualCols()); + TableScanDesc tsd = new TableScanDesc(tableAlias, virtualCols, ht.getHiveTableMD()); + + // 1.2. Set Partition cols in TSDesc + List partColInfos = ht.getPartColumns(); + List partColNames = new ArrayList(); + for (ColumnInfo ci : partColInfos) { + partColNames.add(ci.getInternalName()); + } + tsd.setPartColumns(partColNames); + + // 1.3. Set needed cols in TSDesc + List neededColumnIDs = new ArrayList(); + List neededColumns = new ArrayList(); + Map colNameToIndxMap = HiveCalciteUtil.getColNameIndxMap(ht.getHiveTableMD() + .getCols()); + for (RelDataTypeField rdtf : scanRel.getRowType().getFieldList()) { + neededColumnIDs.add(colNameToIndxMap.get(rdtf.getName())); + neededColumns.add(rdtf.getName()); + } + tsd.setNeededColumnIDs(neededColumnIDs); + tsd.setNeededColumns(neededColumns); + + // 2. Setup TableScan + TableScanOperator ts = null; + // 2.1 Construct ordered colInfo list for TS RowSchema & update vcolMap + ArrayList colInfos = new ArrayList(ht.getNonPartColumns()); + colInfos.addAll(ht.getPartColumns()); + ColumnInfo ci; + for (VirtualColumn vc : virtualCols) { + ci = new ColumnInfo(vc.getName(), vc.getTypeInfo(), tableAlias, true, vc.getIsHidden()); + colInfos.add(ci); + newVColMap.put(colInfos.size(), vc); + } + + // 2.2. Create TS OP + ts = (TableScanOperator) OperatorFactory.get(tsd, new RowSchema(colInfos)); + + topOps.put(ht.getQBID(), ts); + + if (LOG.isDebugEnabled()) { + LOG.debug("Generated " + ts + " with row schema: [" + ts.getSchema() + "]"); + } + + return new OpAttr(tableAlias, newVColMap, ts); + } + + OpAttr visit(HiveProject projectRel) throws SemanticException { + OpAttr inputOpAf = dispatch(projectRel.getInput()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Translating operator rel#" + projectRel.getId() + ":" + projectRel.getRelTypeName() + + " with row type: [" + projectRel.getRowType() + "]"); + } + + List exprCols = convertToExprNodes(projectRel.getChildExps(), projectRel.getInput(), + inputOpAf.tabAlias); + // TODO: is this a safe assumption (name collision, external names...) + List exprNames = new ArrayList(projectRel.getRowType().getFieldNames()); + SelectDesc sd = new SelectDesc(exprCols, exprNames); + Pair, Map> colInfoVColPair = createColInfos( + projectRel.getChildExps(), exprCols, exprNames, inputOpAf); + SelectOperator selOp = (SelectOperator) OperatorFactory.getAndMakeChild(sd, new RowSchema( + colInfoVColPair.getKey()), inputOpAf.inputs.get(0)); + + if (LOG.isDebugEnabled()) { + LOG.debug("Generated " + selOp + " with row schema: [" + selOp.getSchema() + "]"); + } + + return new OpAttr(inputOpAf.tabAlias, colInfoVColPair.getValue(), selOp); + } + OpAttr visit(HiveJoin joinRel) throws SemanticException { // 1. Convert inputs OpAttr[] inputs = new OpAttr[joinRel.getInputs().size()]; @@ -590,6 +692,15 @@ private static JoinType extractJoinType(HiveJoin join) { return columnDescriptors; } + private static List convertToExprNodes(List rNodeLst, + RelNode inputRel, String tabAlias) { + List exprNodeLst = new ArrayList(); + for (RexNode rn : rNodeLst) { + exprNodeLst.add(convertToExprNode(rn, inputRel, tabAlias)); + } + return exprNodeLst; + } + private static ExprNodeDesc convertToExprNode(RexNode rn, RelNode inputRel, String tabAlias) { return (ExprNodeDesc) rn.accept(new ExprNodeConverter(tabAlias,