diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java index 6aa584b..d2e3812 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java @@ -27,9 +27,13 @@ import java.util.Map; import java.util.Set; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollations; +import org.apache.calcite.rel.RelFieldCollation; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.SemiJoin; import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; import org.apache.calcite.util.Pair; import org.apache.commons.logging.Log; @@ -38,6 +42,7 @@ import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.JoinOperator; +import org.apache.hadoop.hive.ql.exec.LimitOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; @@ -49,6 +54,7 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinLeafPredicateInfo; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSort; import org.apache.hadoop.hive.ql.parse.JoinCond; import org.apache.hadoop.hive.ql.parse.JoinType; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -57,9 +63,11 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils; import org.apache.hadoop.hive.ql.plan.JoinCondDesc; import org.apache.hadoop.hive.ql.plan.JoinDesc; +import org.apache.hadoop.hive.ql.plan.LimitDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.SelectDesc; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -133,6 +141,8 @@ OpAttr dispatch(RelNode rn) throws SemanticException { HiveJoin hj = HiveJoin.getJoin(sj.getCluster(), sj.getLeft(), sj.getRight(), sj.getCondition(), sj.getJoinType(), true); return visit(hj); + } else if (rn instanceof HiveSort) { + return visit((HiveSort) rn); } LOG.error(rn.getClass().getCanonicalName() + "operator translation not supported" + " yet in return path."); @@ -185,6 +195,99 @@ OpAttr visit(HiveJoin joinRel) throws SemanticException { // 8. Return result return new OpAttr(null, vcolMap, joinOp); } + + OpAttr visit(HiveSort sortRel) throws SemanticException { + OpAttr inputOpAf = dispatch(sortRel.getInput()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Translating operator rel#" + sortRel.getId() + ":" + sortRel.getRelTypeName() + + " with row type: [" + sortRel.getRowType() + "]"); + if (sortRel.getCollation() == RelCollations.EMPTY) { + LOG.debug("Operator rel#" + sortRel.getId() + ":" + sortRel.getRelTypeName() + + " consists of limit"); + } + else if (sortRel.fetch == null) { + LOG.debug("Operator rel#" + sortRel.getId() + ":" + sortRel.getRelTypeName() + + " consists of sort"); + } + else { + LOG.debug("Operator rel#" + sortRel.getId() + ":" + sortRel.getRelTypeName() + + " consists of sort+limit"); + } + } + + Operator inputOp = inputOpAf.inputs.get(0); + Operator resultOp = inputOpAf.inputs.get(0); + // 1. If we need to sort tuples based on the value of some + // of their columns + if (sortRel.getCollation() != RelCollations.EMPTY) { + + // In strict mode, in the presence of order by, limit must be specified + if (strictMode && sortRel.fetch == null) { + throw new SemanticException(ErrorMsg.NO_LIMIT_WITH_ORDERBY.getMsg()); + } + + // 1.a. Extract order for each column from collation + // Generate sortCols and order + List sortCols = new ArrayList(); + StringBuilder order = new StringBuilder(); + for (RelCollation collation : sortRel.getCollationList()) { + for (RelFieldCollation sortInfo : collation.getFieldCollations()) { + int sortColumnPos = sortInfo.getFieldIndex(); + ColumnInfo columnInfo = new ColumnInfo(inputOp.getSchema().getSignature(). + get(sortColumnPos)); + ExprNodeColumnDesc sortColumn = new ExprNodeColumnDesc(columnInfo.getType(), + columnInfo.getInternalName(), columnInfo.getTabAlias(), + columnInfo.getIsVirtualCol()); + sortCols.add(sortColumn); + if (sortInfo.getDirection() == RelFieldCollation.Direction.DESCENDING) { + order.append("-"); + } + else { + order.append("+"); + } + } + } + // Use only 1 reducer for order by + int numReducers = 1; + + // 1.b. Generate reduce sink + resultOp = genReduceSink(resultOp, sortCols.toArray(new ExprNodeDesc[sortCols.size()]), + -1, new ArrayList(), order.toString(), numReducers, + Operation.NOT_ACID, strictMode); + + // 1.c. Generate project operator + Map descriptors = buildBacktrackFromReduceSink( + (ReduceSinkOperator) resultOp, inputOp); + SelectDesc selectDesc = new SelectDesc( + new ArrayList(descriptors.values()), + new ArrayList(descriptors.keySet())); + ArrayList cinfoLst = createColInfos(inputOp); + resultOp = OperatorFactory.getAndMakeChild(selectDesc, + new RowSchema(cinfoLst), resultOp); + resultOp.setColumnExprMap(descriptors); + if (LOG.isDebugEnabled()) { + LOG.debug("Generated " + resultOp + " with row schema: [" + resultOp.getSchema() + "]"); + } + } + + // 2. If we need to generate limit + if (sortRel.fetch != null) { + int limit = RexLiteral.intValue(sortRel.fetch); + LimitDesc limitDesc = new LimitDesc(limit); + // TODO: Set 'last limit' global property + ArrayList cinfoLst = createColInfos(inputOp); + resultOp = (LimitOperator) OperatorFactory.getAndMakeChild( + limitDesc, new RowSchema(cinfoLst), resultOp); + + if (LOG.isDebugEnabled()) { + LOG.debug("Generated " + resultOp + " with row schema: [" + resultOp.getSchema() + "]"); + } + } + + // 3. Return result + return inputOpAf.clone(resultOp); + } private static ExprNodeDesc[][] extractJoinKeys(JoinPredicateInfo joinPredInfo, List inputs) {