diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 9040d9b..912aef1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -1435,6 +1435,7 @@ public int execute() throws CommandNeedRetryException { if (SessionState.get() != null) { try { + SessionState.get().getLineageState().clear(); SessionState.get().getHiveHistory().logPlanProgress(plan); } catch (Exception e) { // ignore diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java index acaca23..feb8558 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java @@ -198,6 +198,11 @@ private boolean checkEquals(Object obj1, Object obj2) { } @Override + public int hashCode() { + return internalName.hashCode() + typeName.hashCode(); + } + + @Override public boolean equals(Object obj) { if (!(obj instanceof ColumnInfo) || (obj == null)) { return false; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java index fc5864a..d5de58e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java @@ -383,7 +383,7 @@ public void setChildren(Configuration hconf) throws HiveException { if (!onepath.toUri().relativize(fpath.toUri()).equals(fpath.toUri())) { children.add(op); childrenOpToOpCtxMap.put(op, opCtx); - LOG.info("dump " + op.getName() + " " + LOG.info("dump " + op + " " + opCtxMap.get(inp).rowObjectInspector.getTypeName()); } current = opCtx; // just need for TestOperators.testMapOperator diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 22374b2..db94271 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -32,7 +32,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -98,6 +97,11 @@ private boolean useBucketizedHiveInputFormat; + // dummy operator (for not increasing seqId) + private Operator(String name) { + id = name; + } + public Operator() { id = String.valueOf(seqId.getAndIncrement()); childOperators = new ArrayList>(); @@ -334,7 +338,7 @@ public void initialize(Configuration hconf, ObjectInspector[] inputOIs) return; } - LOG.info("Initializing Self " + id + " " + getName()); + LOG.info("Initializing Self " + this); if (inputOIs != null) { inputObjInspectors = inputOIs; @@ -1288,4 +1292,14 @@ public void setGroupKeyObjectInspector(ObjectInspector keyObjectInspector) { public ObjectInspector getGroupKeyObjectInspector() { return groupKeyOI; } + + public static Operator createDummy() { + return new DummyOperator(); + } + + private static class DummyOperator extends Operator { + public DummyOperator() { super("dummy"); } + public void processOp(Object row, int tag) { } + public OperatorType getType() { return null; } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index 6368548..02cdb6e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -57,7 +57,7 @@ implements Serializable, TopNHash.BinaryCollector { static { - PTFUtils.makeTransient(ReduceSinkOperator.class, "inputAliases"); + PTFUtils.makeTransient(ReduceSinkOperator.class, "inputAliases", "valueIndex"); } private static final long serialVersionUID = 1L; @@ -96,6 +96,8 @@ transient String[] inputAliases; // input aliases of this RS for join (used for PPD) private boolean skipTag = false; + private transient int[] valueIndex; // index for value(+ from keys, - from values) + public void setInputAliases(String[] inputAliases) { this.inputAliases = inputAliases; } @@ -470,4 +472,12 @@ public boolean opAllowedBeforeMapJoin() { public void setSkipTag(boolean value) { this.skipTag = value; } + + public void setValueIndex(int[] valueIndex) { + this.valueIndex = valueIndex; + } + + public int[] getValueIndex() { + return valueIndex; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java index 083d574..71cc7eb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java @@ -27,7 +27,7 @@ public class RowSchema implements Serializable { private static final long serialVersionUID = 1L; - private ArrayList signature; + private ArrayList signature = new ArrayList(); public RowSchema() { } @@ -52,13 +52,11 @@ public void setSignature(ArrayList signature) { public String toString() { StringBuilder sb = new StringBuilder(); sb.append('('); - if (signature != null) { - for (ColumnInfo col: signature) { - if (sb.length() > 1) { - sb.append(','); - } - sb.append(col.toString()); + for (ColumnInfo col: signature) { + if (sb.length() > 1) { + sb.append(','); } + sb.append(col.toString()); } sb.append(')'); return sb.toString(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 7250432..e2b0367 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -220,6 +220,13 @@ } } + public static String removeValueTag(String column) { + if (column.startsWith(ReduceField.VALUE + ".")) { + return column.substring(6); + } + return column; + } + private Utilities() { // prevent instantiation } diff --git ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java index 22a8785..c0756a8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java @@ -112,6 +112,12 @@ public Table getTable() { public Partition getPartition() { return this.part; } + + public String toString() { + return isPartition() ? + part.getDbName() + "." + part.getTableName() + part.getValues() : + tab.getDbName() + "." + tab.getTableName(); + } } /** @@ -190,6 +196,10 @@ public boolean equals(Object obj) { } return true; } + + public String toString() { + return dc + ":" + fld; + } } /** @@ -242,6 +252,10 @@ public FieldSchema getColumn() { public void setColumn(FieldSchema column) { this.column = column; } + + public String toString() { + return tabAlias + ":" + column; + } } public static class TableAliasInfo implements Serializable { @@ -288,6 +302,10 @@ public Table getTable() { public void setTable(Table table) { this.table = table; } + + public String toString() { + return table.getDbName() + "." + table.getTableName() + "(" + alias + ")"; + } } /** @@ -356,6 +374,10 @@ public void setExpr(String expr) { public void setBaseCols(List baseCols) { this.baseCols = baseCols; } + + public String toString() { + return "[" + type + "]" + baseCols; + } } /** @@ -401,4 +423,8 @@ public void putDependency(DataContainer dc, FieldSchema col, Dependency dep) { public Set> entrySet() { return index.entrySet(); } + + public void clear() { + index.clear(); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java index 6a4dc9b..dab0b75 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java @@ -372,6 +372,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException { ReduceSinkOperator op = (ReduceSinkOperator) nd; ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx; + RowResolver resolver = cppCtx.getOpToParseCtxMap().get(op).getRowResolver(); ReduceSinkDesc conf = op.getConf(); List colLists = new ArrayList(); @@ -392,35 +393,20 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, childCols = cppCtx.getPrunedColList(child); } + List valCols = conf.getValueCols(); + List valColNames = conf.getOutputValueColumnNames(); + if (childCols != null) { - /* - * in the case of count(or sum) distinct if we are not able to map - * a parameter column references back to the ReduceSink value columns - * we give up and assume all columns are needed. - */ - boolean hasUnresolvedReference = false; - boolean[] flags = new boolean[conf.getValueCols().size()]; + boolean[] flags = new boolean[valCols.size()]; Map exprMap = op.getColumnExprMap(); + for (String childCol : childCols) { - ExprNodeDesc desc = exprMap.get(childCol); - int index = conf.getValueCols().indexOf(desc); + int index = valColNames.indexOf(Utilities.removeValueTag(childCol)); if (index < 0) { - hasUnresolvedReference = desc == null || ExprNodeDescUtils.indexOf(desc, conf.getKeyCols()) < 0; - if ( hasUnresolvedReference ) { - break; - } continue; } flags[index] = true; - colLists = Utilities.mergeUniqElems(colLists, desc.getCols()); - } - - if ( hasUnresolvedReference ) { - for (ExprNodeDesc val : conf.getValueCols()) { - colLists = Utilities.mergeUniqElems(colLists, val.getCols()); - } - cppCtx.getPrunedColLists().put(op, colLists); - return null; + colLists = Utilities.mergeUniqElems(colLists, valCols.get(index).getCols()); } Collections.sort(colLists); @@ -431,8 +417,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, // Reduce Sink contains the columns needed - no need to aggregate from // children - ArrayList vals = conf.getValueCols(); - for (ExprNodeDesc val : vals) { + for (ExprNodeDesc val : valCols) { colLists = Utilities.mergeUniqElems(colLists, val.getCols()); } @@ -594,10 +579,6 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, // do we need to prune the select operator? List originalColList = op.getConf().getColList(); - List columns = new ArrayList(); - for (ExprNodeDesc expr : originalColList) { - Utilities.mergeUniqElems(columns, expr.getCols()); - } // by now, 'prunedCols' are columns used by child operators, and 'columns' // are columns used by this select operator. List originalOutputColumnNames = conf.getOutputColumnNames(); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java index e3e0acc..0e7d247 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java @@ -460,7 +460,8 @@ public static MapJoinOperator convertJoinOpMapJoinOp(HiveConf hconf, ExprNodeDesc expr = colExprMap.get(column.getInternalName()); int index = ExprNodeDescUtils.indexOf(expr, values); if (index >= 0) { - colExprMap.put(column.getInternalName(), newValues.get(index)); + ExprNodeDesc backtrack = ExprNodeDescUtils.backtrack(newValues.get(index), op, terminal); + colExprMap.put(column.getInternalName(), backtrack); schema.set(i, null); } } @@ -505,7 +506,7 @@ public static MapJoinOperator convertJoinOpMapJoinOp(HiveConf hconf, byte srcTag = entry.getKey(); List filter = entry.getValue(); - Operator terminal = op.getParentOperators().get(srcTag); + Operator terminal = oldReduceSinkParentOps.get(srcTag); newFilters.put(srcTag, ExprNodeDescUtils.backtrack(filter, op, terminal)); } desc.setFilters(filters = newFilters); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java index 86e4834..c52f753 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java @@ -36,7 +36,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; @@ -62,6 +61,7 @@ import org.apache.hadoop.hive.ql.optimizer.physical.CommonJoinTaskDispatcher; import org.apache.hadoop.hive.ql.parse.OpParseContext; import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -341,7 +341,6 @@ private boolean sameOrder(String order1, String order2) { IntraQueryCorrelation correlation) throws SemanticException { LOG.info("now detecting operator " + current.getIdentifier() + " " + current.getName()); - LinkedHashSet correlatedReduceSinkOperators = new LinkedHashSet(); if (skipedJoinOperators.contains(current)) { @@ -387,18 +386,18 @@ private boolean sameOrder(String order1, String order2) { ExprNodeDescUtils.backtrack(childKeyCols, child, current); List backtrackedPartitionCols = ExprNodeDescUtils.backtrack(childPartitionCols, child, current); + + OpParseContext opCtx = pCtx.getOpParseCtx().get(current); + RowResolver rowResolver = opCtx.getRowResolver(); Set tableNeedToCheck = new HashSet(); for (ExprNodeDesc expr: childKeyCols) { if (!(expr instanceof ExprNodeColumnDesc)) { return correlatedReduceSinkOperators; - } else { - String colName = ((ExprNodeColumnDesc)expr).getColumn(); - OpParseContext opCtx = pCtx.getOpParseCtx().get(current); - for (ColumnInfo cinfo : opCtx.getRowResolver().getColumnInfos()) { - if (colName.equals(cinfo.getInternalName())) { - tableNeedToCheck.add(cinfo.getTabAlias()); - } - } + } + String colName = ((ExprNodeColumnDesc)expr).getColumn(); + String[] nm = rowResolver.reverseLookup(colName); + if (nm != null) { + tableNeedToCheck.add(nm[0]); } } if (current instanceof JoinOperator) { @@ -576,7 +575,6 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException { CorrelationNodeProcCtx corrCtx = (CorrelationNodeProcCtx) ctx; ReduceSinkOperator op = (ReduceSinkOperator) nd; - // Check if we have visited this operator if (corrCtx.isWalked(op)) { return null; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java index 719fe9f..6a7733c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.SelectOperator; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; @@ -163,7 +164,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } return false; } - if (child instanceof ExtractOperator) { + if (child instanceof ExtractOperator || child instanceof SelectOperator) { return process(cRS, dedupCtx); } return false; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcCtx.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcCtx.java index 7cf48a7..d3caaf0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcCtx.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcCtx.java @@ -20,6 +20,7 @@ import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.plan.OperatorDesc; /** @@ -69,4 +70,8 @@ public LineageCtx getLineageCtx() { public Operator getInputOperator() { return inpOp; } + + public RowResolver getResolver() { + return lctx.getParseCtx().getOpParseCtx().get(inpOp).getRowResolver(); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java index b5cdde1..fdbb93e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java @@ -28,6 +28,8 @@ import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.hooks.LineageInfo; import org.apache.hadoop.hive.ql.hooks.LineageInfo.BaseColumnInfo; import org.apache.hadoop.hive.ql.hooks.LineageInfo.Dependency; @@ -40,6 +42,7 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; @@ -70,20 +73,19 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // assert that the input operator is not null as there are no // exprs associated with table scans. - assert (epc.getInputOperator() != null); - - ColumnInfo inp_ci = null; - for (ColumnInfo tmp_ci : epc.getInputOperator().getSchema() - .getSignature()) { - if (tmp_ci.getInternalName().equals(cd.getColumn())) { - inp_ci = tmp_ci; - break; - } + Operator operator = epc.getInputOperator(); + assert (operator != null); + + RowResolver resolver = epc.getResolver(); + String[] nm = resolver.reverseLookup(cd.getColumn()); + if (nm == null && operator instanceof ReduceSinkOperator) { + nm = resolver.reverseLookup(Utilities.removeValueTag(cd.getColumn())); } + ColumnInfo ci = nm != null ? resolver.get(nm[0], nm[1]): null; // Insert the dependencies of inp_ci to that of the current operator, ci LineageCtx lc = epc.getLineageCtx(); - Dependency dep = lc.getIndex().getDependency(epc.getInputOperator(), inp_ci); + Dependency dep = lc.getIndex().getDependency(operator, ci); return dep; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java index 78b7ca8..48ca8f9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.exec.ColumnInfo; +import org.apache.hadoop.hive.ql.exec.ExtractOperator; import org.apache.hadoop.hive.ql.exec.ForwardOperator; import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; @@ -41,6 +42,7 @@ import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.exec.SelectOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.hooks.LineageInfo; import org.apache.hadoop.hive.ql.hooks.LineageInfo.BaseColumnInfo; import org.apache.hadoop.hive.ql.hooks.LineageInfo.Dependency; @@ -52,6 +54,7 @@ import org.apache.hadoop.hive.ql.lib.Utils; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.AggregationDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -213,8 +216,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // Otherwise look up the expression corresponding to this ci ExprNodeDesc expr = exprs.get(cnt++); - lCtx.getIndex().mergeDependency(op, ci, - ExprProcFactory.getExprDependency(lCtx, inpOp, expr)); + Dependency dependency = ExprProcFactory.getExprDependency(lCtx, inpOp, expr); + lCtx.getIndex().mergeDependency(op, ci, dependency); } return null; @@ -438,7 +441,6 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, LineageCtx lCtx = (LineageCtx) procCtx; ReduceSinkOperator rop = (ReduceSinkOperator)nd; - ArrayList col_infos = rop.getSchema().getSignature(); Operator inpOp = getParent(stack); int cnt = 0; @@ -450,15 +452,49 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } if (op instanceof GroupByOperator) { + ArrayList col_infos = rop.getSchema().getSignature(); for(ExprNodeDesc expr : rop.getConf().getKeyCols()) { lCtx.getIndex().putDependency(rop, col_infos.get(cnt++), ExprProcFactory.getExprDependency(lCtx, inpOp, expr)); } - } - - for(ExprNodeDesc expr : rop.getConf().getValueCols()) { - lCtx.getIndex().putDependency(rop, col_infos.get(cnt++), - ExprProcFactory.getExprDependency(lCtx, inpOp, expr)); + for(ExprNodeDesc expr : rop.getConf().getValueCols()) { + lCtx.getIndex().putDependency(rop, col_infos.get(cnt++), + ExprProcFactory.getExprDependency(lCtx, inpOp, expr)); + } + } else if (op instanceof ExtractOperator) { + ArrayList col_infos = rop.getSchema().getSignature(); + for(ExprNodeDesc expr : rop.getConf().getValueCols()) { + lCtx.getIndex().putDependency(rop, col_infos.get(cnt++), + ExprProcFactory.getExprDependency(lCtx, inpOp, expr)); + } + } else { + RowResolver resolver = lCtx.getParseCtx().getOpParseCtx().get(rop).getRowResolver(); + ReduceSinkDesc desc = rop.getConf(); + List keyCols = desc.getKeyCols(); + ArrayList keyColNames = desc.getOutputKeyColumnNames(); + for (int i = 0; i < keyCols.size(); i++) { + // order-bys, joins + String[] nm = resolver.reverseLookup(Utilities.ReduceField.KEY + "." + keyColNames.get(i)); + if (nm == null) { + continue; // key in values + } + ColumnInfo column = resolver.get(nm[0], nm[1]); + lCtx.getIndex().putDependency(rop, column, + ExprProcFactory.getExprDependency(lCtx, inpOp, keyCols.get(i))); + } + List valCols = desc.getValueCols(); + ArrayList valColNames = desc.getOutputValueColumnNames(); + for (int i = 0; i < valCols.size(); i++) { + // todo: currently, bucketing,etc. makes RS differently with those for order-bys or joins + String[] nm = resolver.reverseLookup(valColNames.get(i)); + if (nm == null) { + // order-bys, joins + nm = resolver.reverseLookup(Utilities.ReduceField.VALUE + "." + valColNames.get(i)); + } + ColumnInfo column = resolver.get(nm[0], nm[1]); + lCtx.getIndex().putDependency(rop, column, + ExprProcFactory.getExprDependency(lCtx, inpOp, valCols.get(i))); + } } return null; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingOpProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingOpProcFactory.java index eac0edd..82f4243 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingOpProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingOpProcFactory.java @@ -20,7 +20,9 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Stack; import org.apache.hadoop.hive.ql.exec.ColumnInfo; @@ -42,8 +44,11 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; -import org.apache.hadoop.hive.ql.plan.ExprNodeDesc.ExprNodeDescEqualityWrapper; +import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils; +import org.apache.hadoop.hive.ql.plan.JoinDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.SelectDesc; /** * Operator factory for the rule processors for inferring bucketing/sorting columns. @@ -129,8 +134,11 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, BucketingSortingCtx bctx = (BucketingSortingCtx)procCtx; JoinOperator jop = (JoinOperator)nd; - List colInfos = jop.getSchema().getSignature(); - Byte[] order = jop.getConf().getTagOrder(); + JoinDesc joinDesc = jop.getConf(); + + Byte[] order = joinDesc.getTagOrder(); + Map> expressions = joinDesc.getExprs(); + List outputValNames = joinDesc.getOutputColumnNames(); BucketCol[] newBucketCols = null; SortCol[] newSortCols = null; @@ -143,63 +151,55 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, assert(parent instanceof ReduceSinkOperator); ReduceSinkOperator rop = (ReduceSinkOperator)jop.getParentOperators().get(i); + ReduceSinkDesc rsDesc = rop.getConf(); - String sortOrder = rop.getConf().getOrder(); - List bucketCols = new ArrayList(); - List sortCols = new ArrayList(); - // Go through the Reduce keys and find the matching column(s) in the reduce values - for (int keyIndex = 0; keyIndex < rop.getConf().getKeyCols().size(); keyIndex++) { - for (int valueIndex = 0; valueIndex < rop.getConf().getValueCols().size(); - valueIndex++) { - - if (new ExprNodeDescEqualityWrapper(rop.getConf().getValueCols().get(valueIndex)). - equals(new ExprNodeDescEqualityWrapper(rop.getConf().getKeyCols().get( - keyIndex)))) { - - String colName = rop.getSchema().getSignature().get(valueIndex).getInternalName(); - bucketCols.add(new BucketCol(colName, keyIndex)); - sortCols.add(new SortCol(colName, keyIndex, sortOrder.charAt(keyIndex))); - break; - } + byte tag = (byte) rsDesc.getTag(); + List joinValues = expressions.get(tag); + + // Columns are output from the join from the different reduce sinks in the order of their + // offsets + int offset = 0; + for (byte orderIndex = 0; orderIndex < order.length; orderIndex++) { + if (order[orderIndex] < order[tag]) { + offset += expressions.get(orderIndex).size(); } } - if (bucketCols.isEmpty()) { - assert(sortCols.isEmpty()); - continue; - } + String sortOrder = rsDesc.getOrder(); + List keyCols = rsDesc.getKeyCols(); + List valCols = ExprNodeDescUtils.backtrack(joinValues, jop, parent); if (newBucketCols == null) { - assert(newSortCols == null); - // The number of join keys is equal to the number of keys in every reducer, although - // not every key may map to a value in the reducer - newBucketCols = new BucketCol[rop.getConf().getKeyCols().size()]; - newSortCols = new SortCol[rop.getConf().getKeyCols().size()]; - } else { - assert(newSortCols != null); + newBucketCols = new BucketCol[keyCols.size()]; + newSortCols = new SortCol[keyCols.size()]; } - byte tag = (byte)rop.getConf().getTag(); - List exprs = jop.getConf().getExprs().get(tag); - - int colInfosOffset = 0; - int orderValue = order[tag]; - // Columns are output from the join from the different reduce sinks in the order of their - // offsets - for (byte orderIndex = 0; orderIndex < order.length; orderIndex++) { - if (order[orderIndex] < orderValue) { - colInfosOffset += jop.getConf().getExprs().get(orderIndex).size(); + // Go through the Reduce keys and find the matching column(s) in the reduce values + for (int keyIndex = 0 ; keyIndex < keyCols.size(); keyIndex++) { + ExprNodeDesc key = keyCols.get(keyIndex); + int index = ExprNodeDescUtils.indexOf(key, valCols); + if (index >= 0) { + int vindex = offset + index; + String vname = outputValNames.get(vindex); + if (newBucketCols[keyIndex] != null) { + newBucketCols[keyIndex].addAlias(vname, vindex); + newSortCols[keyIndex].addAlias(vname, vindex); + } else { + newBucketCols[keyIndex] = new BucketCol(vname, vindex); + newSortCols[keyIndex] = new SortCol(vname, vindex, sortOrder.charAt(keyIndex)); + } } } - - findBucketingSortingColumns(exprs, colInfos, bucketCols, sortCols, newBucketCols, - newSortCols, colInfosOffset); - } - setBucketingColsIfComplete(bctx, jop, newBucketCols); - - setSortingColsIfComplete(bctx, jop, newSortCols); + List bucketCols = Arrays.asList(newBucketCols); + if (!bucketCols.contains(null)) { + bctx.setBucketedCols(jop, bucketCols); + } + List sortCols = Arrays.asList(newSortCols); + if (!sortCols.contains(null)) { + bctx.setSortedCols(jop, sortCols); + } return null; } @@ -331,6 +331,12 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, BucketingSortingCtx bctx = (BucketingSortingCtx)procCtx; SelectOperator sop = (SelectOperator)nd; + if (sop.getNumParent() == 1 && + sop.getParentOperators().get(0) instanceof ReduceSinkOperator) { + ReduceSinkOperator rs = (ReduceSinkOperator) sop.getParentOperators().get(0); + extractTraits(bctx, rs, sop); + return null; + } Operator parent = getParent(stack); // if this is a selStarNoCompute then this select operator @@ -506,71 +512,83 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Operator parent = exop.getParentOperators().get(0); // The caller of this method should guarantee this - assert(parent instanceof ReduceSinkOperator); + if (parent instanceof ReduceSinkOperator) { + extractTraits(bctx, (ReduceSinkOperator)parent, exop); + } - ReduceSinkOperator rop = (ReduceSinkOperator)parent; + return null; + } + } - // Go through the set of partition columns, and find their representatives in the values - // These represent the bucketed columns - List bucketCols = new ArrayList(); - for (int i = 0; i < rop.getConf().getPartitionCols().size(); i++) { - boolean valueColFound = false; - for (int j = 0; j < rop.getConf().getValueCols().size(); j++) { - if (new ExprNodeDescEqualityWrapper(rop.getConf().getValueCols().get(j)).equals( - new ExprNodeDescEqualityWrapper(rop.getConf().getPartitionCols().get(i)))) { - - bucketCols.add(new BucketCol( - rop.getSchema().getSignature().get(j).getInternalName(), j)); - valueColFound = true; - break; - } - } + static void extractTraits(BucketingSortingCtx bctx, ReduceSinkOperator rop, Operator exop) + throws SemanticException { - // If the partition columns can't all be found in the values then the data is not bucketed - if (!valueColFound) { - bucketCols.clear(); - break; - } - } + List outputValues = Collections.emptyList(); + if (exop instanceof ExtractOperator) { + outputValues = rop.getConf().getValueCols(); + } else if (exop instanceof SelectOperator) { + SelectDesc select = ((SelectOperator)exop).getConf(); + outputValues = ExprNodeDescUtils.backtrack(select.getColList(), exop, rop); + } + if (outputValues.isEmpty()) { + return; + } - // Go through the set of key columns, and find their representatives in the values - // These represent the sorted columns - String sortOrder = rop.getConf().getOrder(); - List sortCols = new ArrayList(); - for (int i = 0; i < rop.getConf().getKeyCols().size(); i++) { - boolean valueColFound = false; - for (int j = 0; j < rop.getConf().getValueCols().size(); j++) { - if (new ExprNodeDescEqualityWrapper(rop.getConf().getValueCols().get(j)).equals( - new ExprNodeDescEqualityWrapper(rop.getConf().getKeyCols().get(i)))) { - - sortCols.add(new SortCol( - rop.getSchema().getSignature().get(j).getInternalName(), j, sortOrder.charAt(i))); - valueColFound = true; - break; - } - } + // Go through the set of partition columns, and find their representatives in the values + // These represent the bucketed columns + List bucketCols = extractBucketCols(rop, outputValues); - // If the sorted columns can't all be found in the values then the data is only sorted on - // the columns seen up until now - if (!valueColFound) { - break; - } - } + // Go through the set of key columns, and find their representatives in the values + // These represent the sorted columns + List sortCols = extractSortCols(rop, outputValues); - List colInfos = exop.getSchema().getSignature(); + List colInfos = exop.getSchema().getSignature(); - if (!bucketCols.isEmpty()) { - List newBucketCols = getNewBucketCols(bucketCols, colInfos); - bctx.setBucketedCols(exop, newBucketCols); - } + if (!bucketCols.isEmpty()) { + List newBucketCols = getNewBucketCols(bucketCols, colInfos); + bctx.setBucketedCols(exop, newBucketCols); + } - if (!sortCols.isEmpty()) { - List newSortCols = getNewSortCols(sortCols, colInfos); - bctx.setSortedCols(exop, newSortCols); + if (!sortCols.isEmpty()) { + List newSortCols = getNewSortCols(sortCols, colInfos); + bctx.setSortedCols(exop, newSortCols); + } + } + + static List extractBucketCols(ReduceSinkOperator rop, List outputValues) { + List bucketCols = new ArrayList(); + for (ExprNodeDesc partitionCol : rop.getConf().getPartitionCols()) { + if (!(partitionCol instanceof ExprNodeColumnDesc)) { + return Collections.emptyList(); } + int index = ExprNodeDescUtils.indexOf(partitionCol, outputValues); + if (index < 0) { + return Collections.emptyList(); + } + bucketCols.add(new BucketCol(((ExprNodeColumnDesc) partitionCol).getColumn(), index)); + } + // If the partition columns can't all be found in the values then the data is not bucketed + return bucketCols; + } - return null; + static List extractSortCols(ReduceSinkOperator rop, List outputValues) { + String sortOrder = rop.getConf().getOrder(); + List sortCols = new ArrayList(); + ArrayList keyCols = rop.getConf().getKeyCols(); + for (int i = 0; i < keyCols.size(); i++) { + ExprNodeDesc keyCol = keyCols.get(i); + if (!(keyCol instanceof ExprNodeColumnDesc)) { + break; + } + int index = ExprNodeDescUtils.indexOf(keyCol, outputValues); + if (index < 0) { + break; + } + sortCols.add(new SortCol(((ExprNodeColumnDesc) keyCol).getColumn(), index, sortOrder.charAt(i))); } + // If the sorted columns can't all be found in the values then the data is only sorted on + // the columns seen up until now + return sortCols; } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java index f142f3e..33b8a21 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java @@ -97,24 +97,23 @@ public ASTNode getExpressionSource(ASTNode node) { } public void put(String tab_alias, String col_alias, ColumnInfo colInfo) { + if (!addMappingOnly(tab_alias, col_alias, colInfo)) { + rowSchema.getSignature().add(colInfo); + } + } + + public boolean addMappingOnly(String tab_alias, String col_alias, ColumnInfo colInfo) { if (tab_alias != null) { tab_alias = tab_alias.toLowerCase(); } col_alias = col_alias.toLowerCase(); - if (rowSchema.getSignature() == null) { - rowSchema.setSignature(new ArrayList()); - } - + /* * allow multiple mappings to the same ColumnInfo. - * When a ColumnInfo is mapped multiple times, only the + * When a ColumnInfo is mapped multiple times, only the * first inverse mapping is captured. */ boolean colPresent = invRslvMap.containsKey(colInfo.getInternalName()); - - if ( !colPresent ) { - rowSchema.getSignature().add(colInfo); - } LinkedHashMap f_map = rslvMap.get(tab_alias); if (f_map == null) { @@ -127,10 +126,12 @@ public void put(String tab_alias, String col_alias, ColumnInfo colInfo) { qualifiedAlias[0] = tab_alias; qualifiedAlias[1] = col_alias; if ( !colPresent ) { - invRslvMap.put(colInfo.getInternalName(), qualifiedAlias); + invRslvMap.put(colInfo.getInternalName(), qualifiedAlias); } else { altInvRslvMap.put(colInfo.getInternalName(), qualifiedAlias); } + + return colPresent; } public boolean hasTableAlias(String tab_alias) { @@ -350,18 +351,4 @@ public void setExpressionMap(Map expressionMap) { this.expressionMap = expressionMap; } - public String[] toColumnDesc() { - StringBuilder cols = new StringBuilder(); - StringBuilder colTypes = new StringBuilder(); - - for (ColumnInfo colInfo : getColumnInfos()) { - if (cols.length() > 0) { - cols.append(','); - colTypes.append(':'); - } - cols.append(colInfo.getInternalName()); - colTypes.append(colInfo.getType().getTypeName()); - } - return new String[] {cols.toString(), colTypes.toString()}; - } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 49eb83f..519179d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -3120,15 +3120,18 @@ private static boolean isRegex(String pattern, HiveConf conf) { startPosn = 0; } + Set colAliases = new HashSet(); + ASTNode[] exprs = new ASTNode[exprList.getChildCount()]; + String[][] aliases = new String[exprList.getChildCount()][]; + boolean[] hasAsClauses = new boolean[exprList.getChildCount()]; // Iterate over all expression (either after SELECT, or in SELECT TRANSFORM) for (int i = startPosn; i < exprList.getChildCount(); ++i) { // child can be EXPR AS ALIAS, or EXPR. ASTNode child = (ASTNode) exprList.getChild(i); boolean hasAsClause = (!isInTransform) && (child.getChildCount() == 2); - boolean isWindowSpec = child.getChildCount() == 3 ? - (child.getChild(2).getType() == HiveParser.TOK_WINDOWSPEC) : - false; + boolean isWindowSpec = child.getChildCount() == 3 && + child.getChild(2).getType() == HiveParser.TOK_WINDOWSPEC; // EXPR AS (ALIAS,...) parses, but is only allowed for UDTF's // This check is not needed and invalid when there is a transform b/c the @@ -3159,8 +3162,20 @@ private static boolean isRegex(String pattern, HiveConf conf) { unparseTranslator.addIdentifierTranslation((ASTNode) child .getChild(1)); } - } + exprs[i] = expr; + aliases[i] = new String[] {tabAlias, colAlias}; + hasAsClauses[i] = hasAsClause; + colAliases.add(colAlias); + } + + // Iterate over all expression (either after SELECT, or in SELECT TRANSFORM) + for (int i = startPosn; i < exprList.getChildCount(); ++i) { + // The real expression + ASTNode expr = exprs[i]; + String tabAlias = aliases[i][0]; + String colAlias = aliases[i][1]; + boolean hasAsClause = hasAsClauses[i]; if (expr.getType() == HiveParser.TOK_ALLCOLREF) { pos = genColListRegex(".*", expr.getChildCount() == 0 ? null @@ -3196,7 +3211,8 @@ private static boolean isRegex(String pattern, HiveConf conf) { tcCtx.setAllowDistinctFunctions(false); ExprNodeDesc exp = genExprNodeDesc(expr, inputRR, tcCtx); String recommended = recommendName(exp, colAlias); - if (recommended != null && out_rwsch.get(null, recommended) == null) { + if (recommended != null && !colAliases.contains(recommended) && + out_rwsch.get(null, recommended) == null) { colAlias = recommended; } col_list.add(exp); @@ -4729,7 +4745,7 @@ private Operator genGroupByPlan1ReduceMultiGBY(List dests, QB qb, Operat * distinct key in hope of getting a uniform distribution, and * compute partial aggregates by the grouping key. Evaluate partial * aggregates first, and spray by the grouping key to compute actual - * aggregates in the second phase. The agggregation evaluation + * aggregates in the second phase. The aggregation evaluation * functions are as follows: Partitioning Key: distinct key * * Sorting Key: distinct key @@ -4797,7 +4813,7 @@ private Operator genGroupByPlan2MRMultiGroupBy(String dest, QB qb, * compute partial aggregates grouped by the reduction key (grouping * key + distinct key). Evaluate partial aggregates first, and spray * by the grouping key to compute actual aggregates in the second - * phase. The agggregation evaluation functions are as follows: + * phase. The aggregation evaluation functions are as follows: * Partitioning Key: random() if no DISTINCT grouping + distinct key * if DISTINCT * @@ -4971,7 +4987,7 @@ private void checkExpressionsForGroupingSet(List grpByExprs, * spray by the group by key, and sort by the distinct key (if any), and * compute aggregates based on actual aggregates * - * The agggregation evaluation functions are as follows: + * The aggregation evaluation functions are as follows: * * No grouping sets: * Group By Operator: @@ -5138,7 +5154,7 @@ private Operator genGroupByPlanMapAggrNoSkew(String dest, QB qb, * key). Evaluate partial aggregates first, and spray by the grouping key to * compute actual aggregates in the second phase. * - * The agggregation evaluation functions are as follows: + * The aggregation evaluation functions are as follows: * * No grouping sets: * STAGE 1 @@ -5160,7 +5176,7 @@ private Operator genGroupByPlanMapAggrNoSkew(String dest, QB qb, * Sorting Key: grouping key * Reducer: merge/terminate (mode = FINAL) * - * In the presence of grouping sets, the agggregation evaluation functions are as follows: + * In the presence of grouping sets, the aggregation evaluation functions are as follows: * STAGE 1 * Group by Operator: * grouping keys: group by expressions + grouping id. if no DISTINCT @@ -5409,8 +5425,8 @@ private Operator genBucketingSortingDest(String dest, Operator input, QB qb, if ((dest_tab.getNumBuckets() > 0) && (conf.getBoolVar(HiveConf.ConfVars.HIVEENFORCEBUCKETING))) { enforceBucketing = true; - partnCols = getParitionColsFromBucketCols(dest, qb, dest_tab, table_desc, input, true); - partnColsNoConvert = getParitionColsFromBucketCols(dest, qb, dest_tab, table_desc, input, + partnCols = getPartitionColsFromBucketCols(dest, qb, dest_tab, table_desc, input, true); + partnColsNoConvert = getPartitionColsFromBucketCols(dest, qb, dest_tab, table_desc, input, false); } @@ -6208,7 +6224,7 @@ private Operator genLimitMapRedPlan(String dest, QB qb, Operator input, return genLimitPlan(dest, qb, curr, limit); } - private ArrayList getParitionColsFromBucketCols(String dest, QB qb, Table tab, + private ArrayList getPartitionColsFromBucketCols(String dest, QB qb, Table tab, TableDesc table_desc, Operator input, boolean convert) throws SemanticException { List tabBucketCols = tab.getBucketCols(); @@ -6387,7 +6403,7 @@ private Operator genReduceSinkPlanForSortingBucketing(Table tab, Operator input, } @SuppressWarnings("nls") - private Operator genReduceSinkPlan(String dest, QB qb, Operator input, + private Operator genReduceSinkPlan(String dest, QB qb, Operator input, int numReducers) throws SemanticException { RowResolver inputRR = opParseCtx.get(input).getRowResolver(); @@ -6427,8 +6443,11 @@ private Operator genReduceSinkPlan(String dest, QB qb, Operator input, } } } + Operator dummy = Operator.createDummy(); + dummy.setParentOperators(Arrays.asList(input)); ArrayList sortCols = new ArrayList(); + ArrayList sortColsBack = new ArrayList(); StringBuilder order = new StringBuilder(); if (sortExprs != null) { int ccount = sortExprs.getChildCount(); @@ -6449,51 +6468,114 @@ private Operator genReduceSinkPlan(String dest, QB qb, Operator input, } ExprNodeDesc exprNode = genExprNodeDesc(cl, inputRR); sortCols.add(exprNode); + sortColsBack.add(ExprNodeDescUtils.backtrack(exprNode, dummy, input)); } } - // For the generation of the values expression just get the inputs // signature and generate field expressions for those + RowResolver rsRR = new RowResolver(); ArrayList outputColumns = new ArrayList(); - Map colExprMap = new HashMap(); ArrayList valueCols = new ArrayList(); - int i = 0; - for (ColumnInfo colInfo : inputRR.getColumnInfos()) { - String internalName = getColumnInternalName(i++); - outputColumns.add(internalName); - valueCols.add(new ExprNodeColumnDesc(colInfo.getType(), colInfo - .getInternalName(), colInfo.getTabAlias(), colInfo - .getIsVirtualCol())); - colExprMap.put(internalName, valueCols.get(valueCols.size() - 1)); + ArrayList valueColsBack = new ArrayList(); + Map colExprMap = new HashMap(); + + ArrayList columnInfos = inputRR.getColumnInfos(); + + int[] index = new int[columnInfos.size()]; + for (int i = 0; i < index.length; i++) { + ColumnInfo colInfo = columnInfos.get(i); + String[] nm = inputRR.reverseLookup(colInfo.getInternalName()); + String[] nm2 = inputRR.getAlternateMappings(colInfo.getInternalName()); + ExprNodeColumnDesc value = new ExprNodeColumnDesc(colInfo.getType(), + colInfo.getInternalName(), colInfo.getTabAlias(), colInfo.getIsVirtualCol()); + + // backtrack can be null when input is script operator + ExprNodeDesc valueBack = ExprNodeDescUtils.backtrack(value, dummy, input); + int kindex = valueBack == null ? -1 : ExprNodeDescUtils.indexOf(valueBack, sortColsBack); + if (kindex >= 0) { + index[i] = kindex; + ColumnInfo newColInfo = new ColumnInfo(colInfo); + newColInfo.setInternalName(Utilities.ReduceField.KEY + ".reducesinkkey" + kindex); + newColInfo.setTabAlias(nm[0]); + rsRR.addMappingOnly(nm[0], nm[1], newColInfo); + if (nm2 != null) { + rsRR.addMappingOnly(nm2[0], nm2[1], newColInfo); + } + continue; + } + int vindex = valueBack == null ? -1 : ExprNodeDescUtils.indexOf(valueBack, valueColsBack); + if (vindex >= 0) { + index[i] = -vindex - 1; + continue; + } + index[i] = -valueCols.size() - 1; + String outputColName = getColumnInternalName(valueCols.size()); + + valueCols.add(value); + valueColsBack.add(valueBack); + + ColumnInfo newColInfo = new ColumnInfo(colInfo); + newColInfo.setInternalName(Utilities.ReduceField.VALUE + "." + outputColName); + newColInfo.setTabAlias(nm[0]); + + rsRR.put(nm[0], nm[1], newColInfo); + if (nm2 != null) { + rsRR.addMappingOnly(nm2[0], nm2[1], newColInfo); + } + outputColumns.add(outputColName); } - Operator interim = putOpInsertMap(OperatorFactory.getAndMakeChild(PlanUtils - .getReduceSinkDesc(sortCols, valueCols, outputColumns, false, -1, - partitionCols, order.toString(), numReducers), - new RowSchema(inputRR.getColumnInfos()), input), inputRR); - interim.setColumnExprMap(colExprMap); + dummy.setParentOperators(null); - // Add the extract operator to get the value fields - RowResolver out_rwsch = new RowResolver(); - RowResolver interim_rwsch = inputRR; - Integer pos = Integer.valueOf(0); - for (ColumnInfo colInfo : interim_rwsch.getColumnInfos()) { - String[] info = interim_rwsch.reverseLookup(colInfo.getInternalName()); - out_rwsch.put(info[0], info[1], new ColumnInfo( - getColumnInternalName(pos), colInfo.getType(), info[0], - colInfo.getIsVirtualCol(), colInfo.isHiddenVirtualCol())); - pos = Integer.valueOf(pos.intValue() + 1); + ReduceSinkDesc rsdesc = PlanUtils.getReduceSinkDesc(sortCols, valueCols, outputColumns, + false, -1, partitionCols, order.toString(), numReducers); + Operator interim = putOpInsertMap(OperatorFactory.getAndMakeChild(rsdesc, + new RowSchema(rsRR.getColumnInfos()), input), rsRR); + + List keyColNames = rsdesc.getOutputKeyColumnNames(); + for (int i = 0 ; i < keyColNames.size(); i++) { + colExprMap.put(Utilities.ReduceField.KEY + "." + keyColNames.get(i), sortCols.get(i)); + } + List valueColNames = rsdesc.getOutputValueColumnNames(); + for (int i = 0 ; i < valueColNames.size(); i++) { + colExprMap.put(Utilities.ReduceField.VALUE + "." + valueColNames.get(i), valueCols.get(i)); } + interim.setColumnExprMap(colExprMap); - Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild( - new ExtractDesc(new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, - Utilities.ReduceField.VALUE.toString(), "", false)), new RowSchema( - out_rwsch.getColumnInfos()), interim), out_rwsch); + RowResolver selectRR = new RowResolver(); + ArrayList selCols = new ArrayList(); + ArrayList selOutputCols = new ArrayList(); + Map selColExprMap = new HashMap(); - if (LOG.isDebugEnabled()) { - LOG.debug("Created ReduceSink Plan for clause: " + dest + " row schema: " - + out_rwsch.toString()); + for (int i = 0; i < index.length; i++) { + ColumnInfo prev = columnInfos.get(i); + String[] nm = inputRR.reverseLookup(prev.getInternalName()); + String[] nm2 = inputRR.getAlternateMappings(prev.getInternalName()); + ColumnInfo info = new ColumnInfo(prev); + + String field; + if (index[i] >= 0) { + field = Utilities.ReduceField.KEY + "." + keyColNames.get(index[i]); + } else { + field = Utilities.ReduceField.VALUE + "." + valueColNames.get(-index[i] - 1); + } + String internalName = getColumnInternalName(i); + ExprNodeColumnDesc desc = new ExprNodeColumnDesc(info.getType(), + field, info.getTabAlias(), info.getIsVirtualCol()); + selCols.add(desc); + + info.setInternalName(internalName); + selectRR.put(nm[0], nm[1], info); + if (nm2 != null) { + selectRR.addMappingOnly(nm2[0], nm2[1], info); + } + selOutputCols.add(internalName); + selColExprMap.put(internalName, desc); } + SelectDesc select = new SelectDesc(selCols, selOutputCols); + Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(select, + new RowSchema(selectRR.getColumnInfos()), interim), selectRR); + output.setColumnExprMap(selColExprMap); return output; } @@ -6514,56 +6596,73 @@ private Operator genJoinOperatorChildren(QBJoinTree join, Operator left, new HashMap>(); for (int pos = 0; pos < right.length; ++pos) { - - Operator input = right[pos]; + Operator input = right[pos] == null ? left : right[pos]; if (input == null) { input = left; } + ReduceSinkOperator rs = (ReduceSinkOperator) input; + if (rs.getNumParent() != 1) { + throw new SemanticException("RS should have single parent"); + } + Operator parent = rs.getParentOperators().get(0); + ReduceSinkDesc rsDesc = (ReduceSinkDesc) (input.getConf()); - ArrayList keyDesc = new ArrayList(); + int[] index = rs.getValueIndex(); + + ArrayList valueDesc = new ArrayList(); ArrayList filterDesc = new ArrayList(); - Byte tag = Byte.valueOf((byte) (((ReduceSinkDesc) (input.getConf())) - .getTag())); + Byte tag = (byte) rsDesc.getTag(); // check whether this input operator produces output - if (omitOpts == null || !omitOpts.contains(pos)) { - // prepare output descriptors for the input opt - RowResolver inputRS = opParseCtx.get(input).getRowResolver(); - Iterator keysIter = inputRS.getTableNames().iterator(); - Set aliases = posToAliasMap.get(pos); - if (aliases == null) { - aliases = new HashSet(); - posToAliasMap.put(pos, aliases); - } - while (keysIter.hasNext()) { - String key = keysIter.next(); - aliases.add(key); - HashMap map = inputRS.getFieldMap(key); - Iterator fNamesIter = map.keySet().iterator(); - while (fNamesIter.hasNext()) { - String field = fNamesIter.next(); - ColumnInfo valueInfo = inputRS.get(key, field); - keyDesc.add(new ExprNodeColumnDesc(valueInfo.getType(), valueInfo - .getInternalName(), valueInfo.getTabAlias(), valueInfo - .getIsVirtualCol())); + if (omitOpts != null && omitOpts.contains(pos)) { + exprMap.put(tag, valueDesc); + filterMap.put(tag, filterDesc); + rightOps[pos] = input; + continue; + } - if (outputRS.get(key, field) == null) { - String colName = getColumnInternalName(outputPos); - outputPos++; - outputColumnNames.add(colName); - colExprMap.put(colName, keyDesc.get(keyDesc.size() - 1)); - outputRS.put(key, field, new ColumnInfo(colName, valueInfo - .getType(), key, valueInfo.getIsVirtualCol(), valueInfo - .isHiddenVirtualCol())); - reversedExprs.put(colName, tag); - } - } + List keyColNames = rsDesc.getOutputKeyColumnNames(); + List valColNames = rsDesc.getOutputValueColumnNames(); + + // prepare output descriptors for the input opt + RowResolver inputRS = opParseCtx.get(input).getRowResolver(); + RowResolver parentRS = opParseCtx.get(parent).getRowResolver(); + posToAliasMap.put(pos, inputRS.getTableNames()); + + List columns = parentRS.getColumnInfos(); + for (int i = 0; i < index.length; i++) { + ColumnInfo prev = columns.get(i); + String[] nm = parentRS.reverseLookup(prev.getInternalName()); + String[] nm2 = parentRS.getAlternateMappings(prev.getInternalName()); + if (outputRS.get(nm[0], nm[1]) != null) { + continue; } - for (ASTNode cond : join.getFilters().get(tag)) { - filterDesc.add(genExprNodeDesc(cond, inputRS)); + ColumnInfo info = new ColumnInfo(prev); + String field; + if (index[i] >= 0) { + field = Utilities.ReduceField.KEY + "." + keyColNames.get(index[i]); + } else { + field = Utilities.ReduceField.VALUE + "." + valColNames.get(-index[i] - 1); + } + String internalName = getColumnInternalName(outputColumnNames.size()); + ExprNodeColumnDesc desc = new ExprNodeColumnDesc(info.getType(), + field, info.getTabAlias(), info.getIsVirtualCol()); + + info.setInternalName(internalName); + colExprMap.put(internalName, desc); + outputRS.put(nm[0], nm[1], info); + if (nm2 != null) { + outputRS.addMappingOnly(nm2[0], nm2[1], info); } + + valueDesc.add(desc); + outputColumnNames.add(internalName); + reversedExprs.put(internalName, tag); } - exprMap.put(tag, keyDesc); + for (ASTNode cond : join.getFilters().get(tag)) { + filterDesc.add(genExprNodeDesc(cond, inputRS)); + } + exprMap.put(tag, valueDesc); filterMap.put(tag, filterDesc); rightOps[pos] = input; } @@ -6594,48 +6693,90 @@ private Operator genJoinOperatorChildren(QBJoinTree join, Operator left, return putOpInsertMap(joinOp, outputRS); } + private ExprNodeDesc[][] genJoinKeys(QBJoinTree joinTree, Operator[] inputs) + throws SemanticException { + ExprNodeDesc[][] joinKeys = new ExprNodeDesc[inputs.length][]; + for (int i = 0; i < inputs.length; i++) { + RowResolver inputRS = opParseCtx.get(inputs[i]).getRowResolver(); + List expressions = joinTree.getExpressions().get(i); + joinKeys[i] = new ExprNodeDesc[expressions.size()]; + for (int j = 0; j < joinKeys[i].length; j++) { + joinKeys[i][j] = genExprNodeDesc(expressions.get(j), inputRS); + } + } + // Type checking and implicit type conversion for join keys + return genJoinOperatorTypeCheck(joinKeys); + } + @SuppressWarnings("nls") - private Operator genJoinReduceSinkChild(QB qb, QBJoinTree joinTree, - Operator child, String[] srcs, int pos) throws SemanticException { + private Operator genJoinReduceSinkChild(QB qb, ExprNodeDesc[] joinKeys, + Operator child, String[] srcs, int tag) throws SemanticException { + + Operator dummy = Operator.createDummy(); // dummy for backtracking + dummy.setParentOperators(Arrays.asList(child)); + RowResolver inputRS = opParseCtx.get(child).getRowResolver(); RowResolver outputRS = new RowResolver(); ArrayList outputColumns = new ArrayList(); ArrayList reduceKeys = new ArrayList(); + ArrayList reduceKeysBack = new ArrayList(); // Compute join keys and store in reduceKeys - ArrayList exprs = joinTree.getExpressions().get(pos); - for (int i = 0; i < exprs.size(); i++) { - ASTNode expr = exprs.get(i); - reduceKeys.add(genExprNodeDesc(expr, inputRS)); + for (ExprNodeDesc joinKey : joinKeys) { + reduceKeys.add(joinKey); + reduceKeysBack.add(ExprNodeDescUtils.backtrack(joinKey, dummy, child)); } // Walk over the input row resolver and copy in the output ArrayList reduceValues = new ArrayList(); - Iterator tblNamesIter = inputRS.getTableNames().iterator(); + ArrayList reduceValuesBack = new ArrayList(); Map colExprMap = new HashMap(); - while (tblNamesIter.hasNext()) { - String src = tblNamesIter.next(); - HashMap fMap = inputRS.getFieldMap(src); - for (Map.Entry entry : fMap.entrySet()) { - String field = entry.getKey(); - ColumnInfo valueInfo = entry.getValue(); - ExprNodeColumnDesc inputExpr = new ExprNodeColumnDesc(valueInfo - .getType(), valueInfo.getInternalName(), valueInfo.getTabAlias(), - valueInfo.getIsVirtualCol()); - reduceValues.add(inputExpr); - if (outputRS.get(src, field) == null) { - String col = getColumnInternalName(reduceValues.size() - 1); - outputColumns.add(col); - ColumnInfo newColInfo = new ColumnInfo(Utilities.ReduceField.VALUE - .toString() - + "." + col, valueInfo.getType(), src, valueInfo - .getIsVirtualCol(), valueInfo.isHiddenVirtualCol()); - colExprMap.put(newColInfo.getInternalName(), inputExpr); - outputRS.put(src, field, newColInfo); - } + List columns = inputRS.getColumnInfos(); + int[] index = new int[columns.size()]; + for (int i = 0; i < columns.size(); i++) { + ColumnInfo colInfo = columns.get(i); + String[] nm = inputRS.reverseLookup(colInfo.getInternalName()); + String[] nm2 = inputRS.getAlternateMappings(colInfo.getInternalName()); + ExprNodeDesc expr = new ExprNodeColumnDesc(colInfo.getType(), + colInfo.getInternalName(), colInfo.getTabAlias(), colInfo.getIsVirtualCol()); + + // backtrack can be null when input is script operator + ExprNodeDesc exprBack = ExprNodeDescUtils.backtrack(expr, dummy, child); + int kindex = exprBack == null ? -1 : ExprNodeDescUtils.indexOf(exprBack, reduceKeysBack); + if (kindex >= 0) { + ColumnInfo newColInfo = new ColumnInfo(colInfo); + newColInfo.setInternalName(Utilities.ReduceField.KEY + ".reducesinkkey" + kindex); + newColInfo.setTabAlias(nm[0]); + outputRS.addMappingOnly(nm[0], nm[1], newColInfo); + if (nm2 != null) { + outputRS.addMappingOnly(nm2[0], nm2[1], newColInfo); + } + index[i] = kindex; + continue; + } + int vindex = exprBack == null ? -1 : ExprNodeDescUtils.indexOf(exprBack, reduceValuesBack); + if (kindex >= 0) { + index[i] = -vindex - 1; + continue; + } + index[i] = -reduceValues.size() - 1; + String outputColName = getColumnInternalName(reduceValues.size()); + + reduceValues.add(expr); + reduceValuesBack.add(exprBack); + + ColumnInfo newColInfo = new ColumnInfo(colInfo); + newColInfo.setInternalName(Utilities.ReduceField.VALUE + "." + outputColName); + newColInfo.setTabAlias(nm[0]); + + outputRS.put(nm[0], nm[1], newColInfo); + if (nm2 != null) { + outputRS.addMappingOnly(nm2[0], nm2[1], newColInfo); } + outputColumns.add(outputColName); } + dummy.setParentOperators(null); int numReds = -1; @@ -6650,11 +6791,23 @@ private Operator genJoinReduceSinkChild(QB qb, QBJoinTree joinTree, } } + ReduceSinkDesc rsDesc = PlanUtils.getReduceSinkDesc(reduceKeys, + reduceValues, outputColumns, false, tag, + reduceKeys.size(), numReds); + ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap( - OperatorFactory.getAndMakeChild(PlanUtils.getReduceSinkDesc(reduceKeys, - reduceValues, outputColumns, false, joinTree.getNextTag(), - reduceKeys.size(), numReds), new RowSchema(outputRS + OperatorFactory.getAndMakeChild(rsDesc, new RowSchema(outputRS .getColumnInfos()), child), outputRS); + List keyColNames = rsDesc.getOutputKeyColumnNames(); + for (int i = 0 ; i < keyColNames.size(); i++) { + colExprMap.put(Utilities.ReduceField.KEY + "." + keyColNames.get(i), reduceKeys.get(i)); + } + List valColNames = rsDesc.getOutputValueColumnNames(); + for (int i = 0 ; i < valColNames.size(); i++) { + colExprMap.put(Utilities.ReduceField.VALUE + "." + valColNames.get(i), reduceValues.get(i)); + } + + rsOp.setValueIndex(index); rsOp.setColumnExprMap(colExprMap); rsOp.setInputAliases(srcs); return rsOp; @@ -6675,16 +6828,15 @@ private Operator genJoinOperator(QB qb, QBJoinTree joinTree, for (ASTNode cond : filter) { joinSrcOp = genFilterPlan(qb, cond, joinSrcOp); } - String[] leftAliases = joinTree.getLeftAliases(); - joinSrcOp = genJoinReduceSinkChild(qb, joinTree, joinSrcOp, leftAliases, 0); } - Operator[] srcOps = new Operator[joinTree.getBaseSrc().length]; + String[] baseSrc = joinTree.getBaseSrc(); + Operator[] srcOps = new Operator[baseSrc.length]; HashSet omitOpts = null; // set of input to the join that should be // omitted by the output int pos = 0; - for (String src : joinTree.getBaseSrc()) { + for (String src : baseSrc) { if (src != null) { Operator srcOp = map.get(src.toLowerCase()); @@ -6703,21 +6855,24 @@ private Operator genJoinOperator(QB qb, QBJoinTree joinTree, // generate a groupby operator (HASH mode) for a map-side partial // aggregation for semijoin - srcOp = genMapGroupByForSemijoin(qb, fields, srcOp, + srcOps[pos++] = genMapGroupByForSemijoin(qb, fields, srcOp, GroupByDesc.Mode.HASH); + } else { + srcOps[pos++] = srcOp; } - - // generate a ReduceSink operator for the join - srcOps[pos] = genJoinReduceSinkChild(qb, joinTree, srcOp, new String[]{src}, pos); - pos++; } else { assert pos == 0; - srcOps[pos++] = null; + srcOps[pos++] = joinSrcOp; } } - // Type checking and implicit type conversion for join keys - genJoinOperatorTypeCheck(joinSrcOp, srcOps); + ExprNodeDesc[][] joinKeys = genJoinKeys(joinTree, srcOps); + + for (int i = 0; i < srcOps.length; i++) { + // generate a ReduceSink operator for the join + String[] srcs = baseSrc[i] != null ? new String[] {baseSrc[i]} : joinTree.getLeftAliases(); + srcOps[i] = genJoinReduceSinkChild(qb, joinKeys[i], srcOps[i], srcs, joinTree.getNextTag()); + } JoinOperator joinOp = (JoinOperator) genJoinOperatorChildren(joinTree, joinSrcOp, srcOps, omitOpts); @@ -6748,12 +6903,14 @@ private Operator insertSelectForSemijoin(ArrayList fields, ArrayList colList = new ArrayList(); ArrayList columnNames = new ArrayList(); + Map colExprMap = new HashMap(); // construct the list of columns that need to be projected for (ASTNode field : fields) { ExprNodeColumnDesc exprNode = (ExprNodeColumnDesc) genExprNodeDesc(field, inputRR); colList.add(exprNode); columnNames.add(exprNode.getColumn()); + colExprMap.put(exprNode.getColumn(), exprNode); } // create selection operator @@ -6761,7 +6918,7 @@ private Operator insertSelectForSemijoin(ArrayList fields, new SelectDesc(colList, columnNames, false), new RowSchema(inputRR .getColumnInfos()), input), inputRR); - output.setColumnExprMap(input.getColumnExprMap()); + output.setColumnExprMap(colExprMap); return output; } @@ -6820,28 +6977,24 @@ private Operator genMapGroupByForSemijoin(QB qb, ArrayList fields, // t return op; } - private void genJoinOperatorTypeCheck(Operator left, Operator[] right) + private ExprNodeDesc[][] genJoinOperatorTypeCheck(ExprNodeDesc[][] keys) throws SemanticException { // keys[i] -> ArrayList for the i-th join operator key list - ArrayList> keys = new ArrayList>(); int keyLength = 0; - for (int i = 0; i < right.length; i++) { - Operator oi = (i == 0 && right[i] == null ? left : right[i]); - ReduceSinkDesc now = ((ReduceSinkOperator) (oi)).getConf(); + for (int i = 0; i < keys.length; i++) { if (i == 0) { - keyLength = now.getKeyCols().size(); + keyLength = keys[i].length; } else { - assert (keyLength == now.getKeyCols().size()); + assert keyLength == keys[i].length; } - keys.add(now.getKeyCols()); } // implicit type conversion hierarchy for (int k = 0; k < keyLength; k++) { // Find the common class for type conversion - TypeInfo commonType = keys.get(0).get(k).getTypeInfo(); - for (int i = 1; i < right.length; i++) { + TypeInfo commonType = keys[0][k].getTypeInfo(); + for (int i = 1; i < keys.length; i++) { TypeInfo a = commonType; - TypeInfo b = keys.get(i).get(k).getTypeInfo(); + TypeInfo b = keys[i][k].getTypeInfo(); commonType = FunctionRegistry.getCommonClassForComparison(a, b); if (commonType == null) { throw new SemanticException( @@ -6850,27 +7003,15 @@ private void genJoinOperatorTypeCheck(Operator left, Operator[] right) } } // Add implicit type conversion if necessary - for (int i = 0; i < right.length; i++) { + for (int i = 0; i < keys.length; i++) { if (TypeInfoUtils.isConversionRequiredForComparison( - keys.get(i).get(k).getTypeInfo(), - commonType)) { - keys.get(i).set( - k, - ParseUtils.createConversionCast( - keys.get(i).get(k), (PrimitiveTypeInfo)commonType)); + keys[i][k].getTypeInfo(), commonType)) { + keys[i][k] = ParseUtils.createConversionCast( + keys[i][k], (PrimitiveTypeInfo)commonType); } } } - // regenerate keySerializationInfo because the ReduceSinkOperator's - // output key types might have changed. - for (int i = 0; i < right.length; i++) { - Operator oi = (i == 0 && right[i] == null ? left : right[i]); - ReduceSinkDesc now = ((ReduceSinkOperator) (oi)).getConf(); - - now.setKeySerializeInfo(PlanUtils.getReduceKeyTableDesc(PlanUtils - .getFieldSchemasFromColumnList(now.getKeyCols(), "joinkey"), now - .getOrder())); - } + return keys; } private Operator genJoinPlan(QB qb, Map map) @@ -9275,7 +9416,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { optm.setPctx(pCtx); optm.initialize(conf); pCtx = optm.optimize(); - + FetchTask origFetchTask = pCtx.getFetchTask(); if (LOG.isDebugEnabled()) { @@ -9304,7 +9445,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { // if desired check we're not going over partition scan limits enforceScanLimits(pCtx, origFetchTask); } - + return; } diff --git ql/src/java/org/apache/hadoop/hive/ql/ppd/ExprWalkerProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/ppd/ExprWalkerProcFactory.java index 4175d11..8a8b0d5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/ppd/ExprWalkerProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/ppd/ExprWalkerProcFactory.java @@ -87,6 +87,10 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, if (exp instanceof ExprNodeGenericFuncDesc) { isCandidate = false; } + if (exp instanceof ExprNodeColumnDesc && colAlias == null) { + ExprNodeColumnDesc column = (ExprNodeColumnDesc)exp; + colAlias = new String[]{column.getTabAlias(), column.getColumn()}; + } } ctx.addConvertedNode(colref, exp); ctx.setIsCandidate(exp, isCandidate); diff --git ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java index e706f52..e716ed2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java +++ ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java @@ -116,4 +116,13 @@ public LineageInfo getLineageInfo() { public void setIndex(Index index) { this.index = index; } + + /** + * Clear all lineage states + */ + public void clear() { + dirToFop.clear(); + linfo.clear(); + index.clear(); + } }