diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java index 450d7f3..5e4f8ab 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java @@ -20,7 +20,9 @@ import java.io.Serializable; import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; +import java.util.Set; /** * RowSchema Implementation. @@ -49,6 +51,57 @@ public void setSignature(ArrayList signature) { return signature; } + public ColumnInfo getColumnInfo(String internalName) { + for (ColumnInfo columnInfo: this.signature) { + if (columnInfo.getInternalName().equals(internalName)) { + return columnInfo; + } + } + return null; + } + + public ColumnInfo getColumnInfo(String tabAlias, String alias) { + for (ColumnInfo columnInfo: this.signature) { + if (columnInfo.getTabAlias() == null) { + if (tabAlias == null) { + if(columnInfo.getAlias() != null && alias != null && + columnInfo.getAlias().equals(alias)) { + return columnInfo; + } + } + } + else { + if (tabAlias != null) { + if (columnInfo.getTabAlias().equals(tabAlias) && + columnInfo.getAlias() != null && alias != null && + columnInfo.getAlias().equals(alias)) { + return columnInfo; + } + } + } + } + return null; + } + + public int getPosition(String internalName) { + int pos = -1; + for (ColumnInfo var : this.signature) { + ++pos; + if (var.getInternalName().equals(internalName)) { + return pos; + } + } + return -1; + } + + public Set getTableNames() { + Set tableNames = new HashSet(); + for (ColumnInfo var : this.signature) { + tableNames.add(var.getTabAlias()); + } + return tableNames; + } + @Override public boolean equals(Object obj) { if (!(obj instanceof RowSchema) || (obj == null)) { @@ -103,6 +156,13 @@ public String toString() { sb.append(','); } sb.append(col.toString()); + sb.append('|'); + sb.append('{'); + sb.append(col.getTabAlias()); + sb.append('}'); + if (col.getAlias() != null) { + sb.append(col.getAlias()); + } } sb.append(')'); return sb.toString(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 9ed2c61..471db52 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -31,7 +31,6 @@ import java.io.DataInput; import java.io.EOFException; import java.io.File; -import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; @@ -1066,7 +1065,6 @@ protected synchronized Kryo initialValue() { kryo.setInstantiatorStrategy(new StdInstantiatorStrategy()); removeField(kryo, Operator.class, "colExprMap"); removeField(kryo, ColumnInfo.class, "objectInspector"); - removeField(kryo, MapWork.class, "opParseCtxMap"); return kryo; }; }; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java index a948b19..ff34682 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java @@ -26,8 +26,6 @@ import java.util.Set; import java.util.Stack; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.ql.ErrorMsg; @@ -59,9 +57,6 @@ //try to replace a bucket map join with a sorted merge map join abstract public class AbstractSMBJoinProc extends AbstractBucketJoinProc implements NodeProcessor { - private static final Log LOG = LogFactory - .getLog(SortedMergeBucketMapJoinOptimizer.class.getName()); - public AbstractSMBJoinProc(ParseContext pctx) { super(pctx); } @@ -222,7 +217,6 @@ protected SMBMapJoinOperator convertBucketMapJoinToSMBJoin(MapJoinOperator mapJo // pGraphContext.getSmbMapJoinOps().add(smbJop); pGraphContext.getMapJoinOps().remove(mapJoinOp); - pGraphContext.getOpParseCtx().put(smbJop, pGraphContext.getOpParseCtx().get(mapJoinOp)); return smbJop; } @@ -430,6 +424,7 @@ protected boolean canConvertJoinToSMBJoin( } // Can the join operator be converted to a bucket map-merge join operator ? + @SuppressWarnings("unchecked") protected boolean canConvertJoinToBucketMapJoin( JoinOperator joinOp, SortBucketJoinProcCtx context) throws SemanticException { @@ -512,7 +507,6 @@ protected MapJoinOperator convertJoinToBucketMapJoin( SortBucketJoinProcCtx joinContext) throws SemanticException { MapJoinOperator mapJoinOp = new MapJoinProcessor().convertMapJoin( pGraphContext.getConf(), - pGraphContext.getOpParseCtx(), joinOp, joinOp.getConf().isLeftInputJoin(), joinOp.getConf().getBaseSrc(), diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java index 046a52f..b8f5c71 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.optimizer; import java.util.ArrayList; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; @@ -31,7 +30,6 @@ import org.apache.hadoop.hive.ql.exec.LateralViewJoinOperator; import org.apache.hadoop.hive.ql.exec.LimitOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; -import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.PTFOperator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.ScriptOperator; @@ -45,10 +43,8 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; -import org.apache.hadoop.hive.ql.parse.OpParseContext; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.plan.OperatorDesc; /** * Implementation of one of the rule-based optimization steps. ColumnPruner gets @@ -60,7 +56,6 @@ */ public class ColumnPruner implements Transform { protected ParseContext pGraphContext; - private HashMap, OpParseContext> opToParseCtxMap; /** * empty constructor. @@ -80,7 +75,6 @@ public ColumnPruner() { @Override public ParseContext transform(ParseContext pactx) throws SemanticException { pGraphContext = pactx; - opToParseCtxMap = pGraphContext.getOpParseCtx(); // generate pruned column list for all relevant operators ColumnPrunerProcCtx cppCtx = new ColumnPrunerProcCtx(pactx); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java index 5d848a1..e5240a0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java @@ -26,12 +26,11 @@ import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.CommonJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.exec.SelectOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; -import org.apache.hadoop.hive.ql.parse.OpParseContext; import org.apache.hadoop.hive.ql.parse.ParseContext; -import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; @@ -46,13 +45,10 @@ private final Map, List> prunedColLists; - private final HashMap, OpParseContext> opToParseCtxMap; - private final Map>> joinPrunedColLists; public ColumnPrunerProcCtx(ParseContext pctx) { this.pctx = pctx; - this.opToParseCtxMap = pctx.getOpParseCtx(); prunedColLists = new HashMap, List>(); joinPrunedColLists = new HashMap>>(); } @@ -72,10 +68,6 @@ public ParseContext getParseContext() { return prunedColLists.get(op); } - public HashMap, OpParseContext> getOpToParseCtxMap() { - return opToParseCtxMap; - } - public Map, List> getPrunedColLists() { return prunedColLists; } @@ -183,11 +175,11 @@ public ParseContext getParseContext() { /** * Create the list of internal columns for select tag of LV */ - public List getSelectColsFromLVJoin(RowResolver rr, + public List getSelectColsFromLVJoin(RowSchema rs, List colList) throws SemanticException { List columns = new ArrayList(); for (String col : colList) { - if (rr.reverseLookup(col) != null) { + if (rs.getColumnInfo(col) != null) { columns.add(col); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java index abf32f1..855ae9b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java @@ -18,6 +18,16 @@ package org.apache.hadoop.hive.ql.optimizer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Stack; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; @@ -42,8 +52,6 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; -import org.apache.hadoop.hive.ql.parse.OpParseContext; -import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.AggregationDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; @@ -65,16 +73,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.Stack; - /** * Factory for generating the different node processors used by ColumnPruner. */ @@ -175,35 +173,31 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx; Operator op = (Operator) nd; - RowResolver inputRR = cppCtx.getParseContext().getOpParseCtx().get(op) - .getRowResolver(); + RowSchema inputRS = op.getSchema(); List prunedCols = cppCtx.getPrunedColList(op.getChildOperators() .get(0)); Operator parent = op.getParentOperators().get(0); - RowResolver parentRR = cppCtx.getParseContext().getOpParseCtx() - .get(parent).getRowResolver(); - List sig = parentRR.getRowSchema().getSignature(); + RowSchema parentRS = parent.getSchema(); + List sig = parentRS.getSignature(); List colList = new ArrayList(); for (ColumnInfo cI : sig) { colList.add(cI.getInternalName()); } - if (prunedCols.size() != inputRR.getRowSchema().getSignature().size() + if (prunedCols.size() != inputRS.getSignature().size() && !(op.getChildOperators().get(0) instanceof SelectOperator)) { ArrayList exprs = new ArrayList(); ArrayList outputs = new ArrayList(); Map colExprMap = new HashMap(); - RowResolver outputRS = new RowResolver(); + ArrayList outputRS = new ArrayList(); for (String internalName : prunedCols) { - String[] nm = inputRR.reverseLookup(internalName); - ColumnInfo valueInfo = inputRR.get(nm[0], nm[1]); + ColumnInfo valueInfo = inputRS.getColumnInfo(internalName); ExprNodeDesc colDesc = new ExprNodeColumnDesc(valueInfo.getType(), - valueInfo.getInternalName(), nm[0], valueInfo.getIsVirtualCol()); + valueInfo.getInternalName(), valueInfo.getTabAlias(), valueInfo.getIsVirtualCol()); exprs.add(colDesc); outputs.add(internalName); - outputRS.put(nm[0], nm[1], - new ColumnInfo(internalName, valueInfo.getType(), nm[0], + outputRS.add(new ColumnInfo(internalName, valueInfo.getType(), valueInfo.getTabAlias(), valueInfo.getIsVirtualCol(), valueInfo.isHiddenVirtualCol())); colExprMap.put(internalName, colDesc); } @@ -212,12 +206,9 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Operator child = op.getChildOperators().get(0); op.removeChild(child); SelectOperator sel = (SelectOperator) OperatorFactory.getAndMakeChild( - select, new RowSchema(outputRS.getColumnInfos()), op); + select, new RowSchema(outputRS), op); OperatorFactory.makeChild(sel, child); - OpParseContext parseCtx = new OpParseContext(outputRS); - cppCtx.getParseContext().getOpParseCtx().put(sel, parseCtx); - sel.setColumnExprMap(colExprMap); } @@ -271,32 +262,28 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, } WindowTableFunctionDef def = (WindowTableFunctionDef) conf.getFuncDef(); - ArrayList sig = new ArrayList(); List prunedCols = cppCtx.getPrunedColList(op.getChildOperators().get(0)); //we create a copy of prunedCols to create a list of pruned columns for PTFOperator prunedCols = new ArrayList(prunedCols); prunedColumnsList(prunedCols, def); - RowResolver oldRR = cppCtx.getOpToParseCtxMap().get(op).getRowResolver(); - RowResolver newRR = buildPrunedRR(prunedCols, oldRR, sig); + RowSchema oldRS = op.getSchema(); + ArrayList sig = buildPrunedRR(prunedCols, oldRS); cppCtx.getPrunedColLists().put(op, prunedInputList(prunedCols, def)); - cppCtx.getOpToParseCtxMap().get(op).setRowResolver(newRR); op.getSchema().setSignature(sig); return null; } - private static RowResolver buildPrunedRR(List prunedCols, - RowResolver oldRR, ArrayList sig) throws SemanticException{ - RowResolver newRR = new RowResolver(); + private static ArrayList buildPrunedRR(List prunedCols, + RowSchema oldRS) throws SemanticException{ + ArrayList sig = new ArrayList(); HashSet prunedColsSet = new HashSet(prunedCols); - for(ColumnInfo cInfo : oldRR.getRowSchema().getSignature()) { + for(ColumnInfo cInfo : oldRS.getSignature()) { if ( prunedColsSet.contains(cInfo.getInternalName())) { - String[] nm = oldRR.reverseLookup(cInfo.getInternalName()); - newRR.put(nm[0], nm[1], cInfo); sig.add(cInfo); } } - return newRR; + return sig; } /* @@ -402,14 +389,14 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, cppCtx.getPrunedColLists().put((Operator) nd, cols); - RowResolver inputRR = cppCtx.getOpToParseCtxMap().get(scanOp).getRowResolver(); - setupNeededColumns(scanOp, inputRR, cols); + RowSchema inputRS = scanOp.getSchema(); + setupNeededColumns(scanOp, inputRS, cols); return null; } } - public static void setupNeededColumns(TableScanOperator scanOp, RowResolver inputRR, + public static void setupNeededColumns(TableScanOperator scanOp, RowSchema inputRS, List cols) throws SemanticException { List neededColumnIds = new ArrayList(); List neededColumnNames = new ArrayList(); @@ -424,12 +411,11 @@ public static void setupNeededColumns(TableScanOperator scanOp, RowResolver inpu } for (String column : cols) { - String[] tabCol = inputRR.reverseLookup(column); - if (tabCol == null) { + ColumnInfo colInfo = inputRS.getColumnInfo(column); + if (colInfo == null) { continue; } referencedColumnNames.add(column); - ColumnInfo colInfo = inputRR.get(tabCol[0], tabCol[1]); if (colInfo.getIsVirtualCol()) { // part is also a virtual column, but part col should not in this // list. @@ -442,7 +428,7 @@ public static void setupNeededColumns(TableScanOperator scanOp, RowResolver inpu //no need to pass virtual columns to reader. continue; } - int position = inputRR.getPosition(column); + int position = inputRS.getPosition(column); if (position >= 0) { // get the needed columns by id and name neededColumnIds.add(position); @@ -474,7 +460,6 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException { ReduceSinkOperator op = (ReduceSinkOperator) nd; ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx; - RowResolver resolver = cppCtx.getOpToParseCtxMap().get(op).getRowResolver(); ReduceSinkDesc conf = op.getConf(); List colLists = new ArrayList(); @@ -604,14 +589,13 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, // these are from ColumnPrunerSelectProc List cols = cppCtx.getPrunedColList(select); - RowResolver rr = cppCtx.getOpToParseCtxMap().get(op).getRowResolver(); - if (rr.getColumnInfos().size() != cols.size()) { + RowSchema rs = op.getSchema(); + if (rs.getSignature().size() != cols.size()) { ArrayList colList = new ArrayList(); ArrayList outputColNames = new ArrayList(); for (String col : cols) { // revert output cols of SEL(*) to ExprNodeColumnDesc - String[] tabcol = rr.reverseLookup(col); - ColumnInfo colInfo = rr.get(tabcol[0], tabcol[1]); + ColumnInfo colInfo = rs.getColumnInfo(col); ExprNodeColumnDesc colExpr = new ExprNodeColumnDesc(colInfo); colList.add(colExpr); outputColNames.add(col); @@ -671,9 +655,9 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, if (lvJoin != null) { // get columns for SEL(*) from LVJ if (cols != null) { - RowResolver rr = cppCtx.getOpToParseCtxMap().get(op).getRowResolver(); + RowSchema rs = op.getSchema(); cppCtx.getPrunedColLists().put(op, - cppCtx.getSelectColsFromLVJoin(rr, cols)); + cppCtx.getSelectColsFromLVJoin(rs, cols)); } return null; } @@ -696,19 +680,12 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, ArrayList newOutputColumnNames = new ArrayList(); ArrayList rs_oldsignature = op.getSchema().getSignature(); ArrayList rs_newsignature = new ArrayList(); - RowResolver old_rr = cppCtx.getOpToParseCtxMap().get(op) - .getRowResolver(); - RowResolver new_rr = new RowResolver(); for (String col : cols) { int index = originalOutputColumnNames.indexOf(col); newOutputColumnNames.add(col); newColList.add(originalColList.get(index)); rs_newsignature.add(rs_oldsignature.get(index)); - String[] tabcol = old_rr.reverseLookup(col); - ColumnInfo columnInfo = old_rr.get(tabcol[0], tabcol[1]); - new_rr.put(tabcol[0], tabcol[1], columnInfo); } - cppCtx.getOpToParseCtxMap().get(op).setRowResolver(new_rr); op.getSchema().setSignature(rs_newsignature); conf.setColList(newColList); conf.setOutputColumnNames(newOutputColumnNames); @@ -778,8 +755,8 @@ private static void pruneReduceSinkOperator(boolean[] retainFlags, ReduceSinkDesc reduceConf = reduce.getConf(); Map oldMap = reduce.getColumnExprMap(); LOG.info("RS " + reduce.getIdentifier() + " oldColExprMap: " + oldMap); - RowResolver oldRR = cppCtx.getOpToParseCtxMap().get(reduce).getRowResolver(); - ArrayList old_signature = oldRR.getRowSchema().getSignature(); + RowSchema oldRS = reduce.getSchema(); + ArrayList old_signature = oldRS.getSignature(); ArrayList signature = new ArrayList(old_signature); List valueColNames = reduceConf.getOutputValueColumnNames(); @@ -793,23 +770,21 @@ private static void pruneReduceSinkOperator(boolean[] retainFlags, String outputCol = valueColNames.get(i); ExprNodeDesc outputColExpr = valueExprs.get(i); if (!retainFlags[i]) { - String[] nm = oldRR.reverseLookup(outputCol); - if (nm == null) { + ColumnInfo colInfo = oldRS.getColumnInfo(outputCol); + if (colInfo == null) { outputCol = Utilities.ReduceField.VALUE.toString() + "." + outputCol; - nm = oldRR.reverseLookup(outputCol); + colInfo = oldRS.getColumnInfo(outputCol); } // In case there are multiple columns referenced to the same column name, we won't // do row resolve once more because the ColumnInfo in row resolver is already removed - if (nm == null) { + if (colInfo == null) { continue; } // Only remove information of a column if it is not a key, // i.e. this column is not appearing in keyExprs of the RS if (ExprNodeDescUtils.indexOf(outputColExpr, keyExprs) == -1) { - ColumnInfo colInfo = oldRR.getFieldMap(nm[0]).remove(nm[1]); - oldRR.getInvRslvMap().remove(colInfo.getInternalName()); oldMap.remove(outputCol); signature.remove(colInfo); } @@ -820,7 +795,7 @@ private static void pruneReduceSinkOperator(boolean[] retainFlags, } } - oldRR.getRowSchema().setSignature(signature); + oldRS.setSignature(signature); reduce.getSchema().setSignature(signature); reduceConf.setOutputValueColumnNames(newValueColNames); reduceConf.setValueCols(newValueExprs); @@ -893,16 +868,12 @@ private static void pruneOperator(NodeProcessorCtx ctx, RowSchema inputSchema = op.getSchema(); if (inputSchema != null) { ArrayList rs = new ArrayList(); - RowResolver oldRR = ((ColumnPrunerProcCtx)ctx).getOpToParseCtxMap().get(op).getRowResolver(); - RowResolver newRR = new RowResolver(); - for(ColumnInfo i : oldRR.getRowSchema().getSignature()) { + RowSchema oldRS = op.getSchema(); + for(ColumnInfo i : oldRS.getSignature()) { if ( cols.contains(i.getInternalName())) { - String[] nm = oldRR.reverseLookup(i.getInternalName()); - newRR.put(nm[0], nm[1], i); rs.add(i); } } - ((ColumnPrunerProcCtx)ctx).getOpToParseCtxMap().get(op).setRowResolver(newRR); op.getSchema().setSignature(rs); } } @@ -967,8 +938,7 @@ private static void pruneJoinOperator(NodeProcessorCtx ctx, } } - RowResolver joinRR = cppCtx.getOpToParseCtxMap().get(op).getRowResolver(); - RowResolver newJoinRR = new RowResolver(); + RowSchema joinRS = op.getSchema(); ArrayList outputCols = new ArrayList(); ArrayList rs = new ArrayList(); Map newColExprMap = new HashMap(); @@ -1043,9 +1013,7 @@ private static void pruneJoinOperator(NodeProcessorCtx ctx, for (int i = 0; i < outputCols.size(); i++) { String internalName = outputCols.get(i); - String[] nm = joinRR.reverseLookup(internalName); - ColumnInfo col = joinRR.get(nm[0], nm[1]); - newJoinRR.put(nm[0], nm[1], col); + ColumnInfo col = joinRS.getColumnInfo(internalName); rs.add(col); } @@ -1053,7 +1021,6 @@ private static void pruneJoinOperator(NodeProcessorCtx ctx, op.setColumnExprMap(newColExprMap); conf.setOutputColumnNames(outputCols); op.getSchema().setSignature(rs); - cppCtx.getOpToParseCtxMap().get(op).setRowResolver(newJoinRR); cppCtx.getJoinPrunedColLists().put(op, prunedColLists); } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagate.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagate.java index 14e20dd..0027960 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagate.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagate.java @@ -26,7 +26,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.FilterOperator; import org.apache.hadoop.hive.ql.exec.GroupByOperator; @@ -44,10 +43,8 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; -import org.apache.hadoop.hive.ql.parse.OpParseContext; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.plan.OperatorDesc; /** * Implementation of one of the rule-based optimization steps. ConstantPropagate traverse the DAG @@ -65,7 +62,6 @@ private static final Log LOG = LogFactory.getLog(ConstantPropagate.class); protected ParseContext pGraphContext; - private Map, OpParseContext> opToParseCtxMap; public ConstantPropagate() {} @@ -78,10 +74,9 @@ public ConstantPropagate() {} @Override public ParseContext transform(ParseContext pactx) throws SemanticException { pGraphContext = pactx; - opToParseCtxMap = pGraphContext.getOpParseCtx(); // generate pruned column list for all relevant operators - ConstantPropagateProcCtx cppCtx = new ConstantPropagateProcCtx(opToParseCtxMap); + ConstantPropagateProcCtx cppCtx = new ConstantPropagateProcCtx(); // create a walker which walks the tree in a DFS manner while maintaining // the operator stack. The dispatcher diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcCtx.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcCtx.java index 91af3aa..0613fda 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcCtx.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcCtx.java @@ -30,13 +30,10 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; -import org.apache.hadoop.hive.ql.parse.OpParseContext; -import org.apache.hadoop.hive.ql.parse.RowResolver; -import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; -import org.apache.hadoop.hive.ql.plan.OperatorDesc; /** * This class implements the processor context for Constant Propagate. @@ -50,25 +47,18 @@ .getLog(ConstantPropagateProcCtx.class); private final Map, Map> opToConstantExprs; - private final Map, OpParseContext> opToParseCtx; private final List> opToDelete; - public ConstantPropagateProcCtx(Map, OpParseContext> opToParseCtx) { + public ConstantPropagateProcCtx() { opToConstantExprs = new HashMap, Map>(); opToDelete = new ArrayList>(); - this.opToParseCtx = opToParseCtx; } public Map, Map> getOpToConstantExprs() { return opToConstantExprs; } - - public Map, OpParseContext> getOpToParseCtxMap() { - return opToParseCtx; - } - /** * Resolve a ColumnInfo based on given RowResolver. * @@ -78,27 +68,25 @@ public ConstantPropagateProcCtx(Map, OpParseCon * @return * @throws SemanticException */ - private ColumnInfo resolve(ColumnInfo ci, RowResolver rr, RowResolver parentRR) - throws SemanticException { + private ColumnInfo resolve(ColumnInfo ci, RowSchema rs, RowSchema parentRS) { // Resolve new ColumnInfo from String alias = ci.getAlias(); if (alias == null) { alias = ci.getInternalName(); } String tblAlias = ci.getTabAlias(); - ColumnInfo rci = rr.get(tblAlias, alias); - if (rci == null && rr.getRslvMap().size() == 1 && parentRR.getRslvMap().size() == 1) { - rci = rr.get(null, alias); + ColumnInfo rci = rs.getColumnInfo(tblAlias, alias); + if (rci == null && rs.getTableNames().size() == 1 && + parentRS.getTableNames().size() == 1) { + rci = rs.getColumnInfo(rs.getTableNames().iterator().next(), + alias); } if (rci == null) { return null; } - String[] tmp = rr.reverseLookup(rci.getInternalName()); - rci.setTabAlias(tmp[0]); - rci.setAlias(tmp[1]); LOG.debug("Resolved " + ci.getTabAlias() + "." + ci.getAlias() + " as " - + rci.getTabAlias() + "." + rci.getAlias() + " with rr: " + rr); + + rci.getTabAlias() + "." + rci.getAlias() + " with rs: " + rs); return rci; } @@ -117,90 +105,76 @@ private ColumnInfo resolve(ColumnInfo ci, RowResolver rr, RowResolver parentRR) public Map getPropagatedConstants( Operator op) { Map constants = new HashMap(); - OpParseContext parseCtx = opToParseCtx.get(op); - if (parseCtx == null) { + if (op.getSchema() == null) { return constants; } - RowResolver rr = parseCtx.getRowResolver(); - LOG.debug("Getting constants of op:" + op + " with rr:" + rr); + RowSchema rs = op.getSchema(); + LOG.debug("Getting constants of op:" + op + " with rs:" + rs); - try { - if (op.getParentOperators() == null) { - return constants; - } + if (op.getParentOperators() == null) { + return constants; + } - if (op instanceof UnionOperator) { - String alias = (String) rr.getRslvMap().keySet().toArray()[0]; - // find intersection - Map intersection = null; - for (Operator parent : op.getParentOperators()) { - Map unionConst = opToConstantExprs.get(parent); - LOG.debug("Constant of op " + parent.getOperatorId() + " " + unionConst); - if (intersection == null) { - intersection = new HashMap(); - for (Entry e : unionConst.entrySet()) { - ColumnInfo ci = new ColumnInfo(e.getKey()); - ci.setTabAlias(alias); - intersection.put(ci, e.getValue()); - } - } else { - Iterator> itr = intersection.entrySet().iterator(); - while (itr.hasNext()) { - Entry e = itr.next(); - boolean found = false; - for (Entry f : opToConstantExprs.get(parent).entrySet()) { - if (e.getKey().getInternalName().equals(f.getKey().getInternalName())) { - if (e.getValue().isSame(f.getValue())) { - found = true; - } - break; + if (op instanceof UnionOperator) { + String alias = rs.getSignature().get(0).getTabAlias(); + // find intersection + Map intersection = null; + for (Operator parent : op.getParentOperators()) { + Map unionConst = opToConstantExprs.get(parent); + LOG.debug("Constant of op " + parent.getOperatorId() + " " + unionConst); + if (intersection == null) { + intersection = new HashMap(); + for (Entry e : unionConst.entrySet()) { + ColumnInfo ci = new ColumnInfo(e.getKey()); + ci.setTabAlias(alias); + intersection.put(ci, e.getValue()); + } + } else { + Iterator> itr = intersection.entrySet().iterator(); + while (itr.hasNext()) { + Entry e = itr.next(); + boolean found = false; + for (Entry f : opToConstantExprs.get(parent).entrySet()) { + if (e.getKey().getInternalName().equals(f.getKey().getInternalName())) { + if (e.getValue().isSame(f.getValue())) { + found = true; } - } - if (!found) { - itr.remove(); + break; } } - } - if (intersection.isEmpty()) { - return intersection; + if (!found) { + itr.remove(); + } } } - LOG.debug("Propagated union constants:" + intersection); - return intersection; + if (intersection.isEmpty()) { + return intersection; + } } + LOG.debug("Propagated union constants:" + intersection); + return intersection; + } - for (Operator parent : op.getParentOperators()) { - Map c = opToConstantExprs.get(parent); - for (Entry e : c.entrySet()) { - ColumnInfo ci = e.getKey(); - ColumnInfo rci = null; - ExprNodeDesc constant = e.getValue(); - rci = resolve(ci, rr, opToParseCtx.get(parent).getRowResolver()); - if (rci != null) { - constants.put(rci, constant); - } else { - LOG.debug("Can't resolve " + ci.getTabAlias() + "." + ci.getAlias() + " from rr:" - + rr); - } - + for (Operator parent : op.getParentOperators()) { + Map c = opToConstantExprs.get(parent); + for (Entry e : c.entrySet()) { + ColumnInfo ci = e.getKey(); + ColumnInfo rci = null; + ExprNodeDesc constant = e.getValue(); + rci = resolve(ci, rs, parent.getSchema()); + if (rci != null) { + constants.put(rci, constant); + } else { + LOG.debug("Can't resolve " + ci.getTabAlias() + "." + ci.getAlias() + + "(" + ci.getInternalName() + ") from rs:" + rs); } - } - LOG.debug("Offerring constants " + constants.keySet() - + " to operator " + op.toString()); - return constants; - } catch (SemanticException e) { - LOG.error(e.getMessage(), e); - throw new RuntimeException(e); } - } - public RowResolver getRowResolver(Operator op) { - OpParseContext parseCtx = opToParseCtx.get(op); - if (parseCtx == null) { - return null; - } - return parseCtx.getRowResolver(); + LOG.debug("Offerring constants " + constants.keySet() + + " to operator " + op.toString()); + + return constants; } public void addOpToDelete(Operator op) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java index d692e8e..f36f843 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java @@ -47,7 +47,6 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; @@ -108,30 +107,16 @@ private ConstantPropagateProcFactory() { * @param desc * @return */ - public static ColumnInfo resolveColumn(RowResolver rr, + public static ColumnInfo resolveColumn(RowSchema rs, ExprNodeColumnDesc desc) { - try { - ColumnInfo ci = rr.get(desc.getTabAlias(), desc.getColumn()); - if (ci == null) { - String[] tmp = rr.reverseLookup(desc.getColumn()); - if (tmp == null) { - return null; - } - ci = rr.get(tmp[0], tmp[1]); - ci.setTabAlias(tmp[0]); - ci.setAlias(tmp[1]); - } else { - String[] tmp = rr.reverseLookup(ci.getInternalName()); - if (tmp == null) { - return null; - } - ci.setTabAlias(tmp[0]); - ci.setAlias(tmp[1]); - } - return ci; - } catch (SemanticException e) { - throw new RuntimeException(e); + ColumnInfo ci = rs.getColumnInfo(desc.getTabAlias(), desc.getColumn()); + if (ci == null) { + ci = rs.getColumnInfo(desc.getColumn()); } + if (ci == null) { + return null; + } + return ci; } private static final Set unSupportedTypes = ImmutableSet @@ -254,7 +239,7 @@ private static ExprNodeDesc foldExpr(ExprNodeDesc desc, Map newExprs, RowResolver rr, + private static void propagate(GenericUDF udf, List newExprs, RowSchema rs, Map constants) { if (udf instanceof GenericUDFOPEqual) { ExprNodeDesc lOperand = newExprs.get(0); @@ -341,7 +326,7 @@ private static void propagate(GenericUDF udf, List newExprs, RowRe // we need a column expression on other side. return; } - ColumnInfo ci = resolveColumn(rr, c); + ColumnInfo ci = resolveColumn(rs, c); if (ci != null) { LOG.debug("Filter " + udf + " is identified as a value assignment, propagate it."); if (!v.getTypeInfo().equals(ci.getType())) { @@ -356,7 +341,7 @@ private static void propagate(GenericUDF udf, List newExprs, RowRe if (operand instanceof ExprNodeColumnDesc) { LOG.debug("Filter " + udf + " is identified as a value assignment, propagate it."); ExprNodeColumnDesc c = (ExprNodeColumnDesc) operand; - ColumnInfo ci = resolveColumn(rr, c); + ColumnInfo ci = resolveColumn(rs, c); if (ci != null) { constants.put(ci, new ExprNodeNullDesc()); } @@ -435,45 +420,38 @@ private static ExprNodeDesc shortcutFunction(GenericUDF udf, List * @return */ private static ExprNodeDesc evaluateColumn(ExprNodeColumnDesc desc, - ConstantPropagateProcCtx cppCtx, Operator parent) { - try { - ColumnInfo ci = null; - RowResolver rr = cppCtx.getOpToParseCtxMap().get(parent).getRowResolver(); - String[] tmp = rr.reverseLookup(desc.getColumn()); - if (tmp == null) { - LOG.error("Reverse look up of column " + desc + " error!"); - return null; - } - ci = rr.get(tmp[0], tmp[1]); - if (ci != null) { - ExprNodeDesc constant = null; - // Additional work for union operator, see union27.q - if (ci.getAlias() == null) { - for (Entry e : cppCtx.getOpToConstantExprs().get(parent).entrySet()) { - if (e.getKey().getInternalName().equals(ci.getInternalName())) { - constant = e.getValue(); - break; - } - } - } else { - constant = cppCtx.getOpToConstantExprs().get(parent).get(ci); - } - if (constant != null) { - if (constant instanceof ExprNodeConstantDesc - && !constant.getTypeInfo().equals(desc.getTypeInfo())) { - return typeCast(constant, desc.getTypeInfo()); - } - return constant; - } else { - return null; - } - } + ConstantPropagateProcCtx cppCtx, Operator parent) { + RowSchema rs = parent.getSchema(); + ColumnInfo ci = rs.getColumnInfo(desc.getColumn()); + if (ci == null) { + LOG.error("Reverse look up of column " + desc + " error!"); + ci = rs.getColumnInfo(desc.getTabAlias(), desc.getColumn()); + } + if (ci == null) { LOG.error("Can't resolve " + desc.getTabAlias() + "." + desc.getColumn()); throw new RuntimeException("Can't resolve " + desc.getTabAlias() + "." + desc.getColumn()); - } catch (SemanticException e) { - throw new RuntimeException(e); } - + ExprNodeDesc constant = null; + // Additional work for union operator, see union27.q + if (ci.getAlias() == null) { + for (Entry e : cppCtx.getOpToConstantExprs().get(parent).entrySet()) { + if (e.getKey().getInternalName().equals(ci.getInternalName())) { + constant = e.getValue(); + break; + } + } + } else { + constant = cppCtx.getOpToConstantExprs().get(parent).get(ci); + } + if (constant != null) { + if (constant instanceof ExprNodeConstantDesc + && !constant.getTypeInfo().equals(desc.getTypeInfo())) { + return typeCast(constant, desc.getTypeInfo()); + } + return constant; + } else { + return null; + } } /** @@ -793,11 +771,10 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object.. // Assume only 1 parent for FS operator Operator parent = op.getParentOperators().get(0); Map parentConstants = cppCtx.getPropagatedConstants(parent); - RowResolver rr = cppCtx.getOpToParseCtxMap().get(parent).getRowResolver(); + RowSchema rs = parent.getSchema(); boolean allConstant = true; for (String input : inputs) { - String tmp[] = rr.reverseLookup(input); - ColumnInfo ci = rr.get(tmp[0], tmp[1]); + ColumnInfo ci = rs.getColumnInfo(input); if (parentConstants.get(ci) == null) { allConstant = false; break; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index 567c42e..7cdaf36 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -45,7 +45,6 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext; -import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc; import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc; @@ -70,7 +69,6 @@ static final private Log LOG = LogFactory.getLog(ConvertJoinMapJoin.class.getName()); - @SuppressWarnings("unchecked") @Override /* * (non-Javadoc) we should ideally not modify the tree we traverse. However, @@ -172,6 +170,7 @@ return null; } + @SuppressWarnings("unchecked") private Object checkAndConvertSMBJoin(OptimizeTezProcContext context, JoinOperator joinOp, TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException { // we cannot convert to bucket map join, we cannot convert to @@ -228,12 +227,11 @@ private Object checkAndConvertSMBJoin(OptimizeTezProcContext context, JoinOperat private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext context, int mapJoinConversionPos, int numBuckets, boolean isSubQuery, boolean adjustParentsChildren) throws SemanticException { - ParseContext parseContext = context.parseContext; MapJoinDesc mapJoinDesc = null; if (adjustParentsChildren) { - mapJoinDesc = MapJoinProcessor.getMapJoinDesc(context.conf, parseContext.getOpParseCtx(), - joinOp, joinOp.getConf().isLeftInputJoin(), joinOp.getConf().getBaseSrc(), joinOp.getConf().getMapAliases(), - mapJoinConversionPos, true); + mapJoinDesc = MapJoinProcessor.getMapJoinDesc(context.conf, + joinOp, joinOp.getConf().isLeftInputJoin(), joinOp.getConf().getBaseSrc(), + joinOp.getConf().getMapAliases(), mapJoinConversionPos, true); } else { JoinDesc joinDesc = joinOp.getConf(); // retain the original join desc in the map join. @@ -249,7 +247,6 @@ private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext cont mapJoinDesc.resetOrder(); } - @SuppressWarnings("unchecked") CommonMergeJoinOperator mergeJoinOp = (CommonMergeJoinOperator) OperatorFactory.get(new CommonMergeJoinDesc(numBuckets, isSubQuery, mapJoinConversionPos, mapJoinDesc), joinOp.getSchema()); @@ -606,11 +603,10 @@ public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcCo } //can safely convert the join to a map join. - ParseContext parseContext = context.parseContext; MapJoinOperator mapJoinOp = - MapJoinProcessor.convertJoinOpMapJoinOp(context.conf, parseContext.getOpParseCtx(), joinOp, - joinOp.getConf().isLeftInputJoin(), joinOp.getConf().getBaseSrc(), joinOp.getConf().getMapAliases(), - bigTablePosition, true); + MapJoinProcessor.convertJoinOpMapJoinOp(context.conf, joinOp, + joinOp.getConf().isLeftInputJoin(), joinOp.getConf().getBaseSrc(), + joinOp.getConf().getMapAliases(), bigTablePosition, true); Operator parentBigTableOp = mapJoinOp.getParentOperators().get(bigTablePosition); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index b00fa52..fad582a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -77,11 +77,9 @@ import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPruner; import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.tableSpec; -import org.apache.hadoop.hive.ql.parse.OpParseContext; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; import org.apache.hadoop.hive.ql.parse.QBParseInfo; -import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles; @@ -934,24 +932,6 @@ public static MapredWork getMapRedWorkFromConf(HiveConf conf) { return mrWork; } - /** - * insert in the map for the operator to row resolver. - * - * @param op - * operator created - * @param rr - * row resolver - * @param parseCtx - * parse context - */ - @SuppressWarnings("nls") - public static Operator putOpInsertMap( - Operator op, RowResolver rr, ParseContext parseCtx) { - OpParseContext ctx = new OpParseContext(rr); - parseCtx.getOpParseCtx().put(op, ctx); - return op; - } - public static TableScanOperator createTemporaryTableScanOperator(RowSchema rowSchema) { TableScanOperator tableScanOp = (TableScanOperator) OperatorFactory.get(new TableScanDesc(null), rowSchema); @@ -996,19 +976,16 @@ public static TableScanOperator createTemporaryFile( desc.setCompressType(parseCtx.getConf().getVar( HiveConf.ConfVars.COMPRESSINTERMEDIATETYPE)); } - Operator fileSinkOp = putOpInsertMap(OperatorFactory - .get(desc, parent.getSchema()), null, parseCtx); + Operator fileSinkOp = OperatorFactory.get( + desc, parent.getSchema()); // Connect parent to fileSinkOp parent.replaceChild(child, fileSinkOp); fileSinkOp.setParentOperators(Utilities.makeList(parent)); // Create a dummy TableScanOperator for the file generated through fileSinkOp - RowResolver parentRowResolver = - parseCtx.getOpParseCtx().get(parent).getRowResolver(); - TableScanOperator tableScanOp = (TableScanOperator) putOpInsertMap( - createTemporaryTableScanOperator(parent.getSchema()), - parentRowResolver, parseCtx); + TableScanOperator tableScanOp = (TableScanOperator) createTemporaryTableScanOperator( + parent.getSchema()); // Connect this TableScanOperator to child. tableScanOp.setChildOperators(Utilities.makeList(child)); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java index d849dcf..167ee89 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java @@ -29,8 +29,6 @@ import java.util.Set; import java.util.Stack; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.conf.HiveConf; @@ -59,9 +57,7 @@ import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; import org.apache.hadoop.hive.ql.parse.GenMapRedWalker; -import org.apache.hadoop.hive.ql.parse.OpParseContext; import org.apache.hadoop.hive.ql.parse.ParseContext; -import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -89,7 +85,6 @@ */ public class MapJoinProcessor implements Transform { - private static final Log LOG = LogFactory.getLog(MapJoinProcessor.class.getName()); // mapjoin table descriptor contains a key descriptor which needs the field schema // (column type + column name). The column name is not really used anywhere, but it // needs to be passed. Use the string defined below for that. @@ -98,15 +93,6 @@ public MapJoinProcessor() { } - @SuppressWarnings("nls") - private static Operator putOpInsertMap ( - ParseContext pGraphContext, Operator op, - RowResolver rr) { - OpParseContext ctx = new OpParseContext(rr); - pGraphContext.getOpParseCtx().put(op, ctx); - return op; - } - /** * Generate the MapRed Local Work for the given map-join operator * @@ -224,12 +210,10 @@ private static void genMapJoinLocalWork(MapredWork newWork, MapJoinOperator mapJ public static void genMapJoinOpAndLocalWork(HiveConf conf, MapredWork newWork, JoinOperator op, int mapJoinPos) throws SemanticException { - LinkedHashMap, OpParseContext> opParseCtxMap = - newWork.getMapWork().getOpParseCtxMap(); // generate the map join operator; already checked the map join - MapJoinOperator newMapJoinOp = new MapJoinProcessor().convertMapJoin(conf, opParseCtxMap, op, - newWork.getMapWork().isLeftInputJoin(), newWork.getMapWork().getBaseSrc(), newWork.getMapWork().getMapAliases(), - mapJoinPos, true, false); + MapJoinOperator newMapJoinOp = new MapJoinProcessor().convertMapJoin(conf, op, + newWork.getMapWork().isLeftInputJoin(), newWork.getMapWork().getBaseSrc(), + newWork.getMapWork().getMapAliases(), mapJoinPos, true, false); genLocalWorkForMapJoin(newWork, newMapJoinOp, mapJoinPos); } @@ -240,11 +224,9 @@ public static void genLocalWorkForMapJoin(MapredWork newWork, MapJoinOperator ne // generate the local work for the big table alias MapJoinProcessor.genMapJoinLocalWork(newWork, newMapJoinOp, mapJoinPos); // clean up the mapred work - newWork.getMapWork().setOpParseCtxMap(null); newWork.getMapWork().setLeftInputJoin(false); newWork.getMapWork().setBaseSrc(null); newWork.getMapWork().setMapAliases(null); - } catch (Exception e) { e.printStackTrace(); throw new SemanticException("Failed to generate new mapJoin operator " + @@ -302,7 +284,6 @@ private static void validateMapJoinTypes(Operator op) * @param validateMapJoinTree */ public MapJoinOperator convertMapJoin(HiveConf conf, - LinkedHashMap, OpParseContext> opParseCtxMap, JoinOperator op, boolean leftInputJoin, String[] baseSrc, List mapAliases, int mapJoinPos, boolean noCheckOuterJoin, boolean validateMapJoinTree) throws SemanticException { @@ -352,10 +333,9 @@ public MapJoinOperator convertMapJoin(HiveConf conf, } // create the map-join operator - MapJoinOperator mapJoinOp = convertJoinOpMapJoinOp(conf, opParseCtxMap, + MapJoinOperator mapJoinOp = convertJoinOpMapJoinOp(conf, op, leftInputJoin, baseSrc, mapAliases, mapJoinPos, noCheckOuterJoin); - // remove old parents for (pos = 0; pos < newParentOps.size(); pos++) { newParentOps.get(pos).replaceChild(oldReduceSinkParentOps.get(pos), mapJoinOp); @@ -376,22 +356,18 @@ public MapJoinOperator convertMapJoin(HiveConf conf, } public static MapJoinOperator convertJoinOpMapJoinOp(HiveConf hconf, - LinkedHashMap, OpParseContext> opParseCtxMap, JoinOperator op, boolean leftInputJoin, String[] baseSrc, List mapAliases, int mapJoinPos, boolean noCheckOuterJoin) throws SemanticException { MapJoinDesc mapJoinDescriptor = - getMapJoinDesc(hconf, opParseCtxMap, op, leftInputJoin, baseSrc, mapAliases, + getMapJoinDesc(hconf, op, leftInputJoin, baseSrc, mapAliases, mapJoinPos, noCheckOuterJoin); // reduce sink row resolver used to generate map join op - RowResolver outputRS = opParseCtxMap.get(op).getRowResolver(); + RowSchema outputRS = op.getSchema(); MapJoinOperator mapJoinOp = (MapJoinOperator) OperatorFactory.getAndMakeChild( - mapJoinDescriptor, new RowSchema(outputRS.getColumnInfos()), op.getParentOperators()); - - OpParseContext ctx = new OpParseContext(outputRS); - opParseCtxMap.put(mapJoinOp, ctx); + mapJoinDescriptor, new RowSchema(outputRS.getSignature()), op.getParentOperators()); mapJoinOp.getConf().setReversedExprs(op.getConf().getReversedExprs()); Map colExprMap = op.getColumnExprMap(); @@ -434,7 +410,6 @@ private static boolean needValueIndex(int[] valueIndex) { * @param noCheckOuterJoin */ public static MapJoinOperator convertSMBJoinToMapJoin(HiveConf hconf, - Map, OpParseContext> opParseCtxMap, SMBMapJoinOperator smbJoinOp, int bigTablePos, boolean noCheckOuterJoin) throws SemanticException { // Create a new map join operator @@ -451,14 +426,10 @@ public static MapJoinOperator convertSMBJoinToMapJoin(HiveConf hconf, mapJoinDesc.setStatistics(smbJoinDesc.getStatistics()); - RowResolver joinRS = opParseCtxMap.get(smbJoinOp).getRowResolver(); + RowSchema joinRS = smbJoinOp.getSchema(); // The mapjoin has the same schema as the join operator MapJoinOperator mapJoinOp = (MapJoinOperator) OperatorFactory.getAndMakeChild( - mapJoinDesc, joinRS.getRowSchema(), - new ArrayList>()); - - OpParseContext ctx = new OpParseContext(joinRS); - opParseCtxMap.put(mapJoinOp, ctx); + mapJoinDesc, joinRS, new ArrayList>()); // change the children of the original join operator to point to the map // join operator @@ -488,11 +459,10 @@ public MapJoinOperator generateMapJoinOperator(ParseContext pctx, JoinOperator o HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN) && HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN); - LinkedHashMap, OpParseContext> opParseCtxMap = pctx - .getOpParseCtx(); - MapJoinOperator mapJoinOp = convertMapJoin(pctx.getConf(), opParseCtxMap, op, - op.getConf().isLeftInputJoin(), op.getConf().getBaseSrc(), op.getConf().getMapAliases(), - mapJoinPos, noCheckOuterJoin, true); + MapJoinOperator mapJoinOp = convertMapJoin(pctx.getConf(), op, + op.getConf().isLeftInputJoin(), op.getConf().getBaseSrc(), + op.getConf().getMapAliases(), mapJoinPos, noCheckOuterJoin, true); + // create a dummy select to select all columns genSelectPlan(pctx, mapJoinOp); return mapJoinOp; @@ -597,32 +567,31 @@ protected void genSelectPlan(ParseContext pctx, MapJoinOperator input) throws Se // create a dummy select - This select is needed by the walker to split the // mapJoin later on - RowResolver inputRR = pctx.getOpParseCtx().get(input).getRowResolver(); + RowSchema inputRS = input.getSchema(); ArrayList exprs = new ArrayList(); ArrayList outputs = new ArrayList(); List outputCols = input.getConf().getOutputColumnNames(); - RowResolver outputRS = new RowResolver(); - + ArrayList outputRS = new ArrayList(); + Map colExprMap = new HashMap(); for (int i = 0; i < outputCols.size(); i++) { String internalName = outputCols.get(i); - String[] nm = inputRR.reverseLookup(internalName); - ColumnInfo valueInfo = inputRR.get(nm[0], nm[1]); + ColumnInfo valueInfo = inputRS.getColumnInfo(internalName); ExprNodeDesc colDesc = new ExprNodeColumnDesc(valueInfo.getType(), valueInfo - .getInternalName(), nm[0], valueInfo.getIsVirtualCol()); + .getInternalName(), valueInfo.getTabAlias(), valueInfo.getIsVirtualCol()); exprs.add(colDesc); outputs.add(internalName); - outputRS.put(nm[0], nm[1], new ColumnInfo(internalName, valueInfo.getType(), nm[0], valueInfo - .getIsVirtualCol(), valueInfo.isHiddenVirtualCol())); + outputRS.add(new ColumnInfo(internalName, valueInfo.getType(), + valueInfo.getTabAlias(), valueInfo.getIsVirtualCol(), valueInfo.isHiddenVirtualCol())); colExprMap.put(internalName, colDesc); } SelectDesc select = new SelectDesc(exprs, outputs, false); - SelectOperator sel = (SelectOperator) putOpInsertMap(pctx, OperatorFactory.getAndMakeChild(select, - new RowSchema(inputRR.getColumnInfos()), input), inputRR); + SelectOperator sel = (SelectOperator) OperatorFactory.getAndMakeChild(select, + new RowSchema(outputRS), input); sel.setColumnExprMap(colExprMap); @@ -1055,7 +1024,6 @@ public void setpGraphContext(ParseContext pGraphContext) { } public static MapJoinDesc getMapJoinDesc(HiveConf hconf, - LinkedHashMap, OpParseContext> opParseCtxMap, JoinOperator op, boolean leftInputJoin, String[] baseSrc, List mapAliases, int mapJoinPos, boolean noCheckOuterJoin) throws SemanticException { JoinDesc desc = op.getConf(); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java index 175a53c..aa5a5d0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java @@ -134,7 +134,6 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, pSEL.getConf().setSelectStar(cSEL.getConf().isSelectStar()); // We need to use the OpParseContext of the child SelectOperator to replace the // the OpParseContext of the parent SelectOperator. - pctx.updateOpParseCtx(pSEL, pctx.removeOpParseCtx(cSEL)); pSEL.removeChildAndAdoptItsChildren(cSEL); cSEL.setParentOperators(null); cSEL.setChildOperators(null); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java index ed6f713..b184cf4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java @@ -196,7 +196,9 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procContext, LOG.debug("Cloning reduce sink for multi-child broadcast edge"); // we've already set this one up. Need to clone for the next work. r = (ReduceSinkOperator) OperatorFactory.getAndMakeChild( - (ReduceSinkDesc) parentRS.getConf().clone(), parentRS.getParentOperators()); + (ReduceSinkDesc) parentRS.getConf().clone(), + new RowSchema(parentRS.getSchema()), + parentRS.getParentOperators()); context.clonedReduceSinks.add(r); } else { r = parentRS; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java index 28d8201..dc885ab 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java @@ -50,7 +50,6 @@ import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.QBJoinTree; -import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; @@ -223,9 +222,6 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, OperatorFactory.getAndMakeChild( new UnionDesc(), new RowSchema(currOp.getSchema().getSignature()), oplist); - RowResolver unionRR = parseContext.getOpParseCtx().get(currOp).getRowResolver(); - GenMapRedUtils.putOpInsertMap(unionOp, unionRR, parseContext); - // Introduce a select after the union List> unionList = new ArrayList>(); @@ -235,7 +231,6 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, OperatorFactory.getAndMakeChild( new SelectDesc(true), new RowSchema(unionOp.getSchema().getSignature()), unionList); - GenMapRedUtils.putOpInsertMap(selectUnionOp, unionRR, parseContext); // add the finalOp after the union selectUnionOp.setChildOperators(finalOps); @@ -472,12 +467,10 @@ private void insertFilterOnTop( currChild.setParentOperators(null); Operator filter = OperatorFactory.getAndMakeChild( - new FilterDesc(filterExpr, false), tableScanOp); - filter.setSchema(new RowSchema(tableScanOp.getSchema().getSignature())); + new FilterDesc(filterExpr, false), + new RowSchema(tableScanOp.getSchema().getSignature()), + tableScanOp); OperatorFactory.makeChild(filter, currChild); - - RowResolver filterRR = parseContext.getOpParseCtx().get(tableScanOp).getRowResolver(); - GenMapRedUtils.putOpInsertMap(filter, filterRR, parseContext); } /** @@ -604,9 +597,6 @@ private void insertRowResolvers( ctx.getCloneTSOpMap().put((TableScanOperator)opClone, (TableScanOperator)op); } - GenMapRedUtils.putOpInsertMap( - opClone, parseContext.getOpParseCtx().get(op).getRowResolver(), parseContext); - List> parents = op.getParentOperators(); List> parentClones = opClone.getParentOperators(); if ((parents != null) && (!parents.isEmpty()) && diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java index e16ba6c..137956c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java @@ -18,8 +18,13 @@ package org.apache.hadoop.hive.ql.optimizer; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Stack; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -48,10 +53,7 @@ import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.parse.OpParseContext; import org.apache.hadoop.hive.ql.parse.ParseContext; -import org.apache.hadoop.hive.ql.parse.RowResolver; -import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; @@ -66,14 +68,8 @@ import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.Stack; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; /** * When dynamic partitioning (with or without bucketing and sorting) is enabled, this optimization @@ -196,8 +192,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, for (int i : sortPositions) LOG.debug("sort position " + i); for (int i : sortOrder) LOG.debug("sort order " + i); List partitionPositions = getPartitionPositions(dpCtx, fsParent.getSchema()); - List colInfos = parseCtx.getOpParseCtx().get(fsParent).getRowResolver() - .getColumnInfos(); + List colInfos = fsParent.getSchema().getSignature(); ArrayList bucketColumns = getPositionsToExprNodes(bucketPositions, colInfos); // update file sink descriptor @@ -206,9 +201,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, fsOp.getConf().setTotalFiles(1); // Create ReduceSinkDesc - RowResolver inputRR = parseCtx.getOpParseCtx().get(fsParent).getRowResolver(); - ObjectPair pair = copyRowResolver(inputRR); - RowResolver outRR = pair.getSecond(); + RowSchema outRS = new RowSchema(fsParent.getSchema()); ArrayList valColInfo = Lists.newArrayList(fsParent.getSchema().getSignature()); ArrayList newValueCols = Lists.newArrayList(); Map colExprMap = Maps.newHashMap(); @@ -220,28 +213,25 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, newValueCols, bucketColumns, numBuckets, fsParent, fsOp.getConf().getWriteType()); if (!bucketColumns.isEmpty()) { - String tableAlias = outRR.getColumnInfos().get(0).getTabAlias(); + String tableAlias = outRS.getSignature().get(0).getTabAlias(); ColumnInfo ci = new ColumnInfo(BUCKET_NUMBER_COL_NAME, TypeInfoFactory.stringTypeInfo, tableAlias, true, true); - outRR.put(tableAlias, BUCKET_NUMBER_COL_NAME, ci); + outRS.getSignature().add(ci); } // Create ReduceSink operator - ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap( - OperatorFactory.getAndMakeChild(rsConf, new RowSchema(outRR.getColumnInfos()), fsParent), - outRR, parseCtx); + ReduceSinkOperator rsOp = (ReduceSinkOperator) OperatorFactory.getAndMakeChild( + rsConf, new RowSchema(outRS.getSignature()), fsParent); rsOp.setColumnExprMap(colExprMap); // Create ExtractDesc - ObjectPair exPair = copyRowResolver(outRR); - RowResolver exRR = exPair.getSecond(); + RowSchema exRR = new RowSchema(outRS); ExtractDesc exConf = new ExtractDesc(new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, Utilities.ReduceField.VALUE.toString(), "", false)); // Create Extract Operator - ExtractOperator exOp = (ExtractOperator) putOpInsertMap( - OperatorFactory.getAndMakeChild(exConf, new RowSchema(exRR.getColumnInfos()), rsOp), - exRR, parseCtx); + ExtractOperator exOp = (ExtractOperator) OperatorFactory.getAndMakeChild( + exConf, exRR, rsOp); // link EX to FS fsOp.getParentOperators().clear(); @@ -313,8 +303,6 @@ private boolean removeRSInsertedByEnforceBucketing(FileSinkOperator fsOp) { rsParent.getChildOperators().add(rsGrandChild); rsGrandChild.getParentOperators().clear(); rsGrandChild.getParentOperators().add(rsParent); - parseCtx.removeOpParseCtx(rsToRemove); - parseCtx.removeOpParseCtx(rsChild); LOG.info("Removed " + rsToRemove.getOperatorId() + " and " + rsChild.getOperatorId() + " as it was introduced by enforce bucketing/sorting."); } @@ -496,31 +484,6 @@ public ReduceSinkDesc getReduceSinkDesc(List partitionPositions, return cols; } - private Operator putOpInsertMap(Operator op, RowResolver rr, - ParseContext context) { - OpParseContext ctx = new OpParseContext(rr); - context.getOpParseCtx().put(op, ctx); - return op; - } - - private ObjectPair copyRowResolver(RowResolver inputRR) { - ObjectPair output = new ObjectPair(); - RowResolver outRR = new RowResolver(); - int pos = 0; - String tabAlias = null; - - for (ColumnInfo colInfo : inputRR.getColumnInfos()) { - String[] info = inputRR.reverseLookup(colInfo.getInternalName()); - tabAlias = info[0]; - outRR.put(info[0], info[1], new ColumnInfo(SemanticAnalyzer.getColumnInternalName(pos), - colInfo.getType(), info[0], colInfo.getIsVirtualCol(), colInfo.isHiddenVirtualCol())); - pos++; - } - output.setFirst(tabAlias); - output.setSecond(outRR); - return output; - } - } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkMapJoinProcessor.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkMapJoinProcessor.java index c69d492..036579a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkMapJoinProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkMapJoinProcessor.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.optimizer; -import java.util.LinkedHashMap; import java.util.List; import org.apache.hadoop.hive.conf.HiveConf; @@ -26,7 +25,6 @@ import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.parse.OpParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.JoinCondDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; @@ -49,10 +47,9 @@ */ @Override public MapJoinOperator convertMapJoin(HiveConf conf, - LinkedHashMap, OpParseContext> opParseCtxMap, - JoinOperator op, boolean leftSrc, String[] baseSrc, List mapAliases, - int bigTablePos, boolean noCheckOuterJoin, - boolean validateMapJoinTree) throws SemanticException { + JoinOperator op, boolean leftSrc, String[] baseSrc, List mapAliases, + int bigTablePos, boolean noCheckOuterJoin, + boolean validateMapJoinTree) throws SemanticException { // outer join cannot be performed on a table which is being cached JoinCondDesc[] condns = op.getConf().getConds(); @@ -64,7 +61,7 @@ public MapJoinOperator convertMapJoin(HiveConf conf, } // create the map-join operator - MapJoinOperator mapJoinOp = convertJoinOpMapJoinOp(conf, opParseCtxMap, + MapJoinOperator mapJoinOp = convertJoinOpMapJoinOp(conf, op, op.getConf().isLeftInputJoin(), op.getConf().getBaseSrc(), op.getConf().getMapAliases(), bigTablePos, noCheckOuterJoin); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java index 20655c1..c1f1519 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java @@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; @@ -43,6 +44,7 @@ import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.PTFOperator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -59,9 +61,7 @@ import org.apache.hadoop.hive.ql.optimizer.MapJoinProcessor; import org.apache.hadoop.hive.ql.optimizer.Transform; import org.apache.hadoop.hive.ql.optimizer.physical.CommonJoinTaskDispatcher; -import org.apache.hadoop.hive.ql.parse.OpParseContext; import org.apache.hadoop.hive.ql.parse.ParseContext; -import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -387,17 +387,16 @@ private boolean sameOrder(String order1, String order2) { List backtrackedPartitionCols = ExprNodeDescUtils.backtrack(childPartitionCols, child, current); - OpParseContext opCtx = pCtx.getOpParseCtx().get(current); - RowResolver rowResolver = opCtx.getRowResolver(); + RowSchema rowSchema = current.getSchema(); Set tableNeedToCheck = new HashSet(); for (ExprNodeDesc expr: childKeyCols) { if (!(expr instanceof ExprNodeColumnDesc)) { return correlatedReduceSinkOperators; } String colName = ((ExprNodeColumnDesc)expr).getColumn(); - String[] nm = rowResolver.reverseLookup(colName); - if (nm != null) { - tableNeedToCheck.add(nm[0]); + ColumnInfo columnInfo = rowSchema.getColumnInfo(colName); + if (columnInfo != null) { + tableNeedToCheck.add(columnInfo.getTabAlias()); } } if (current instanceof JoinOperator) { @@ -405,8 +404,7 @@ private boolean sameOrder(String order1, String order2) { int expectedNumCorrelatedRsops = current.getParentOperators().size(); LinkedHashSet correlatedRsops = null; for (Operator parent : current.getParentOperators()) { - Set tableNames = - pCtx.getOpParseCtx().get(parent).getRowResolver().getTableNames(); + Set tableNames = parent.getSchema().getTableNames(); for (String tbl : tableNames) { if (tableNeedToCheck.contains(tbl)) { correlatedRsops = findCorrelatedReduceSinkOperators(current, diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationUtilities.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationUtilities.java index dc906e8..630a9eb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationUtilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationUtilities.java @@ -44,9 +44,7 @@ import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.parse.OpParseContext; import org.apache.hadoop.hive.ql.parse.ParseContext; -import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.AggregationDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -347,7 +345,7 @@ protected static SelectOperator replaceReduceSinkWithSelectOperator(ReduceSinkOp protected static SelectOperator replaceOperatorWithSelect(Operator operator, ParseContext context, AbstractCorrelationProcCtx procCtx) throws SemanticException { - RowResolver inputRR = context.getOpParseCtx().get(operator).getRowResolver(); + RowSchema inputRS = operator.getSchema(); SelectDesc select = new SelectDesc(null, null); Operator parent = getSingleParent(operator); @@ -355,9 +353,8 @@ protected static SelectOperator replaceOperatorWithSelect(Operator operator, parent.getChildOperators().clear(); - SelectOperator sel = (SelectOperator) putOpInsertMap( - OperatorFactory.getAndMakeChild(select, new RowSchema(inputRR - .getColumnInfos()), parent), inputRR, context); + SelectOperator sel = (SelectOperator) OperatorFactory.getAndMakeChild( + select, new RowSchema(inputRS.getSignature()), parent); sel.setColumnExprMap(operator.getColumnExprMap()); @@ -393,8 +390,6 @@ protected static void removeReduceSinkForGroupBy(ReduceSinkOperator cRS, GroupBy } cGBYr.setColumnExprMap(cGBYm.getColumnExprMap()); cGBYr.setSchema(cGBYm.getSchema()); - RowResolver resolver = context.getOpParseCtx().get(cGBYm).getRowResolver(); - context.getOpParseCtx().get(cGBYr).setRowResolver(resolver); } else { // pRS-cRS-cGBYr (no map aggregation) --> pRS-cGBYr(COMPLETE) // revert expressions of cGBYr to that of cRS @@ -404,25 +399,23 @@ protected static void removeReduceSinkForGroupBy(ReduceSinkOperator cRS, GroupBy } Map oldMap = cGBYr.getColumnExprMap(); - RowResolver oldRR = context.getOpParseCtx().get(cGBYr).getRowResolver(); + RowSchema oldRS = cGBYr.getSchema(); Map newMap = new HashMap(); - RowResolver newRR = new RowResolver(); + ArrayList newRS = new ArrayList(); List outputCols = cGBYr.getConf().getOutputColumnNames(); for (int i = 0; i < outputCols.size(); i++) { String colName = outputCols.get(i); - String[] nm = oldRR.reverseLookup(colName); - ColumnInfo colInfo = oldRR.get(nm[0], nm[1]); - newRR.put(nm[0], nm[1], colInfo); + ColumnInfo colInfo = oldRS.getColumnInfo(colName); + newRS.add(colInfo); ExprNodeDesc colExpr = ExprNodeDescUtils.backtrack(oldMap.get(colName), cGBYr, cRS); if (colExpr != null) { newMap.put(colInfo.getInternalName(), colExpr); } } cGBYr.setColumnExprMap(newMap); - cGBYr.setSchema(new RowSchema(newRR.getColumnInfos())); - context.getOpParseCtx().get(cGBYr).setRowResolver(newRR); + cGBYr.setSchema(new RowSchema(newRS)); } cGBYr.getConf().setMode(GroupByDesc.Mode.COMPLETE); @@ -494,13 +487,5 @@ protected static void removeOperator(Operator target, Operator child, Oper } target.setChildOperators(null); target.setParentOperators(null); - context.getOpParseCtx().remove(target); - } - - protected static Operator putOpInsertMap(Operator op, RowResolver rr, - ParseContext context) { - OpParseContext ctx = new OpParseContext(rr); - context.getOpParseCtx().put(op, ctx); - return op; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/QueryPlanTreeTransformation.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/QueryPlanTreeTransformation.java index 080725b..e8ae2f7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/QueryPlanTreeTransformation.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/QueryPlanTreeTransformation.java @@ -254,7 +254,6 @@ protected static void applyCorrelation( for (ReduceSinkOperator rsop: handledRSs) { rsop.setChildOperators(null); rsop.setParentOperators(null); - pCtx.getOpParseCtx().remove(rsop); } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java index 06a9478..5afe21e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java @@ -42,7 +42,6 @@ import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.optimizer.IndexUtils; import org.apache.hadoop.hive.ql.optimizer.Transform; -import org.apache.hadoop.hive.ql.parse.OpParseContext; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.OperatorDesc; @@ -332,8 +331,6 @@ private void rewriteOriginalQuery() throws SemanticException { RewriteQueryUsingAggregateIndexCtx.getInstance(parseContext, hiveDb, canApplyCtx); rewriteQueryCtx.invokeRewriteQueryProc(); parseContext = rewriteQueryCtx.getParseContext(); - parseContext.setOpParseCtx((LinkedHashMap, - OpParseContext>) rewriteQueryCtx.getOpc()); } LOG.info("Finished Rewriting query"); } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java index 72f4588..69a5a44 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java @@ -21,7 +21,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -31,19 +30,17 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; +import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.RowSchema; -import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.SelectOperator; -import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.optimizer.ColumnPrunerProcFactory; -import org.apache.hadoop.hive.ql.parse.OpParseContext; import org.apache.hadoop.hive.ql.parse.ParseContext; -import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.AggregationDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; @@ -76,7 +73,6 @@ private RewriteQueryUsingAggregateIndexCtx(ParseContext parseContext, Hive hiveD this.indexTableName = canApplyCtx.getIndexTableName(); this.alias = canApplyCtx.getAlias(); this.aggregateFunction = canApplyCtx.getAggFunction(); - this.opc = parseContext.getOpParseCtx(); this.indexKey = canApplyCtx.getIndexKey(); } @@ -86,8 +82,6 @@ public static RewriteQueryUsingAggregateIndexCtx getInstance(ParseContext parseC parseContext, hiveDb, canApplyCtx); } - private Map, OpParseContext> opc = - new LinkedHashMap, OpParseContext>(); private final Hive hiveDb; private final ParseContext parseContext; private RewriteCanApplyCtx canApplyCtx; @@ -99,10 +93,6 @@ public static RewriteQueryUsingAggregateIndexCtx getInstance(ParseContext parseC private ExprNodeColumnDesc aggrExprNode = null; private String indexKey; - public Map, OpParseContext> getOpc() { - return opc; - } - public ParseContext getParseContext() { return parseContext; } @@ -172,15 +162,9 @@ private void replaceTableScanProcess(TableScanOperator scanOperator) throws Sema // and add new ones Map> topOps = rewriteQueryCtx.getParseContext() .getTopOps(); - Map, OpParseContext> opParseContext = rewriteQueryCtx - .getParseContext().getOpParseCtx(); - - // need this to set rowResolver for new scanOperator - OpParseContext operatorContext = opParseContext.get(scanOperator); // remove original TableScanOperator topOps.remove(alias); - opParseContext.remove(scanOperator); String indexTableName = rewriteQueryCtx.getIndexName(); Table indexTableHandle = null; @@ -201,23 +185,21 @@ private void replaceTableScanProcess(TableScanOperator scanOperator) throws Sema scanOperator.setConf(indexTableScanDesc); // Construct the new RowResolver for the new TableScanOperator - RowResolver rr = new RowResolver(); + ArrayList sigRS = new ArrayList(); try { StructObjectInspector rowObjectInspector = (StructObjectInspector) indexTableHandle .getDeserializer().getObjectInspector(); StructField field = rowObjectInspector.getStructFieldRef(rewriteQueryCtx.getIndexKey()); - rr.put(indexTableName, field.getFieldName(), new ColumnInfo(field.getFieldName(), - TypeInfoUtils.getTypeInfoFromObjectInspector(field.getFieldObjectInspector()), - indexTableName, false)); + sigRS.add(new ColumnInfo(field.getFieldName(), TypeInfoUtils.getTypeInfoFromObjectInspector( + field.getFieldObjectInspector()), indexTableName, false)); } catch (SerDeException e) { LOG.error("Error while creating the RowResolver for new TableScanOperator."); LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e)); throw new SemanticException(e.getMessage(), e); } + RowSchema rs = new RowSchema(sigRS); // Set row resolver for new table - operatorContext.setRowResolver(rr); - String newAlias = indexTableName; int index = alias.lastIndexOf(":"); if (index >= 0) { @@ -228,13 +210,10 @@ private void replaceTableScanProcess(TableScanOperator scanOperator) throws Sema scanOperator.getConf().setAlias(newAlias); scanOperator.setAlias(indexTableName); topOps.put(newAlias, scanOperator); - opParseContext.put(scanOperator, operatorContext); rewriteQueryCtx.getParseContext().setTopOps( (HashMap>) topOps); - rewriteQueryCtx.getParseContext().setOpParseCtx( - (LinkedHashMap, OpParseContext>) opParseContext); - ColumnPrunerProcFactory.setupNeededColumns(scanOperator, rr, + ColumnPrunerProcFactory.setupNeededColumns(scanOperator, rs, Arrays.asList(rewriteQueryCtx.getIndexKey())); } @@ -318,11 +297,6 @@ private void replaceGroupByOperatorProcess(GroupByOperator operator, int index) // Now the GroupByOperator has the new AggregationList; // sum(`_count_of_indexed_key`) // instead of count(indexed_key) - OpParseContext gbyOPC = rewriteQueryCtx.getOpc().get(operator); - RowResolver gbyRR = newDAGContext.getOpParseCtx().get(newGbyOperator).getRowResolver(); - gbyOPC.setRowResolver(gbyRR); - rewriteQueryCtx.getOpc().put(operator, gbyOPC); - oldConf.setAggregators((ArrayList) newAggrList); operator.setConf(oldConf); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcCtx.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcCtx.java index d3caaf0..2187484 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcCtx.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcCtx.java @@ -19,8 +19,8 @@ package org.apache.hadoop.hive.ql.optimizer.lineage; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; -import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.plan.OperatorDesc; /** @@ -71,7 +71,7 @@ public LineageCtx getLineageCtx() { return inpOp; } - public RowResolver getResolver() { - return lctx.getParseCtx().getOpParseCtx().get(inpOp).getRowResolver(); + public RowSchema getSchema() { + return inpOp.getSchema(); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java index fdbb93e..86d221d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.hooks.LineageInfo; import org.apache.hadoop.hive.ql.hooks.LineageInfo.BaseColumnInfo; @@ -42,7 +43,6 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; -import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; @@ -76,12 +76,11 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Operator operator = epc.getInputOperator(); assert (operator != null); - RowResolver resolver = epc.getResolver(); - String[] nm = resolver.reverseLookup(cd.getColumn()); - if (nm == null && operator instanceof ReduceSinkOperator) { - nm = resolver.reverseLookup(Utilities.removeValueTag(cd.getColumn())); + RowSchema schema = epc.getSchema(); + ColumnInfo ci = schema.getColumnInfo(cd.getColumn()); + if (ci == null && operator instanceof ReduceSinkOperator) { + ci = schema.getColumnInfo(Utilities.removeValueTag(cd.getColumn())); } - ColumnInfo ci = nm != null ? resolver.get(nm[0], nm[1]): null; // Insert the dependencies of inp_ci to that of the current operator, ci LineageCtx lc = epc.getLineageCtx(); @@ -143,6 +142,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Dependency dep = new Dependency(); dep.setType(LineageInfo.DependencyType.SIMPLE); dep.setBaseCols(new ArrayList()); + return dep; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java index d6a6ed6..adca50d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java @@ -54,7 +54,6 @@ import org.apache.hadoop.hive.ql.lib.Utils; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.parse.ParseContext; -import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.AggregationDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -468,17 +467,16 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, ExprProcFactory.getExprDependency(lCtx, inpOp, expr)); } } else { - RowResolver resolver = lCtx.getParseCtx().getOpParseCtx().get(rop).getRowResolver(); + RowSchema schema = rop.getSchema(); ReduceSinkDesc desc = rop.getConf(); List keyCols = desc.getKeyCols(); ArrayList keyColNames = desc.getOutputKeyColumnNames(); for (int i = 0; i < keyCols.size(); i++) { // order-bys, joins - String[] nm = resolver.reverseLookup(Utilities.ReduceField.KEY + "." + keyColNames.get(i)); - if (nm == null) { + ColumnInfo column = schema.getColumnInfo(Utilities.ReduceField.KEY + "." + keyColNames.get(i)); + if (column == null) { continue; // key in values } - ColumnInfo column = resolver.get(nm[0], nm[1]); lCtx.getIndex().putDependency(rop, column, ExprProcFactory.getExprDependency(lCtx, inpOp, keyCols.get(i))); } @@ -486,12 +484,11 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, ArrayList valColNames = desc.getOutputValueColumnNames(); for (int i = 0; i < valCols.size(); i++) { // todo: currently, bucketing,etc. makes RS differently with those for order-bys or joins - String[] nm = resolver.reverseLookup(valColNames.get(i)); - if (nm == null) { + ColumnInfo column = schema.getColumnInfo(valColNames.get(i)); + if (column == null) { // order-bys, joins - nm = resolver.reverseLookup(Utilities.ReduceField.VALUE + "." + valColNames.get(i)); + column = schema.getColumnInfo(Utilities.ReduceField.VALUE + "." + valColNames.get(i)); } - ColumnInfo column = resolver.get(nm[0], nm[1]); lCtx.getIndex().putDependency(rop, column, ExprProcFactory.getExprDependency(lCtx, inpOp, valCols.get(i))); } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java index 19503dc..f500a5e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java @@ -455,7 +455,6 @@ public static boolean cannotConvert(long aliasKnownSize, } } - currWork.setOpParseCtxMap(parseCtx.getOpParseCtx()); currWork.setLeftInputJoin(joinOp.getConf().isLeftInputJoin()); currWork.setBaseSrc(joinOp.getConf().getBaseSrc()); currWork.setMapAliases(joinOp.getConf().getMapAliases()); @@ -521,7 +520,6 @@ public static boolean cannotConvert(long aliasKnownSize, listWorks.add(currTask.getWork()); listTasks.add(currTask); // clear JoinTree and OP Parse Context - currWork.setOpParseCtxMap(null); currWork.setLeftInputJoin(false); currWork.setBaseSrc(null); currWork.setMapAliases(null); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java index a135cf5..3b09c2f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java @@ -44,7 +44,6 @@ import org.apache.hadoop.hive.ql.lib.Dispatcher; import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; import org.apache.hadoop.hive.ql.optimizer.MapJoinProcessor; -import org.apache.hadoop.hive.ql.parse.OpParseContext; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin; @@ -151,10 +150,6 @@ private MapredWork convertSMBWorkToJoinWork(MapredWork currWork, SMBMapJoinOpera MapredWork currJoinWork = Utilities.clonePlan(currWork); SMBMapJoinOperator newSMBJoinOp = getSMBMapJoinOp(currJoinWork); - // Add the row resolver for the new operator - Map, OpParseContext> opParseContextMap = - physicalContext.getParseContext().getOpParseCtx(); - opParseContextMap.put(newSMBJoinOp, opParseContextMap.get(oldSMBJoinOp)); // change the newly created map-red plan as if it was a join operator genSMBJoinWork(currJoinWork.getMapWork(), newSMBJoinOp); return currJoinWork; @@ -253,11 +248,9 @@ private boolean isEligibleForOptimization(SMBMapJoinOperator originalSMBJoinOp) MapredWork currJoinWork = convertSMBWorkToJoinWork(currWork, originalSMBJoinOp); SMBMapJoinOperator newSMBJoinOp = getSMBMapJoinOp(currJoinWork); - currWork.getMapWork().setOpParseCtxMap(parseCtx.getOpParseCtx()); currWork.getMapWork().setLeftInputJoin(originalSMBJoinOp.getConf().isLeftInputJoin()); currWork.getMapWork().setBaseSrc(originalSMBJoinOp.getConf().getBaseSrc()); currWork.getMapWork().setMapAliases(originalSMBJoinOp.getConf().getMapAliases()); - currJoinWork.getMapWork().setOpParseCtxMap(parseCtx.getOpParseCtx()); currJoinWork.getMapWork().setLeftInputJoin(originalSMBJoinOp.getConf().isLeftInputJoin()); currJoinWork.getMapWork().setBaseSrc(originalSMBJoinOp.getConf().getBaseSrc()); currJoinWork.getMapWork().setMapAliases(originalSMBJoinOp.getConf().getMapAliases()); @@ -334,7 +327,6 @@ private boolean isEligibleForOptimization(SMBMapJoinOperator originalSMBJoinOp) listWorks.add(currTask.getWork()); listTasks.add(currTask); // clear JoinTree and OP Parse Context - currWork.getMapWork().setOpParseCtxMap(null); currWork.getMapWork().setLeftInputJoin(false); currWork.getMapWork().setBaseSrc(null); currWork.getMapWork().setMapAliases(null); @@ -432,13 +424,8 @@ private MapJoinOperator getMapJoinOperator(MapRedTask task, int mapJoinPos) throws SemanticException { SMBMapJoinOperator newSMBJoinOp = getSMBMapJoinOp(task.getWork()); - // Add the row resolver for the new operator - Map, OpParseContext> opParseContextMap = - physicalContext.getParseContext().getOpParseCtx(); - opParseContextMap.put(newSMBJoinOp, opParseContextMap.get(oldSMBJoinOp)); - // generate the map join operator - return MapJoinProcessor.convertSMBJoinToMapJoin(physicalContext.getConf(), - opParseContextMap, newSMBJoinOp, mapJoinPos, true); + return MapJoinProcessor.convertSMBJoinToMapJoin( + physicalContext.getConf(), newSMBJoinOp, mapJoinPos, true); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java index 9ff47c7..39d1f18 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java @@ -372,11 +372,10 @@ public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeSparkProc } //can safely convert the join to a map join. - ParseContext parseContext = context.getParseContext(); MapJoinOperator mapJoinOp = - MapJoinProcessor.convertJoinOpMapJoinOp(context.getConf(), parseContext.getOpParseCtx(), joinOp, - joinOp.getConf().isLeftInputJoin(), joinOp.getConf().getBaseSrc(), joinOp.getConf().getMapAliases(), - bigTablePosition, true); + MapJoinProcessor.convertJoinOpMapJoinOp(context.getConf(), joinOp, + joinOp.getConf().isLeftInputJoin(), joinOp.getConf().getBaseSrc(), + joinOp.getConf().getMapAliases(), bigTablePosition, true); Operator parentBigTableOp = mapJoinOp.getParentOperators().get(bigTablePosition); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java index 6a87929..e67d98b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java @@ -18,6 +18,13 @@ package org.apache.hadoop.hive.ql.parse; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Stack; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; @@ -28,6 +35,7 @@ import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; @@ -43,13 +51,6 @@ import org.apache.hadoop.hive.ql.plan.TezWork.VertexType; import org.apache.hadoop.hive.ql.plan.UnionWork; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Stack; - /** * GenTezWork separates the operator tree into tez tasks. * It is called once per leaf operator (operator that forces @@ -245,7 +246,9 @@ public Object process(Node nd, Stack stack, LOG.debug("Cloning reduce sink for multi-child broadcast edge"); // we've already set this one up. Need to clone for the next work. r = (ReduceSinkOperator) OperatorFactory.getAndMakeChild( - (ReduceSinkDesc)r.getConf().clone(), r.getParentOperators()); + (ReduceSinkDesc)r.getConf().clone(), + new RowSchema(r.getSchema()), + r.getParentOperators()); context.clonedReduceSinks.add(r); } r.getConf().setOutputName(work.getName()); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java index b838bff..dda4f75 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java @@ -21,7 +21,6 @@ import java.io.Serializable; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -70,7 +69,6 @@ private HashMap opToSamplePruner; private Map> opToPartToSkewedPruner; private HashMap> topOps; - private LinkedHashMap, OpParseContext> opParseCtx; private Set joinOps; private Set mapJoinOps; private Set smbMapJoinOps; @@ -152,7 +150,6 @@ public ParseContext( HashMap opToPartPruner, HashMap opToPartList, HashMap> topOps, - LinkedHashMap, OpParseContext> opParseCtx, Set joinOps, Set smbMapJoinOps, List loadTableWork, List loadFileWork, @@ -177,7 +174,6 @@ public ParseContext( this.smbMapJoinOps = smbMapJoinOps; this.loadFileWork = loadFileWork; this.loadTableWork = loadTableWork; - this.opParseCtx = opParseCtx; this.topOps = topOps; this.ctx = ctx; this.idToTableNameMap = idToTableNameMap; @@ -303,43 +299,6 @@ public void setTopOps(HashMap> topOps) this.topOps = topOps; } - /** - * @return the opParseCtx - */ - public LinkedHashMap, OpParseContext> getOpParseCtx() { - return opParseCtx; - } - - /** - * Remove the OpParseContext of a specific operator op - * @param op - * @return - */ - public OpParseContext removeOpParseCtx(Operator op) { - return opParseCtx.remove(op); - } - - /** - * Update the OpParseContext of operator op to newOpParseContext. - * If op is not in opParseCtx, a new entry will be added into opParseCtx. - * The key is op, and the value is newOpParseContext. - * @param op - * @param newOpParseContext - */ - public void updateOpParseCtx(Operator op, - OpParseContext newOpParseContext) { - opParseCtx.put(op, newOpParseContext); - } - - /** - * @param opParseCtx - * the opParseCtx to set - */ - public void setOpParseCtx( - LinkedHashMap, OpParseContext> opParseCtx) { - this.opParseCtx = opParseCtx; - } - public HashMap getNameToSplitSample() { return nameToSplitSample; } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java index 469dc9f..56422a3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java @@ -54,7 +54,6 @@ // TODO: Refactor this and do in a more object oriented manner private boolean isExprResolver; - @SuppressWarnings("unused") private static final Log LOG = LogFactory.getLog(RowResolver.class.getName()); public RowResolver() { @@ -97,6 +96,14 @@ public ASTNode getExpressionSource(ASTNode node) { public void put(String tab_alias, String col_alias, ColumnInfo colInfo) { if (!addMappingOnly(tab_alias, col_alias, colInfo)) { + //Make sure that the table alias and column alias are stored + //in the column info + if (tab_alias != null && colInfo.getTabAlias() == null) { + colInfo.setTabAlias(tab_alias.toLowerCase()); + } + if (col_alias != null && colInfo.getAlias() == null) { + colInfo.setAlias(col_alias.toLowerCase()); + } rowSchema.getSignature().add(colInfo); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 4364f28..651a209 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -381,7 +381,6 @@ public void initParseCtx(ParseContext pctx) { opToPartList = pctx.getOpToPartList(); opToSamplePruner = pctx.getOpToSamplePruner(); topOps = pctx.getTopOps(); - opParseCtx = pctx.getOpParseCtx(); loadTableWork = pctx.getLoadTableWork(); loadFileWork = pctx.getLoadFileWork(); ctx = pctx.getContext(); @@ -397,8 +396,7 @@ public void initParseCtx(ParseContext pctx) { } public ParseContext getParseContext() { - return new ParseContext(conf, qb, ast, opToPartPruner, opToPartList, - topOps, opParseCtx, + return new ParseContext(conf, qb, ast, opToPartPruner, opToPartList, topOps, new HashSet(joinContext.keySet()), new HashSet(smbMapJoinContext.keySet()), loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx, @@ -7141,7 +7139,7 @@ private Operator genReduceSinkPlan(Operator input, ColumnInfo newColInfo = new ColumnInfo(colInfo); newColInfo.setInternalName(Utilities.ReduceField.KEY + ".reducesinkkey" + kindex); newColInfo.setTabAlias(nm[0]); - rsRR.addMappingOnly(nm[0], nm[1], newColInfo); + rsRR.put(nm[0], nm[1], newColInfo); if (nm2 != null) { rsRR.addMappingOnly(nm2[0], nm2[1], newColInfo); } @@ -7392,7 +7390,7 @@ private Operator genJoinReduceSinkChild(QB qb, ExprNodeDesc[] joinKeys, ColumnInfo newColInfo = new ColumnInfo(colInfo); newColInfo.setInternalName(Utilities.ReduceField.KEY + ".reducesinkkey" + kindex); newColInfo.setTabAlias(nm[0]); - outputRR.addMappingOnly(nm[0], nm[1], newColInfo); + outputRR.put(nm[0], nm[1], newColInfo); if (nm2 != null) { outputRR.addMappingOnly(nm2[0], nm2[1], newColInfo); } @@ -10164,7 +10162,7 @@ void analyzeInternal(ASTNode ast, PlannerContext plannerCtx) throws SemanticExce // 4. Generate Parse Context for Optimizer & Physical compiler ParseContext pCtx = new ParseContext(conf, qb, plannerCtx.child, - opToPartPruner, opToPartList, topOps, opParseCtx, + opToPartPruner, opToPartList, topOps, new HashSet(joinContext.keySet()), new HashSet(smbMapJoinContext.keySet()), loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx, diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index f2eb4d2..0116c85 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -387,7 +387,7 @@ public ParseContext getParseContext(ParseContext pCtx, List, OpParseContext> opParseCtxMap; private boolean leftInputJoin; private String[] baseSrc; private List mapAliases; @@ -457,16 +454,6 @@ public String getIndexIntermediateFile() { return new ArrayList(aliasToPartnInfo.values()); } - public - LinkedHashMap, OpParseContext> getOpParseCtxMap() { - return opParseCtxMap; - } - - public void setOpParseCtxMap( - LinkedHashMap, OpParseContext> opParseCtxMap) { - this.opParseCtxMap = opParseCtxMap; - } - public Path getTmpHDFSPath() { return tmpHDFSPath; } diff --git ql/src/java/org/apache/hadoop/hive/ql/ppd/ExprWalkerInfo.java ql/src/java/org/apache/hadoop/hive/ql/ppd/ExprWalkerInfo.java index 9bed527..33ad3e8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/ppd/ExprWalkerInfo.java +++ ql/src/java/org/apache/hadoop/hive/ql/ppd/ExprWalkerInfo.java @@ -27,7 +27,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; -import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; @@ -62,7 +61,6 @@ public ExprInfo(boolean isCandidate, String alias, ExprNodeDesc replacedNode) { protected static final Log LOG = LogFactory.getLog(OpProcFactory.class .getName());; private Operator op = null; - private RowResolver toRR = null; /** * Values the expression sub-trees (predicates) that can be pushed down for @@ -104,10 +102,8 @@ public ExprWalkerInfo() { newToOldExprMap = new HashMap(); } - public ExprWalkerInfo(Operator op, - final RowResolver toRR) { + public ExprWalkerInfo(Operator op) { this.op = op; - this.toRR = toRR; pushdownPreds = new HashMap>(); exprInfoMap = new HashMap(); @@ -123,13 +119,6 @@ public ExprWalkerInfo(Operator op, } /** - * @return the row resolver of the operator of this expression. - */ - public RowResolver getToRR() { - return toRR; - } - - /** * @return the new expression to old expression map */ public Map getNewToOldExprMap() { diff --git ql/src/java/org/apache/hadoop/hive/ql/ppd/ExprWalkerProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/ppd/ExprWalkerProcFactory.java index 8a8b0d5..3a07b17 100644 --- ql/src/java/org/apache/hadoop/hive/ql/ppd/ExprWalkerProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/ppd/ExprWalkerProcFactory.java @@ -26,8 +26,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; @@ -37,7 +39,6 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; -import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -69,9 +70,13 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { ExprWalkerInfo ctx = (ExprWalkerInfo) procCtx; ExprNodeColumnDesc colref = (ExprNodeColumnDesc) nd; - RowResolver toRR = ctx.getToRR(); + RowSchema toRS = ctx.getOp().getSchema(); Operator op = ctx.getOp(); - String[] colAlias = toRR.reverseLookup(colref.getColumn()); + ColumnInfo ci = toRS.getColumnInfo(colref.getColumn()); + String tabAlias = null; + if (ci != null) { + tabAlias = ci.getTabAlias(); + } boolean isCandidate = true; if (op.getColumnExprMap() != null) { @@ -87,19 +92,19 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, if (exp instanceof ExprNodeGenericFuncDesc) { isCandidate = false; } - if (exp instanceof ExprNodeColumnDesc && colAlias == null) { + if (exp instanceof ExprNodeColumnDesc && ci == null) { ExprNodeColumnDesc column = (ExprNodeColumnDesc)exp; - colAlias = new String[]{column.getTabAlias(), column.getColumn()}; + tabAlias = column.getTabAlias(); } } ctx.addConvertedNode(colref, exp); ctx.setIsCandidate(exp, isCandidate); - ctx.addAlias(exp, colAlias[0]); + ctx.addAlias(exp, tabAlias); } else { - if (colAlias == null) { + if (ci == null) { return false; } - ctx.addAlias(colref, colAlias[0]); + ctx.addAlias(colref, tabAlias); } ctx.setIsCandidate(colref, isCandidate); return isCandidate; @@ -256,8 +261,7 @@ public static ExprWalkerInfo extractPushdownPreds(OpWalkerInfo opContext, Operator op, List preds) throws SemanticException { // Create the walker, the rules dispatcher and the context. - ExprWalkerInfo exprContext = new ExprWalkerInfo(op, opContext - .getRowResolver(op)); + ExprWalkerInfo exprContext = new ExprWalkerInfo(op); // create a walker which walks the tree in a DFS manner while maintaining // the operator stack. The dispatcher diff --git ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java index 0a7a757..6f9df53 100644 --- ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java @@ -47,8 +47,6 @@ import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler; import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.parse.OpParseContext; -import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.WindowingSpec.Direction; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; @@ -459,7 +457,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, LOG.info("Processing for " + nd.getName() + "(" + ((Operator) nd).getIdentifier() + ")"); OpWalkerInfo owi = (OpWalkerInfo) procCtx; - Set aliases = getAliases(nd, owi); + Set aliases = getAliases(nd); // we pass null for aliases here because mergeWithChildrenPred filters // aliases in the children node context and we need to filter them in // the current JoinOperator's context @@ -495,8 +493,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, return null; } - protected Set getAliases(Node nd, OpWalkerInfo owi) throws SemanticException { - return owi.getRowResolver(nd).getTableNames(); + protected Set getAliases(Node nd) throws SemanticException { + return ((Operator)nd).getSchema().getTableNames(); } protected Object handlePredicates(Node nd, ExprWalkerInfo prunePreds, OpWalkerInfo owi) @@ -512,8 +510,8 @@ protected Object handlePredicates(Node nd, ExprWalkerInfo prunePreds, OpWalkerIn public static class JoinPPD extends JoinerPPD { @Override - protected Set getAliases(Node nd, OpWalkerInfo owi) { - return getQualifiedAliases((JoinOperator) nd, owi.getRowResolver(nd)); + protected Set getAliases(Node nd) { + return getQualifiedAliases((JoinOperator) nd, ((JoinOperator)nd).getSchema()); } /** @@ -540,7 +538,7 @@ protected Object handlePredicates(Node nd, ExprWalkerInfo prunePreds, OpWalkerIn * Row resolver * @return set of qualified aliases */ - private Set getQualifiedAliases(JoinOperator op, RowResolver rr) { + private Set getQualifiedAliases(JoinOperator op, RowSchema rs) { Set aliases = new HashSet(); JoinCondDesc[] conds = op.getConf().getConds(); Map> posToAliasMap = op.getPosToAliasMap(); @@ -560,7 +558,7 @@ protected Object handlePredicates(Node nd, ExprWalkerInfo prunePreds, OpWalkerIn if(i == -1){ aliases.addAll(posToAliasMap.get(0)); } - Set aliases2 = rr.getTableNames(); + Set aliases2 = rs.getTableNames(); aliases.retainAll(aliases2); return aliases; } @@ -691,7 +689,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, String[] aliases = ((ReduceSinkOperator)operator).getInputAliases(); return new HashSet(Arrays.asList(aliases)); } - Set includes = owi.getRowResolver(operator).getTableNames(); + Set includes = operator.getSchema().getTableNames(); if (includes.size() == 1 && includes.contains("")) { // Reduce sink of group by operator return null; @@ -794,7 +792,7 @@ protected static Object createFilter(Operator op, protected static Object createFilter(Operator op, Map> predicates, OpWalkerInfo owi) { - RowResolver inputRR = owi.getRowResolver(op); + RowSchema inputRS = op.getSchema(); // combine all predicates into a single expression List preds = new ArrayList(); @@ -834,7 +832,7 @@ protected static Object createFilter(Operator op, .getChildOperators(); op.setChildOperators(null); Operator output = OperatorFactory.getAndMakeChild( - new FilterDesc(condn, false), new RowSchema(inputRR.getColumnInfos()), + new FilterDesc(condn, false), new RowSchema(inputRS.getSignature()), op); output.setChildOperators(originalChilren); for (Operator ch : originalChilren) { @@ -845,8 +843,6 @@ protected static Object createFilter(Operator op, parentOperators.remove(pos); parentOperators.add(pos, output); // add the new op as the old } - OpParseContext ctx = new OpParseContext(inputRR); - owi.put(output, ctx); if (HiveConf.getBoolVar(owi.getParseContext().getConf(), HiveConf.ConfVars.HIVEPPDREMOVEDUPLICATEFILTERS)) { diff --git ql/src/java/org/apache/hadoop/hive/ql/ppd/OpWalkerInfo.java ql/src/java/org/apache/hadoop/hive/ql/ppd/OpWalkerInfo.java index 2b4e4ac..33f380e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/ppd/OpWalkerInfo.java +++ ql/src/java/org/apache/hadoop/hive/ql/ppd/OpWalkerInfo.java @@ -20,15 +20,11 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; -import java.util.Map; import org.apache.hadoop.hive.ql.exec.FilterOperator; import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; -import org.apache.hadoop.hive.ql.parse.OpParseContext; import org.apache.hadoop.hive.ql.parse.ParseContext; -import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.plan.OperatorDesc; /** @@ -41,13 +37,11 @@ */ private final HashMap, ExprWalkerInfo> opToPushdownPredMap; - private final Map, OpParseContext> opToParseCtxMap; private final ParseContext pGraphContext; private final List candidateFilterOps; public OpWalkerInfo(ParseContext pGraphContext) { this.pGraphContext = pGraphContext; - opToParseCtxMap = pGraphContext.getOpParseCtx(); opToPushdownPredMap = new HashMap, ExprWalkerInfo>(); candidateFilterOps = new ArrayList(); } @@ -61,15 +55,6 @@ public ExprWalkerInfo putPrunedPreds(Operator op, return opToPushdownPredMap.put(op, value); } - public RowResolver getRowResolver(Node op) { - return opToParseCtxMap.get(op).getRowResolver(); - } - - public OpParseContext put(Operator key, - OpParseContext value) { - return opToParseCtxMap.put(key, value); - } - public ParseContext getParseContext() { return pGraphContext; } diff --git ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicateTransitivePropagate.java ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicateTransitivePropagate.java index 00952b7..9bfb517 100644 --- ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicateTransitivePropagate.java +++ ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicateTransitivePropagate.java @@ -44,9 +44,7 @@ import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; import org.apache.hadoop.hive.ql.optimizer.Transform; -import org.apache.hadoop.hive.ql.parse.OpParseContext; import org.apache.hadoop.hive.ql.parse.ParseContext; -import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils; @@ -97,20 +95,19 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { ((FilterOperator)parent).getConf().setPredicate(merged); } else { ExprNodeDesc merged = ExprNodeDescUtils.mergePredicates(exprs); - RowResolver parentRR = pGraphContext.getOpParseCtx().get(parent).getRowResolver(); - Operator newFilter = createFilter(reducer, parent, parentRR, merged); - pGraphContext.getOpParseCtx().put(newFilter, new OpParseContext(parentRR)); + RowSchema parentRS = parent.getSchema(); + Operator newFilter = createFilter(reducer, parent, parentRS, merged); } } return pGraphContext; } - // insert filter operator between target(chilld) and input(parent) + // insert filter operator between target(child) and input(parent) private Operator createFilter(Operator target, Operator parent, - RowResolver parentRR, ExprNodeDesc filterExpr) { + RowSchema parentRS, ExprNodeDesc filterExpr) { Operator filter = OperatorFactory.get(new FilterDesc(filterExpr, false), - new RowSchema(parentRR.getColumnInfos())); + new RowSchema(parentRS.getSignature())); filter.setParentOperators(new ArrayList>()); filter.setChildOperators(new ArrayList>()); filter.getParentOperators().add(parent); diff --git ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java index fe59c13..5ccf0a2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java +++ ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java @@ -47,9 +47,7 @@ import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; import org.apache.hadoop.hive.ql.optimizer.Transform; -import org.apache.hadoop.hive.ql.parse.OpParseContext; import org.apache.hadoop.hive.ql.parse.ParseContext; -import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicListDesc; @@ -96,9 +94,9 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { // insert filter operator between target(child) and input(parent) private static Operator createFilter(Operator target, Operator parent, - RowResolver parentRR, ExprNodeDesc filterExpr) { + RowSchema parentRS, ExprNodeDesc filterExpr) { Operator filter = OperatorFactory.get(new FilterDesc(filterExpr, false), - new RowSchema(parentRR.getColumnInfos())); + new RowSchema(parentRS.getSignature())); filter.setParentOperators(new ArrayList>()); filter.setChildOperators(new ArrayList>()); filter.getParentOperators().add(parent); @@ -139,7 +137,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, int[][] targets = getTargets(join); Operator parent = source.getParentOperators().get(0); - RowResolver parentRR = pCtx.getOpParseCtx().get(parent).getRowResolver(); + RowSchema parentRS = parent.getSchema(); // don't generate for null-safes. if (join.getConf().getNullSafes() != null) { @@ -194,8 +192,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } } - Operator newFilter = createFilter(source, parent, parentRR, syntheticExpr); - pCtx.getOpParseCtx().put(newFilter, new OpParseContext(parentRR)); + Operator newFilter = createFilter(source, parent, parentRS, syntheticExpr); parent = newFilter; }