diff --git data/files/flights_tiny.txt data/files/flights_tiny.txt old mode 100644 new mode 100755 diff --git data/files/part.rc data/files/part.rc old mode 100644 new mode 100755 diff --git data/files/part.seq data/files/part.seq old mode 100644 new mode 100755 diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java index 582ef14..58a9b59 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java @@ -23,19 +23,19 @@ import java.util.LinkedHashMap; import java.util.Map; -import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.CommonJoinOperator; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.FilterOperator; -import org.apache.hadoop.hive.ql.exec.TableScanOperator; -import org.apache.hadoop.hive.ql.exec.SelectOperator; -import org.apache.hadoop.hive.ql.exec.ScriptOperator; import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.exec.LateralViewForwardOperator; +import org.apache.hadoop.hive.ql.exec.LateralViewJoinOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; -import org.apache.hadoop.hive.ql.exec.UnionOperator; -import org.apache.hadoop.hive.ql.exec.CommonJoinOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.PTFOperator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; -import org.apache.hadoop.hive.ql.exec.LateralViewJoinOperator; -import org.apache.hadoop.hive.ql.exec.LateralViewForwardOperator; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.ScriptOperator; +import org.apache.hadoop.hive.ql.exec.SelectOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; @@ -114,6 +114,9 @@ public ParseContext transform(ParseContext pactx) throws SemanticException { opRules.put(new RuleRegExp("R9", LateralViewForwardOperator.getOperatorName() + "%"), ColumnPrunerProcFactory.getLateralViewForwardProc()); + opRules.put(new RuleRegExp("R10", + PTFOperator.getOperatorName() + "%"), + ColumnPrunerProcFactory.getPTFProc()); // The dispatcher fires the processor corresponding to the closest matching // rule and passes the context along Dispatcher disp = new DefaultRuleDispatcher(ColumnPrunerProcFactory diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java index a9a93ad..a0ad7be 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.optimizer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -31,6 +32,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.CommonJoinOperator; +import org.apache.hadoop.hive.ql.exec.ExtractOperator; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.FilterOperator; import org.apache.hadoop.hive.ql.exec.GroupByOperator; @@ -40,6 +42,7 @@ import org.apache.hadoop.hive.ql.exec.LimitOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.PTFOperator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.exec.ScriptOperator; @@ -62,11 +65,21 @@ import org.apache.hadoop.hive.ql.plan.JoinDesc; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.PTFDesc; +import org.apache.hadoop.hive.ql.plan.PTFDesc.PTFExpressionDef; +import org.apache.hadoop.hive.ql.plan.PTFDesc.ShapeDetails; +import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowExpressionDef; +import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowFunctionDef; +import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowTableFunctionDef; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.SelectDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; /** * Factory for generating the different node processors used by ColumnPruner. @@ -148,6 +161,150 @@ public static ColumnPrunerGroupByProc getGroupByProc() { } /** + * - Pruning can only be done for Windowing. PTFs are black boxes, we assume all columns are needed. + * - add column names referenced in WindowFn args and in WindowFn expressions to the pruned list of the child Select Op. + * - Prune the Column names & types serde properties in each of the Shapes in the PTF Chain: + * - the InputDef's output shape + * - Window Tabl Functions: window output shape & output shape. + * - Why is pruning the Column names & types in the serde properties enough? + * - because during runtime we rebuild the OIs using these properties. + * - finally we set the prunedColList on the ColumnPrunerContx; and update the RR & signature on the PTFOp. + */ + public static class ColumnPrunerPTFProc implements NodeProcessor { + public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, + Object... nodeOutputs) throws SemanticException { + + PTFOperator op = (PTFOperator) nd; + PTFDesc conf = op.getConf(); + if (!conf.forWindowing()) { + return getDefaultProc().process(nd, stack, ctx, nodeOutputs); + } + + ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx; + WindowTableFunctionDef def = (WindowTableFunctionDef) conf.getFuncDef(); + ArrayList sig = new ArrayList(); + + List prunedCols = cppCtx.getPrunedColList(op.getChildOperators().get(0)); + prunedCols = new ArrayList(prunedCols); + prunedColumnsList(prunedCols, def); + pruneShape(def.getInput().getOutputShape(), prunedCols); + pruneShape(def.getOutputFromWdwFnProcessing(), prunedCols); + pruneShape(def.getOutputShape(), prunedCols); + + RowResolver newRR = new RowResolver(); + RowResolver oldRR = cppCtx.getOpToParseCtxMap().get(op).getRowResolver(); + buildPrunedRR(prunedCols, oldRR, newRR, sig); + cppCtx.getPrunedColLists().put(op, prunedInputList(prunedCols, def)); + cppCtx.getOpToParseCtxMap().get(op).setRowResolver(newRR); + op.getSchema().setSignature(sig); + return null; + } + + private static void buildPrunedRR(List prunedCols, + RowResolver oldRR, RowResolver newRR, ArrayList sig) throws SemanticException{ + for (String col : prunedCols) { + String[] nm = oldRR.reverseLookup(col); + ColumnInfo colInfo = oldRR.get(nm[0], nm[1]); + if (colInfo != null) { + newRR.put(nm[0], nm[1], colInfo); + sig.add(colInfo); + } + } + } + + /* + * add any input columns referenced in WindowFn args or expressions. + */ + private void prunedColumnsList(List prunedCols, WindowTableFunctionDef tDef) { + if ( tDef.getWindowFunctions() != null ) { + for(WindowFunctionDef wDef : tDef.getWindowFunctions() ) { + if ( wDef.getArgs() == null) { + continue; + } + for(PTFExpressionDef arg : wDef.getArgs()) { + ExprNodeDesc exprNode = arg.getExprNode(); + Utilities.mergeUniqElems(prunedCols, exprNode.getCols()); + } + } + } + if ( tDef.getWindowExpressions() != null ) { + for(WindowExpressionDef expr : tDef.getWindowExpressions()) { + ExprNodeDesc exprNode = expr.getExprNode(); + Utilities.mergeUniqElems(prunedCols, exprNode.getCols()); + } + } + } + + private List getLowerCasePrunedCols(List prunedCols){ + List lowerCasePrunedCols = new ArrayList(); + for (String col : prunedCols) { + lowerCasePrunedCols.add(col.toLowerCase()); + } + return lowerCasePrunedCols; + } + + /* + * reconstruct Column names & types list based on the prunedCols list. + */ + private void pruneShape(ShapeDetails shp, List prunedCols) { + List columnNames = Arrays.asList(shp.getSerdeProps().get( + org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMNS).split(",")); + List columnTypes = TypeInfoUtils + .getTypeInfosFromTypeString(shp.getSerdeProps().get( + org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMN_TYPES)); + /* + * fieldNames in OI are lower-cased. So we compare lower cased names for now. + */ + prunedCols = getLowerCasePrunedCols(prunedCols); + + StringBuilder cNames = new StringBuilder(); + StringBuilder cTypes = new StringBuilder(); + + boolean addComma = false; + for(int i=0; i < columnNames.size(); i++) { + if ( prunedCols.contains(columnNames.get(i)) ) { + cNames.append(addComma ? "," : ""); + cTypes.append(addComma ? "," : ""); + cNames.append(columnNames.get(i)); + cTypes.append(columnTypes.get(i)); + addComma = true; + } + } + shp.getSerdeProps().put( + org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMNS, cNames.toString()); + shp.getSerdeProps().put( + org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMN_TYPES, cTypes.toString()); + } + + /* + * from the prunedCols list filter out columns that refer to WindowFns or WindowExprs + * the returned list is set as the prunedList needed by the PTFOp. + */ + private ArrayList prunedInputList(List prunedCols, WindowTableFunctionDef tDef) { + ArrayList prunedInputCols = new ArrayList(); + + StructObjectInspector OI = tDef.getInput().getOutputShape().getOI(); + for(StructField f : OI.getAllStructFieldRefs()) { + String fName = f.getFieldName(); + if ( prunedCols.contains(fName)) { + prunedInputCols.add(fName); + } + } + + return prunedInputCols; + } + } + + /** + * Factory method to get the ColumnPrunerGroupByProc class. + * + * @return ColumnPrunerGroupByProc + */ + public static ColumnPrunerPTFProc getPTFProc() { + return new ColumnPrunerPTFProc(); + } + + /** * The Default Node Processor for Column Pruning. */ public static class ColumnPrunerDefaultProc implements NodeProcessor { @@ -285,6 +442,38 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, } Collections.sort(colLists); pruneReduceSinkOperator(flags, op, cppCtx); + } else if ((childOperators.size() == 1) + && (childOperators.get(0) instanceof ExtractOperator ) + && (childOperators.get(0).getChildOperators().size() == 1) + && (childOperators.get(0).getChildOperators().get(0) instanceof PTFOperator ) + && ((PTFOperator)childOperators.get(0).getChildOperators().get(0)).getConf().forWindowing() ) { + + /* + * For RS that are followed by Extract & PTFOp for windowing + * - do the same thing as above. Reconstruct ValueColumn list based on what is required + * by the PTFOp. + */ + + assert parentOperators.size() == 1; + + PTFOperator ptfOp = (PTFOperator) childOperators.get(0).getChildOperators().get(0); + List childCols = cppCtx.getPrunedColList(ptfOp); + boolean[] flags = new boolean[conf.getValueCols().size()]; + for (int i = 0; i < flags.length; i++) { + flags[i] = false; + } + if (childCols != null && childCols.size() > 0) { + ArrayList outColNames = op.getConf().getOutputValueColumnNames(); + for(int i=0; i < outColNames.size(); i++ ) { + if ( childCols.contains(outColNames.get(i))) { + ExprNodeDesc exprNode = op.getConf().getValueCols().get(i); + flags[i] = true; + Utilities.mergeUniqElems(colLists, exprNode.getCols()); + } + } + } + Collections.sort(colLists); + pruneReduceSinkOperator(flags, op, cppCtx); } else { // Reduce Sink contains the columns needed - no need to aggregate from // children @@ -831,4 +1020,4 @@ public static ColumnPrunerMapJoinProc getMapJoinProc() { return new ColumnPrunerMapJoinProc(); } -} +} \ No newline at end of file diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index f52cbff..efb1a58 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -10825,11 +10825,6 @@ private PTFDesc translateWindowingSpec(WindowingSpec wSpec, RowResolver inputRR) Operator genWindowingPlan(WindowingSpec wSpec, Operator input) throws SemanticException { RowResolver rr = opParseCtx.get(input).getRowResolver(); - input = putOpInsertMap(OperatorFactory.getAndMakeChild( - new SelectDesc(true), new RowSchema(rr.getColumnInfos()), - input), rr); - - rr = opParseCtx.get(input).getRowResolver(); input = genReduceSinkPlanForWindowing(wSpec, rr, input); rr = opParseCtx.get(input).getRowResolver(); @@ -10886,6 +10881,7 @@ private Operator genReduceSinkPlanForWindowing(WindowingSpec spec, * only if it is not a virtual column */ ArrayList colInfoList = inputRR.getColumnInfos(); + RowResolver rsNewRR = new RowResolver(); int pos = 0; for (ColumnInfo colInfo : colInfoList) { ExprNodeDesc valueColExpr = new ExprNodeColumnDesc(colInfo.getType(), colInfo @@ -10895,13 +10891,20 @@ private Operator genReduceSinkPlanForWindowing(WindowingSpec spec, colExprMap.put(colInfo.getInternalName(), valueColExpr); String outColName = SemanticAnalyzer.getColumnInternalName(pos++); outputColumnNames.add(outColName); + + String[] alias = inputRR.reverseLookup(colInfo.getInternalName()); + ColumnInfo newColInfo = new ColumnInfo( + outColName, colInfo.getType(), alias[0], + colInfo.getIsVirtualCol(), colInfo.isHiddenVirtualCol()); + rsNewRR.put(alias[0], alias[1], newColInfo); + } input = putOpInsertMap(OperatorFactory.getAndMakeChild(PlanUtils .getReduceSinkDesc(orderCols, valueCols, outputColumnNames, false, -1, partCols, orderString.toString(), -1), - new RowSchema(inputRR.getColumnInfos()), input), inputRR); + new RowSchema(inputRR.getColumnInfos()), input), rsNewRR); input.setColumnExprMap(colExprMap); diff --git ql/src/test/queries/clientpositive/ptf_general_queries.q ql/src/test/queries/clientpositive/ptf_general_queries.q index da01e0a..98eef29 100644 --- ql/src/test/queries/clientpositive/ptf_general_queries.q +++ ql/src/test/queries/clientpositive/ptf_general_queries.q @@ -28,6 +28,8 @@ FL_NUM string LOAD DATA LOCAL INPATH '../data/files/flights_tiny.txt' OVERWRITE INTO TABLE flights_tiny; +set hive.ptf.partition.persistence.memsize=10485760; + --1. test1 select p_mfgr, p_name, p_size, rank() as r, @@ -74,6 +76,13 @@ from part distribute by p_mfgr sort by p_name ; +-- 6. testJoinWithLeadLag +select p1.p_mfgr, p1.p_name, +p1.p_size, p1.p_size - lag(p1.p_size,1) as deltaSz +from part p1 join part p2 on p1.p_partkey = p2.p_partkey +distribute by p1.p_mfgr +sort by p1.p_name ; + -- 7. testJoinWithNoop select p_mfgr, p_name, p_size, p_size - lag(p_size,1) as deltaSz @@ -129,7 +138,7 @@ from noop(part partition by p_mfgr order by p_name ) -where p_size > 0 +having p_size > 0 distribute by p_mfgr sort by p_name; diff --git ql/src/test/results/clientpositive/ptf_general_queries.q.out ql/src/test/results/clientpositive/ptf_general_queries.q.out index 7021faa..f7cd48e 100644 --- ql/src/test/results/clientpositive/ptf_general_queries.q.out +++ ql/src/test/results/clientpositive/ptf_general_queries.q.out @@ -308,6 +308,52 @@ Manufacturer#2 almond aquamarine sandy cyan gainsboro 18 Manufacturer#4 almond aquamarine yellow dodger mint 7 Manufacturer#4 almond azure aquamarine papaya violet 12 Manufacturer#5 almond azure blanched chiffon midnight 23 +PREHOOK: query: -- 6. testJoinWithLeadLag +select p1.p_mfgr, p1.p_name, +p1.p_size, p1.p_size - lag(p1.p_size,1) as deltaSz +from part p1 join part p2 on p1.p_partkey = p2.p_partkey +distribute by p1.p_mfgr +sort by p1.p_name +PREHOOK: type: QUERY +PREHOOK: Input: default@part +#### A masked pattern was here #### +POSTHOOK: query: -- 6. testJoinWithLeadLag +select p1.p_mfgr, p1.p_name, +p1.p_size, p1.p_size - lag(p1.p_size,1) as deltaSz +from part p1 join part p2 on p1.p_partkey = p2.p_partkey +distribute by p1.p_mfgr +sort by p1.p_name +POSTHOOK: type: QUERY +POSTHOOK: Input: default@part +#### A masked pattern was here #### +Manufacturer#1 almond antique burnished rose metallic 2 0 +Manufacturer#1 almond antique burnished rose metallic 2 0 +Manufacturer#1 almond antique burnished rose metallic 2 0 +Manufacturer#1 almond antique burnished rose metallic 2 0 +Manufacturer#1 almond antique chartreuse lavender yellow 34 32 +Manufacturer#1 almond antique salmon chartreuse burlywood 6 -28 +Manufacturer#1 almond aquamarine burnished black steel 28 22 +Manufacturer#1 almond aquamarine pink moccasin thistle 42 14 +Manufacturer#2 almond antique violet chocolate turquoise 14 0 +Manufacturer#2 almond antique violet turquoise frosted 40 26 +Manufacturer#2 almond aquamarine midnight light salmon 2 -38 +Manufacturer#2 almond aquamarine rose maroon antique 25 23 +Manufacturer#2 almond aquamarine sandy cyan gainsboro 18 -7 +Manufacturer#3 almond antique chartreuse khaki white 17 0 +Manufacturer#3 almond antique forest lavender goldenrod 14 -3 +Manufacturer#3 almond antique metallic orange dim 19 5 +Manufacturer#3 almond antique misty red olive 1 -18 +Manufacturer#3 almond antique olive coral navajo 45 44 +Manufacturer#4 almond antique gainsboro frosted violet 10 0 +Manufacturer#4 almond antique violet mint lemon 39 29 +Manufacturer#4 almond aquamarine floral ivory bisque 27 -12 +Manufacturer#4 almond aquamarine yellow dodger mint 7 -20 +Manufacturer#4 almond azure aquamarine papaya violet 12 5 +Manufacturer#5 almond antique blue firebrick mint 31 0 +Manufacturer#5 almond antique medium spring khaki 6 -25 +Manufacturer#5 almond antique sky peru orange 2 -4 +Manufacturer#5 almond aquamarine dodger light gainsboro 46 44 +Manufacturer#5 almond azure blanched chiffon midnight 23 -23 PREHOOK: query: -- 7. testJoinWithNoop select p_mfgr, p_name, p_size, p_size - lag(p_size,1) as deltaSz @@ -559,7 +605,7 @@ from noop(part partition by p_mfgr order by p_name ) -where p_size > 0 +having p_size > 0 distribute by p_mfgr sort by p_name PREHOOK: type: QUERY @@ -574,7 +620,7 @@ from noop(part partition by p_mfgr order by p_name ) -where p_size > 0 +having p_size > 0 distribute by p_mfgr sort by p_name POSTHOOK: type: QUERY