diff --git ql/src/java/org/apache/hadoop/hive/ql/lib/RuleRegExp.java ql/src/java/org/apache/hadoop/hive/ql/lib/RuleRegExp.java index 1e850d6..107ee64 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lib/RuleRegExp.java +++ ql/src/java/org/apache/hadoop/hive/ql/lib/RuleRegExp.java @@ -80,6 +80,9 @@ private static boolean patternHasOnlyWildCardChar(String pattern, char wcc) { if (wildCards.contains(pc)) { hasWildCard = true; ret = ret && (pc == wcc); + if (!ret) { + return false; + } } } return ret && hasWildCard; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java index 09ef490..5034988 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java @@ -18,7 +18,7 @@ package org.apache.hadoop.hive.ql.optimizer.lineage; -import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.LinkedHashSet; @@ -65,6 +65,19 @@ */ public class ExprProcFactory { + + private static final String exprNodeColDescRegExp = ExprNodeColumnDesc.class.getName() + "%"; + private static final String exprNodeFieldDescRegExp = ExprNodeFieldDesc.class.getName() + "%"; + private static final String exprNodeGenFuncDescRegExp = ExprNodeGenericFuncDesc.class.getName() + "%"; + + private static final Map exprRules = new LinkedHashMap(); + + static { + exprRules.put(new RuleRegExp("R1", exprNodeColDescRegExp), getColumnProcessor()); + exprRules.put(new RuleRegExp("R2", exprNodeFieldDescRegExp), getFieldProcessor()); + exprRules.put(new RuleRegExp("R3", exprNodeGenFuncDescRegExp), getGenericFuncProcessor()); + } + /** * Processor for column expressions. */ @@ -272,6 +285,26 @@ public static String getExprString(RowSchema rs, ExprNodeDesc expr, public static Dependency getExprDependency(LineageCtx lctx, Operator inpOp, ExprNodeDesc expr) throws SemanticException { + return getExprDependency(lctx, inpOp, expr, new HashMap()); + } + + /** + * Gets the expression dependencies for the expression. + * + * @param lctx + * The lineage context containing the input operators dependencies. + * @param inpOp + * The input operator to the current operator. + * @param expr + * The expression that is being processed. + * @param outputMap + * @throws SemanticException + */ + public static Dependency getExprDependency(LineageCtx lctx, + Operator inpOp, ExprNodeDesc expr, HashMap outputMap) + throws SemanticException { + + outputMap.clear(); // Create the walker, the rules dispatcher and the context. ExprProcCtx exprCtx = new ExprProcCtx(lctx, inpOp); @@ -279,15 +312,6 @@ public static Dependency getExprDependency(LineageCtx lctx, // create a walker which walks the tree in a DFS manner while maintaining // the operator stack. The dispatcher // generates the plan from the operator tree - Map exprRules = new LinkedHashMap(); - exprRules.put( - new RuleRegExp("R1", ExprNodeColumnDesc.class.getName() + "%"), - getColumnProcessor()); - exprRules.put( - new RuleRegExp("R2", ExprNodeFieldDesc.class.getName() + "%"), - getFieldProcessor()); - exprRules.put(new RuleRegExp("R3", ExprNodeGenericFuncDesc.class.getName() - + "%"), getGenericFuncProcessor()); // The dispatcher fires the processor corresponding to the closest matching // rule and passes the context along @@ -295,10 +319,8 @@ public static Dependency getExprDependency(LineageCtx lctx, exprRules, exprCtx); GraphWalker egw = new DefaultGraphWalker(disp); - List startNodes = new ArrayList(); - startNodes.add(expr); + List startNodes = Collections.singletonList((Node)expr); - HashMap outputMap = new HashMap(); egw.startWalking(startNodes, outputMap); return (Dependency)outputMap.get(expr); } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java index 0c2ff32..747aae0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java @@ -47,6 +47,8 @@ import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.session.SessionState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class generates the lineage information for the columns @@ -55,6 +57,8 @@ */ public class Generator extends Transform { + private static final Logger LOG = LoggerFactory.getLogger(Generator.class); + /* (non-Javadoc) * @see org.apache.hadoop.hive.ql.optimizer.Transform#transform(org.apache.hadoop.hive.ql.parse.ParseContext) */ @@ -64,6 +68,7 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { Index index = SessionState.get() != null ? SessionState.get().getLineageState().getIndex() : new Index(); + long sTime = System.currentTimeMillis(); // Create the lineage context LineageCtx lCtx = new LineageCtx(pctx, index); @@ -101,6 +106,7 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { topNodes.addAll(pctx.getTopOps().values()); ogw.startWalking(topNodes, null); + LOG.debug("Time taken for lineage transform={}", (System.currentTimeMillis() - sTime)); return pctx; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java index d95b45b..73f88e0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java @@ -202,6 +202,9 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, * Processor for Join Operator. */ public static class JoinLineage extends DefaultLineage implements NodeProcessor { + + private final HashMap outputMap = new HashMap(); + @Override public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { @@ -237,7 +240,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // Otherwise look up the expression corresponding to this ci ExprNodeDesc expr = exprs.get(cnt++); - Dependency dependency = ExprProcFactory.getExprDependency(lCtx, inpOp, expr); + Dependency dependency = ExprProcFactory.getExprDependency(lCtx, inpOp, expr, outputMap); lCtx.getIndex().mergeDependency(op, ci, dependency); } @@ -348,6 +351,9 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, * Processor for Select operator. */ public static class SelectLineage extends DefaultLineage implements NodeProcessor { + + private final HashMap outputMap = new HashMap(); + @Override public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { @@ -372,7 +378,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, ArrayList col_infos = rs.getSignature(); int cnt = 0; for(ExprNodeDesc expr : sop.getConf().getColList()) { - Dependency dep = ExprProcFactory.getExprDependency(lctx, inpOp, expr); + Dependency dep = ExprProcFactory.getExprDependency(lctx, inpOp, expr, outputMap); if (dep != null && dep.getExpr() == null && (dep.getBaseCols().isEmpty() || dep.getType() != LineageInfo.DependencyType.SIMPLE)) { dep.setExpr(ExprProcFactory.getExprString(rs, expr, lctx, inpOp, null)); @@ -401,6 +407,9 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, * Processor for GroupBy operator. */ public static class GroupByLineage extends DefaultLineage implements NodeProcessor { + + private final HashMap outputMap = new HashMap(); + @Override public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { @@ -414,7 +423,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, for(ExprNodeDesc expr : gop.getConf().getKeys()) { lctx.getIndex().putDependency(gop, col_infos.get(cnt++), - ExprProcFactory.getExprDependency(lctx, inpOp, expr)); + ExprProcFactory.getExprDependency(lctx, inpOp, expr, outputMap)); } // If this is a reduce side GroupBy operator, check if there is @@ -438,7 +447,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } else { sb.append(", "); } - Dependency expr_dep = ExprProcFactory.getExprDependency(lctx, inpOp, expr); + Dependency expr_dep = ExprProcFactory.getExprDependency(lctx, inpOp, expr, outputMap); if (expr_dep != null && !expr_dep.getBaseCols().isEmpty()) { new_type = LineageCtx.getNewDependencyType(expr_dep.getType(), new_type); bci_set.addAll(expr_dep.getBaseCols()); @@ -542,10 +551,13 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, lCtx.getIndex().copyPredicates(inpOp, op); RowSchema rs = op.getSchema(); ArrayList inp_cols = inpOp.getSchema().getSignature(); - int cnt = 0; - for(ColumnInfo ci : rs.getSignature()) { - Dependency inp_dep = lCtx.getIndex().getDependency(inpOp, inp_cols.get(cnt++)); + + // check only for input cols + for(ColumnInfo input : inp_cols) { + Dependency inp_dep = lCtx.getIndex().getDependency(inpOp, input); if (inp_dep != null) { + //merge it with rs colInfo + ColumnInfo ci = rs.getColumnInfo(input.getInternalName()); lCtx.getIndex().mergeDependency(op, ci, inp_dep); } } @@ -558,6 +570,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, */ public static class ReduceSinkLineage implements NodeProcessor { + private final HashMap outputMap = new HashMap(); + @Override public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { @@ -584,11 +598,11 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, ArrayList col_infos = rop.getSchema().getSignature(); for(ExprNodeDesc expr : rop.getConf().getKeyCols()) { lCtx.getIndex().putDependency(rop, col_infos.get(cnt++), - ExprProcFactory.getExprDependency(lCtx, inpOp, expr)); + ExprProcFactory.getExprDependency(lCtx, inpOp, expr, outputMap)); } for(ExprNodeDesc expr : rop.getConf().getValueCols()) { lCtx.getIndex().putDependency(rop, col_infos.get(cnt++), - ExprProcFactory.getExprDependency(lCtx, inpOp, expr)); + ExprProcFactory.getExprDependency(lCtx, inpOp, expr, outputMap)); } } else { RowSchema schema = rop.getSchema(); @@ -602,7 +616,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, continue; // key in values } lCtx.getIndex().putDependency(rop, column, - ExprProcFactory.getExprDependency(lCtx, inpOp, keyCols.get(i))); + ExprProcFactory.getExprDependency(lCtx, inpOp, keyCols.get(i), outputMap)); } List valCols = desc.getValueCols(); ArrayList valColNames = desc.getOutputValueColumnNames(); @@ -614,7 +628,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, column = schema.getColumnInfo(Utilities.ReduceField.VALUE + "." + valColNames.get(i)); } lCtx.getIndex().putDependency(rop, column, - ExprProcFactory.getExprDependency(lCtx, inpOp, valCols.get(i))); + ExprProcFactory.getExprDependency(lCtx, inpOp, valCols.get(i), outputMap)); } }