diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java index 6042470..83a6224 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Stack; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileStatus; @@ -50,6 +51,7 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.TableAccessAnalyzer; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; @@ -456,10 +458,19 @@ protected void convertMapJoinToBucketMapJoin( public static List toColumns(List keys) { List columns = new ArrayList(); for (ExprNodeDesc key : keys) { - if (!(key instanceof ExprNodeColumnDesc)) { + if (key instanceof ExprNodeColumnDesc) { + columns.add(((ExprNodeColumnDesc) key).getColumn()); + } else if ((key instanceof ExprNodeConstantDesc)) { + ExprNodeConstantDesc constant = (ExprNodeConstantDesc) key; + String colName = constant.getFoldedFromCol(); + if (colName == null){ + return null; + } else { + columns.add(colName); + } + } else { return null; } - columns.add(((ExprNodeColumnDesc) key).getColumn()); } return columns; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagate.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagate.java index 3c8940f..b12d3a8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagate.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagate.java @@ -82,9 +82,7 @@ public ParseContext transform(ParseContext pactx) throws SemanticException { // if the later is enabled. return pactx; } - if (pactx.getConf().getBoolVar(ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN)) { - return pactx; - } + pGraphContext = pactx; opToParseCtxMap = pGraphContext.getOpParseCtx(); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java index c1cc9f4..a060709 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java @@ -552,6 +552,7 @@ private static void foldOperator(Operator op, * conditional expressions and extract assignment expressions and propagate them. */ public static class ConstantPropagateFilterProc implements NodeProcessor { + @Override public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException { FilterOperator op = (FilterOperator) nd; @@ -594,6 +595,7 @@ public static ConstantPropagateFilterProc getFilterProc() { * Node Processor for Constant Propagate for Group By Operators. */ public static class ConstantPropagateGroupByProc implements NodeProcessor { + @Override public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException { GroupByOperator op = (GroupByOperator) nd; @@ -630,6 +632,7 @@ public static ConstantPropagateGroupByProc getGroupByProc() { * The Default Node Processor for Constant Propagation. */ public static class ConstantPropagateDefaultProc implements NodeProcessor { + @Override @SuppressWarnings("unchecked") public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException { @@ -658,6 +661,7 @@ public static ConstantPropagateDefaultProc getDefaultProc() { * The Node Processor for Constant Propagation for Select Operators. */ public static class ConstantPropagateSelectProc implements NodeProcessor { + @Override public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException { SelectOperator op = (SelectOperator) nd; @@ -691,6 +695,7 @@ public static ConstantPropagateSelectProc getSelectProc() { * propagation, this processor also prunes dynamic partitions to static partitions if possible. */ public static class ConstantPropagateFileSinkProc implements NodeProcessor { + @Override public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException { FileSinkOperator op = (FileSinkOperator) nd; @@ -743,6 +748,7 @@ public static NodeProcessor getFileSinkProc() { * Currently these kinds of Operators include UnionOperator and ScriptOperator. */ public static class ConstantPropagateStopProc implements NodeProcessor { + @Override public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException { Operator op = (Operator) nd; @@ -763,6 +769,7 @@ public static NodeProcessor getStopProc() { * join (left table for left outer join and vice versa) can be propagated. */ public static class ConstantPropagateReduceSinkProc implements NodeProcessor { + @Override public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException { ReduceSinkOperator op = (ReduceSinkOperator) nd; @@ -795,7 +802,11 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object.. // key columns ArrayList newKeyEpxrs = new ArrayList(); for (ExprNodeDesc desc : rsDesc.getKeyCols()) { - newKeyEpxrs.add(foldExpr(desc, constants, cppCtx, op, 0, false)); + ExprNodeDesc newDesc = foldExpr(desc, constants, cppCtx, op, 0, false); + if (newDesc != desc && desc instanceof ExprNodeColumnDesc && newDesc instanceof ExprNodeConstantDesc) { + ((ExprNodeConstantDesc)newDesc).setFoldedFromCol(((ExprNodeColumnDesc)desc).getColumn()); + } + newKeyEpxrs.add(newDesc); } rsDesc.setKeyCols(newKeyEpxrs); @@ -854,6 +865,7 @@ public static NodeProcessor getReduceSinkProc() { * The Node Processor for Constant Propagation for Join Operators. */ public static class ConstantPropagateJoinProc implements NodeProcessor { + @Override public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException { JoinOperator op = (JoinOperator) nd; @@ -916,6 +928,7 @@ public static NodeProcessor getJoinProc() { * The Node Processor for Constant Propagation for Table Scan Operators. */ public static class ConstantPropagateTableScanProc implements NodeProcessor { + @Override public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException { TableScanOperator op = (TableScanOperator) nd; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeJoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeJoinProc.java index 5f7682e..8a0c474 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeJoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeJoinProc.java @@ -18,17 +18,13 @@ package org.apache.hadoop.hive.ql.optimizer; -import java.util.HashMap; -import java.util.Map; import java.util.Stack; import org.apache.hadoop.hive.ql.exec.JoinOperator; -import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.parse.ParseContext; -import org.apache.hadoop.hive.ql.parse.QBJoinTree; import org.apache.hadoop.hive.ql.parse.SemanticException; public class SortedMergeJoinProc extends AbstractSMBJoinProc implements NodeProcessor { @@ -46,12 +42,6 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, JoinOperator joinOp = (JoinOperator) nd; SortBucketJoinProcCtx smbJoinContext = (SortBucketJoinProcCtx) procCtx; - Map mapJoinMap = pGraphContext.getMapJoinContext(); - if (mapJoinMap == null) { - mapJoinMap = new HashMap(); - pGraphContext.setMapJoinContext(mapJoinMap); - } - boolean convert = canConvertJoinToSMBJoin( joinOp, smbJoinContext, pGraphContext); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeConstantDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeConstantDesc.java index 2420971..3295aba 100755 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeConstantDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeConstantDesc.java @@ -36,6 +36,17 @@ private static final long serialVersionUID = 1L; final protected transient static char[] hexArray = "0123456789ABCDEF".toCharArray(); private Object value; + // If this constant was created while doing constant folding, foldedFromCol holds the name of + // original column from which it was folded. + private transient String foldedFromCol; + + public String getFoldedFromCol() { + return foldedFromCol; + } + + public void setFoldedFromCol(String foldedFromCol) { + this.foldedFromCol = foldedFromCol; + } public ExprNodeConstantDesc() { } diff --git ql/src/test/results/clientpositive/join_nullsafe.q.out ql/src/test/results/clientpositive/join_nullsafe.q.out index 37b6978..49c5727 100644 --- ql/src/test/results/clientpositive/join_nullsafe.q.out +++ ql/src/test/results/clientpositive/join_nullsafe.q.out @@ -1519,9 +1519,8 @@ STAGE PLANS: predicate: value is null (type: boolean) Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator - key expressions: value (type: int) + key expressions: null (type: void) sort order: + - Map-reduce partition columns: value (type: int) Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE value expressions: key (type: int) TableScan @@ -1531,9 +1530,8 @@ STAGE PLANS: predicate: key is null (type: boolean) Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator - key expressions: key (type: int) + key expressions: null (type: void) sort order: + - Map-reduce partition columns: key (type: int) Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE value expressions: value (type: int) Reduce Operator Tree: @@ -1541,13 +1539,13 @@ STAGE PLANS: condition map: Inner Join 0 to 1 condition expressions: - 0 {KEY.reducesinkkey0} {VALUE._col0} - 1 {VALUE._col0} {KEY.reducesinkkey0} + 0 {VALUE._col0} + 1 {VALUE._col0} nullSafes: [true] - outputColumnNames: _col0, _col1, _col4, _col5 + outputColumnNames: _col1, _col4 Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE Select Operator - expressions: _col0 (type: int), _col1 (type: int), _col4 (type: int), _col5 (type: int) + expressions: null (type: void), _col1 (type: int), _col4 (type: int), null (type: void) outputColumnNames: _col0, _col1, _col2, _col3 Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE File Output Operator diff --git ql/src/test/results/clientpositive/smb_mapjoin_25.q.out ql/src/test/results/clientpositive/smb_mapjoin_25.q.out index bd289c3..f6a377b 100644 --- ql/src/test/results/clientpositive/smb_mapjoin_25.q.out +++ ql/src/test/results/clientpositive/smb_mapjoin_25.q.out @@ -219,14 +219,13 @@ STAGE PLANS: condition map: Inner Join 0 to 1 condition expressions: - 0 {key} + 0 1 keys: - 0 key (type: int) - 1 key (type: int) - outputColumnNames: _col0 + 0 5 (type: int) + 1 5 (type: int) Select Operator - expressions: _col0 (type: int) + expressions: 5 (type: int) outputColumnNames: _col0 Reduce Output Operator key expressions: _col0 (type: int) @@ -242,14 +241,13 @@ STAGE PLANS: condition map: Inner Join 0 to 1 condition expressions: - 0 {key} + 0 1 keys: - 0 key (type: int) - 1 key (type: int) - outputColumnNames: _col0 + 0 5 (type: int) + 1 5 (type: int) Select Operator - expressions: _col0 (type: int) + expressions: 5 (type: int) outputColumnNames: _col0 Reduce Output Operator key expressions: _col0 (type: int) @@ -266,7 +264,7 @@ STAGE PLANS: Filter Operator predicate: (_col1 = 5) (type: boolean) Select Operator - expressions: _col0 (type: int), _col1 (type: int) + expressions: _col0 (type: int), 5 (type: int) outputColumnNames: _col0, _col1 File Output Operator compressed: false