Index: eclipse-templates/TestHive.launchtemplate =================================================================== --- eclipse-templates/TestHive.launchtemplate (revision 906309) +++ eclipse-templates/TestHive.launchtemplate (working copy) @@ -9,7 +9,7 @@ - + @@ -19,5 +19,6 @@ + Index: eclipse-templates/TestMTQueries.launchtemplate =================================================================== --- eclipse-templates/TestMTQueries.launchtemplate (revision 906309) +++ eclipse-templates/TestMTQueries.launchtemplate (working copy) @@ -9,7 +9,7 @@ - + @@ -19,5 +19,6 @@ + Index: eclipse-templates/TestJdbc.launchtemplate =================================================================== --- eclipse-templates/TestJdbc.launchtemplate (revision 906309) +++ eclipse-templates/TestJdbc.launchtemplate (working copy) @@ -9,7 +9,7 @@ - + @@ -19,5 +19,6 @@ + Index: eclipse-templates/TestCliDriver.launchtemplate =================================================================== --- eclipse-templates/TestCliDriver.launchtemplate (revision 906309) +++ eclipse-templates/TestCliDriver.launchtemplate (working copy) @@ -9,7 +9,7 @@ - + @@ -19,5 +19,6 @@ + Index: eclipse-templates/TestTruncate.launchtemplate =================================================================== --- eclipse-templates/TestTruncate.launchtemplate (revision 906309) +++ eclipse-templates/TestTruncate.launchtemplate (working copy) @@ -9,7 +9,7 @@ - + @@ -19,5 +19,6 @@ + Index: ql/src/test/results/clientpositive/fileformat_sequencefile.q.out =================================================================== --- ql/src/test/results/clientpositive/fileformat_sequencefile.q.out (revision 906309) +++ ql/src/test/results/clientpositive/fileformat_sequencefile.q.out (working copy) @@ -43,12 +43,14 @@ key int value string -Detailed Table Information Table(tableName:dest1, dbName:default, owner:njain, createTime:1253779880, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:key, type:int, comment:null), FieldSchema(name:value, type:string, comment:null)], location:file:/data/users/njain/hive5/hive5/build/ql/test/data/warehouse/dest1, inputFormat:org.apache.hadoop.mapred.SequenceFileInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, parameters:{serialization.format=1}), bucketCols:[], sortCols:[], parameters:{}), partitionKeys:[], parameters:{}) +Detailed Table Information Table(tableName:dest1, dbName:default, owner:athusoo, createTime:1264496237, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:key, type:int, comment:null), FieldSchema(name:value, type:string, comment:null)], location:file:/data/users/athusoo/apache_workspaces/hive_trunk_ws1/.ptest_0/build/ql/test/data/warehouse/dest1, inputFormat:org.apache.hadoop.mapred.SequenceFileInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, parameters:{serialization.format=1}), bucketCols:[], sortCols:[], parameters:{}), partitionKeys:[], parameters:{transient_lastDdlTime=1264496237}) PREHOOK: query: FROM src INSERT OVERWRITE TABLE dest1 SELECT src.key, src.value WHERE src.key < 10 PREHOOK: type: QUERY PREHOOK: Input: default@src PREHOOK: Output: default@dest1 +PREHOOK: Lineage: dest1.value SIMPLE null[(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: Lineage: dest1.key SIMPLE null[(src)src.FieldSchema(name:key, type:string, comment:default), ] POSTHOOK: query: FROM src INSERT OVERWRITE TABLE dest1 SELECT src.key, src.value WHERE src.key < 10 POSTHOOK: type: QUERY @@ -57,11 +59,11 @@ PREHOOK: query: SELECT dest1.* FROM dest1 PREHOOK: type: QUERY PREHOOK: Input: default@dest1 -PREHOOK: Output: file:/data/users/njain/hive5/hive5/build/ql/tmp/862471397/10000 +PREHOOK: Output: file:/data/users/athusoo/apache_workspaces/hive_trunk_ws1/.ptest_0/build/ql/scratchdir/1514937477/10000 POSTHOOK: query: SELECT dest1.* FROM dest1 POSTHOOK: type: QUERY POSTHOOK: Input: default@dest1 -POSTHOOK: Output: file:/data/users/njain/hive5/hive5/build/ql/tmp/862471397/10000 +POSTHOOK: Output: file:/data/users/athusoo/apache_workspaces/hive_trunk_ws1/.ptest_0/build/ql/scratchdir/1514937477/10000 0 val_0 4 val_4 8 val_8 @@ -77,3 +79,88 @@ POSTHOOK: query: DROP TABLE dest1 POSTHOOK: type: DROPTABLE POSTHOOK: Output: default@dest1 +PREHOOK: query: DROP TABLE dest1 +PREHOOK: type: LOAD +POSTHOOK: query: DROP TABLE dest1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@srcpart@ds=2008-04-08/hr=11 +PREHOOK: query: DROP TABLE dest1 +PREHOOK: type: LOAD +POSTHOOK: query: DROP TABLE dest1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@srcpart@ds=2008-04-08/hr=12 +PREHOOK: query: DROP TABLE dest1 +PREHOOK: type: LOAD +POSTHOOK: query: DROP TABLE dest1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@srcpart@ds=2008-04-09/hr=11 +PREHOOK: query: DROP TABLE dest1 +PREHOOK: type: LOAD +POSTHOOK: query: DROP TABLE dest1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@srcpart@ds=2008-04-09/hr=12 +PREHOOK: query: DROP TABLE dest1 +PREHOOK: type: CREATETABLE +POSTHOOK: query: DROP TABLE dest1 +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@srcbucket +PREHOOK: query: DROP TABLE dest1 +PREHOOK: type: LOAD +POSTHOOK: query: DROP TABLE dest1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@srcbucket +PREHOOK: query: DROP TABLE dest1 +PREHOOK: type: LOAD +POSTHOOK: query: DROP TABLE dest1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@srcbucket +PREHOOK: query: DROP TABLE dest1 +PREHOOK: type: CREATETABLE +POSTHOOK: query: DROP TABLE dest1 +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@srcbucket2 +PREHOOK: query: DROP TABLE dest1 +PREHOOK: type: LOAD +POSTHOOK: query: DROP TABLE dest1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@srcbucket2 +PREHOOK: query: DROP TABLE dest1 +PREHOOK: type: LOAD +POSTHOOK: query: DROP TABLE dest1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@srcbucket2 +PREHOOK: query: DROP TABLE dest1 +PREHOOK: type: LOAD +POSTHOOK: query: DROP TABLE dest1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@srcbucket2 +PREHOOK: query: DROP TABLE dest1 +PREHOOK: type: LOAD +POSTHOOK: query: DROP TABLE dest1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@srcbucket2 +PREHOOK: query: DROP TABLE dest1 +PREHOOK: type: LOAD +POSTHOOK: query: DROP TABLE dest1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@src +PREHOOK: query: DROP TABLE dest1 +PREHOOK: type: LOAD +POSTHOOK: query: DROP TABLE dest1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@src1 +PREHOOK: query: DROP TABLE dest1 +PREHOOK: type: LOAD +POSTHOOK: query: DROP TABLE dest1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@src_sequencefile +PREHOOK: query: DROP TABLE dest1 +PREHOOK: type: LOAD +POSTHOOK: query: DROP TABLE dest1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@src_thrift +PREHOOK: query: DROP TABLE dest1 +PREHOOK: type: LOAD +POSTHOOK: query: DROP TABLE dest1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@src_json Index: ql/src/test/org/apache/hadoop/hive/ql/hooks/PreExecutePrinter.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/hooks/PreExecutePrinter.java (revision 906309) +++ ql/src/test/org/apache/hadoop/hive/ql/hooks/PreExecutePrinter.java (working copy) @@ -20,6 +20,8 @@ import java.util.Set; +import org.apache.hadoop.hive.ql.hooks.LineageInfo.BaseColumnInfo; +import org.apache.hadoop.hive.ql.hooks.LineageInfo.Dependency; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.security.UserGroupInformation; @@ -32,7 +34,8 @@ @Override public void run(SessionState sess, Set inputs, - Set outputs, UserGroupInformation ugi) throws Exception { + Set outputs, LineageInfo linfo, UserGroupInformation ugi) + throws Exception { LogHelper console = SessionState.getConsole(); @@ -51,6 +54,29 @@ for (WriteEntity we : outputs) { console.printError("PREHOOK: Output: " + we.toString()); } + + // Also print out the generic lineage information if there is any + if (linfo != null) { + LineageInfoIterator iter = linfo.iterator(); + while(iter.hasNext()) { + LineageInfoItem it = iter.next(); + Dependency dep = it.getDependency(); + + StringBuilder sb = new StringBuilder(); + sb.append("PREHOOK: Lineage: " + it.getTable().getTableName() + "." + + it.getField().getName() + " " + dep.getType() + " " + dep.getExpr()); + + sb.append("["); + for(BaseColumnInfo col: dep.getBaseCols()) { + sb.append("("+col.getTabAlias().getTable().getTableName() + ")" + + col.getTabAlias().getAlias() + "." + + col.getColumn() + ", "); + } + sb.append("]"); + + console.printError(sb.toString()); + } + } } } Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (revision 906309) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (working copy) @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.ppd.PredicatePushDown; +import org.apache.hadoop.hive.ql.optimizer.lineage.Generator; /** * Implementation of the optimizer @@ -42,6 +43,8 @@ */ public void initialize(HiveConf hiveConf) { transformations = new ArrayList(); + // Add the transformation that computes the lineage information. + transformations.add(new Generator()); if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCP)) { transformations.add(new ColumnPruner()); } Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcCtx.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcCtx.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcCtx.java (revision 0) @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.lineage; + +import java.io.Serializable; + +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; + +/** + * The processor context for the lineage information. This contains the + * lineage context and the column info and operator information that is + * being used for the current expression. + */ +public class ExprProcCtx implements NodeProcessorCtx { + + /** + * The lineage context that is being populated. + */ + private LineageCtx lctx; + + /** + * The input operator in case the current operator is not a leaf. + */ + private Operator inpOp; + + /** + * Constructor. + * + * @param lctx The lineage context thatcontains the dependencies for the inputs. + * @param inpOp The input operator to the current operator. + */ + public ExprProcCtx(LineageCtx lctx, + Operator inpOp) { + this.lctx = lctx; + this.inpOp = inpOp; + } + + /** + * Gets the lineage context. + * + * @return LineageCtx The lineage context. + */ + public LineageCtx getLineageCtx() { + return lctx; + } + + /** + * Gets the input operator. + * + * @return Operator The input operator - this is null in case the current + * operator is a leaf. + */ + public Operator getInputOperator() { + return inpOp; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java (revision 0) @@ -0,0 +1,249 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.lineage; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import java.util.Vector; + +import org.apache.hadoop.hive.ql.exec.ColumnInfo; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.hooks.LineageInfo; +import org.apache.hadoop.hive.ql.hooks.LineageInfo.BaseColumnInfo; +import org.apache.hadoop.hive.ql.hooks.LineageInfo.Dependency; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.parse.ParseContext; + +/** + * This class contains the lineage context that is passed + * while walking the operator tree in Lineage. The context + * contains the LineageInfo structure that is passed to the + * pre-execution hooks. + */ +public class LineageCtx implements NodeProcessorCtx { + + /** + * The map contains an index from the (operator, columnInfo) to the + * dependency vector for that tuple. This is used to generate the + * dependency vectors during the walk of the operator tree. + */ + private Map, HashMap> index; + + /** + * The lineage info structure that is eventually generated by processing the filesink + * operator. + */ + private LineageInfo linfo; + + /** + * Parse context to get to the table metadata information. + */ + private ParseContext pctx; + + /** + * Class to record a visit from fromOp to toOp. This class is used to figure out + * whether an operator was already visited from a parent operator during a walk. + */ + protected static class Visit { + + /** + * The parent operator from where the visit was made. + */ + private Operator fromOp; + + /** + * The child operator to which the visit was made. + */ + private Operator toOp; + + /** + * Default constructor. + */ + public Visit() { + fromOp = null; + toOp = null; + } + + /** + * Constructor. + * + * @param fromOp The parent operator from where the visit was made. + * @param toOp The child operator to where the visit was made. + */ + public Visit(Operator fromOp, Operator toOp) { + this.fromOp = fromOp; + this.toOp = toOp; + } + + /** + * @return the fromOp + */ + public Operator getFromOp() { + return fromOp; + } + + /** + * @param fromOp the fromOp to set + */ + public void setFromOp(Operator fromOp) { + this.fromOp = fromOp; + } + + /** + * @return the toOp + */ + public Operator getToOp() { + return toOp; + } + + /** + * @param toOp the toOp to set + */ + public void setToOp(Operator toOp) { + this.toOp = toOp; + } + } + + /** + * A set of visits that have been made. This map can be used if a visit from a + * parent to a child is being made again. + */ + private Set visits; + + /** + * Constructor. + * + * @param pctx The parse context that is used to get table metadata information. + */ + public LineageCtx(ParseContext pctx) { + index = new HashMap, HashMap>(); + linfo = new LineageInfo(); + this.pctx = pctx; + visits = new HashSet(); + } + + /** + * Gets the dependency for an operator, columninfo tuple. + * @param op The operator whose dependency is being inspected. + * @param col The column info whose dependency is being inspected. + * @return Dependency for that particular operator, columninfo tuple. + * null if no dependency is found. + */ + public Dependency getDependency(Operator op, ColumnInfo col) { + HashMap colMap = index.get(op); + if (colMap == null) + return null; + + return colMap.get(col); + } + + /** + * Puts the dependency for an operator, columninfo tuple. + * @param op The operator whose dependency is being inserted. + * @param col The column info whose dependency is being inserted. + * @param dep The dependency. + */ + public void putDependency(Operator op, + ColumnInfo col, Dependency dep) { + HashMap colMap = index.get(op); + if (colMap == null) { + colMap = new HashMap(); + index.put(op, colMap); + } + colMap.put(col, dep); + } + + /** + * Merges the new dependencies in dep to the existing dependencies + * of (op, ci). + * + * @param op The operator of the column whose dependency is being modified. + * @param ci The column info of the associated column. + * @param dependency The new dependency. + */ + public void mergeDependency(Operator op, + ColumnInfo ci, Dependency dep) { + Dependency old_dep = getDependency(op, ci); + if (old_dep == null) + putDependency(op, ci, dep); + else { + old_dep.setType(LineageInfo.DependencyType.SET); + Set bci_set = new LinkedHashSet(old_dep.getBaseCols()); + bci_set.addAll(dep.getBaseCols()); + old_dep.setBaseCols(new Vector(bci_set)); + // TODO: Fix the expressions later. + old_dep.setExpr(null); + } + } + + /** + * Gets the lineage information computed for this query. + * + * @return LineageInfo The lineageinfo structure for this query. + */ + public LineageInfo getInfo() { + return linfo; + } + + /** + * Sets the lineage information structure. + * + * @param linfo The associated lineage information structure. + */ + public void setInfo(LineageInfo linfo) { + this.linfo = linfo; + } + + /** + * Gets the parse context. + * + * @return ParseContext + */ + public ParseContext getParseCtx() { + return pctx; + } + + /** + * Record a visit from inpOp to op + * + * @param inpOp The parent operator from where the visit is made. + * @param op The child operator to which the visit is made. + */ + public void recordVisit(Operator inpOp, + Operator op) { + visits.add(new Visit(inpOp, op)); + } + + /** + * Checks if op has been visited from inpOp. + * + * @param inpOp The parent operator from where the visit was made. + * @param op The child operator to which the visit was made. + * + * @return boolean true if op was already visited from inpOp else false. + */ + public boolean hasVisited(Operator inpOp, + Operator op) { + return visits.contains(new Visit(inpOp, op)); + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java (revision 0) @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.lineage; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.Map; + +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.PreOrderWalker; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.optimizer.Transform; +import org.apache.hadoop.hive.ql.optimizer.lineage.OpProcFactory; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; + +/** + * This class generates the lineage information for the columns + * and tables from the plan before it goes through other + * optimization phases. + */ +public class Generator implements Transform { + + /* (non-Javadoc) + * @see org.apache.hadoop.hive.ql.optimizer.Transform#transform(org.apache.hadoop.hive.ql.parse.ParseContext) + */ + @Override + public ParseContext transform(ParseContext pctx) throws SemanticException { + + // Create the lineage context + LineageCtx lCtx = new LineageCtx(pctx); + + Map opRules = new LinkedHashMap(); + opRules.put(new RuleRegExp("R1", "TS%"), OpProcFactory.getTSProc()); + opRules.put(new RuleRegExp("R2", "SCR%"), OpProcFactory.getTransformProc()); + opRules.put(new RuleRegExp("R3", "UDTF%"), OpProcFactory.getTransformProc()); + opRules.put(new RuleRegExp("R4", "FS%"), OpProcFactory.getFSProc()); + opRules.put(new RuleRegExp("R5", "SEL%"), OpProcFactory.getSelProc()); + opRules.put(new RuleRegExp("R6", "GBY%"), OpProcFactory.getGroupByProc()); + opRules.put(new RuleRegExp("R7", "UNION%"), OpProcFactory.getUnionProc()); + opRules.put(new RuleRegExp("R8", "JOIN%|MAPJOIN%"), OpProcFactory.getJoinProc()); + opRules.put(new RuleRegExp("R8", "RS%"), OpProcFactory.getReduceSinkProc()); + opRules.put(new RuleRegExp("R9", "LVJ%"), OpProcFactory.getLateralViewJoinProc()); + + // The dispatcher fires the processor corresponding to the closest matching rule and passes the context along + Dispatcher disp = new DefaultRuleDispatcher(OpProcFactory.getDefaultProc(), opRules, lCtx); + GraphWalker ogw = new PreOrderWalker(disp); + + // Create a list of topop nodes + ArrayList topNodes = new ArrayList(); + topNodes.addAll(pctx.getTopOps().values()); + ogw.startWalking(topNodes, null); + + // Transfer the lineage context to the proper place so that it can be passed to the + // pre-execution hook. + pctx.setLineageInfo(lCtx.getInfo()); + + return pctx; + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java (revision 0) @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.lineage; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Stack; +import java.util.Vector; + +import org.apache.hadoop.hive.ql.exec.ColumnInfo; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.hooks.LineageInfo; +import org.apache.hadoop.hive.ql.hooks.LineageInfo.BaseColumnInfo; +import org.apache.hadoop.hive.ql.hooks.LineageInfo.Dependency; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeNullDesc; + +/** + * Expression processor factory for lineage. Each processor is responsible to + * create the leaf level column info objects that the expression depends upon + * and also generates a string representation of the expression. + */ +public class ExprProcFactory { + + /** + * Processor for column expressions. + */ + public static class ColumnExprProcessor implements NodeProcessor { + + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + + ExprNodeColumnDesc cd = (ExprNodeColumnDesc) nd; + ExprProcCtx epc = (ExprProcCtx) procCtx; + + // assert that the input operator is not null as there are no + // exprs associated with table scans. + assert (epc.getInputOperator() != null); + + ColumnInfo inp_ci = null; + for (ColumnInfo tmp_ci : epc.getInputOperator().getSchema() + .getSignature()) { + if (tmp_ci.getInternalName().equals(cd.getColumn())) { + inp_ci = tmp_ci; + break; + } + } + + // Insert the dependencies of inp_ci to that of the current operator, ci + LineageCtx lc = epc.getLineageCtx(); + Dependency dep = lc.getDependency(epc.getInputOperator(), inp_ci); + + return dep; + } + + } + + /** + * Processor for any function or field expression. + */ + public static class GenericExprProcessor implements NodeProcessor { + + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + + assert (nd instanceof ExprNodeGenericFuncDesc || nd instanceof ExprNodeFieldDesc); + + // Concatenate the dependencies of all the children to compute the new + // dependency. + Dependency dep = new Dependency(); + + LinkedHashSet bci_set = new LinkedHashSet(); + dep.setType(LineageInfo.DependencyType.UDF); + + for (Object child : nodeOutputs) { + if (child == null) + continue; + + Dependency child_dep = (Dependency) child; + if (child_dep.getType() != LineageInfo.DependencyType.UDF) + dep.setType(child_dep.getType()); + bci_set.addAll(child_dep.getBaseCols()); + } + + dep.setBaseCols(new Vector(bci_set)); + + return dep; + } + + } + + /** + * Processor for constants and null expressions. For such expressions the + * processor simply returns a null dependency vector. + */ + public static class DefaultExprProcessor implements NodeProcessor { + + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + assert (nd instanceof ExprNodeConstantDesc || nd instanceof ExprNodeNullDesc); + + // Create a dependency that has no basecols + Dependency dep = new Dependency(); + dep.setType(LineageInfo.DependencyType.SIMPLE); + dep.setBaseCols(new Vector()); + return dep; + } + } + + public static NodeProcessor getDefaultExprProcessor() { + return new DefaultExprProcessor(); + } + + public static NodeProcessor getGenericFuncProcessor() { + return new GenericExprProcessor(); + } + + public static NodeProcessor getFieldProcessor() { + return new GenericExprProcessor(); + } + + public static NodeProcessor getColumnProcessor() { + return new ColumnExprProcessor(); + } + + /** + * Gets the expression dependencies for the expression. + * + * @param lctx + * The lineage context containing the input operators dependencies. + * @param inpOp + * The input operator to the current operator. + * @param expr + * The expression that is being processed. + * @throws SemanticException + */ + public static Dependency getExprDependency(LineageCtx lctx, + Operator inpOp, ExprNodeDesc expr) + throws SemanticException { + + // Create the walker, the rules dispatcher and the context. + ExprProcCtx exprCtx = new ExprProcCtx(lctx, inpOp); + + // create a walker which walks the tree in a DFS manner while maintaining + // the operator stack. The dispatcher + // generates the plan from the operator tree + Map exprRules = new LinkedHashMap(); + exprRules.put( + new RuleRegExp("R1", ExprNodeColumnDesc.class.getName() + "%"), + getColumnProcessor()); + exprRules.put( + new RuleRegExp("R2", ExprNodeFieldDesc.class.getName() + "%"), + getFieldProcessor()); + exprRules.put(new RuleRegExp("R3", ExprNodeGenericFuncDesc.class.getName() + + "%"), getGenericFuncProcessor()); + + // The dispatcher fires the processor corresponding to the closest matching + // rule and passes the context along + Dispatcher disp = new DefaultRuleDispatcher(getDefaultExprProcessor(), + exprRules, exprCtx); + GraphWalker egw = new DefaultGraphWalker(disp); + + List startNodes = new ArrayList(); + startNodes.add(expr); + + HashMap outputMap = new HashMap(); + egw.startWalking(startNodes, outputMap); + return (Dependency)outputMap.get(expr); + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java (revision 0) @@ -0,0 +1,544 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.lineage; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import java.util.Stack; +import java.util.Vector; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.exec.ColumnInfo; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.ForwardOperator; +import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.exec.JoinOperator; +import org.apache.hadoop.hive.ql.exec.LateralViewJoinOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.RowSchema; +import org.apache.hadoop.hive.ql.exec.ScriptOperator; +import org.apache.hadoop.hive.ql.exec.SelectOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.hooks.LineageInfo; +import org.apache.hadoop.hive.ql.hooks.LineageInfo.BaseColumnInfo; +import org.apache.hadoop.hive.ql.hooks.LineageInfo.Dependency; +import org.apache.hadoop.hive.ql.hooks.LineageInfo.TableAliasInfo; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Utils; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.AggregationDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; +import org.apache.hadoop.hive.ql.plan.JoinDesc; +import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; + +/** + * Operator factory for the rule processors for lineage. + */ +public class OpProcFactory { + + /** + * Returns the parent operator in the walk path to the current operator. + * + * @param stack The stack encoding the path. + * + * @return Operator The parent operator in the current path. + */ + protected static Operator getParent(Stack stack) { + return (Operator)Utils.getNthAncestor(stack, 1); + } + + /** + * Processor for Script and UDTF Operators. + */ + public static class TransformLineage extends DefaultLineage implements NodeProcessor { + + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + + // LineageCTx + LineageCtx lCtx = (LineageCtx) procCtx; + + // The operators + Operator op = (Operator)nd; + Operator inpOp = getParent(stack); + + // Create a single dependency list by concatenating the dependencies of all + // the cols + Dependency dep = new Dependency(); + dep.setType(LineageInfo.DependencyType.SCRIPT); + // TODO: Fix this to a non null value. + dep.setExpr(null); + + LinkedHashSet col_set = new LinkedHashSet(); + for(ColumnInfo ci : inpOp.getSchema().getSignature()) + col_set.addAll(lCtx.getDependency(inpOp, ci).getBaseCols()); + + dep.setBaseCols(new Vector(col_set)); + + // This dependency is then set for all the colinfos of the script operator + for(ColumnInfo ci : op.getSchema().getSignature()) + lCtx.putDependency(op, ci, dep); + + return null; + } + + } + + /** + * Processor for TableScan Operator. This actually creates the base column mappings. + */ + public static class TableScanLineage extends DefaultLineage implements NodeProcessor { + + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + + // LineageCtx + LineageCtx lCtx = (LineageCtx) procCtx; + ParseContext pctx = lCtx.getParseCtx(); + + // Table scan operator. + TableScanOperator top = (TableScanOperator)nd; + org.apache.hadoop.hive.ql.metadata.Table t = pctx.getTopToTable().get(top); + Table tab = t.getTTable(); + + // Generate the mappings + RowSchema rs = top.getSchema(); + List cols = t.getAllCols(); + TableAliasInfo tai = new TableAliasInfo(); + tai.setAlias(top.getConf().getAlias()); + tai.setTable(tab); + int cnt = 0; + for(ColumnInfo ci : rs.getSignature()) { + // Create a dependency + Dependency dep = new Dependency(); + BaseColumnInfo bci = new BaseColumnInfo(); + bci.setTabAlias(tai); + bci.setColumn(cols.get(cnt++)); + + // Populate the dependency + dep.setType(LineageInfo.DependencyType.SIMPLE); + // TODO: Find out how to get the expression here. + dep.setExpr(null); + dep.setBaseCols(new Vector()); + dep.getBaseCols().add(bci); + + // Put the dependency in the map + lCtx.putDependency(top, ci, dep); + } + + return null; + } + + } + + /** + * Processor for Join Operator. + */ + public static class JoinLineage extends DefaultLineage implements NodeProcessor { + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + + // Assert that there is atleast one item in the stack. This should never + // be called for leafs. + assert(!stack.isEmpty()); + + // LineageCtx + LineageCtx lCtx = (LineageCtx) procCtx; + JoinOperator op = (JoinOperator)nd; + JoinDesc jd = op.getConf(); + + // The input operator to the join is always a reduce sink operator + ReduceSinkOperator inpOp = (ReduceSinkOperator)getParent(stack); + ReduceSinkDesc rd = inpOp.getConf(); + int tag = rd.getTag(); + + // Iterate over the outputs of the join operator and merge the + // dependencies of the columns that corresponding to the tag. + int cnt = 0; + List exprs = jd.getExprs().get((byte)tag); + for(ColumnInfo ci : op.getSchema().getSignature()) { + if (jd.getReversedExprs().get(ci.getInternalName()) != tag) + continue; + + // Otherwise look up the expression corresponding to this ci + ExprNodeDesc expr = exprs.get(cnt++); + lCtx.mergeDependency(op, ci, + ExprProcFactory.getExprDependency(lCtx, inpOp, expr)); + } + + return null; + } + + } + + /** + * Processor for Join Operator. + */ + public static class LateralViewJoinLineage extends DefaultLineage implements NodeProcessor { + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + + // Assert that there is atleast one item in the stack. This should never + // be called for leafs. + assert(!stack.isEmpty()); + + // LineageCtx + LineageCtx lCtx = (LineageCtx) procCtx; + LateralViewJoinOperator op = (LateralViewJoinOperator)nd; + boolean isUdtfPath = true; + Operator inpOp = getParent(stack); + ArrayList cols = inpOp.getSchema().getSignature(); + + if (inpOp instanceof SelectOperator) + isUdtfPath = false; + + // Dirty hack!! + // For the select path the columns are the ones at the end of the + // current operators schema and for the udtf path the columns are + // at the beginning of the operator schema. + ArrayList out_cols = op.getSchema().getSignature(); + int out_cols_size = out_cols.size(); + int cols_size = cols.size(); + if (isUdtfPath) { + int cnt = 0; + while (cnt < cols_size) { + lCtx.mergeDependency(op, out_cols.get(cnt), + lCtx.getDependency(inpOp, cols.get(cnt))); + cnt++; + } + } + else { + int cnt = cols_size - 1; + while (cnt >= 0) { + lCtx.mergeDependency(op, out_cols.get(out_cols_size - cols_size + cnt), + lCtx.getDependency(inpOp, cols.get(cnt))); + cnt--; + } + } + return null; + } + + } + + /** + * Processor for Select operator. + */ + public static class SelectLineage extends DefaultLineage implements NodeProcessor { + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + + LineageCtx lctx = (LineageCtx)procCtx; + SelectOperator sop = (SelectOperator)nd; + + // if this is a selStarNoCompute then this select operator + // is treated like a default operator, so just call the super classes + // process method. + if (sop.getConf().isSelStarNoCompute()) + return super.process(nd, stack, procCtx, nodeOutputs); + + // Otherwise we treat this as a normal select operator and look at + // the expressions. + + ArrayList col_infos = sop.getSchema().getSignature(); + int cnt = 0; + for(ExprNodeDesc expr : sop.getConf().getColList()) + lctx.putDependency(sop, col_infos.get(cnt++), + ExprProcFactory.getExprDependency(lctx, getParent(stack), expr)); + + return null; + } + + } + + /** + * Processor for GroupBy operator. + */ + public static class GroupByLineage extends DefaultLineage implements NodeProcessor { + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + + LineageCtx lctx = (LineageCtx)procCtx; + GroupByOperator gop = (GroupByOperator)nd; + ArrayList col_infos = gop.getSchema().getSignature(); + Operator inpOp = getParent(stack); + int cnt = 0; + + for(ExprNodeDesc expr : gop.getConf().getKeys()) + lctx.putDependency(gop, col_infos.get(cnt++), + ExprProcFactory.getExprDependency(lctx, inpOp, expr)); + + for(AggregationDesc agg : gop.getConf().getAggregators()) { + // Concatenate the dependencies of all the parameters to + // create the new dependency + Dependency dep = new Dependency(); + dep.setType(LineageInfo.DependencyType.UDAF); + // TODO: Get the actual string here. + dep.setExpr(null); + LinkedHashSet bci_set = new LinkedHashSet(); + for(ExprNodeDesc expr : agg.getParameters()) { + Dependency expr_dep = ExprProcFactory.getExprDependency(lctx, inpOp, expr); + if (expr_dep != null) + bci_set.addAll(expr_dep.getBaseCols()); + } + + // If the bci_set is empty, this means that the inputs to this + // aggregate function were all constants (e.g. count(1)). In this case + // the aggregate function is just dependent on all the tables that are in + // the dependency list of the input operator. + if (bci_set.isEmpty()) { + Set tai_set = new LinkedHashSet(); + for(ColumnInfo ci : inpOp.getSchema().getSignature()) { + Dependency inp_dep = lctx.getDependency(inpOp, ci); + // The dependency can be null as some of the input cis may not have + // been set in case of joins. + if (inp_dep != null) + for(BaseColumnInfo bci : inp_dep.getBaseCols()) + tai_set.add(bci.getTabAlias()); + } + + // Create the BaseColumnInfos and set them in the bci_set + for(TableAliasInfo tai : tai_set) { + BaseColumnInfo bci = new BaseColumnInfo(); + bci.setTabAlias(tai); + // This is set to null to reflect that the dependency is not on any + // particular column of the table. + bci.setColumn(null); + bci_set.add(bci); + } + } + + dep.setBaseCols(new Vector(bci_set)); + lctx.putDependency(gop, col_infos.get(cnt++), dep); + } + + return null; + } + + } + + /** + * Processor for FileSink operator. + */ + public static class FileSinkLineage extends DefaultLineage implements NodeProcessor { + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + + // There are atleast 2 operators in the stack + assert(stack.size() >= 2); + + // LineageCtx + LineageCtx lCtx = (LineageCtx) procCtx; + ParseContext pctx = lCtx.getParseCtx(); + + // File sink operator. + FileSinkOperator fop = (FileSinkOperator)nd; + // The row schema of the parent operator + Operator inpOp = getParent(stack); + ArrayList inp_cols = inpOp.getSchema().getSignature(); + + // Get the associated table + org.apache.hadoop.hive.ql.metadata.Table tab = pctx.getFopToTable().get(fop); + // If this file sink is not for a table or a partition, just return. + if (tab == null) + return null; + Table dest_tab = tab.getTTable(); + + // Generate the mappings + int cnt = 0; + for(FieldSchema fs : tab.getCols()) { + // Propagate the dependency + lCtx.getInfo().putDependency(dest_tab, fs, + lCtx.getDependency(inpOp, inp_cols.get(cnt++))); + } + + return null; + } + + } + + /** + * Union processor. + * In this case we call mergeDependency as opposed to putDependency + * in order to account for visits from different parents. + */ + public static class UnionLineage extends DefaultLineage implements NodeProcessor { + + protected static final Log LOG = LogFactory.getLog(OpProcFactory.class.getName()); + + @SuppressWarnings("unchecked") + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + // Assert that there is atleast one item in the stack. This should never + // be called for leafs. + assert(!stack.isEmpty()); + + // LineageCtx + LineageCtx lCtx = (LineageCtx) procCtx; + Operator op = (Operator)nd; + + // Get the row schema of the input operator. + // The row schema of the parent operator + Operator inpOp = getParent(stack); + RowSchema rs = op.getSchema(); + ArrayList inp_cols = inpOp.getSchema().getSignature(); + int cnt = 0; + for(ColumnInfo ci : rs.getSignature()) { + lCtx.mergeDependency(op, ci, + lCtx.getDependency(inpOp, inp_cols.get(cnt++))); + } + return null; + } + } + + /** + * ReduceSink processor. + */ + public static class ReduceSinkLineage implements NodeProcessor { + + protected static final Log LOG = LogFactory.getLog(OpProcFactory.class.getName()); + + @SuppressWarnings("unchecked") + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + // Assert that there is atleast one item in the stack. This should never + // be called for leafs. + assert(!stack.isEmpty()); + + // LineageCtx + LineageCtx lCtx = (LineageCtx) procCtx; + ReduceSinkOperator rop = (ReduceSinkOperator)nd; + + ArrayList col_infos = rop.getSchema().getSignature(); + Operator inpOp = getParent(stack); + int cnt = 0; + + // The keys are included only in case the reduce sink feeds into + // a group by operator through a chain of forward operators + Operator op = rop.getChildOperators().get(0); + while (op instanceof ForwardOperator) + op = op.getChildOperators().get(0); + + if (op instanceof GroupByOperator) + for(ExprNodeDesc expr : rop.getConf().getKeyCols()) + lCtx.putDependency(rop, col_infos.get(cnt++), + ExprProcFactory.getExprDependency(lCtx, inpOp, expr)); + + for(ExprNodeDesc expr : rop.getConf().getValueCols()) + lCtx.putDependency(rop, col_infos.get(cnt++), + ExprProcFactory.getExprDependency(lCtx, inpOp, expr)); + + return null; + } + } + + /** + * Default processor. This basically passes the input dependencies as such + * to the output dependencies. + */ + public static class DefaultLineage implements NodeProcessor { + + protected static final Log LOG = LogFactory.getLog(OpProcFactory.class.getName()); + + @SuppressWarnings("unchecked") + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + // Assert that there is atleast one item in the stack. This should never + // be called for leafs. + assert(!stack.isEmpty()); + + // LineageCtx + LineageCtx lCtx = (LineageCtx) procCtx; + Operator op = (Operator)nd; + + // Get the row schema of the input operator. + // The row schema of the parent operator + Operator inpOp = getParent(stack); + RowSchema rs = op.getSchema(); + ArrayList inp_cols = inpOp.getSchema().getSignature(); + int cnt = 0; + for(ColumnInfo ci : rs.getSignature()) { + lCtx.putDependency(op, ci, + lCtx.getDependency(inpOp, inp_cols.get(cnt++))); + } + return null; + } + } + + public static NodeProcessor getJoinProc() { + return new JoinLineage(); + } + + public static NodeProcessor getLateralViewJoinProc() { + return new LateralViewJoinLineage(); + } + + public static NodeProcessor getTSProc() { + return new TableScanLineage(); + } + + public static NodeProcessor getTransformProc() { + return new TransformLineage(); + } + + public static NodeProcessor getFSProc() { + return new FileSinkLineage(); + } + + public static NodeProcessor getSelProc() { + return new SelectLineage(); + } + + public static NodeProcessor getGroupByProc() { + return new GroupByLineage(); + } + + public static NodeProcessor getUnionProc() { + return new UnionLineage(); + } + + public static NodeProcessor getReduceSinkProc() { + return new ReduceSinkLineage(); + } + + public static NodeProcessor getDefaultProc() { + return new DefaultLineage(); + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfoItem.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfoItem.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfoItem.java (revision 0) @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.hooks; + +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.hooks.LineageInfo.Dependency; + +/** + * Implements a lineageinfo item. The item is basically a triple + * of the form (table, column, dependency). + */ +public class LineageInfoItem { + + /** + * The table. + */ + private Table tab; + + /** + * The field. + */ + private FieldSchema field; + + /** + * The dependency associated witht the table and field. + */ + private Dependency dep; + + /** + * Constructor. + * + * @param tab The table. + * @param field The field. + * @param dep The dependency. + */ + public LineageInfoItem(Table tab, FieldSchema field, Dependency dep) { + this.tab = tab; + this.field = field; + this.dep = dep; + } + + /** + * Constructor. + */ + public LineageInfoItem() { + this.tab = null; + this.field = null; + this.dep = null; + } + + /** + * Gets the associated table. + * + * @return Table + */ + public Table getTable() { + return tab; + } + + /** + * Sets the associated table. + * + * @param tab Table. + */ + public void setTable(Table tab) { + this.tab = tab; + } + + /** + * Gets the associated field. + * + * @return FieldSchema + */ + public FieldSchema getField() { + return field; + } + + /** + * Sets the associated field. + * + * @param field FieldSchema. + */ + public void setField(FieldSchema field) { + this.field = field; + } + + /** + * Gets the associated dependency. + * + * @return Dependency + */ + public Dependency getDependency() { + return dep; + } + + /** + * Sets the associated dependency. + * + * @param dep Dependency. + */ + public void setDependency(Dependency dep) { + this.dep = dep; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java (revision 0) @@ -0,0 +1,246 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.hooks; + +import java.util.Comparator; +import java.util.HashMap; +import java.util.Vector; + +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Table; + +/** + * This class contains the lineage information that is passed + * to the PreExecution hook. + */ +public class LineageInfo { + + /** + * Enum to track dependency. This enum has two values: + * 1. SCALAR - Indicates that the column is derived from a scalar expression. + * 2. AGGREGATION - Indicates that the column is derived from an aggregation. + */ + public static enum DependencyType { + SIMPLE, UDF, UDAF, UDTF, SCRIPT, SET + } + + /** + * Base Column information. + */ + public static class BaseColumnInfo { + + /** + * The table and alias info ecapsulated in a different class. + */ + private TableAliasInfo tabAlias; + + /** + * The metastore column information. The column can be null + * and that denotes that the expression is dependent on the row + * of the table and not particular column. This can happen in case + * of count(1). + */ + private FieldSchema column; + + /** + * @return the tabAlias + */ + public TableAliasInfo getTabAlias() { + return tabAlias; + } + + /** + * @param tabAlias the tabAlias to set + */ + public void setTabAlias(TableAliasInfo tabAlias) { + this.tabAlias = tabAlias; + } + + /** + * @return the column + */ + public FieldSchema getColumn() { + return column; + } + + /** + * @param column the column to set + */ + public void setColumn(FieldSchema column) { + this.column = column; + } + } + + public static class TableAliasInfo { + + /** + * The alias for the table. + */ + private String alias; + + /** + * The metastore table information. + */ + private Table table; + + /** + * @return the alias + */ + public String getAlias() { + return alias; + } + + /** + * @param alias the alias to set + */ + public void setAlias(String alias) { + this.alias = alias; + } + + /** + * @return the table + */ + public Table getTable() { + return table; + } + + /** + * @param table the table to set + */ + public void setTable(Table table) { + this.table = table; + } + } + + /** + * This class tracks the dependency information for the base column. + */ + public static class Dependency { + + /** + * The type of dependency. + */ + private DependencyType type; + + /** + * Expression string for the dependency. + */ + private String expr; + + /** + * The list of base columns that the particular column depends on. + */ + private Vector baseCols; + + /** + * @return the type + */ + public DependencyType getType() { + return type; + } + + /** + * @param type the type to set + */ + public void setType(DependencyType type) { + this.type = type; + } + + /** + * @return the expr + */ + public String getExpr() { + return expr; + } + + /** + * @param expr the expr to set + */ + public void setExpr(String expr) { + this.expr = expr; + } + + /** + * @return the baseCols + */ + public Vector getBaseCols() { + return baseCols; + } + + /** + * @param basecols the baseCols to set + */ + public void setBaseCols(Vector baseCols) { + this.baseCols = baseCols; + } + } + + /** + * The map contains an index from the (tablename, columnname) to the + * dependency vector for that tuple. This is used to generate the + * dependency vectors during the walk of the operator tree. + */ + protected HashMap> index; + + /** + * Constructor. + */ + public LineageInfo() { + index = new HashMap>(); + } + + /** + * Gets the dependency for a table, column tuple. + * @param tab The table name of the column whose dependency is being inspected. + * @param col The column whose dependency is being inspected. + * @return Dependency for that particular table, column tuple. + * null if no dependency is found. + */ + public Dependency getDependency(Table tab, FieldSchema col) { + HashMap colMap = index.get(tab); + if (colMap == null) + return null; + + return colMap.get(col); + } + + /** + * Puts the dependency for a table, column tuple. + * @param tab The table of the column whose dependency is being inserted. + * @param col The column whose dependency is being inserted. + * @param dep The dependency. + */ + public void putDependency(Table tab, FieldSchema col, Dependency dep) { + HashMap colMap = index.get(tab); + if (colMap == null) { + colMap = new HashMap(); + index.put(tab, colMap); + } + colMap.put(col, dep); + } + + /** + * Gets the iterator on this structure. + * + * @return LineageInfoItereator + */ + public LineageInfoIterator iterator() { + return new LineageInfoIterator(this); + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/hooks/PreExecute.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/hooks/PreExecute.java (revision 906309) +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/PreExecute.java (working copy) @@ -38,10 +38,13 @@ * The set of input tables and partitions. * @param outputs * The set of output tables, partitions, local and hdfs directories. + * @param lInfo + * The column level lineage information. * @param ugi * The user group security information. */ - public void run(SessionState sess, Set inputs, - Set outputs, UserGroupInformation ugi) throws Exception; - + public void run(SessionState sess, Set inputs, + Set outputs, LineageInfo lInfo, UserGroupInformation ugi) + throws Exception; + } Index: ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfoIterator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfoIterator.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfoIterator.java (revision 0) @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.hooks; + +import java.util.Iterator; + +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Table; + +/** + * Impelements an iterator on the lineageinfo structure. The returned + * items are LineageInfoTuples. + */ +public class LineageInfoIterator implements Iterator { + + /** + * The iterator on the table keys. + */ + private Iterator tabIter; + + /** + * The iterator on the field keys. + */ + private Iterator fieldIter; + + /** + * The info object that is returned. + */ + private LineageInfoItem item; + + /** + * The lineage info object that this object is an iterator for. + */ + protected LineageInfo linfo; + + /** + * Initializes the internal iterators when we go to the next table. + */ + private void nextTable() { + item.setTable(tabIter.next()); + this.fieldIter = linfo.index.get(item.getTable()).keySet().iterator(); + } + + /** + * Constructor. + * + * @param linfo The lineage info structure that needs to be iterated on. + */ + public LineageInfoIterator(LineageInfo linfo) { + this.item = new LineageInfoItem(); + this.linfo = linfo; + this.fieldIter = null; + this.tabIter = linfo.index.keySet().iterator(); + if (this.tabIter.hasNext()) + nextTable(); + } + + @Override + public LineageInfoItem next() { + // Check if there is anything to return from the fieldIter. + if (!fieldIter.hasNext()) + nextTable(); + + item.setField(fieldIter.next()); + item.setDependency(linfo.index.get(item.getTable()).get(item.getField())); + + return item; + } + + @Override + public boolean hasNext() { + return ((fieldIter != null && fieldIter.hasNext()) || + tabIter.hasNext()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Remove not supported for LineageInfoIterator"); + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java (revision 906309) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java (working copy) @@ -121,4 +121,9 @@ } + @Override + public String getName() { + return "LVJ"; + } + } Index: ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java (revision 906309) +++ ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java (working copy) @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.hooks.LineageInfo; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; @@ -66,6 +67,10 @@ private FetchTask fetchTask; private HashSet inputs; private HashSet outputs; + /** + * Lineage information for the query. + */ + protected LineageInfo linfo; private HashMap idToTableNameMap; @@ -87,6 +92,7 @@ // Note that inputs and outputs can be changed when the query gets executed inputs = sem.getInputs(); outputs = sem.getOutputs(); + linfo = sem.getLineageInfo(); idToTableNameMap = new HashMap(sem.getIdToTableNameMap()); queryId = makeQueryId(); @@ -699,4 +705,21 @@ this.started = started; } + /** + * Gets the lineage information. + * + * @return LineageInfo associated with the query. + */ + public LineageInfo getLineageInfo() { + return linfo; + } + + /** + * Sets the lineage information. + * + * @param linfo The LineageInfo structure that is set in the optimization phase. + */ + public void setLineageInfo(LineageInfo linfo) { + this.linfo = linfo; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/lib/Utils.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lib/Utils.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/lib/Utils.java (revision 0) @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.lib; + +import java.util.Stack; + +/** + * Contains common utility functions to manipulate nodes, walkers etc. + */ +public class Utils { + + /** + * Gets the nth ancestor (the parent being the 1st ancestor) in the traversal + * path. n=0 returns the currently visited node. + * + * @param st The stack that encodes the traversal path. + * @param n The value of n (n=0 is the currently visited node). + * + * @return Node The Nth ancestor in the path with respect to the current node. + */ + public static Node getNthAncestor(Stack st, int n) { + assert(st.size() - 1 >= n); + + Stack tmpStack = new Stack(); + for(int i=0; i<=n; i++) + tmpStack.push(st.pop()); + + Node ret_nd = tmpStack.peek(); + + for(int i=0; i<=n; i++) + st.push(tmpStack.pop()); + + assert(tmpStack.isEmpty()); + + return ret_nd; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/lib/PreOrderWalker.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lib/PreOrderWalker.java (revision 906309) +++ ql/src/java/org/apache/hadoop/hive/ql/lib/PreOrderWalker.java (working copy) @@ -30,7 +30,7 @@ * Since the operator tree is a DAG, nodes with mutliple parents will be * visited more than once. This can be made configurable. */ - + /** * Constructor * Index: ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java (revision 906309) +++ ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java (working copy) @@ -38,7 +38,6 @@ protected Stack opStack; private final List toWalk = new ArrayList(); - private final Set seenList = new HashSet(); private final HashMap retMap = new HashMap(); private final Dispatcher dispatcher; Index: ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java (revision 906309) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java (working copy) @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.hooks.LineageInfo; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; @@ -73,7 +74,11 @@ * List of WriteEntities that are passed to the hooks. */ protected HashSet outputs; - + /** + * Lineage information for the query. + */ + protected LineageInfo linfo; + protected static final String TEXTFILE_INPUT = TextInputFormat.class .getName(); protected static final String TEXTFILE_OUTPUT = IgnoreKeyTextOutputFormat.class @@ -456,4 +461,22 @@ } } } + + /** + * Gets the lineage information. + * + * @return LineageInfo associated with the query. + */ + public LineageInfo getLineageInfo() { + return linfo; + } + + /** + * Sets the lineage information. + * + * @param linfo The LineageInfo structure that is set in the optimization phase. + */ + public void setLineageInfo(LineageInfo linfo) { + this.linfo = linfo; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 906309) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy) @@ -57,6 +57,7 @@ import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.UnionOperator; @@ -163,6 +164,7 @@ private List loadFileWork; private Map joinContext; private final HashMap topToTable; + private HashMap fopToTable; private QB qb; private ASTNode ast; private int destTableId; @@ -194,6 +196,7 @@ opParseCtx = new LinkedHashMap, OpParseContext>(); joinContext = new HashMap(); topToTable = new HashMap(); + fopToTable = new HashMap(); destTableId = 1; uCtx = null; listMapJoinOpsNoReducer = new ArrayList(); @@ -237,13 +240,15 @@ qb = pctx.getQB(); groupOpToInputTables = pctx.getGroupOpToInputTables(); prunedPartitions = pctx.getPrunedPartitions(); + setLineageInfo(pctx.getLineageInfo()); } public ParseContext getParseContext() { return new ParseContext(conf, qb, ast, opToPartPruner, topOps, topSelOps, - opParseCtx, joinContext, topToTable, loadTableWork, loadFileWork, ctx, - idToTableNameMap, destTableId, uCtx, listMapJoinOpsNoReducer, - groupOpToInputTables, prunedPartitions, opToSamplePruner); + opParseCtx, joinContext, topToTable, fopToTable, loadTableWork, + loadFileWork, ctx, idToTableNameMap, destTableId, uCtx, + listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions, + opToSamplePruner); } @SuppressWarnings("nls") @@ -2965,7 +2970,7 @@ QBMetaData qbm = qb.getMetaData(); Integer dest_type = qbm.getDestTypeForAlias(dest); - Table dest_tab; // destination table if any + Table dest_tab = null; // destination table if any String queryTmpdir; // the intermediate destination directory Path dest_path; // the final destination directory TableDesc table_desc = null; @@ -3160,6 +3165,9 @@ .getBoolVar(HiveConf.ConfVars.COMPRESSRESULT), currentTableId), fsRS, input), inputRR); + if (dest_tab != null) + fopToTable.put((FileSinkOperator)output, dest_tab); + LOG.debug("Created FileSink Plan for clause: " + dest + "dest_path: " + dest_path + " row schema: " + inputRR.toString()); @@ -4927,7 +4935,8 @@ colsEqual, alias, rwsch, qb.getMetaData(), null); tableOp = OperatorFactory.getAndMakeChild(new FilterDesc( samplePredicate, true, new sampleDesc(ts.getNumerator(), ts - .getDenominator(), tabBucketCols, true)), top); + .getDenominator(), tabBucketCols, true)), + new RowSchema(rwsch.getColumnInfos()), top); } else { // need to add filter // create tableOp to be filterDesc and set as child to 'top' @@ -4935,7 +4944,8 @@ ExprNodeDesc samplePredicate = genSamplePredicate(ts, tabBucketCols, colsEqual, alias, rwsch, qb.getMetaData(), null); tableOp = OperatorFactory.getAndMakeChild(new FilterDesc( - samplePredicate, true), top); + samplePredicate, true), + new RowSchema(rwsch.getColumnInfos()), top); } } else { boolean testMode = conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODE); @@ -4966,7 +4976,8 @@ tableOp = OperatorFactory .getAndMakeChild(new FilterDesc(samplePred, true, new sampleDesc(tsSample.getNumerator(), tsSample - .getDenominator(), tab.getBucketCols(), true)), top); + .getDenominator(), tab.getBucketCols(), true)), + new RowSchema(rwsch.getColumnInfos()), top); LOG.info("No need for sample filter"); } // The table is not bucketed, add a dummy filter :: rand() @@ -4982,7 +4993,8 @@ ExprNodeDesc samplePred = genSamplePredicate(tsSample, null, false, alias, rwsch, qb.getMetaData(), randFunc); tableOp = OperatorFactory.getAndMakeChild(new FilterDesc( - samplePred, true), top); + samplePred, true), + new RowSchema(rwsch.getColumnInfos()), top); } } } @@ -5600,8 +5612,8 @@ } ParseContext pCtx = new ParseContext(conf, qb, child, opToPartPruner, - topOps, topSelOps, opParseCtx, joinContext, topToTable, loadTableWork, - loadFileWork, ctx, idToTableNameMap, destTableId, uCtx, + topOps, topSelOps, opParseCtx, joinContext, topToTable, fopToTable, + loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx, listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions, opToSamplePruner); Index: ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java (revision 906309) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java (working copy) @@ -32,6 +32,8 @@ import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.hooks.LineageInfo; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -59,6 +61,7 @@ private LinkedHashMap, OpParseContext> opParseCtx; private Map joinContext; private HashMap topToTable; + private HashMap fopToTable; private List loadTableWork; private List loadFileWork; private Context ctx; @@ -71,7 +74,12 @@ // reducer private Map> groupOpToInputTables; private Map prunedPartitions; - + + /** + * The lineage information. + */ + private LineageInfo lInfo; + // is set to true if the expression only contains partitioning columns and not // any other column reference. // This is used to optimize select * from table where ... scenario, when the @@ -102,6 +110,7 @@ * context needed join processing (map join specifically) * @param topToTable * the top tables being processed + * @param fopToTable the table schemas that are being inserted into * @param loadTableWork * list of destination tables being loaded * @param loadFileWork @@ -126,6 +135,7 @@ LinkedHashMap, OpParseContext> opParseCtx, Map joinContext, HashMap topToTable, + HashMap fopToTable, List loadTableWork, List loadFileWork, Context ctx, HashMap idToTableNameMap, int destTableId, UnionProcContext uCtx, List listMapJoinOpsNoReducer, @@ -138,6 +148,7 @@ this.opToPartPruner = opToPartPruner; this.joinContext = joinContext; this.topToTable = topToTable; + this.fopToTable = fopToTable; this.loadFileWork = loadFileWork; this.loadTableWork = loadTableWork; this.opParseCtx = opParseCtx; @@ -247,6 +258,21 @@ } /** + * @return the fopToTable + */ + public HashMap getFopToTable() { + return fopToTable; + } + + /** + * @param fopToTable + * the fopToTable to set + */ + public void setFopToTable(HashMap fopToTable) { + this.fopToTable = fopToTable; + } + + /** * @return the topOps */ public HashMap> getTopOps() { @@ -439,4 +465,22 @@ Map prunedPartitions) { this.prunedPartitions = prunedPartitions; } + + /** + * Sets the lineage information. + * + * @param lInfo The lineage information. + */ + public void setLineageInfo(LineageInfo lInfo) { + this.lInfo = lInfo; + } + + /** + * Gets the associated lineage information. + * + * @return LineageInfo + */ + public LineageInfo getLineageInfo() { + return lInfo; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/Driver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java (revision 906309) +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java (working copy) @@ -488,6 +488,7 @@ // Get all the pre execution hooks and execute them. for (PreExecute peh : getPreExecHooks()) { peh.run(SessionState.get(), plan.getInputs(), plan.getOutputs(), + plan.getLineageInfo(), UnixUserGroupInformation.readFromConf(conf, UnixUserGroupInformation.UGI_PROPERTY_NAME)); }