diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java index 0690fb7..884999a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hive.ql.exec.LimitOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorFactory; import org.apache.hadoop.hive.ql.exec.PTFOperator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.RowSchema; @@ -106,9 +107,57 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, filterOpPrunedColListsOrderPreserved); pruneOperator(cppCtx, op, cppCtx.getPrunedColLists().get(op)); - + insertSelectOpIfNeeded(op, stack, ctx, nodeOutputs); return null; } + + /* + * If the FilterOperator is on the Reduce-side for a OrderBy + * or Limit followed by OrderBy then we need to put a SelectOp + * before it, so that only the columns it expects are passed to it + * at runtime. + */ + @SuppressWarnings("unchecked") + private void insertSelectOpIfNeeded(FilterOperator op, + Stack stack, NodeProcessorCtx ctx, + Object... nodeOutputs) throws SemanticException { + Operator parentOp = op.getParentOperators().get(0); + if ( parentOp instanceof LimitOperator || parentOp instanceof ExtractOperator ) { + ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx; + OpParseContext pParseCtx = cppCtx.getOpToParseCtxMap().get(parentOp); + op.removeParent(parentOp); + SelectDesc selDesc = new SelectDesc(new ArrayList(), new ArrayList(), true); + Operator selectOp = + OperatorFactory.getAndMakeChild( + selDesc, + new RowSchema(pParseCtx.getRowResolver().getRowSchema()), parentOp); + OperatorFactory.makeChild(selectOp, op); + + List colNames = cppCtx.getPrunedColLists().get(op); + RowResolver parentRR = cppCtx.getOpToParseCtxMap().get(parentOp).getRowResolver(); + RowResolver selectRR = new RowResolver(); + ArrayList selectSignature = new ArrayList(); + List selectExprList = new ArrayList(); + List outputColumnNames = new ArrayList(); + + for(String col : colNames) { + String[] pCol = parentRR.reverseLookup(col); + ColumnInfo pColInfo = parentRR.get(pCol[0], pCol[1]); + ColumnInfo colInfo = new ColumnInfo(pColInfo); + selectExprList.add(new ExprNodeColumnDesc(colInfo + .getType(), colInfo.getInternalName(), colInfo.getTabAlias(), + colInfo.getIsVirtualCol())); + selectRR.put(pCol[0], pCol[1], colInfo); + selectSignature.add(colInfo); + outputColumnNames.add(col); + } + OpParseContext selectPCtx = new OpParseContext(selectRR); + cppCtx.getOpToParseCtxMap().put(selectOp, selectPCtx); + selectOp.setSchema(new RowSchema(selectSignature)); + selDesc.setColList(selectExprList); + selDesc.setOutputColumnNames(outputColumnNames); + } + } } /**