diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java index d15520c..1f8d3c8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java @@ -27,7 +27,6 @@ import java.util.Map; import java.util.Set; -import org.apache.calcite.rel.RelCollation; import org.apache.calcite.rel.RelCollations; import org.apache.calcite.rel.RelDistribution; import org.apache.calcite.rel.RelDistribution.Type; @@ -38,6 +37,7 @@ import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.ImmutableBitSet; import org.apache.calcite.util.Pair; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -365,6 +365,7 @@ OpAttr visit(HiveSort sortRel) throws SemanticException { Operator inputOp = inputOpAf.inputs.get(0); Operator resultOp = inputOpAf.inputs.get(0); + // 1. If we need to sort tuples based on the value of some // of their columns if (sortRel.getCollation() != RelCollations.EMPTY) { @@ -377,30 +378,51 @@ OpAttr visit(HiveSort sortRel) throws SemanticException { // 1.a. Extract order for each column from collation // Generate sortCols and order + ImmutableBitSet.Builder sortColsPosBuilder = new ImmutableBitSet.Builder(); + ImmutableBitSet.Builder sortOutputColsPosBuilder = new ImmutableBitSet.Builder(); + Map obRefToCallMap = sortRel.getInputRefToCallMap(); List sortCols = new ArrayList(); StringBuilder order = new StringBuilder(); - for (RelCollation collation : sortRel.getCollationList()) { - for (RelFieldCollation sortInfo : collation.getFieldCollations()) { - int sortColumnPos = sortInfo.getFieldIndex(); - ColumnInfo columnInfo = new ColumnInfo(inputOp.getSchema().getSignature() - .get(sortColumnPos)); - ExprNodeColumnDesc sortColumn = new ExprNodeColumnDesc(columnInfo.getType(), - columnInfo.getInternalName(), columnInfo.getTabAlias(), columnInfo.getIsVirtualCol()); - sortCols.add(sortColumn); - if (sortInfo.getDirection() == RelFieldCollation.Direction.DESCENDING) { - order.append("-"); - } else { - order.append("+"); + for (RelFieldCollation sortInfo : sortRel.getCollation().getFieldCollations()) { + int sortColumnPos = sortInfo.getFieldIndex(); + ColumnInfo columnInfo = new ColumnInfo(inputOp.getSchema().getSignature() + .get(sortColumnPos)); + ExprNodeColumnDesc sortColumn = new ExprNodeColumnDesc(columnInfo.getType(), + columnInfo.getInternalName(), columnInfo.getTabAlias(), columnInfo.getIsVirtualCol()); + sortCols.add(sortColumn); + if (sortInfo.getDirection() == RelFieldCollation.Direction.DESCENDING) { + order.append("-"); + } else { + order.append("+"); + } + + if (obRefToCallMap != null) { + RexNode obExpr = obRefToCallMap.get(sortColumnPos); + sortColsPosBuilder.set(sortColumnPos); + if (obExpr == null) { + sortOutputColsPosBuilder.set(sortColumnPos); } } } // Use only 1 reducer for order by int numReducers = 1; + + // We keep the columns only the columns that are part of the final output + List keepColumns = new ArrayList(); + final ImmutableBitSet sortColsPos = sortColsPosBuilder.build(); + final ImmutableBitSet sortOutputColsPos = sortOutputColsPosBuilder.build(); + final ArrayList inputSchema = inputOp.getSchema().getSignature(); + for (int pos=0; pos(), - order.toString(), numReducers, Operation.NOT_ACID, strictMode); + order.toString(), numReducers, Operation.NOT_ACID, strictMode, keepColumns); } // 2. If we need to generate limit @@ -584,18 +606,27 @@ private OpAttr genPTF(OpAttr inputOpAf, WindowingSpec wSpec) throws SemanticExce } private static SelectOperator genReduceSinkAndBacktrackSelect(Operator input, + ExprNodeDesc[] keys, int tag, ArrayList partitionCols, String order, + int numReducers, Operation acidOperation, boolean strictMode) throws SemanticException { + return genReduceSinkAndBacktrackSelect(input, keys, tag, partitionCols, order, + numReducers, acidOperation, strictMode, input.getSchema().getColumnNames()); + } + + private static SelectOperator genReduceSinkAndBacktrackSelect(Operator input, ExprNodeDesc[] keys, int tag, ArrayList partitionCols, String order, - int numReducers, Operation acidOperation, boolean strictMode) throws SemanticException { + int numReducers, Operation acidOperation, boolean strictMode, + List keepColNames) throws SemanticException { // 1. Generate RS operator ReduceSinkOperator rsOp = genReduceSink(input, keys, tag, partitionCols, order, numReducers, acidOperation, strictMode); // 2. Generate backtrack Select operator - Map descriptors = buildBacktrackFromReduceSink(rsOp, - input); + Map descriptors = buildBacktrackFromReduceSink(keepColNames, + rsOp.getConf().getOutputKeyColumnNames(), rsOp.getConf().getOutputValueColumnNames(), + rsOp.getValueIndex(), input); SelectDesc selectDesc = new SelectDesc(new ArrayList(descriptors.values()), new ArrayList(descriptors.keySet())); - ArrayList cinfoLst = createColInfos(input); + ArrayList cinfoLst = createColInfosSubset(input, keepColNames); SelectOperator selectOp = (SelectOperator) OperatorFactory.getAndMakeChild(selectDesc, new RowSchema(cinfoLst), rsOp); selectOp.setColumnExprMap(descriptors); @@ -763,7 +794,7 @@ private static JoinOperator genJoin(HiveJoin hiveJoin, JoinPredicateInfo joinPre posToAliasMap.put(pos, new HashSet(inputRS.getSchema().getTableNames())); - Map descriptors = buildBacktrackFromReduceSink(outputPos, + Map descriptors = buildBacktrackFromReduceSinkForJoin(outputPos, outputColumnNames, keyColNames, valColNames, index, parent); List parentColumns = parent.getSchema().getSignature(); @@ -827,14 +858,7 @@ private static JoinType extractJoinType(HiveJoin join) { return resultJoinType; } - private static Map buildBacktrackFromReduceSink(ReduceSinkOperator rsOp, - Operator inputOp) { - return buildBacktrackFromReduceSink(0, inputOp.getSchema().getColumnNames(), rsOp.getConf() - .getOutputKeyColumnNames(), rsOp.getConf().getOutputValueColumnNames(), - rsOp.getValueIndex(), inputOp); - } - - private static Map buildBacktrackFromReduceSink(int initialPos, + private static Map buildBacktrackFromReduceSinkForJoin(int initialPos, List outputColumnNames, List keyColNames, List valueColNames, int[] index, Operator inputOp) { Map columnDescriptors = new LinkedHashMap(); @@ -853,6 +877,29 @@ private static JoinType extractJoinType(HiveJoin join) { return columnDescriptors; } + private static Map buildBacktrackFromReduceSink(List keepColNames, + List keyColNames, List valueColNames, int[] index, Operator inputOp) { + Map columnDescriptors = new LinkedHashMap(); + int pos = 0; + for (int i = 0; i < index.length; i++) { + ColumnInfo info = inputOp.getSchema().getSignature().get(i); + if (pos < keepColNames.size() && + info.getInternalName().equals(keepColNames.get(pos))) { + String field; + if (index[i] >= 0) { + field = Utilities.ReduceField.KEY + "." + keyColNames.get(index[i]); + } else { + field = Utilities.ReduceField.VALUE + "." + valueColNames.get(-index[i] - 1); + } + ExprNodeColumnDesc desc = new ExprNodeColumnDesc(info.getType(), field, info.getTabAlias(), + info.getIsVirtualCol()); + columnDescriptors.put(keepColNames.get(pos), desc); + pos++; + } + } + return columnDescriptors; + } + private static ExprNodeDesc convertToExprNode(RexNode rn, RelNode inputRel, String tabAlias) { return rn.accept(new ExprNodeConverter(tabAlias, inputRel.getRowType(), false, inputRel.getCluster().getTypeFactory())); @@ -866,6 +913,20 @@ private static ExprNodeDesc convertToExprNode(RexNode rn, RelNode inputRel, Stri return cInfoLst; } + private static ArrayList createColInfosSubset(Operator input, + List keepColNames) { + ArrayList cInfoLst = new ArrayList(); + int pos = 0; + for (ColumnInfo ci : input.getSchema().getSignature()) { + if (pos < keepColNames.size() && + ci.getInternalName().equals(keepColNames.get(pos))) { + cInfoLst.add(new ColumnInfo(ci)); + pos++; + } + } + return cInfoLst; + } + private static Pair, Map> createColInfos( List calciteExprs, List hiveExprs, List projNames, OpAttr inpOpAf) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/PlanModifierForASTConv.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/PlanModifierForASTConv.java index cba37bc..d7d8e75 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/PlanModifierForASTConv.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/PlanModifierForASTConv.java @@ -18,14 +18,11 @@ package org.apache.hadoop.hive.ql.optimizer.calcite.translator; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; -import java.util.Set; import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.plan.hep.HepRelVertex; import org.apache.calcite.plan.volcano.RelSubset; -import org.apache.calcite.rel.RelCollationImpl; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.SingleRel; import org.apache.calcite.rel.core.Aggregate; @@ -38,19 +35,14 @@ import org.apache.calcite.rel.rules.MultiJoin; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rex.RexBuilder; -import org.apache.calcite.rex.RexCall; -import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.SqlAggFunction; -import org.apache.calcite.sql.SqlKind; import org.apache.calcite.util.Pair; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; -import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSort; @@ -58,11 +50,12 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; public class PlanModifierForASTConv { + private static final Log LOG = LogFactory.getLog(PlanModifierForASTConv.class); + public static RelNode convertOpTree(RelNode rel, List resultSchema) throws CalciteSemanticException { RelNode newTopNode = rel; @@ -71,7 +64,7 @@ public static RelNode convertOpTree(RelNode rel, List resultSchema) } if (!(newTopNode instanceof Project) && !(newTopNode instanceof Sort)) { - newTopNode = introduceDerivedTable(newTopNode); + newTopNode = PlanModifierUtil.introduceDerivedTable(newTopNode); if (LOG.isDebugEnabled()) { LOG.debug("Plan after top-level introduceDerivedTable\n " + RelOptUtil.toString(newTopNode)); @@ -84,7 +77,7 @@ public static RelNode convertOpTree(RelNode rel, List resultSchema) } Pair topSelparentPair = HiveCalciteUtil.getTopLevelSelect(newTopNode); - fixTopOBSchema(newTopNode, topSelparentPair, resultSchema); + PlanModifierUtil.fixTopOBSchema(newTopNode, topSelparentPair, resultSchema, true); if (LOG.isDebugEnabled()) { LOG.debug("Plan after fixTopOBSchema\n " + RelOptUtil.toString(newTopNode)); } @@ -176,79 +169,6 @@ private static void convertOpTree(RelNode rel, RelNode parent) { } } - private static void fixTopOBSchema(final RelNode rootRel, - Pair topSelparentPair, List resultSchema) - throws CalciteSemanticException { - if (!(topSelparentPair.getKey() instanceof Sort) - || !HiveCalciteUtil.orderRelNode(topSelparentPair.getKey())) { - return; - } - HiveSort obRel = (HiveSort) topSelparentPair.getKey(); - Project obChild = (Project) topSelparentPair.getValue(); - if (obChild.getRowType().getFieldCount() <= resultSchema.size()) { - return; - } - - RelDataType rt = obChild.getRowType(); - @SuppressWarnings({ "unchecked", "rawtypes" }) - Set collationInputRefs = new HashSet( - RelCollationImpl.ordinals(obRel.getCollation())); - ImmutableMap.Builder inputRefToCallMapBldr = ImmutableMap.builder(); - for (int i = resultSchema.size(); i < rt.getFieldCount(); i++) { - if (collationInputRefs.contains(i)) { - RexNode obyExpr = obChild.getChildExps().get(i); - if (obyExpr instanceof RexCall) { - int a = -1; - List operands = new ArrayList<>(); - for (int k = 0; k< ((RexCall) obyExpr).operands.size(); k++) { - RexNode rn = ((RexCall) obyExpr).operands.get(k); - for (int j = 0; j < resultSchema.size(); j++) { - if( obChild.getChildExps().get(j).toString().equals(rn.toString())) { - a = j; - break; - } - } if (a != -1) { - operands.add(new RexInputRef(a, rn.getType())); - } else { - operands.add(rn); - } - a = -1; - } - obyExpr = obChild.getCluster().getRexBuilder().makeCall(((RexCall)obyExpr).getOperator(), operands); - } - inputRefToCallMapBldr.put(i, obyExpr); - } - } - ImmutableMap inputRefToCallMap = inputRefToCallMapBldr.build(); - - if ((obChild.getRowType().getFieldCount() - inputRefToCallMap.size()) != resultSchema.size()) { - LOG.error(generateInvalidSchemaMessage(obChild, resultSchema, inputRefToCallMap.size())); - throw new CalciteSemanticException("Result Schema didn't match Optimized Op Tree Schema"); - } - // This removes order-by only expressions from the projections. - HiveProject replacementProjectRel = HiveProject.create(obChild.getInput(), obChild - .getChildExps().subList(0, resultSchema.size()), obChild.getRowType().getFieldNames() - .subList(0, resultSchema.size())); - obRel.replaceInput(0, replacementProjectRel); - obRel.setInputRefToCallMap(inputRefToCallMap); - } - - private static String generateInvalidSchemaMessage(Project topLevelProj, - List resultSchema, int fieldsForOB) { - String errorDesc = "Result Schema didn't match Calcite Optimized Op Tree; schema: "; - for (FieldSchema fs : resultSchema) { - errorDesc += "[" + fs.getName() + ":" + fs.getType() + "], "; - } - errorDesc += " projection fields: "; - for (RexNode exp : topLevelProj.getChildExps()) { - errorDesc += "[" + exp.toString() + ":" + exp.getType() + "], "; - } - if (fieldsForOB != 0) { - errorDesc += fieldsForOB + " fields removed due to ORDER BY "; - } - return errorDesc.substring(0, errorDesc.length() - 2); - } - private static RelNode renameTopLevelSelectInResultSchema(final RelNode rootRel, Pair topSelparentPair, List resultSchema) throws CalciteSemanticException { @@ -260,7 +180,7 @@ private static RelNode renameTopLevelSelectInResultSchema(final RelNode rootRel, List rootChildExps = originalProjRel.getChildExps(); if (resultSchema.size() != rootChildExps.size()) { // Safeguard against potential issues in CBO RowResolver construction. Disable CBO for now. - LOG.error(generateInvalidSchemaMessage(originalProjRel, resultSchema, 0)); + LOG.error(PlanModifierUtil.generateInvalidSchemaMessage(originalProjRel, resultSchema, 0)); throw new CalciteSemanticException("Result Schema didn't match Optimized Op Tree Schema"); } @@ -285,15 +205,6 @@ private static RelNode renameTopLevelSelectInResultSchema(final RelNode rootRel, } } - private static RelNode introduceDerivedTable(final RelNode rel) { - List projectList = HiveCalciteUtil.getProjsFromBelowAsInputRef(rel); - - HiveProject select = HiveProject.create(rel.getCluster(), rel, projectList, - rel.getRowType(), rel.getCollationList()); - - return select; - } - private static RelNode introduceDerivedTable(final RelNode rel, RelNode parent) { int i = 0; int pos = -1; @@ -311,7 +222,7 @@ private static RelNode introduceDerivedTable(final RelNode rel, RelNode parent) throw new RuntimeException("Couldn't find child node in parent's inputs"); } - RelNode select = introduceDerivedTable(rel); + RelNode select = PlanModifierUtil.introduceDerivedTable(rel); parent.replaceInput(pos, select); @@ -434,7 +345,7 @@ private static void replaceEmptyGroupAggr(final RelNode rel, RelNode parent) { Aggregate newAggRel = oldAggRel.copy(oldAggRel.getTraitSet(), oldAggRel.getInput(), oldAggRel.indicator, oldAggRel.getGroupSet(), oldAggRel.getGroupSets(), ImmutableList.of(dummyCall)); - RelNode select = introduceDerivedTable(newAggRel); + RelNode select = PlanModifierUtil.introduceDerivedTable(newAggRel); parent.replaceInput(0, select); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/PlanModifierForReturnPath.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/PlanModifierForReturnPath.java new file mode 100644 index 0000000..9c24bad --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/PlanModifierForReturnPath.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.translator; + +import java.util.List; + +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.util.Pair; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; + +public class PlanModifierForReturnPath { + + private static final Log LOG = LogFactory.getLog(PlanModifierForReturnPath.class); + + + public static RelNode convertOpTree(RelNode rel, List resultSchema) + throws CalciteSemanticException { + RelNode newTopNode = rel; + + if (!(newTopNode instanceof Project) && !(newTopNode instanceof Sort)) { + newTopNode = PlanModifierUtil.introduceDerivedTable(newTopNode); + } + + Pair topSelparentPair = HiveCalciteUtil.getTopLevelSelect(newTopNode); + PlanModifierUtil.fixTopOBSchema(newTopNode, topSelparentPair, resultSchema, false); + + return newTopNode; + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/PlanModifierUtil.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/PlanModifierUtil.java new file mode 100644 index 0000000..c16f142 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/PlanModifierUtil.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.translator; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.calcite.rel.RelCollations; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.Pair; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSort; + +import com.google.common.collect.ImmutableMap; + +public class PlanModifierUtil { + + private static final Log LOG = LogFactory.getLog(PlanModifierUtil.class); + + + protected static RelNode introduceDerivedTable(final RelNode rel) { + List projectList = HiveCalciteUtil.getProjsFromBelowAsInputRef(rel); + + HiveProject select = HiveProject.create(rel.getCluster(), rel, projectList, + rel.getRowType(), rel.getCollationList()); + + return select; + } + + protected static void fixTopOBSchema(final RelNode rootRel, + Pair topSelparentPair, List resultSchema, + boolean replaceProject) throws CalciteSemanticException { + if (!(topSelparentPair.getKey() instanceof Sort) + || !HiveCalciteUtil.orderRelNode(topSelparentPair.getKey())) { + return; + } + HiveSort obRel = (HiveSort) topSelparentPair.getKey(); + Project obChild = (Project) topSelparentPair.getValue(); + if (obChild.getRowType().getFieldCount() <= resultSchema.size()) { + return; + } + + RelDataType rt = obChild.getRowType(); + @SuppressWarnings({ "unchecked", "rawtypes" }) + Set collationInputRefs = new HashSet( + RelCollations.ordinals(obRel.getCollation())); + ImmutableMap.Builder inputRefToCallMapBldr = ImmutableMap.builder(); + for (int i = resultSchema.size(); i < rt.getFieldCount(); i++) { + if (collationInputRefs.contains(i)) { + RexNode obyExpr = obChild.getChildExps().get(i); + if (obyExpr instanceof RexCall) { + int a = -1; + List operands = new ArrayList<>(); + for (int k = 0; k< ((RexCall) obyExpr).operands.size(); k++) { + RexNode rn = ((RexCall) obyExpr).operands.get(k); + for (int j = 0; j < resultSchema.size(); j++) { + if( obChild.getChildExps().get(j).toString().equals(rn.toString())) { + a = j; + break; + } + } if (a != -1) { + operands.add(new RexInputRef(a, rn.getType())); + } else { + operands.add(rn); + } + a = -1; + } + obyExpr = obChild.getCluster().getRexBuilder().makeCall(((RexCall)obyExpr).getOperator(), operands); + } + inputRefToCallMapBldr.put(i, obyExpr); + } + } + ImmutableMap inputRefToCallMap = inputRefToCallMapBldr.build(); + + if ((obChild.getRowType().getFieldCount() - inputRefToCallMap.size()) != resultSchema.size()) { + LOG.error(generateInvalidSchemaMessage(obChild, resultSchema, inputRefToCallMap.size())); + throw new CalciteSemanticException("Result Schema didn't match Optimized Op Tree Schema"); + } + + if (replaceProject) { + // This removes order-by only expressions from the projections. + HiveProject replacementProjectRel = HiveProject.create(obChild.getInput(), obChild + .getChildExps().subList(0, resultSchema.size()), obChild.getRowType().getFieldNames() + .subList(0, resultSchema.size())); + obRel.replaceInput(0, replacementProjectRel); + } + obRel.setInputRefToCallMap(inputRefToCallMap); + } + + protected static String generateInvalidSchemaMessage(Project topLevelProj, + List resultSchema, int fieldsForOB) { + String errorDesc = "Result Schema didn't match Calcite Optimized Op Tree; schema: "; + for (FieldSchema fs : resultSchema) { + errorDesc += "[" + fs.getName() + ":" + fs.getType() + "], "; + } + errorDesc += " projection fields: "; + for (RexNode exp : topLevelProj.getChildExps()) { + errorDesc += "[" + exp.toString() + ":" + exp.getType() + "], "; + } + if (fieldsForOB != 0) { + errorDesc += fieldsForOB + " fields removed due to ORDER BY "; + } + return errorDesc.substring(0, errorDesc.length() - 2); + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index 6a15bf6..7fa8a77 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -58,10 +58,8 @@ import org.apache.calcite.rel.core.Filter; import org.apache.calcite.rel.core.Join; import org.apache.calcite.rel.core.JoinRelType; -import org.apache.calcite.rel.core.Project; import org.apache.calcite.rel.core.RelFactories; import org.apache.calcite.rel.core.SemiJoin; -import org.apache.calcite.rel.core.Sort; import org.apache.calcite.rel.metadata.CachingRelMetadataProvider; import org.apache.calcite.rel.metadata.ChainedRelMetadataProvider; import org.apache.calcite.rel.metadata.RelMetadataProvider; @@ -147,6 +145,7 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.translator.HiveOpConverter; import org.apache.hadoop.hive.ql.optimizer.calcite.translator.JoinCondTypeCheckProcFactory; import org.apache.hadoop.hive.ql.optimizer.calcite.translator.JoinTypeCheckCtx; +import org.apache.hadoop.hive.ql.optimizer.calcite.translator.PlanModifierForReturnPath; import org.apache.hadoop.hive.ql.optimizer.calcite.translator.RexNodeConverter; import org.apache.hadoop.hive.ql.optimizer.calcite.translator.SqlFunctionConverter; import org.apache.hadoop.hive.ql.optimizer.calcite.translator.TypeConverter; @@ -629,7 +628,8 @@ Operator getOptimizedHiveOPDag() throws SemanticException { throw new AssertionError("rethrowCalciteException didn't throw for " + e.getMessage()); } - RelNode modifiedOptimizedOptiqPlan = introduceProjectIfNeeded(optimizedOptiqPlan); + RelNode modifiedOptimizedOptiqPlan = + PlanModifierForReturnPath.convertOpTree(optimizedOptiqPlan, topLevelFieldSchema); LOG.debug("Translating the following plan:\n" + RelOptUtil.toString(modifiedOptimizedOptiqPlan)); Operator hiveRoot = new HiveOpConverter(this, conf, unparseTranslator, topOps, @@ -639,30 +639,6 @@ Operator getOptimizedHiveOPDag() throws SemanticException { return genFileSinkPlan(getQB().getParseInfo().getClauseNames().iterator().next(), getQB(), hiveRoot); } - private RelNode introduceProjectIfNeeded(RelNode optimizedOptiqPlan) - throws CalciteSemanticException { - RelNode parent = null; - RelNode input = optimizedOptiqPlan; - RelNode newRoot = optimizedOptiqPlan; - - while (!(input instanceof Project) && (input instanceof Sort)) { - parent = input; - input = input.getInput(0); - } - - if (!(input instanceof Project)) { - HiveProject hpRel = HiveProject.create(input, - HiveCalciteUtil.getProjsFromBelowAsInputRef(input), input.getRowType().getFieldNames()); - if (input == optimizedOptiqPlan) { - newRoot = hpRel; - } else { - parent.replaceInput(0, hpRel); - } - } - - return newRoot; - } - /*** * Unwraps Calcite Invocation exceptions coming meta data provider chain and * obtains the real cause.