diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index e64e8fc..141e139 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -695,6 +695,7 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { // CBO related HIVE_CBO_ENABLED("hive.cbo.enable", true, "Flag to control enabling Cost Based Optimizations using Calcite framework."), + HIVE_CBO_RETPATH_HIVEOP("hive.cbo.returnpath.hiveop", false, "Flag to control calcite plan to hive operator conversion"), // hive.mapjoin.bucket.cache.size has been replaced by hive.smbjoin.cache.row, // need to remove by hive .13. Also, do not change default (see SMB operator) diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java index 69a54c8..1acb3b3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Set; /** @@ -102,6 +103,14 @@ public int getPosition(String internalName) { return tableNames; } + public List getColumnNames() { + List columnNames = new ArrayList(); + for (ColumnInfo var : this.signature) { + columnNames.add(var.getInternalName()); + } + return columnNames; + } + @Override public boolean equals(Object obj) { if (!(obj instanceof RowSchema) || (obj == null)) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java index d0003ed..98f1979 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java @@ -528,14 +528,15 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Operator child = op.getChildOperators().get(0); - List childCols; + List childCols = null; if (child instanceof CommonJoinOperator) { - childCols = cppCtx.getJoinPrunedColLists().get(child) - .get((byte) conf.getTag()); + childCols = cppCtx.getJoinPrunedColLists().get(child) == null + ? null : cppCtx.getJoinPrunedColLists().get(child) + .get((byte) conf.getTag()); } else { childCols = cppCtx.getPrunedColList(child); - } + List valCols = conf.getValueCols(); List valColNames = conf.getOutputValueColumnNames(); @@ -746,6 +747,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, conf.setOutputColumnNames(newOutputColumnNames); handleChildren(op, cols, cppCtx); } + return null; } @@ -968,16 +970,16 @@ private static void pruneJoinOperator(NodeProcessorCtx ctx, .getChildOperators(); LOG.info("JOIN " + op.getIdentifier() + " oldExprs: " + conf.getExprs()); + List childColLists = cppCtx.genColLists(op); if (childColLists == null) { - return; - } - + return; + } - Map> prunedColLists = new HashMap>(); - for (byte tag : conf.getTagOrder()) { - prunedColLists.put(tag, new ArrayList()); - } + Map> prunedColLists = new HashMap>(); + for (byte tag : conf.getTagOrder()) { + prunedColLists.put(tag, new ArrayList()); + } //add the columns in join filters Set>> filters = @@ -1073,6 +1075,7 @@ private static void pruneJoinOperator(NodeProcessorCtx ctx, } LOG.info("JOIN " + op.getIdentifier() + " newExprs: " + conf.getExprs()); + op.setColumnExprMap(newColExprMap); conf.setOutputColumnNames(outputCols); op.getSchema().setSignature(rs); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java index aa5a5d0..78e8367 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java @@ -57,11 +57,9 @@ @Override public ParseContext transform(ParseContext pctx) throws SemanticException { - String SEL = SelectOperator.getOperatorName(); String FIL = FilterOperator.getOperatorName(); Map opRules = new LinkedHashMap(); - opRules.put(new RuleRegExp("R1", SEL + "%" + SEL + "%"), new SelectDedup(pctx)); - opRules.put(new RuleRegExp("R2", FIL + "%" + FIL + "%"), new FilterDedup()); + opRules.put(new RuleRegExp("R1", FIL + "%" + FIL + "%"), new FilterDedup()); Dispatcher disp = new DefaultRuleDispatcher(null, opRules, null); GraphWalker ogw = new DefaultGraphWalker(disp); @@ -72,8 +70,12 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { return pctx; } + @Deprecated private class SelectDedup implements NodeProcessor { + // This is taken care of now by + // {@link org.apache.hadoop.hive.ql.optimizer.IdentityProjectRemover} + private ParseContext pctx; public SelectDedup (ParseContext pctx) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java index 298855a..46854e6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java @@ -28,6 +28,7 @@ import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.plan.RelOptUtil.InputReferencedVisitor; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Join; import org.apache.calcite.rel.core.RelFactories.ProjectFactory; import org.apache.calcite.rel.core.Sort; import org.apache.calcite.rel.type.RelDataTypeField; @@ -42,13 +43,16 @@ import org.apache.calcite.util.ImmutableBitSet; import org.apache.calcite.util.Pair; import org.apache.calcite.util.Util; -import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.ql.exec.ColumnInfo; +import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject; import org.apache.hadoop.hive.ql.parse.ASTNode; import com.google.common.base.Function; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMap.Builder; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; @@ -311,11 +315,11 @@ public JoinPredicateInfo(List nonEquiJoinPredicateElement return this.mapOfProjIndxInJoinSchemaToLeafPInfo; } - public static JoinPredicateInfo constructJoinPredicateInfo(HiveJoin j) { + public static JoinPredicateInfo constructJoinPredicateInfo(Join j) { return constructJoinPredicateInfo(j, j.getCondition()); } - public static JoinPredicateInfo constructJoinPredicateInfo(HiveJoin j, RexNode predicate) { + public static JoinPredicateInfo constructJoinPredicateInfo(Join j, RexNode predicate) { JoinPredicateInfo jpi = null; JoinLeafPredicateInfo jlpi = null; List equiLPIList = new ArrayList(); @@ -424,6 +428,16 @@ public JoinLeafPredicateInfo(SqlKind comparisonType, List joinKeyExprsF .copyOf(projsFromRightPartOfJoinKeysInJoinSchema); } + public List getJoinKeyExprs(int input) { + if (input == 0) { + return this.joinKeyExprsFromLeft; + } + if (input == 1) { + return this.joinKeyExprsFromRight; + } + return null; + } + public List getJoinKeyExprsFromLeft() { return this.joinKeyExprsFromLeft; } @@ -453,7 +467,7 @@ public JoinLeafPredicateInfo(SqlKind comparisonType, List joinKeyExprsF return this.projsFromRightPartOfJoinKeysInJoinSchema; } - private static JoinLeafPredicateInfo constructJoinLeafPredicateInfo(HiveJoin j, RexNode pe) { + private static JoinLeafPredicateInfo constructJoinLeafPredicateInfo(Join j, RexNode pe) { JoinLeafPredicateInfo jlpi = null; List filterNulls = new ArrayList(); List joinKeyExprsFromLeft = new ArrayList(); @@ -551,4 +565,53 @@ public Void visitCall(org.apache.calcite.rex.RexCall call) { return deterministic; } + + public static ImmutableMap getColInfoMap(List hiveCols, + int startIndx) { + Builder bldr = ImmutableMap. builder(); + + int indx = startIndx; + for (ColumnInfo ci : hiveCols) { + bldr.put(indx, ci); + indx++; + } + + return bldr.build(); + } + + public static ImmutableMap shiftVColsMap(Map hiveVCols, + int shift) { + Builder bldr = ImmutableMap. builder(); + + for (Integer pos : hiveVCols.keySet()) { + bldr.put(shift + pos, hiveVCols.get(pos)); + } + + return bldr.build(); + } + + public static ImmutableMap getVColsMap(List hiveVCols, + int startIndx) { + Builder bldr = ImmutableMap. builder(); + + int indx = startIndx; + for (VirtualColumn vc : hiveVCols) { + bldr.put(indx, vc); + indx++; + } + + return bldr.build(); + } + + public static ImmutableMap getColNameIndxMap(List tableFields) { + Builder bldr = ImmutableMap. builder(); + + int indx = 0; + for (FieldSchema fs : tableFields) { + bldr.put(fs.getName(), indx); + indx++; + } + + return bldr.build(); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java index 09cece0..9e9cab1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java @@ -31,6 +31,7 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.logical.LogicalTableScan; import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rex.RexNode; import org.apache.calcite.util.ImmutableBitSet; import org.apache.commons.logging.Log; @@ -41,6 +42,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.optimizer.calcite.translator.ExprNodeConverter; import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; @@ -51,15 +53,16 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableMap.Builder; public class RelOptHiveTable extends RelOptAbstractTable { private final Table hiveTblMetadata; private final String tblAlias; private final ImmutableList hiveNonPartitionCols; + private final ImmutableList hivePartitionCols; private final ImmutableMap hiveNonPartitionColsMap; private final ImmutableMap hivePartitionColsMap; - private final int noOfProjs; + private final ImmutableList hiveVirtualCols; + private final int noOfNonVirtualCols; final HiveConf hiveConf; private double rowCount = -1; @@ -67,37 +70,66 @@ PrunedPartitionList partitionList; Map partitionCache; AtomicInteger noColsMissingStats; + private final String qbID; - protected static final Log LOG = LogFactory - .getLog(RelOptHiveTable.class - .getName()); + protected static final Log LOG = LogFactory + .getLog(RelOptHiveTable.class + .getName()); - public RelOptHiveTable(RelOptSchema calciteSchema, String qualifiedTblName, String tblAlias, RelDataType rowType, - Table hiveTblMetadata, List hiveNonPartitionCols, - List hivePartitionCols, HiveConf hconf, Map partitionCache, AtomicInteger noColsMissingStats) { + public RelOptHiveTable(RelOptSchema calciteSchema, String qualifiedTblName, String tblAlias, + RelDataType rowType, Table hiveTblMetadata, List hiveNonPartitionCols, + List hivePartitionCols, List hiveVirtualCols, HiveConf hconf, + Map partitionCache, AtomicInteger noColsMissingStats, + String qbID) { super(calciteSchema, qualifiedTblName, rowType); this.hiveTblMetadata = hiveTblMetadata; this.tblAlias = tblAlias; this.hiveNonPartitionCols = ImmutableList.copyOf(hiveNonPartitionCols); - this.hiveNonPartitionColsMap = getColInfoMap(hiveNonPartitionCols, 0); - this.hivePartitionColsMap = getColInfoMap(hivePartitionCols, hiveNonPartitionColsMap.size()); - this.noOfProjs = hiveNonPartitionCols.size() + hivePartitionCols.size(); + this.hiveNonPartitionColsMap = HiveCalciteUtil.getColInfoMap(hiveNonPartitionCols, 0); + this.hivePartitionCols = ImmutableList.copyOf(hivePartitionCols); + this.hivePartitionColsMap = HiveCalciteUtil.getColInfoMap(hivePartitionCols, hiveNonPartitionColsMap.size()); + this.noOfNonVirtualCols = hiveNonPartitionCols.size() + hivePartitionCols.size(); + this.hiveVirtualCols = ImmutableList.copyOf(hiveVirtualCols); this.hiveConf = hconf; this.partitionCache = partitionCache; this.noColsMissingStats = noColsMissingStats; + this.qbID = qbID; } - private static ImmutableMap getColInfoMap(List hiveCols, - int startIndx) { - Builder bldr = ImmutableMap. builder(); + public RelOptHiveTable copy(RelDataType newRowType) { + // 1. Build map of column name to col index of original schema + // Assumption: Hive Table can not contain duplicate column names + Map nameToColIndxMap = new HashMap(); + for (RelDataTypeField f : this.rowType.getFieldList()) { + nameToColIndxMap.put(f.getName(), f.getIndex()); + } - int indx = startIndx; - for (ColumnInfo ci : hiveCols) { - bldr.put(indx, ci); - indx++; + // 2. Build nonPart/Part/Virtual column info for new RowSchema + List newHiveNonPartitionCols = new ArrayList(); + List newHivePartitionCols = new ArrayList(); + List newHiveVirtualCols = new ArrayList(); + Map virtualColInfoMap = HiveCalciteUtil.getVColsMap(this.hiveVirtualCols, + this.noOfNonVirtualCols); + Integer originalColIndx; + ColumnInfo cInfo; + VirtualColumn vc; + for (RelDataTypeField f : newRowType.getFieldList()) { + originalColIndx = nameToColIndxMap.get(f.getName()); + if ((cInfo = hiveNonPartitionColsMap.get(originalColIndx)) != null) { + newHiveNonPartitionCols.add(new ColumnInfo(cInfo)); + } else if ((cInfo = hivePartitionColsMap.get(originalColIndx)) != null) { + newHivePartitionCols.add(new ColumnInfo(cInfo)); + } else if ((vc = virtualColInfoMap.get(originalColIndx)) != null) { + newHiveVirtualCols.add(vc); + } else { + throw new RuntimeException("Copy encountered a column not seen in original TS"); + } } - return bldr.build(); + // 3. Build new Table + return new RelOptHiveTable(this.schema, this.name, this.tblAlias, newRowType, + this.hiveTblMetadata, newHiveNonPartitionCols, newHivePartitionCols, newHiveVirtualCols, + this.hiveConf, this.partitionCache, this.noColsMissingStats, qbID); } @Override @@ -119,13 +151,13 @@ public RelNode toRel(ToRelContext context) { public double getRowCount() { if (rowCount == -1) { if (null == partitionList) { - // we are here either unpartitioned table or partitioned table with no predicates + // we are here either unpartitioned table or partitioned table with no + // predicates computePartitionList(hiveConf, null); } if (hiveTblMetadata.isPartitioned()) { - List rowCounts = StatsUtils.getBasicStatForPartitions( - hiveTblMetadata, partitionList.getNotDeniedPartns(), - StatsSetupConst.ROW_COUNT); + List rowCounts = StatsUtils.getBasicStatForPartitions(hiveTblMetadata, + partitionList.getNotDeniedPartns(), StatsSetupConst.ROW_COUNT); rowCount = StatsUtils.getSumIgnoreNegatives(rowCounts); } else { @@ -144,8 +176,10 @@ public Table getHiveTableMD() { } public String getTableAlias() { - // NOTE: Calcite considers tbls to be equal if their names are the same. Hence - // we need to provide Calcite the fully qualified table name (dbname.tblname) + // NOTE: Calcite considers tbls to be equal if their names are the same. + // Hence + // we need to provide Calcite the fully qualified table name + // (dbname.tblname) // and not the user provided aliases. // However in HIVE DB name can not appear in select list; in case of join // where table names differ only in DB name, Hive would require user @@ -173,16 +207,21 @@ private String getColNamesForLogging(Set colLst) { public void computePartitionList(HiveConf conf, RexNode pruneNode) { try { - if (!hiveTblMetadata.isPartitioned() || pruneNode == null || InputFinder.bits(pruneNode).length() == 0 ) { - // there is no predicate on partitioning column, we need all partitions in this case. - partitionList = PartitionPruner.prune(hiveTblMetadata, null, conf, getName(), partitionCache); + if (!hiveTblMetadata.isPartitioned() || pruneNode == null + || InputFinder.bits(pruneNode).length() == 0) { + // there is no predicate on partitioning column, we need all partitions + // in this case. + partitionList = PartitionPruner.prune(hiveTblMetadata, null, conf, getName(), + partitionCache); return; } // We have valid pruning expressions, only retrieve qualifying partitions - ExprNodeDesc pruneExpr = pruneNode.accept(new ExprNodeConverter(getName(), getRowType(), true, getRelOptSchema().getTypeFactory())); + ExprNodeDesc pruneExpr = pruneNode + .accept(new ExprNodeConverter(getName(), getRowType(), true)); - partitionList = PartitionPruner.prune(hiveTblMetadata, pruneExpr, conf, getName(), partitionCache); + partitionList = PartitionPruner.prune(hiveTblMetadata, pruneExpr, conf, getName(), + partitionCache); } catch (HiveException he) { throw new RuntimeException(he); } @@ -289,10 +328,10 @@ private void updateColStats(Set projIndxLst) { if (colNamesFailedStats.isEmpty() && !partColNamesThatRqrStats.isEmpty()) { ColStatistics cStats = null; for (int i = 0; i < partColNamesThatRqrStats.size(); i++) { - cStats = new ColStatistics(hiveTblMetadata.getTableName(), - partColNamesThatRqrStats.get(i), hivePartitionColsMap.get( - partColIndxsThatRqrStats.get(i)).getTypeName()); - cStats.setCountDistint(getDistinctCount(partitionList.getPartitions(),partColNamesThatRqrStats.get(i))); + cStats = new ColStatistics(hiveTblMetadata.getTableName(), partColNamesThatRqrStats.get(i), + hivePartitionColsMap.get(partColIndxsThatRqrStats.get(i)).getTypeName()); + cStats.setCountDistint(getDistinctCount(partitionList.getPartitions(), + partColNamesThatRqrStats.get(i))); hiveColStatsMap.put(partColIndxsThatRqrStats.get(i), cStats); } } @@ -325,7 +364,7 @@ private int getDistinctCount(Set partitions, String partColName) { } } else { List pILst = new ArrayList(); - for (Integer i = 0; i < noOfProjs; i++) { + for (Integer i = 0; i < noOfNonVirtualCols; i++) { pILst.add(i); } updateColStats(new HashSet(pILst)); @@ -338,10 +377,8 @@ private int getDistinctCount(Set partitions, String partColName) { } /* - * use to check if a set of columns are all partition columns. - * true only if: - * - all columns in BitSet are partition - * columns. + * use to check if a set of columns are all partition columns. true only if: - + * all columns in BitSet are partition columns. */ public boolean containsPartitionColumnsOnly(ImmutableBitSet cols) { @@ -352,4 +389,20 @@ public boolean containsPartitionColumnsOnly(ImmutableBitSet cols) { } return true; } + + public List getVirtualCols() { + return this.hiveVirtualCols; + } + + public List getPartColumns() { + return this.hivePartitionCols; + } + + public List getNonPartColumns() { + return this.hiveNonPartitionCols; + } + + public String getQBID() { + return qbID; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveTableScan.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveTableScan.java index 53021ea..9e04284 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveTableScan.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveTableScan.java @@ -54,8 +54,7 @@ * @param table * HiveDB table */ - public HiveTableScan(RelOptCluster cluster, RelTraitSet traitSet, RelOptHiveTable table, - RelDataType rowtype) { + public HiveTableScan(RelOptCluster cluster, RelTraitSet traitSet, RelOptHiveTable table) { super(cluster, TraitsUtil.getDefaultTraitSet(cluster), table); assert getConvention() == HiveRelNode.CONVENTION; } @@ -66,6 +65,17 @@ public RelNode copy(RelTraitSet traitSet, List inputs) { return this; } + /** + * Copy TableScan operator with a new Row Schema. The new Row Schema can only + * be a subset of this TS schema. + * + * @param rowtype + * @return + */ + public HiveTableScan copy(RelDataType newRowtype) { + return new HiveTableScan(getCluster(), getTraitSet(), ((RelOptHiveTable) table).copy(newRowtype)); + } + @Override public RelOptCost computeSelfCost(RelOptPlanner planner) { return HiveCost.FACTORY.makeZeroCost(); @@ -85,8 +95,9 @@ public void implement(Implementor implementor) { public double getRows() { return ((RelOptHiveTable) table).getRowCount(); } - + public List getColStat(List projIndxLst) { return ((RelOptHiveTable) table).getColStat(projIndxLst); } + } \ No newline at end of file diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveJoinAddNotNullRule.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveJoinAddNotNullRule.java new file mode 100644 index 0000000..1c35828 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveJoinAddNotNullRule.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.rules; + +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.core.RelFactories.FilterFactory; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.hadoop.hive.ql.exec.FunctionRegistry; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinLeafPredicateInfo; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter; +import org.apache.hadoop.hive.ql.optimizer.calcite.translator.SqlFunctionConverter; +import org.apache.hadoop.hive.ql.parse.SemanticException; + +import com.google.common.collect.ImmutableList; + +public final class HiveJoinAddNotNullRule extends RelOptRule { + + private static final String NOT_NULL_FUNC_NAME = "isnotnull"; + + /** The singleton. */ + public static final HiveJoinAddNotNullRule INSTANCE = + new HiveJoinAddNotNullRule(HiveFilter.DEFAULT_FILTER_FACTORY); + + private final FilterFactory filterFactory; + + //~ Constructors ----------------------------------------------------------- + + /** + * Creates an HiveJoinAddNotNullRule. + */ + public HiveJoinAddNotNullRule(FilterFactory filterFactory) { + super(operand(Join.class, + operand(RelNode.class, any()), + operand(RelNode.class, any()))); + this.filterFactory = filterFactory; + } + + //~ Methods ---------------------------------------------------------------- + + public void onMatch(RelOptRuleCall call) { + final Join join = call.rel(0); + RelNode leftInput = call.rel(1); + RelNode rightInput = call.rel(2); + + if (join.getJoinType() != JoinRelType.INNER) { + return; + } + + if (join.getCondition().isAlwaysTrue()) { + return; + } + + JoinPredicateInfo joinPredInfo = + HiveCalciteUtil.JoinPredicateInfo.constructJoinPredicateInfo(join); + + Set joinLeftKeyPositions = new HashSet(); + Set joinRightKeyPositions = new HashSet(); + for (int i = 0; i < joinPredInfo.getEquiJoinPredicateElements().size(); i++) { + JoinLeafPredicateInfo joinLeafPredInfo = joinPredInfo. + getEquiJoinPredicateElements().get(i); + joinLeftKeyPositions.addAll(joinLeafPredInfo.getProjsFromLeftPartOfJoinKeysInChildSchema()); + joinRightKeyPositions.addAll(joinLeafPredInfo.getProjsFromRightPartOfJoinKeysInChildSchema()); + } + + // Build not null conditions + final RelOptCluster cluster = join.getCluster(); + final RexBuilder rexBuilder = join.getCluster().getRexBuilder(); + + final Map newLeftConditions = getNotNullConditions(cluster, + rexBuilder, leftInput, joinLeftKeyPositions); + final Map newRightConditions = getNotNullConditions(cluster, + rexBuilder, rightInput, joinRightKeyPositions); + + // Nothing will be added to the expression + if (newLeftConditions == null && newRightConditions == null) { + return; + } + + if (newLeftConditions != null) { + if (leftInput instanceof HiveFilter) { + leftInput = leftInput.getInput(0); + } + leftInput = createHiveFilterConjunctiveCondition(filterFactory, rexBuilder, + leftInput, newLeftConditions.values()); + } + if (newRightConditions != null) { + if (rightInput instanceof HiveFilter) { + rightInput = rightInput.getInput(0); + } + rightInput = createHiveFilterConjunctiveCondition(filterFactory, rexBuilder, + rightInput, newRightConditions.values()); + } + + Join newJoin = join.copy(join.getTraitSet(), join.getCondition(), + leftInput, rightInput, join.getJoinType(), join.isSemiJoinDone()); + + call.getPlanner().onCopy(join, newJoin); + + call.transformTo(newJoin); + } + + private static Map getNotNullConditions(RelOptCluster cluster, + RexBuilder rexBuilder, RelNode input, Set inputKeyPositions) { + + boolean added = false; + + final RelDataType returnType = cluster.getTypeFactory(). + createSqlType(SqlTypeName.BOOLEAN); + + final Map newConditions; + if (input instanceof HiveFilter) { + newConditions = splitCondition(((HiveFilter) input).getCondition()); + } + else { + newConditions = new HashMap(); + } + for (int pos : inputKeyPositions) { + try { + RelDataType keyType = input.getRowType().getFieldList().get(pos).getType(); + SqlOperator funcCall = SqlFunctionConverter.getCalciteOperator(NOT_NULL_FUNC_NAME, + FunctionRegistry.getFunctionInfo(NOT_NULL_FUNC_NAME).getGenericUDF(), + ImmutableList.of(keyType), returnType); + RexNode cond = rexBuilder.makeCall(funcCall, rexBuilder.makeInputRef(input, pos)); + String digest = cond.toString(); + if (!newConditions.containsKey(digest)) { + newConditions.put(digest,cond); + added = true; + } + } catch (SemanticException e) { + throw new AssertionError(e.getMessage()); + } + } + // Nothing will be added to the expression + if (!added) { + return null; + } + return newConditions; + } + + private static Map splitCondition(RexNode condition) { + Map newConditions = new HashMap(); + if (condition.getKind() == SqlKind.AND) { + for (RexNode node : ((RexCall) condition).getOperands()) { + newConditions.put(node.toString(), node); + } + } + else { + newConditions.put(condition.toString(), condition); + } + return newConditions; + } + + private static RelNode createHiveFilterConjunctiveCondition(FilterFactory filterFactory, + RexBuilder rexBuilder, RelNode input, Collection conditions) { + final RexNode newCondition = RexUtil.composeConjunction(rexBuilder, conditions, false); + return filterFactory.createFilter(input, newCondition); + } +} \ No newline at end of file diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ExprNodeConverter.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ExprNodeConverter.java index 446085e..25dbf52 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ExprNodeConverter.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ExprNodeConverter.java @@ -33,15 +33,12 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexVisitorImpl; -import org.apache.calcite.sql.SqlKind; -import org.apache.calcite.sql.type.SqlTypeUtil; /* * convert a RexNode to an ExprNodeDesc @@ -51,22 +48,12 @@ RelDataType rType; String tabAlias; boolean partitioningExpr; - private final RelDataTypeFactory dTFactory; - public ExprNodeConverter(String tabAlias, RelDataType rType, boolean partitioningExpr, RelDataTypeFactory dTFactory) { + public ExprNodeConverter(String tabAlias, RelDataType rType, boolean partitioningExpr) { super(true); - /* - * hb: 6/25/14 for now we only support expressions that only contain - * partition cols. there is no use case for supporting generic expressions. - * for supporting generic exprs., we need to give the converter information - * on whether a column is a partition column or not, whether a column is a - * virtual column or not. - */ - assert partitioningExpr == true; this.tabAlias = tabAlias; this.rType = rType; this.partitioningExpr = partitioningExpr; - this.dTFactory = dTFactory; } @Override @@ -76,6 +63,9 @@ public ExprNodeDesc visitInputRef(RexInputRef inputRef) { partitioningExpr); } + /** + * TODO: Handle 1) cast 2) Field Access 3) Windowing Over() 4, Windowing Agg Call + */ @Override public ExprNodeDesc visitCall(RexCall call) { ExprNodeGenericFuncDesc gfDesc = null; @@ -90,15 +80,9 @@ public ExprNodeDesc visitCall(RexCall call) { args.add(operand.accept(this)); } - // If Call is a redundant cast then bail out. Ex: cast(true)BOOLEAN - if (call.isA(SqlKind.CAST) - && (call.operands.size() == 1) - && SqlTypeUtil.equalSansNullability(dTFactory, call.getType(), - call.operands.get(0).getType())) { - return args.get(0); - } else if (ASTConverter.isFlat(call)) { - // If Expr is flat (and[p,q,r,s] or[p,q,r,s]) then recursively build the - // exprnode + // If Expr is flat (and[p,q,r,s] or[p,q,r,s]) then recursively build the + // exprnode + if (ASTConverter.isFlat(call)) { ArrayList tmpExprArgs = new ArrayList(); tmpExprArgs.addAll(args.subList(0, 2)); gfDesc = new ExprNodeGenericFuncDesc(TypeConverter.convert(call.getType()), @@ -123,6 +107,9 @@ public ExprNodeDesc visitCall(RexCall call) { return gfDesc; } + /** + * TODO: 1. Handle NULL + */ @Override public ExprNodeDesc visitLiteral(RexLiteral literal) { RelDataType lType = literal.getType(); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java new file mode 100644 index 0000000..6aa584b --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java @@ -0,0 +1,498 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.calcite.translator; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.SemiJoin; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.Pair; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.ColumnInfo; +import org.apache.hadoop.hive.ql.exec.JoinOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorFactory; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.RowSchema; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.AcidUtils.Operation; +import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinLeafPredicateInfo; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; +import org.apache.hadoop.hive.ql.parse.JoinCond; +import org.apache.hadoop.hive.ql.parse.JoinType; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils; +import org.apache.hadoop.hive.ql.plan.JoinCondDesc; +import org.apache.hadoop.hive.ql.plan.JoinDesc; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +public class HiveOpConverter { + + private static final Log LOG = LogFactory.getLog(HiveOpConverter.class); + + public static enum HIVEAGGOPMODE { + NO_SKEW_NO_MAP_SIDE_AGG, // Corresponds to SemAnalyzer genGroupByPlan1MR + SKEW_NO_MAP_SIDE_AGG, // Corresponds to SemAnalyzer genGroupByPlan2MR + NO_SKEW_MAP_SIDE_AGG, // Corresponds to SemAnalyzer + // genGroupByPlanMapAggrNoSkew + SKEW_MAP_SIDE_AGG // Corresponds to SemAnalyzer genGroupByPlanMapAggr2MR + }; + + // TODO: remove this after stashing only rqd pieces from opconverter + private final Map> topOps; + private final HIVEAGGOPMODE aggMode; + private final boolean strictMode; + + public static HIVEAGGOPMODE getAggOPMode(HiveConf hc) { + HIVEAGGOPMODE aggOpMode = HIVEAGGOPMODE.NO_SKEW_NO_MAP_SIDE_AGG; + + if (hc.getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE)) { + if (!hc.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)) { + aggOpMode = HIVEAGGOPMODE.NO_SKEW_MAP_SIDE_AGG; + } else { + aggOpMode = HIVEAGGOPMODE.SKEW_MAP_SIDE_AGG; + } + } else if (hc.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)) { + aggOpMode = HIVEAGGOPMODE.SKEW_NO_MAP_SIDE_AGG; + } + + return aggOpMode; + } + + public HiveOpConverter(Map> topOps, + HIVEAGGOPMODE aggMode, boolean strictMode) { + this.topOps = topOps; + this.aggMode = aggMode; + this.strictMode = strictMode; + } + + private class OpAttr { + private final String tabAlias; + ImmutableList inputs; + ImmutableMap vcolMap; + + private OpAttr(String tabAlias, Map vcolMap, Operator... inputs) { + this.tabAlias = tabAlias; + this.vcolMap = ImmutableMap.copyOf(vcolMap); + this.inputs = ImmutableList.copyOf(inputs); + } + + private OpAttr clone(Operator... inputs) { + return new OpAttr(tabAlias, this.vcolMap, inputs); + } + } + + public Operator convert(RelNode root) throws SemanticException { + OpAttr opAf = dispatch(root); + return opAf.inputs.get(0); + } + + OpAttr dispatch(RelNode rn) throws SemanticException { + if (rn instanceof HiveJoin) { + return visit((HiveJoin) rn); + } else if (rn instanceof SemiJoin) { + SemiJoin sj = (SemiJoin) rn; + HiveJoin hj = HiveJoin.getJoin(sj.getCluster(), sj.getLeft(), sj.getRight(), + sj.getCondition(), sj.getJoinType(), true); + return visit(hj); + } + LOG.error(rn.getClass().getCanonicalName() + "operator translation not supported" + + " yet in return path."); + return null; + } + + OpAttr visit(HiveJoin joinRel) throws SemanticException { + // 1. Convert inputs + OpAttr[] inputs = new OpAttr[joinRel.getInputs().size()]; + for (int i=0; i> children = new ArrayList>(); + for (int i = 0; i < inputs.length; i++) { + // Generate a ReduceSink operator for each join child + ReduceSinkOperator child = genReduceSink(inputs[i].inputs.get(0), joinKeys[i], + i, -1, Operation.NOT_ACID, strictMode); + children.add(child); + } + + // 5. Generate Join operator + JoinOperator joinOp = genJoin(joinRel, joinPredInfo, children, joinKeys); + + // 6. TODO: Extract condition for non-equi join elements (if any) and add it + + // 7. Virtual columns + Map vcolMap = new HashMap(); + vcolMap.putAll(inputs[0].vcolMap); + if (extractJoinType(joinRel) != JoinType.LEFTSEMI) { + int shift = inputs[0].inputs.get(0).getSchema().getSignature().size(); + for (int i=1; i inputs) { + ExprNodeDesc[][] joinKeys = new ExprNodeDesc[inputs.size()][]; + for (int i = 0; i < inputs.size(); i++) { + joinKeys[i] = new ExprNodeDesc[joinPredInfo.getEquiJoinPredicateElements().size()]; + for (int j = 0; j < joinPredInfo.getEquiJoinPredicateElements().size(); j++) { + JoinLeafPredicateInfo joinLeafPredInfo = joinPredInfo.getEquiJoinPredicateElements().get(j); + RexNode key = joinLeafPredInfo.getJoinKeyExprs(j).get(0); + joinKeys[i][j] = convertToExprNode(key, inputs.get(j), null); + } + } + return joinKeys; + } + + private static ReduceSinkOperator genReduceSink(Operator input, ExprNodeDesc[] keys, + int tag, int numReducers, Operation acidOperation, + boolean strictMode) throws SemanticException { + return genReduceSink(input, keys, tag, new ArrayList(), + "", numReducers, acidOperation, strictMode); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + private static ReduceSinkOperator genReduceSink(Operator input, + ExprNodeDesc[] keys, int tag, ArrayList partitionCols, + String order, int numReducers, Operation acidOperation, + boolean strictMode) throws SemanticException { + Operator dummy = Operator.createDummy(); // dummy for backtracking + dummy.setParentOperators(Arrays.asList(input)); + + ArrayList reduceKeys = new ArrayList(); + ArrayList reduceKeysBack = new ArrayList(); + + // Compute join keys and store in reduceKeys + for (ExprNodeDesc key : keys) { + reduceKeys.add(key); + reduceKeysBack.add(ExprNodeDescUtils.backtrack(key, dummy, input)); + } + + // Walk over the input schema and copy in the output + ArrayList reduceValues = new ArrayList(); + ArrayList reduceValuesBack = new ArrayList(); + Map colExprMap = new HashMap(); + + List inputColumns = input.getSchema().getSignature(); + ArrayList outputColumns = new ArrayList(); + List outputColumnNames = new ArrayList(); + int[] index = new int[inputColumns.size()]; + for (int i = 0; i < inputColumns.size(); i++) { + ColumnInfo colInfo = inputColumns.get(i); + String outputColName = colInfo.getInternalName(); + ExprNodeDesc expr = new ExprNodeColumnDesc(colInfo); + + // backtrack can be null when input is script operator + ExprNodeDesc exprBack = ExprNodeDescUtils.backtrack(expr, dummy, input); + int kindex = exprBack == null ? -1 : ExprNodeDescUtils.indexOf(exprBack, reduceKeysBack); + if (kindex >= 0) { + ColumnInfo newColInfo = new ColumnInfo(colInfo); + newColInfo.setInternalName(Utilities.ReduceField.KEY + ".reducesinkkey" + kindex); + newColInfo.setAlias(outputColName); + newColInfo.setTabAlias(colInfo.getTabAlias()); + outputColumns.add(newColInfo); + index[i] = kindex; + continue; + } + int vindex = exprBack == null ? -1 : ExprNodeDescUtils.indexOf(exprBack, reduceValuesBack); + if (kindex >= 0) { + index[i] = -vindex - 1; + continue; + } + index[i] = -reduceValues.size() - 1; + + reduceValues.add(expr); + reduceValuesBack.add(exprBack); + + ColumnInfo newColInfo = new ColumnInfo(colInfo); + newColInfo.setInternalName(Utilities.ReduceField.VALUE + "." + outputColName); + newColInfo.setAlias(outputColName); + newColInfo.setTabAlias(colInfo.getTabAlias()); + + outputColumns.add(newColInfo); + outputColumnNames.add(outputColName); + } + dummy.setParentOperators(null); + + // Use only 1 reducer if no reduce keys + if (reduceKeys.size() == 0) { + numReducers = 1; + + // Cartesian product is not supported in strict mode + if (strictMode) { + throw new SemanticException(ErrorMsg.NO_CARTESIAN_PRODUCT.getMsg()); + } + } + + ReduceSinkDesc rsDesc; + if (order.isEmpty()) { + rsDesc = PlanUtils.getReduceSinkDesc(reduceKeys, reduceValues, + outputColumnNames, false, tag, reduceKeys.size(), + numReducers, acidOperation); + } else { + rsDesc = PlanUtils.getReduceSinkDesc(reduceKeys, reduceValues, + outputColumnNames, false, tag, partitionCols, + order, numReducers, acidOperation); + } + + ReduceSinkOperator rsOp = (ReduceSinkOperator) OperatorFactory.getAndMakeChild(rsDesc, + new RowSchema(outputColumns), input); + + List keyColNames = rsDesc.getOutputKeyColumnNames(); + for (int i = 0 ; i < keyColNames.size(); i++) { + colExprMap.put(Utilities.ReduceField.KEY + "." + keyColNames.get(i), reduceKeys.get(i)); + } + List valColNames = rsDesc.getOutputValueColumnNames(); + for (int i = 0 ; i < valColNames.size(); i++) { + colExprMap.put(Utilities.ReduceField.VALUE + "." + valColNames.get(i), reduceValues.get(i)); + } + + rsOp.setValueIndex(index); + rsOp.setColumnExprMap(colExprMap); + rsOp.setInputAliases(input.getSchema().getColumnNames().toArray( + new String[input.getSchema().getColumnNames().size()])); + + if (LOG.isDebugEnabled()) { + LOG.debug("Generated " + rsOp + " with row schema: [" + rsOp.getSchema() + "]"); + } + + return rsOp; + } + + private static JoinOperator genJoin(HiveJoin hiveJoin, JoinPredicateInfo joinPredInfo, + List> children, ExprNodeDesc[][] joinKeys) throws SemanticException { + + // Extract join type + JoinType joinType = extractJoinType(hiveJoin); + + // NOTE: Currently binary joins only + JoinCondDesc[] joinCondns = new JoinCondDesc[1]; + joinCondns[0] = new JoinCondDesc(new JoinCond(0, 1, joinType)); + + ArrayList outputColumns = new ArrayList(); + ArrayList outputColumnNames = + new ArrayList(hiveJoin.getRowType().getFieldNames()); + Operator[] childOps = new Operator[children.size()]; + + Map reversedExprs = new HashMap(); + HashMap> exprMap = new HashMap>(); + Map colExprMap = new HashMap(); + HashMap> posToAliasMap = new HashMap>(); + + int outputPos = 0; + for (int pos = 0; pos < children.size(); pos++) { + ReduceSinkOperator inputRS = (ReduceSinkOperator) children.get(pos); + if (inputRS.getNumParent() != 1) { + throw new SemanticException("RS should have single parent"); + } + Operator parent = inputRS.getParentOperators().get(0); + ReduceSinkDesc rsDesc = inputRS.getConf(); + + int[] index = inputRS.getValueIndex(); + + Byte tag = (byte) rsDesc.getTag(); + + // Semijoin + if (joinType == JoinType.LEFTSEMI && pos != 0) { + exprMap.put(tag, new ArrayList()); + childOps[pos] = inputRS; + continue; + } + + List keyColNames = rsDesc.getOutputKeyColumnNames(); + List valColNames = rsDesc.getOutputValueColumnNames(); + + posToAliasMap.put(pos, new HashSet(inputRS.getSchema().getTableNames())); + + Map descriptors = buildBacktrackFromReduceSink(outputPos, + outputColumnNames, keyColNames, valColNames, index, parent); + + List parentColumns = parent.getSchema().getSignature(); + for (int i = 0; i < index.length; i++) { + ColumnInfo info = new ColumnInfo(parentColumns.get(i)); + info.setInternalName(outputColumnNames.get(outputPos)); + outputColumns.add(info); + reversedExprs.put(outputColumnNames.get(outputPos), tag); + outputPos++; + } + + exprMap.put(tag, new ArrayList(descriptors.values())); + colExprMap.putAll(descriptors); + childOps[pos] = inputRS; + } + + boolean noOuterJoin = joinType != JoinType.FULLOUTER + && joinType != JoinType.LEFTOUTER + && joinType != JoinType.RIGHTOUTER; + JoinDesc desc = new JoinDesc(exprMap, outputColumnNames, noOuterJoin, + joinCondns, joinKeys); + desc.setReversedExprs(reversedExprs); + + JoinOperator joinOp = (JoinOperator) OperatorFactory.getAndMakeChild(desc, + new RowSchema(outputColumns), childOps); + joinOp.setColumnExprMap(colExprMap); + joinOp.setPosToAliasMap(posToAliasMap); + + // TODO: null safes? + + if (LOG.isDebugEnabled()) { + LOG.debug("Generated " + joinOp + " with row schema: [" + joinOp.getSchema() + "]"); + } + + return joinOp; + } + + private static JoinType extractJoinType(HiveJoin join) { + // UNIQUE + if (join.isDistinct()) { + return JoinType.UNIQUE; + } + // SEMIJOIN + if (join.isLeftSemiJoin()) { + return JoinType.LEFTSEMI; + } + // OUTER AND INNER JOINS + JoinType resultJoinType; + switch (join.getJoinType()) { + case FULL: + resultJoinType = JoinType.FULLOUTER; + break; + case LEFT: + resultJoinType = JoinType.LEFTOUTER; + break; + case RIGHT: + resultJoinType = JoinType.RIGHTOUTER; + break; + default: + resultJoinType = JoinType.INNER; + break; + } + return resultJoinType; + } + + private static Map buildBacktrackFromReduceSink( + ReduceSinkOperator rsOp, Operator inputOp) { + return buildBacktrackFromReduceSink(0, inputOp.getSchema().getColumnNames(), + rsOp.getConf().getOutputKeyColumnNames(), rsOp.getConf().getOutputValueColumnNames(), + rsOp.getValueIndex(), inputOp); + } + + private static Map buildBacktrackFromReduceSink(int initialPos, + List outputColumnNames, List keyColNames, List valueColNames, + int[] index, Operator inputOp) { + Map columnDescriptors = new LinkedHashMap(); + for (int i = 0; i < index.length; i++) { + ColumnInfo info = new ColumnInfo(inputOp.getSchema().getSignature().get(i)); + String field; + if (index[i] >= 0) { + field = Utilities.ReduceField.KEY + "." + keyColNames.get(index[i]); + } else { + field = Utilities.ReduceField.VALUE + "." + valueColNames.get(-index[i] - 1); + } + ExprNodeColumnDesc desc = new ExprNodeColumnDesc(info.getType(), + field, info.getTabAlias(), info.getIsVirtualCol()); + columnDescriptors.put(outputColumnNames.get(initialPos+i), desc); + } + return columnDescriptors; + } + + private static ExprNodeDesc convertToExprNode(RexNode rn, RelNode inputRel, + String tabAlias) { + return (ExprNodeDesc) rn.accept(new ExprNodeConverter(tabAlias, + inputRel.getRowType(), false)); + } + + private static ArrayList createColInfos(Operator input) { + ArrayList cInfoLst = new ArrayList(); + for (ColumnInfo ci : input.getSchema().getSignature()) { + cInfoLst.add(new ColumnInfo(ci)); + } + return cInfoLst; + } + + private static Pair, Map> createColInfos( + List calciteExprs, List hiveExprs, List projNames, + OpAttr inpOpAf) { + if (hiveExprs.size() != projNames.size()) { + throw new RuntimeException("Column expressions list doesn't match Column Names list"); + } + + RexNode rexN; + ExprNodeDesc pe; + ArrayList colInfos = new ArrayList(); + VirtualColumn vc; + Map newVColMap = new HashMap(); + for (int i = 0; i < hiveExprs.size(); i++) { + pe = hiveExprs.get(i); + rexN = calciteExprs.get(i); + vc = null; + if (rexN instanceof RexInputRef) { + vc = inpOpAf.vcolMap.get(((RexInputRef) rexN).getIndex()); + if (vc != null) { + newVColMap.put(i, vc); + } + } + colInfos + .add(new ColumnInfo(projNames.get(i), pe.getTypeInfo(), inpOpAf.tabAlias, vc != null)); + } + + return new Pair, Map>(colInfos, newVColMap); + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index 47a209f..d769c65 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -132,8 +132,10 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveFilterJoinRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveFilterProjectTransposeRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveFilterSetOpTransposeRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveJoinAddNotNullRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HivePartitionPruneRule; import org.apache.hadoop.hive.ql.optimizer.calcite.translator.ASTConverter; +import org.apache.hadoop.hive.ql.optimizer.calcite.translator.HiveOpConverter; import org.apache.hadoop.hive.ql.optimizer.calcite.translator.JoinCondTypeCheckProcFactory; import org.apache.hadoop.hive.ql.optimizer.calcite.translator.JoinTypeCheckCtx; import org.apache.hadoop.hive.ql.optimizer.calcite.translator.RexNodeConverter; @@ -215,39 +217,43 @@ Operator genOPTree(ASTNode ast, PlannerContext plannerCtx) throws SemanticExcept if (cboCtx.type == PreCboCtx.Type.CTAS) { queryForCbo = cboCtx.nodeOfInterest; // nodeOfInterest is the query } - runCBO = canHandleAstForCbo(queryForCbo, getQB(), cboCtx); + runCBO = canCBOHandleAst(queryForCbo, getQB(), cboCtx); if (runCBO) { disableJoinMerge = true; boolean reAnalyzeAST = false; try { - // 1. Gen Optimized AST - ASTNode newAST = getOptimizedAST(); - - // 1.1. Fix up the query for insert/ctas - newAST = fixUpCtasAndInsertAfterCbo(ast, newAST, cboCtx); - - // 2. Regen OP plan from optimized AST - init(false); - if (cboCtx.type == PreCboCtx.Type.CTAS) { - // Redo create-table analysis, because it's not part of doPhase1. - setAST(newAST); - newAST = reAnalyzeCtasAfterCbo(newAST); - } - Phase1Ctx ctx_1 = initPhase1Ctx(); - if (!doPhase1(newAST, getQB(), ctx_1, null)) { - throw new RuntimeException("Couldn't do phase1 on CBO optimized query plan"); + if (this.conf.getBoolVar(HiveConf.ConfVars.HIVE_CBO_RETPATH_HIVEOP)) { + sinkOp = getOptimizedHiveOPDag(); + } else { + // 1. Gen Optimized AST + ASTNode newAST = getOptimizedAST(); + + // 1.1. Fix up the query for insert/ctas + newAST = fixUpCtasAndInsertAfterCbo(ast, newAST, cboCtx); + + // 2. Regen OP plan from optimized AST + init(false); + if (cboCtx.type == PreCboCtx.Type.CTAS) { + // Redo create-table analysis, because it's not part of doPhase1. + setAST(newAST); + newAST = reAnalyzeCtasAfterCbo(newAST); + } + Phase1Ctx ctx_1 = initPhase1Ctx(); + if (!doPhase1(newAST, getQB(), ctx_1, null)) { + throw new RuntimeException("Couldn't do phase1 on CBO optimized query plan"); + } + // unfortunately making prunedPartitions immutable is not possible + // here with SemiJoins not all tables are costed in CBO, so their + // PartitionList is not evaluated until the run phase. + getMetaData(getQB()); + + disableJoinMerge = false; + sinkOp = genPlan(getQB()); + LOG.info("CBO Succeeded; optimized logical plan."); + LOG.debug(newAST.dump()); } - // unfortunately making prunedPartitions immutable is not possible - // here with SemiJoins not all tables are costed in CBO, so their - // PartitionList is not evaluated until the run phase. - getMetaData(getQB()); - - disableJoinMerge = false; - sinkOp = genPlan(getQB()); - LOG.info("CBO Succeeded; optimized logical plan."); - LOG.debug(newAST.dump()); } catch (Exception e) { boolean isMissingStats = noColsMissingStats.get() > 0; if (isMissingStats) { @@ -304,7 +310,7 @@ Operator genOPTree(ASTNode ast, PlannerContext plannerCtx) throws SemanticExcept * If top level QB is query then everything below it must also be * Query. */ - boolean canHandleAstForCbo(ASTNode ast, QB qb, PreCboCtx cboCtx) { + boolean canCBOHandleAst(ASTNode ast, QB qb, PreCboCtx cboCtx) { int root = ast.getToken().getType(); boolean needToLogMessage = STATIC_LOG.isInfoEnabled(); boolean isSupportedRoot = root == HiveParser.TOK_QUERY || root == HiveParser.TOK_EXPLAIN @@ -578,6 +584,31 @@ ASTNode getOptimizedAST() throws SemanticException { return optiqOptimizedAST; } + /** + * Get Optimized Hive Operator DAG for the given QB tree in the semAnalyzer. + * + * @return Optimized Hive operator tree + * @throws SemanticException + */ + Operator getOptimizedHiveOPDag() throws SemanticException { + RelNode optimizedOptiqPlan = null; + CalcitePlannerAction calcitePlannerAction = new CalcitePlannerAction(prunedPartitions); + + try { + optimizedOptiqPlan = Frameworks.withPlanner(calcitePlannerAction, Frameworks + .newConfigBuilder().typeSystem(new HiveTypeSystemImpl()).build()); + } catch (Exception e) { + rethrowCalciteException(e); + throw new AssertionError("rethrowCalciteException didn't throw for " + e.getMessage()); + } + + Operator hiveRoot = new HiveOpConverter(topOps, HiveOpConverter.getAggOPMode(conf), + conf.getVar(HiveConf.ConfVars.HIVEMAPREDMODE).equalsIgnoreCase("strict")).convert(optimizedOptiqPlan); + RowResolver hiveRootRR = genRowResolver(hiveRoot, getQB()); + opParseCtx.put(hiveRoot, new OpParseContext(hiveRootRR)); + return genFileSinkPlan(getQB().getParseInfo().getClauseNames().iterator().next(), getQB(), hiveRoot); + } + /*** * Unwraps Calcite Invocation exceptions coming meta data provider chain and * obtains the real cause. @@ -654,6 +685,24 @@ private boolean isUselessCause(Throwable t) { || t instanceof UndeclaredThrowableException; } + private RowResolver genRowResolver(Operator op, QB qb) { + RowResolver rr = new RowResolver(); + String subqAlias = (qb.getAliases().size() == 1 && qb.getSubqAliases().size() == 1) ? qb + .getAliases().get(0) : null; + + for (ColumnInfo ci : op.getSchema().getSignature()) { + try { + rr.putWithCheck((subqAlias != null) ? subqAlias : ci.getTabAlias(), + ci.getAlias() != null ? ci.getAlias() : ci.getInternalName(), ci.getInternalName(), + new ColumnInfo(ci)); + } catch (SemanticException e) { + throw new RuntimeException(e); + } + } + + return rr; + } + /** * Code responsible for Calcite plan generation and optimization. */ @@ -773,6 +822,7 @@ private RelNode applyPreJoinOrderingTransforms(RelNode basePlan, RelMetadataProv ReduceExpressionsRule.PROJECT_INSTANCE, ReduceExpressionsRule.FILTER_INSTANCE, ReduceExpressionsRule.JOIN_INSTANCE, + HiveJoinAddNotNullRule.INSTANCE, new HiveFilterProjectTransposeRule( Filter.class, HiveFilter.DEFAULT_FILTER_FACTORY, HiveProject.class, HiveProject.DEFAULT_PROJECT_FACTORY), new HiveFilterSetOpTransposeRule( @@ -1167,7 +1217,7 @@ private RelNode genTableLogicalPlan(String tableAlias, QB qb) throws SemanticExc } // 2. Get Table Metadata - Table tab = qb.getMetaData().getSrcForAlias(tableAlias); + Table tabMetaData = qb.getMetaData().getSrcForAlias(tableAlias); // 3. Get Table Logical Schema (Row Type) // NOTE: Table logical schema = Non Partition Cols + Partition Cols + @@ -1175,7 +1225,7 @@ private RelNode genTableLogicalPlan(String tableAlias, QB qb) throws SemanticExc // 3.1 Add Column info for non partion cols (Object Inspector fields) @SuppressWarnings("deprecation") - StructObjectInspector rowObjectInspector = (StructObjectInspector) tab.getDeserializer() + StructObjectInspector rowObjectInspector = (StructObjectInspector) tabMetaData.getDeserializer() .getObjectInspector(); List fields = rowObjectInspector.getAllStructFieldRefs(); ColumnInfo colInfo; @@ -1197,7 +1247,7 @@ private RelNode genTableLogicalPlan(String tableAlias, QB qb) throws SemanticExc ArrayList partitionColumns = new ArrayList(); // 3.2 Add column info corresponding to partition columns - for (FieldSchema part_col : tab.getPartCols()) { + for (FieldSchema part_col : tabMetaData.getPartCols()) { colName = part_col.getName(); colInfo = new ColumnInfo(colName, TypeInfoFactory.getPrimitiveTypeInfo(part_col.getType()), tableAlias, true); @@ -1207,6 +1257,7 @@ private RelNode genTableLogicalPlan(String tableAlias, QB qb) throws SemanticExc } // 3.3 Add column info corresponding to virtual columns + List virtualCols = new ArrayList(); Iterator vcs = VirtualColumn.getRegistry(conf).iterator(); while (vcs.hasNext()) { VirtualColumn vc = vcs.next(); @@ -1214,24 +1265,27 @@ private RelNode genTableLogicalPlan(String tableAlias, QB qb) throws SemanticExc vc.getIsHidden()); rr.put(tableAlias, vc.getName(), colInfo); cInfoLst.add(colInfo); + virtualCols.add(vc); } // 3.4 Build row type from field RelDataType rowType = TypeConverter.getType(cluster, rr, null); // 4. Build RelOptAbstractTable - String fullyQualifiedTabName = tab.getDbName(); - if (fullyQualifiedTabName != null && !fullyQualifiedTabName.isEmpty()) - fullyQualifiedTabName = fullyQualifiedTabName + "." + tab.getTableName(); - else - fullyQualifiedTabName = tab.getTableName(); + String fullyQualifiedTabName = tabMetaData.getDbName(); + if (fullyQualifiedTabName != null && !fullyQualifiedTabName.isEmpty()) { + fullyQualifiedTabName = fullyQualifiedTabName + "." + tabMetaData.getTableName() + + "." + tableAlias; + } + else { + fullyQualifiedTabName = tabMetaData.getTableName() + "." + tableAlias; + } RelOptHiveTable optTable = new RelOptHiveTable(relOptSchema, fullyQualifiedTabName, - tableAlias, rowType, tab, nonPartitionColumns, partitionColumns, conf, partitionCache, - noColsMissingStats); + tableAlias, rowType, tabMetaData, nonPartitionColumns, partitionColumns, virtualCols, conf, + partitionCache, noColsMissingStats, getAliasId(tableAlias, qb)); // 5. Build Hive Table Scan Rel - tableRel = new HiveTableScan(cluster, cluster.traitSetOf(HiveRelNode.CONVENTION), optTable, - rowType); + tableRel = new HiveTableScan(cluster, cluster.traitSetOf(HiveRelNode.CONVENTION), optTable); // 6. Add Schema(RR) to RelNode-Schema map ImmutableMap hiveToCalciteColMap = buildHiveToCalciteColumnMap(rr, diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index b515525..33bbe0d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -228,9 +228,9 @@ private HashMap opToPartPruner; private HashMap opToPartList; - private HashMap> topOps; - private final HashMap> topSelOps; - private final LinkedHashMap, OpParseContext> opParseCtx; + protected HashMap> topOps; + private HashMap> topSelOps; + protected LinkedHashMap, OpParseContext> opParseCtx; private List loadTableWork; private List loadFileWork; private final Map joinContext; @@ -6123,7 +6123,7 @@ private boolean checkHoldDDLTime(QB qb) { } @SuppressWarnings("nls") - private Operator genFileSinkPlan(String dest, QB qb, Operator input) + protected Operator genFileSinkPlan(String dest, QB qb, Operator input) throws SemanticException { RowResolver inputRR = opParseCtx.get(input).getRowResolver(); @@ -9226,7 +9226,7 @@ private ExprNodeDesc genSamplePredicate(TableSample ts, return equalsExpr; } - private String getAliasId(String alias, QB qb) { + protected String getAliasId(String alias, QB qb) { return (qb.getId() == null ? alias : qb.getId() + ":" + alias).toLowerCase(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java index 990608a..43e34de 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java @@ -107,6 +107,13 @@ public JoinDesc() { } public JoinDesc(final Map> exprs, + List outputColumnNames, final boolean noOuterJoin, + final JoinCondDesc[] conds, ExprNodeDesc[][] joinKeys) { + this (exprs, outputColumnNames, noOuterJoin, conds, + new HashMap>(), joinKeys); + } + + public JoinDesc(final Map> exprs, List outputColumnNames, final boolean noOuterJoin, final JoinCondDesc[] conds, final Map> filters, ExprNodeDesc[][] joinKeys) {