diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java index 0ada068..a308298 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java @@ -93,7 +93,7 @@ public static ASTNode convert(final RelNode relNode, List resultSchema) throws CalciteSemanticException { - RelNode root = PlanModifierForASTConv.convertOpTree(relNode, resultSchema); + RelNode root = PlanModifierForConversion.convertOpTree(relNode, resultSchema); ASTConverter c = new ASTConverter(root, 0); return c.convert(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveGBOpConvUtil.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveGBOpConvUtil.java index 55f1247..d2c9374 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveGBOpConvUtil.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveGBOpConvUtil.java @@ -252,7 +252,7 @@ private static GBInfo getGBInfo(HiveAggregate aggRel, OpAttr inputOpAf, HiveConf gbInfo.distColIndices.add(distColIndicesOfUDAF); } - // special handling for count, similar to PlanModifierForASTConv::replaceEmptyGroupAggr() + // special handling for count, similar to PlanModifierForConversion::replaceEmptyGroupAggr() udafAttrs.udafEvaluator = SemanticAnalyzer.getGenericUDAFEvaluator(udafAttrs.udafName, new ArrayList(udafAttrs.udafParams), new ASTNode(), udafAttrs.isDistinctUDAF, udafAttrs.udafParams.size() == 0 && diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java index d15520c..a25fca8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java @@ -27,7 +27,6 @@ import java.util.Map; import java.util.Set; -import org.apache.calcite.rel.RelCollation; import org.apache.calcite.rel.RelCollations; import org.apache.calcite.rel.RelDistribution; import org.apache.calcite.rel.RelDistribution.Type; @@ -38,6 +37,7 @@ import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.ImmutableBitSet; import org.apache.calcite.util.Pair; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -365,6 +365,10 @@ OpAttr visit(HiveSort sortRel) throws SemanticException { Operator inputOp = inputOpAf.inputs.get(0); Operator resultOp = inputOpAf.inputs.get(0); + + ImmutableBitSet.Builder sortColsPosBuilder = new ImmutableBitSet.Builder(); + ImmutableBitSet.Builder sortOutputColsPosBuilder = new ImmutableBitSet.Builder(); + Map obRefToCallMap = sortRel.getInputRefToCallMap(); // 1. If we need to sort tuples based on the value of some // of their columns if (sortRel.getCollation() != RelCollations.EMPTY) { @@ -379,18 +383,24 @@ OpAttr visit(HiveSort sortRel) throws SemanticException { // Generate sortCols and order List sortCols = new ArrayList(); StringBuilder order = new StringBuilder(); - for (RelCollation collation : sortRel.getCollationList()) { - for (RelFieldCollation sortInfo : collation.getFieldCollations()) { - int sortColumnPos = sortInfo.getFieldIndex(); - ColumnInfo columnInfo = new ColumnInfo(inputOp.getSchema().getSignature() - .get(sortColumnPos)); - ExprNodeColumnDesc sortColumn = new ExprNodeColumnDesc(columnInfo.getType(), - columnInfo.getInternalName(), columnInfo.getTabAlias(), columnInfo.getIsVirtualCol()); - sortCols.add(sortColumn); - if (sortInfo.getDirection() == RelFieldCollation.Direction.DESCENDING) { - order.append("-"); - } else { - order.append("+"); + for (RelFieldCollation sortInfo : sortRel.getCollation().getFieldCollations()) { + int sortColumnPos = sortInfo.getFieldIndex(); + ColumnInfo columnInfo = new ColumnInfo(inputOp.getSchema().getSignature() + .get(sortColumnPos)); + ExprNodeColumnDesc sortColumn = new ExprNodeColumnDesc(columnInfo.getType(), + columnInfo.getInternalName(), columnInfo.getTabAlias(), columnInfo.getIsVirtualCol()); + sortCols.add(sortColumn); + if (sortInfo.getDirection() == RelFieldCollation.Direction.DESCENDING) { + order.append("-"); + } else { + order.append("+"); + } + + if (obRefToCallMap != null) { + RexNode obExpr = obRefToCallMap.get(sortColumnPos); + sortColsPosBuilder.set(sortColumnPos); + if (obExpr == null) { + sortOutputColsPosBuilder.set(sortColumnPos); } } } @@ -402,6 +412,8 @@ OpAttr visit(HiveSort sortRel) throws SemanticException { sortCols.toArray(new ExprNodeDesc[sortCols.size()]), 0, new ArrayList(), order.toString(), numReducers, Operation.NOT_ACID, strictMode); } + final ImmutableBitSet sortColsPos = sortColsPosBuilder.build(); + final ImmutableBitSet sortOutputColsPos = sortOutputColsPosBuilder.build(); // 2. If we need to generate limit if (sortRel.fetch != null) { @@ -417,7 +429,46 @@ OpAttr visit(HiveSort sortRel) throws SemanticException { } } - // 3. Return result + // 3. Select operator (if needed i.e. if this is a sort operator on top of the plan) + if (obRefToCallMap != null) { + // 3.1. We create the data structures for the additional Select operator + List exprRexNodes = new ArrayList(); + List exprNames = new ArrayList(); + List exprCols = new ArrayList(); + Map colExprMap = new HashMap(); + + // 3.2. We keep the columns only the columns that are part of the final output + final ArrayList inputSchema = inputOp.getSchema().getSignature(); + for (int pos=0; pos, Map> colInfoVColPair = createColInfos( + exprRexNodes, exprCols, exprNames, inputOpAf); + SelectOperator selOp = (SelectOperator) OperatorFactory.getAndMakeChild(sd, new RowSchema( + colInfoVColPair.getKey()), resultOp); + selOp.setColumnExprMap(colExprMap); + + if (LOG.isDebugEnabled()) { + LOG.debug("Generated " + selOp + " with row schema: [" + selOp.getSchema() + "]"); + } + + // 3.4. Return result + return new OpAttr(inputOpAf.tabAlias, colInfoVColPair.getValue(), selOp); + } + + // 4. Return result return inputOpAf.clone(resultOp); } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/PlanModifierForASTConv.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/PlanModifierForASTConv.java deleted file mode 100644 index cba37bc..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/PlanModifierForASTConv.java +++ /dev/null @@ -1,440 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.optimizer.calcite.translator; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import org.apache.calcite.plan.RelOptUtil; -import org.apache.calcite.plan.hep.HepRelVertex; -import org.apache.calcite.plan.volcano.RelSubset; -import org.apache.calcite.rel.RelCollationImpl; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.SingleRel; -import org.apache.calcite.rel.core.Aggregate; -import org.apache.calcite.rel.core.AggregateCall; -import org.apache.calcite.rel.core.Filter; -import org.apache.calcite.rel.core.Join; -import org.apache.calcite.rel.core.Project; -import org.apache.calcite.rel.core.SetOp; -import org.apache.calcite.rel.core.Sort; -import org.apache.calcite.rel.rules.MultiJoin; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rex.RexBuilder; -import org.apache.calcite.rex.RexCall; -import org.apache.calcite.rex.RexInputRef; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.sql.SqlAggFunction; -import org.apache.calcite.sql.SqlKind; -import org.apache.calcite.util.Pair; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException; -import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; -import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable; -import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; -import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject; -import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSort; -import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; - -public class PlanModifierForASTConv { - private static final Log LOG = LogFactory.getLog(PlanModifierForASTConv.class); - - public static RelNode convertOpTree(RelNode rel, List resultSchema) - throws CalciteSemanticException { - RelNode newTopNode = rel; - if (LOG.isDebugEnabled()) { - LOG.debug("Original plan for PlanModifier\n " + RelOptUtil.toString(newTopNode)); - } - - if (!(newTopNode instanceof Project) && !(newTopNode instanceof Sort)) { - newTopNode = introduceDerivedTable(newTopNode); - if (LOG.isDebugEnabled()) { - LOG.debug("Plan after top-level introduceDerivedTable\n " - + RelOptUtil.toString(newTopNode)); - } - } - - convertOpTree(newTopNode, (RelNode) null); - if (LOG.isDebugEnabled()) { - LOG.debug("Plan after nested convertOpTree\n " + RelOptUtil.toString(newTopNode)); - } - - Pair topSelparentPair = HiveCalciteUtil.getTopLevelSelect(newTopNode); - fixTopOBSchema(newTopNode, topSelparentPair, resultSchema); - if (LOG.isDebugEnabled()) { - LOG.debug("Plan after fixTopOBSchema\n " + RelOptUtil.toString(newTopNode)); - } - - topSelparentPair = HiveCalciteUtil.getTopLevelSelect(newTopNode); - newTopNode = renameTopLevelSelectInResultSchema(newTopNode, topSelparentPair, resultSchema); - if (LOG.isDebugEnabled()) { - LOG.debug("Final plan after modifier\n " + RelOptUtil.toString(newTopNode)); - } - return newTopNode; - } - - private static String getTblAlias(RelNode rel) { - - if (null == rel) { - return null; - } - if (rel instanceof HiveTableScan) { - return ((HiveTableScan)rel).getTableAlias(); - } - if (rel instanceof Project) { - return null; - } - if (rel.getInputs().size() == 1) { - return getTblAlias(rel.getInput(0)); - } - return null; - } - - private static void convertOpTree(RelNode rel, RelNode parent) { - - if (rel instanceof HepRelVertex) { - throw new RuntimeException("Found HepRelVertex"); - } else if (rel instanceof Join) { - if (!validJoinParent(rel, parent)) { - introduceDerivedTable(rel, parent); - } - String leftChild = getTblAlias(((Join)rel).getLeft()); - if (null != leftChild && leftChild.equalsIgnoreCase(getTblAlias(((Join)rel).getRight()))) { - // introduce derived table above one child, if this is self-join - // since user provided aliases are lost at this point. - introduceDerivedTable(((Join)rel).getLeft(), rel); - } - } else if (rel instanceof MultiJoin) { - throw new RuntimeException("Found MultiJoin"); - } else if (rel instanceof RelSubset) { - throw new RuntimeException("Found RelSubset"); - } else if (rel instanceof SetOp) { - // TODO: Handle more than 2 inputs for setop - if (!validSetopParent(rel, parent)) - introduceDerivedTable(rel, parent); - - SetOp setop = (SetOp) rel; - for (RelNode inputRel : setop.getInputs()) { - if (!validSetopChild(inputRel)) { - introduceDerivedTable(inputRel, setop); - } - } - } else if (rel instanceof SingleRel) { - if (rel instanceof Filter) { - if (!validFilterParent(rel, parent)) { - introduceDerivedTable(rel, parent); - } - } else if (rel instanceof HiveSort) { - if (!validSortParent(rel, parent)) { - introduceDerivedTable(rel, parent); - } - if (!validSortChild((HiveSort) rel)) { - introduceDerivedTable(((HiveSort) rel).getInput(), rel); - } - } else if (rel instanceof HiveAggregate) { - RelNode newParent = parent; - if (!validGBParent(rel, parent)) { - newParent = introduceDerivedTable(rel, parent); - } - // check if groupby is empty and there is no other cols in aggr - // this should only happen when newParent is constant. - if (isEmptyGrpAggr(rel)) { - replaceEmptyGroupAggr(rel, newParent); - } - } - } - - List childNodes = rel.getInputs(); - if (childNodes != null) { - for (RelNode r : childNodes) { - convertOpTree(r, rel); - } - } - } - - private static void fixTopOBSchema(final RelNode rootRel, - Pair topSelparentPair, List resultSchema) - throws CalciteSemanticException { - if (!(topSelparentPair.getKey() instanceof Sort) - || !HiveCalciteUtil.orderRelNode(topSelparentPair.getKey())) { - return; - } - HiveSort obRel = (HiveSort) topSelparentPair.getKey(); - Project obChild = (Project) topSelparentPair.getValue(); - if (obChild.getRowType().getFieldCount() <= resultSchema.size()) { - return; - } - - RelDataType rt = obChild.getRowType(); - @SuppressWarnings({ "unchecked", "rawtypes" }) - Set collationInputRefs = new HashSet( - RelCollationImpl.ordinals(obRel.getCollation())); - ImmutableMap.Builder inputRefToCallMapBldr = ImmutableMap.builder(); - for (int i = resultSchema.size(); i < rt.getFieldCount(); i++) { - if (collationInputRefs.contains(i)) { - RexNode obyExpr = obChild.getChildExps().get(i); - if (obyExpr instanceof RexCall) { - int a = -1; - List operands = new ArrayList<>(); - for (int k = 0; k< ((RexCall) obyExpr).operands.size(); k++) { - RexNode rn = ((RexCall) obyExpr).operands.get(k); - for (int j = 0; j < resultSchema.size(); j++) { - if( obChild.getChildExps().get(j).toString().equals(rn.toString())) { - a = j; - break; - } - } if (a != -1) { - operands.add(new RexInputRef(a, rn.getType())); - } else { - operands.add(rn); - } - a = -1; - } - obyExpr = obChild.getCluster().getRexBuilder().makeCall(((RexCall)obyExpr).getOperator(), operands); - } - inputRefToCallMapBldr.put(i, obyExpr); - } - } - ImmutableMap inputRefToCallMap = inputRefToCallMapBldr.build(); - - if ((obChild.getRowType().getFieldCount() - inputRefToCallMap.size()) != resultSchema.size()) { - LOG.error(generateInvalidSchemaMessage(obChild, resultSchema, inputRefToCallMap.size())); - throw new CalciteSemanticException("Result Schema didn't match Optimized Op Tree Schema"); - } - // This removes order-by only expressions from the projections. - HiveProject replacementProjectRel = HiveProject.create(obChild.getInput(), obChild - .getChildExps().subList(0, resultSchema.size()), obChild.getRowType().getFieldNames() - .subList(0, resultSchema.size())); - obRel.replaceInput(0, replacementProjectRel); - obRel.setInputRefToCallMap(inputRefToCallMap); - } - - private static String generateInvalidSchemaMessage(Project topLevelProj, - List resultSchema, int fieldsForOB) { - String errorDesc = "Result Schema didn't match Calcite Optimized Op Tree; schema: "; - for (FieldSchema fs : resultSchema) { - errorDesc += "[" + fs.getName() + ":" + fs.getType() + "], "; - } - errorDesc += " projection fields: "; - for (RexNode exp : topLevelProj.getChildExps()) { - errorDesc += "[" + exp.toString() + ":" + exp.getType() + "], "; - } - if (fieldsForOB != 0) { - errorDesc += fieldsForOB + " fields removed due to ORDER BY "; - } - return errorDesc.substring(0, errorDesc.length() - 2); - } - - private static RelNode renameTopLevelSelectInResultSchema(final RelNode rootRel, - Pair topSelparentPair, List resultSchema) - throws CalciteSemanticException { - RelNode parentOforiginalProjRel = topSelparentPair.getKey(); - HiveProject originalProjRel = (HiveProject) topSelparentPair.getValue(); - - // Assumption: top portion of tree could only be - // (limit)?(OB)?(Project).... - List rootChildExps = originalProjRel.getChildExps(); - if (resultSchema.size() != rootChildExps.size()) { - // Safeguard against potential issues in CBO RowResolver construction. Disable CBO for now. - LOG.error(generateInvalidSchemaMessage(originalProjRel, resultSchema, 0)); - throw new CalciteSemanticException("Result Schema didn't match Optimized Op Tree Schema"); - } - - List newSelAliases = new ArrayList(); - String colAlias; - for (int i = 0; i < rootChildExps.size(); i++) { - colAlias = resultSchema.get(i).getName(); - if (colAlias.startsWith("_")) { - colAlias = colAlias.substring(1); - } - newSelAliases.add(colAlias); - } - - HiveProject replacementProjectRel = HiveProject.create(originalProjRel.getInput(), - originalProjRel.getChildExps(), newSelAliases); - - if (rootRel == originalProjRel) { - return replacementProjectRel; - } else { - parentOforiginalProjRel.replaceInput(0, replacementProjectRel); - return rootRel; - } - } - - private static RelNode introduceDerivedTable(final RelNode rel) { - List projectList = HiveCalciteUtil.getProjsFromBelowAsInputRef(rel); - - HiveProject select = HiveProject.create(rel.getCluster(), rel, projectList, - rel.getRowType(), rel.getCollationList()); - - return select; - } - - private static RelNode introduceDerivedTable(final RelNode rel, RelNode parent) { - int i = 0; - int pos = -1; - List childList = parent.getInputs(); - - for (RelNode child : childList) { - if (child == rel) { - pos = i; - break; - } - i++; - } - - if (pos == -1) { - throw new RuntimeException("Couldn't find child node in parent's inputs"); - } - - RelNode select = introduceDerivedTable(rel); - - parent.replaceInput(pos, select); - - return select; - } - - private static boolean validJoinParent(RelNode joinNode, RelNode parent) { - boolean validParent = true; - - if (parent instanceof Join) { - if (((Join) parent).getRight() == joinNode) { - validParent = false; - } - } else if (parent instanceof SetOp) { - validParent = false; - } - - return validParent; - } - - private static boolean validFilterParent(RelNode filterNode, RelNode parent) { - boolean validParent = true; - - // TOODO: Verify GB having is not a seperate filter (if so we shouldn't - // introduce derived table) - if (parent instanceof Filter || parent instanceof Join - || parent instanceof SetOp) { - validParent = false; - } - - return validParent; - } - - private static boolean validGBParent(RelNode gbNode, RelNode parent) { - boolean validParent = true; - - // TOODO: Verify GB having is not a seperate filter (if so we shouldn't - // introduce derived table) - if (parent instanceof Join || parent instanceof SetOp - || parent instanceof Aggregate - || (parent instanceof Filter && ((Aggregate) gbNode).getGroupSet().isEmpty())) { - validParent = false; - } - - return validParent; - } - - private static boolean validSortParent(RelNode sortNode, RelNode parent) { - boolean validParent = true; - - if (parent != null && !(parent instanceof Project) - && !((parent instanceof Sort) || HiveCalciteUtil.orderRelNode(parent))) - validParent = false; - - return validParent; - } - - private static boolean validSortChild(HiveSort sortNode) { - boolean validChild = true; - RelNode child = sortNode.getInput(); - - if (!(HiveCalciteUtil.limitRelNode(sortNode) && HiveCalciteUtil.orderRelNode(child)) - && !(child instanceof Project)) { - validChild = false; - } - - return validChild; - } - - private static boolean validSetopParent(RelNode setop, RelNode parent) { - boolean validChild = true; - - if (parent != null && !(parent instanceof Project)) { - validChild = false; - } - - return validChild; - } - - private static boolean validSetopChild(RelNode setopChild) { - boolean validChild = true; - - if (!(setopChild instanceof Project)) { - validChild = false; - } - - return validChild; - } - - private static boolean isEmptyGrpAggr(RelNode gbNode) { - // Verify if both groupset and aggrfunction are empty) - Aggregate aggrnode = (Aggregate) gbNode; - if (aggrnode.getGroupSet().isEmpty() && aggrnode.getAggCallList().isEmpty()) { - return true; - } - return false; - } - - private static void replaceEmptyGroupAggr(final RelNode rel, RelNode parent) { - // If this function is called, the parent should only include constant - List exps = parent.getChildExps(); - for (RexNode rexNode : exps) { - if (!rexNode.accept(new HiveCalciteUtil.ConstantFinder())) { - throw new RuntimeException("We expect " + parent.toString() - + " to contain only constants. However, " + rexNode.toString() + " is " - + rexNode.getKind()); - } - } - HiveAggregate oldAggRel = (HiveAggregate) rel; - RelDataTypeFactory typeFactory = oldAggRel.getCluster().getTypeFactory(); - RelDataType longType = TypeConverter.convert(TypeInfoFactory.longTypeInfo, typeFactory); - RelDataType intType = TypeConverter.convert(TypeInfoFactory.intTypeInfo, typeFactory); - // Create the dummy aggregation. - SqlAggFunction countFn = SqlFunctionConverter.getCalciteAggFn("count", - ImmutableList.of(intType), longType); - // TODO: Using 0 might be wrong; might need to walk down to find the - // proper index of a dummy. - List argList = ImmutableList.of(0); - AggregateCall dummyCall = new AggregateCall(countFn, false, argList, longType, null); - Aggregate newAggRel = oldAggRel.copy(oldAggRel.getTraitSet(), oldAggRel.getInput(), - oldAggRel.indicator, oldAggRel.getGroupSet(), oldAggRel.getGroupSets(), - ImmutableList.of(dummyCall)); - RelNode select = introduceDerivedTable(newAggRel); - parent.replaceInput(0, select); - } -} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/PlanModifierForConversion.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/PlanModifierForConversion.java new file mode 100644 index 0000000..dda3b5a --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/PlanModifierForConversion.java @@ -0,0 +1,466 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.translator; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.hep.HepRelVertex; +import org.apache.calcite.plan.volcano.RelSubset; +import org.apache.calcite.rel.RelCollationImpl; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.SingleRel; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.SetOp; +import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rel.rules.MultiJoin; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.util.Pair; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSort; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +public class PlanModifierForConversion { + private static final Log LOG = LogFactory.getLog(PlanModifierForConversion.class); + + public static RelNode convertOpTree(RelNode rel, List resultSchema) + throws CalciteSemanticException { + RelNode newTopNode = rel; + if (LOG.isDebugEnabled()) { + LOG.debug("Original plan for PlanModifier\n " + RelOptUtil.toString(newTopNode)); + } + + if (!(newTopNode instanceof Project) && !(newTopNode instanceof Sort)) { + newTopNode = introduceDerivedTable(newTopNode); + if (LOG.isDebugEnabled()) { + LOG.debug("Plan after top-level introduceDerivedTable\n " + + RelOptUtil.toString(newTopNode)); + } + } + + convertOpTree(newTopNode, (RelNode) null); + if (LOG.isDebugEnabled()) { + LOG.debug("Plan after nested convertOpTree\n " + RelOptUtil.toString(newTopNode)); + } + + Pair topSelparentPair = HiveCalciteUtil.getTopLevelSelect(newTopNode); + fixTopOBSchema(newTopNode, topSelparentPair, resultSchema, true); + if (LOG.isDebugEnabled()) { + LOG.debug("Plan after fixTopOBSchema\n " + RelOptUtil.toString(newTopNode)); + } + + topSelparentPair = HiveCalciteUtil.getTopLevelSelect(newTopNode); + newTopNode = renameTopLevelSelectInResultSchema(newTopNode, topSelparentPair, resultSchema); + if (LOG.isDebugEnabled()) { + LOG.debug("Final plan after modifier\n " + RelOptUtil.toString(newTopNode)); + } + return newTopNode; + } + + public static RelNode convertOpTreeForReturnPath(RelNode rel, List resultSchema) + throws CalciteSemanticException { + RelNode newTopNode = rel; + if (LOG.isDebugEnabled()) { + LOG.debug("Original plan for PlanModifier\n " + RelOptUtil.toString(newTopNode)); + } + + if (!(newTopNode instanceof Project) && !(newTopNode instanceof Sort)) { + newTopNode = introduceDerivedTable(newTopNode); + if (LOG.isDebugEnabled()) { + LOG.debug("Plan after top-level introduceDerivedTable\n " + + RelOptUtil.toString(newTopNode)); + } + } + + Pair topSelparentPair = HiveCalciteUtil.getTopLevelSelect(newTopNode); + fixTopOBSchema(newTopNode, topSelparentPair, resultSchema, false); + if (LOG.isDebugEnabled()) { + LOG.debug("Plan after fixTopOBSchema\n " + RelOptUtil.toString(newTopNode)); + } + + return newTopNode; + } + + private static String getTblAlias(RelNode rel) { + + if (null == rel) { + return null; + } + if (rel instanceof HiveTableScan) { + return ((HiveTableScan)rel).getTableAlias(); + } + if (rel instanceof Project) { + return null; + } + if (rel.getInputs().size() == 1) { + return getTblAlias(rel.getInput(0)); + } + return null; + } + + private static void convertOpTree(RelNode rel, RelNode parent) { + + if (rel instanceof HepRelVertex) { + throw new RuntimeException("Found HepRelVertex"); + } else if (rel instanceof Join) { + if (!validJoinParent(rel, parent)) { + introduceDerivedTable(rel, parent); + } + String leftChild = getTblAlias(((Join)rel).getLeft()); + if (null != leftChild && leftChild.equalsIgnoreCase(getTblAlias(((Join)rel).getRight()))) { + // introduce derived table above one child, if this is self-join + // since user provided aliases are lost at this point. + introduceDerivedTable(((Join)rel).getLeft(), rel); + } + } else if (rel instanceof MultiJoin) { + throw new RuntimeException("Found MultiJoin"); + } else if (rel instanceof RelSubset) { + throw new RuntimeException("Found RelSubset"); + } else if (rel instanceof SetOp) { + // TODO: Handle more than 2 inputs for setop + if (!validSetopParent(rel, parent)) + introduceDerivedTable(rel, parent); + + SetOp setop = (SetOp) rel; + for (RelNode inputRel : setop.getInputs()) { + if (!validSetopChild(inputRel)) { + introduceDerivedTable(inputRel, setop); + } + } + } else if (rel instanceof SingleRel) { + if (rel instanceof Filter) { + if (!validFilterParent(rel, parent)) { + introduceDerivedTable(rel, parent); + } + } else if (rel instanceof HiveSort) { + if (!validSortParent(rel, parent)) { + introduceDerivedTable(rel, parent); + } + if (!validSortChild((HiveSort) rel)) { + introduceDerivedTable(((HiveSort) rel).getInput(), rel); + } + } else if (rel instanceof HiveAggregate) { + RelNode newParent = parent; + if (!validGBParent(rel, parent)) { + newParent = introduceDerivedTable(rel, parent); + } + // check if groupby is empty and there is no other cols in aggr + // this should only happen when newParent is constant. + if (isEmptyGrpAggr(rel)) { + replaceEmptyGroupAggr(rel, newParent); + } + } + } + + List childNodes = rel.getInputs(); + if (childNodes != null) { + for (RelNode r : childNodes) { + convertOpTree(r, rel); + } + } + } + + private static void fixTopOBSchema(final RelNode rootRel, + Pair topSelparentPair, List resultSchema, + boolean replaceProject) throws CalciteSemanticException { + if (!(topSelparentPair.getKey() instanceof Sort) + || !HiveCalciteUtil.orderRelNode(topSelparentPair.getKey())) { + return; + } + HiveSort obRel = (HiveSort) topSelparentPair.getKey(); + Project obChild = (Project) topSelparentPair.getValue(); + if (obChild.getRowType().getFieldCount() <= resultSchema.size()) { + return; + } + + RelDataType rt = obChild.getRowType(); + @SuppressWarnings({ "unchecked", "rawtypes" }) + Set collationInputRefs = new HashSet( + RelCollationImpl.ordinals(obRel.getCollation())); + ImmutableMap.Builder inputRefToCallMapBldr = ImmutableMap.builder(); + for (int i = resultSchema.size(); i < rt.getFieldCount(); i++) { + if (collationInputRefs.contains(i)) { + RexNode obyExpr = obChild.getChildExps().get(i); + if (obyExpr instanceof RexCall) { + int a = -1; + List operands = new ArrayList<>(); + for (int k = 0; k< ((RexCall) obyExpr).operands.size(); k++) { + RexNode rn = ((RexCall) obyExpr).operands.get(k); + for (int j = 0; j < resultSchema.size(); j++) { + if( obChild.getChildExps().get(j).toString().equals(rn.toString())) { + a = j; + break; + } + } if (a != -1) { + operands.add(new RexInputRef(a, rn.getType())); + } else { + operands.add(rn); + } + a = -1; + } + obyExpr = obChild.getCluster().getRexBuilder().makeCall(((RexCall)obyExpr).getOperator(), operands); + } + inputRefToCallMapBldr.put(i, obyExpr); + } + } + ImmutableMap inputRefToCallMap = inputRefToCallMapBldr.build(); + + if ((obChild.getRowType().getFieldCount() - inputRefToCallMap.size()) != resultSchema.size()) { + LOG.error(generateInvalidSchemaMessage(obChild, resultSchema, inputRefToCallMap.size())); + throw new CalciteSemanticException("Result Schema didn't match Optimized Op Tree Schema"); + } + + if (replaceProject) { + // This removes order-by only expressions from the projections. + HiveProject replacementProjectRel = HiveProject.create(obChild.getInput(), obChild + .getChildExps().subList(0, resultSchema.size()), obChild.getRowType().getFieldNames() + .subList(0, resultSchema.size())); + obRel.replaceInput(0, replacementProjectRel); + } + obRel.setInputRefToCallMap(inputRefToCallMap); + } + + private static String generateInvalidSchemaMessage(Project topLevelProj, + List resultSchema, int fieldsForOB) { + String errorDesc = "Result Schema didn't match Calcite Optimized Op Tree; schema: "; + for (FieldSchema fs : resultSchema) { + errorDesc += "[" + fs.getName() + ":" + fs.getType() + "], "; + } + errorDesc += " projection fields: "; + for (RexNode exp : topLevelProj.getChildExps()) { + errorDesc += "[" + exp.toString() + ":" + exp.getType() + "], "; + } + if (fieldsForOB != 0) { + errorDesc += fieldsForOB + " fields removed due to ORDER BY "; + } + return errorDesc.substring(0, errorDesc.length() - 2); + } + + private static RelNode renameTopLevelSelectInResultSchema(final RelNode rootRel, + Pair topSelparentPair, List resultSchema) + throws CalciteSemanticException { + RelNode parentOforiginalProjRel = topSelparentPair.getKey(); + HiveProject originalProjRel = (HiveProject) topSelparentPair.getValue(); + + // Assumption: top portion of tree could only be + // (limit)?(OB)?(Project).... + List rootChildExps = originalProjRel.getChildExps(); + LOG.info("Jesus : Is resultSchema null? " + resultSchema == null); + LOG.info("Jesus : Is rootChildExps null? " + rootChildExps == null); + if (resultSchema.size() != rootChildExps.size()) { + // Safeguard against potential issues in CBO RowResolver construction. Disable CBO for now. + LOG.error(generateInvalidSchemaMessage(originalProjRel, resultSchema, 0)); + throw new CalciteSemanticException("Result Schema didn't match Optimized Op Tree Schema"); + } + + List newSelAliases = new ArrayList(); + String colAlias; + for (int i = 0; i < rootChildExps.size(); i++) { + colAlias = resultSchema.get(i).getName(); + if (colAlias.startsWith("_")) { + colAlias = colAlias.substring(1); + } + newSelAliases.add(colAlias); + } + + HiveProject replacementProjectRel = HiveProject.create(originalProjRel.getInput(), + originalProjRel.getChildExps(), newSelAliases); + + if (rootRel == originalProjRel) { + return replacementProjectRel; + } else { + parentOforiginalProjRel.replaceInput(0, replacementProjectRel); + return rootRel; + } + } + + private static RelNode introduceDerivedTable(final RelNode rel) { + List projectList = HiveCalciteUtil.getProjsFromBelowAsInputRef(rel); + + HiveProject select = HiveProject.create(rel.getCluster(), rel, projectList, + rel.getRowType(), rel.getCollationList()); + + return select; + } + + private static RelNode introduceDerivedTable(final RelNode rel, RelNode parent) { + int i = 0; + int pos = -1; + List childList = parent.getInputs(); + + for (RelNode child : childList) { + if (child == rel) { + pos = i; + break; + } + i++; + } + + if (pos == -1) { + throw new RuntimeException("Couldn't find child node in parent's inputs"); + } + + RelNode select = introduceDerivedTable(rel); + + parent.replaceInput(pos, select); + + return select; + } + + private static boolean validJoinParent(RelNode joinNode, RelNode parent) { + boolean validParent = true; + + if (parent instanceof Join) { + if (((Join) parent).getRight() == joinNode) { + validParent = false; + } + } else if (parent instanceof SetOp) { + validParent = false; + } + + return validParent; + } + + private static boolean validFilterParent(RelNode filterNode, RelNode parent) { + boolean validParent = true; + + // TOODO: Verify GB having is not a seperate filter (if so we shouldn't + // introduce derived table) + if (parent instanceof Filter || parent instanceof Join + || parent instanceof SetOp) { + validParent = false; + } + + return validParent; + } + + private static boolean validGBParent(RelNode gbNode, RelNode parent) { + boolean validParent = true; + + // TOODO: Verify GB having is not a seperate filter (if so we shouldn't + // introduce derived table) + if (parent instanceof Join || parent instanceof SetOp + || parent instanceof Aggregate + || (parent instanceof Filter && ((Aggregate) gbNode).getGroupSet().isEmpty())) { + validParent = false; + } + + return validParent; + } + + private static boolean validSortParent(RelNode sortNode, RelNode parent) { + boolean validParent = true; + + if (parent != null && !(parent instanceof Project) + && !((parent instanceof Sort) || HiveCalciteUtil.orderRelNode(parent))) + validParent = false; + + return validParent; + } + + private static boolean validSortChild(HiveSort sortNode) { + boolean validChild = true; + RelNode child = sortNode.getInput(); + + if (!(HiveCalciteUtil.limitRelNode(sortNode) && HiveCalciteUtil.orderRelNode(child)) + && !(child instanceof Project)) { + validChild = false; + } + + return validChild; + } + + private static boolean validSetopParent(RelNode setop, RelNode parent) { + boolean validChild = true; + + if (parent != null && !(parent instanceof Project)) { + validChild = false; + } + + return validChild; + } + + private static boolean validSetopChild(RelNode setopChild) { + boolean validChild = true; + + if (!(setopChild instanceof Project)) { + validChild = false; + } + + return validChild; + } + + private static boolean isEmptyGrpAggr(RelNode gbNode) { + // Verify if both groupset and aggrfunction are empty) + Aggregate aggrnode = (Aggregate) gbNode; + if (aggrnode.getGroupSet().isEmpty() && aggrnode.getAggCallList().isEmpty()) { + return true; + } + return false; + } + + private static void replaceEmptyGroupAggr(final RelNode rel, RelNode parent) { + // If this function is called, the parent should only include constant + List exps = parent.getChildExps(); + for (RexNode rexNode : exps) { + if (!rexNode.accept(new HiveCalciteUtil.ConstantFinder())) { + throw new RuntimeException("We expect " + parent.toString() + + " to contain only constants. However, " + rexNode.toString() + " is " + + rexNode.getKind()); + } + } + HiveAggregate oldAggRel = (HiveAggregate) rel; + RelDataTypeFactory typeFactory = oldAggRel.getCluster().getTypeFactory(); + RelDataType longType = TypeConverter.convert(TypeInfoFactory.longTypeInfo, typeFactory); + RelDataType intType = TypeConverter.convert(TypeInfoFactory.intTypeInfo, typeFactory); + // Create the dummy aggregation. + SqlAggFunction countFn = SqlFunctionConverter.getCalciteAggFn("count", + ImmutableList.of(intType), longType); + // TODO: Using 0 might be wrong; might need to walk down to find the + // proper index of a dummy. + List argList = ImmutableList.of(0); + AggregateCall dummyCall = new AggregateCall(countFn, false, argList, longType, null); + Aggregate newAggRel = oldAggRel.copy(oldAggRel.getTraitSet(), oldAggRel.getInput(), + oldAggRel.indicator, oldAggRel.getGroupSet(), oldAggRel.getGroupSets(), + ImmutableList.of(dummyCall)); + RelNode select = introduceDerivedTable(newAggRel); + parent.replaceInput(0, select); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index 6a15bf6..0e4015c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -147,6 +147,7 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.translator.HiveOpConverter; import org.apache.hadoop.hive.ql.optimizer.calcite.translator.JoinCondTypeCheckProcFactory; import org.apache.hadoop.hive.ql.optimizer.calcite.translator.JoinTypeCheckCtx; +import org.apache.hadoop.hive.ql.optimizer.calcite.translator.PlanModifierForConversion; import org.apache.hadoop.hive.ql.optimizer.calcite.translator.RexNodeConverter; import org.apache.hadoop.hive.ql.optimizer.calcite.translator.SqlFunctionConverter; import org.apache.hadoop.hive.ql.optimizer.calcite.translator.TypeConverter; @@ -629,7 +630,8 @@ Operator getOptimizedHiveOPDag() throws SemanticException { throw new AssertionError("rethrowCalciteException didn't throw for " + e.getMessage()); } - RelNode modifiedOptimizedOptiqPlan = introduceProjectIfNeeded(optimizedOptiqPlan); + RelNode modifiedOptimizedOptiqPlan = + PlanModifierForConversion.convertOpTreeForReturnPath(optimizedOptiqPlan, topLevelFieldSchema); LOG.debug("Translating the following plan:\n" + RelOptUtil.toString(modifiedOptimizedOptiqPlan)); Operator hiveRoot = new HiveOpConverter(this, conf, unparseTranslator, topOps, @@ -639,30 +641,6 @@ Operator getOptimizedHiveOPDag() throws SemanticException { return genFileSinkPlan(getQB().getParseInfo().getClauseNames().iterator().next(), getQB(), hiveRoot); } - private RelNode introduceProjectIfNeeded(RelNode optimizedOptiqPlan) - throws CalciteSemanticException { - RelNode parent = null; - RelNode input = optimizedOptiqPlan; - RelNode newRoot = optimizedOptiqPlan; - - while (!(input instanceof Project) && (input instanceof Sort)) { - parent = input; - input = input.getInput(0); - } - - if (!(input instanceof Project)) { - HiveProject hpRel = HiveProject.create(input, - HiveCalciteUtil.getProjsFromBelowAsInputRef(input), input.getRowType().getFieldNames()); - if (input == optimizedOptiqPlan) { - newRoot = hpRel; - } else { - parent.replaceInput(0, hpRel); - } - } - - return newRoot; - } - /*** * Unwraps Calcite Invocation exceptions coming meta data provider chain and * obtains the real cause. @@ -2762,7 +2740,7 @@ private RelNode genLogicalPlan(QB qb, boolean outerMostQB) throws SemanticExcept // 3. The top level OB will not introduce constraining select due to Hive // limitation(#2) stated above. The RR for OB will not include VC. Thus // Result Schema will not include exprs used by top OB. During AST Conv, - // in the PlanModifierForASTConv we would modify the top level OB to + // in the PlanModifierForConversion we would modify the top level OB to // migrate exprs from input sel to SortRel (Note that Calcite doesn't // support this; but since we are done with Calcite at this point its OK). if (topConstrainingProjArgsRel != null) {