Index: ql/src/test/results/clientpositive/fileformat_sequencefile.q.out =================================================================== --- ql/src/test/results/clientpositive/fileformat_sequencefile.q.out (revision 927279) +++ ql/src/test/results/clientpositive/fileformat_sequencefile.q.out (working copy) @@ -43,12 +43,14 @@ key int value string -Detailed Table Information Table(tableName:dest1, dbName:default, owner:njain, createTime:1253779880, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:key, type:int, comment:null), FieldSchema(name:value, type:string, comment:null)], location:file:/data/users/njain/hive5/hive5/build/ql/test/data/warehouse/dest1, inputFormat:org.apache.hadoop.mapred.SequenceFileInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, parameters:{serialization.format=1}), bucketCols:[], sortCols:[], parameters:{}), partitionKeys:[], parameters:{}) +Detailed Table Information Table(tableName:dest1, dbName:default, owner:athusoo, createTime:1264496237, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:key, type:int, comment:null), FieldSchema(name:value, type:string, comment:null)], location:file:/data/users/athusoo/apache_workspaces/hive_trunk_ws1/.ptest_0/build/ql/test/data/warehouse/dest1, inputFormat:org.apache.hadoop.mapred.SequenceFileInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, parameters:{serialization.format=1}), bucketCols:[], sortCols:[], parameters:{}), partitionKeys:[], parameters:{transient_lastDdlTime=1264496237}) PREHOOK: query: FROM src INSERT OVERWRITE TABLE dest1 SELECT src.key, src.value WHERE src.key < 10 PREHOOK: type: QUERY PREHOOK: Input: default@src PREHOOK: Output: default@dest1 +PREHOOK: Lineage: dest1.value SIMPLE null[(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: Lineage: dest1.key SIMPLE null[(src)src.FieldSchema(name:key, type:string, comment:default), ] POSTHOOK: query: FROM src INSERT OVERWRITE TABLE dest1 SELECT src.key, src.value WHERE src.key < 10 POSTHOOK: type: QUERY @@ -57,11 +59,11 @@ PREHOOK: query: SELECT dest1.* FROM dest1 PREHOOK: type: QUERY PREHOOK: Input: default@dest1 -PREHOOK: Output: file:/data/users/njain/hive5/hive5/build/ql/tmp/862471397/10000 +PREHOOK: Output: file:/data/users/athusoo/apache_workspaces/hive_trunk_ws1/.ptest_0/build/ql/scratchdir/1514937477/10000 POSTHOOK: query: SELECT dest1.* FROM dest1 POSTHOOK: type: QUERY POSTHOOK: Input: default@dest1 -POSTHOOK: Output: file:/data/users/njain/hive5/hive5/build/ql/tmp/862471397/10000 +POSTHOOK: Output: file:/data/users/athusoo/apache_workspaces/hive_trunk_ws1/.ptest_0/build/ql/scratchdir/1514937477/10000 0 val_0 4 val_4 8 val_8 @@ -77,3 +79,88 @@ POSTHOOK: query: DROP TABLE dest1 POSTHOOK: type: DROPTABLE POSTHOOK: Output: default@dest1 +PREHOOK: query: DROP TABLE dest1 +PREHOOK: type: LOAD +POSTHOOK: query: DROP TABLE dest1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@srcpart@ds=2008-04-08/hr=11 +PREHOOK: query: DROP TABLE dest1 +PREHOOK: type: LOAD +POSTHOOK: query: DROP TABLE dest1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@srcpart@ds=2008-04-08/hr=12 +PREHOOK: query: DROP TABLE dest1 +PREHOOK: type: LOAD +POSTHOOK: query: DROP TABLE dest1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@srcpart@ds=2008-04-09/hr=11 +PREHOOK: query: DROP TABLE dest1 +PREHOOK: type: LOAD +POSTHOOK: query: DROP TABLE dest1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@srcpart@ds=2008-04-09/hr=12 +PREHOOK: query: DROP TABLE dest1 +PREHOOK: type: CREATETABLE +POSTHOOK: query: DROP TABLE dest1 +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@srcbucket +PREHOOK: query: DROP TABLE dest1 +PREHOOK: type: LOAD +POSTHOOK: query: DROP TABLE dest1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@srcbucket +PREHOOK: query: DROP TABLE dest1 +PREHOOK: type: LOAD +POSTHOOK: query: DROP TABLE dest1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@srcbucket +PREHOOK: query: DROP TABLE dest1 +PREHOOK: type: CREATETABLE +POSTHOOK: query: DROP TABLE dest1 +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@srcbucket2 +PREHOOK: query: DROP TABLE dest1 +PREHOOK: type: LOAD +POSTHOOK: query: DROP TABLE dest1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@srcbucket2 +PREHOOK: query: DROP TABLE dest1 +PREHOOK: type: LOAD +POSTHOOK: query: DROP TABLE dest1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@srcbucket2 +PREHOOK: query: DROP TABLE dest1 +PREHOOK: type: LOAD +POSTHOOK: query: DROP TABLE dest1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@srcbucket2 +PREHOOK: query: DROP TABLE dest1 +PREHOOK: type: LOAD +POSTHOOK: query: DROP TABLE dest1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@srcbucket2 +PREHOOK: query: DROP TABLE dest1 +PREHOOK: type: LOAD +POSTHOOK: query: DROP TABLE dest1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@src +PREHOOK: query: DROP TABLE dest1 +PREHOOK: type: LOAD +POSTHOOK: query: DROP TABLE dest1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@src1 +PREHOOK: query: DROP TABLE dest1 +PREHOOK: type: LOAD +POSTHOOK: query: DROP TABLE dest1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@src_sequencefile +PREHOOK: query: DROP TABLE dest1 +PREHOOK: type: LOAD +POSTHOOK: query: DROP TABLE dest1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@src_thrift +PREHOOK: query: DROP TABLE dest1 +PREHOOK: type: LOAD +POSTHOOK: query: DROP TABLE dest1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@src_json Index: ql/src/test/org/apache/hadoop/hive/ql/hooks/PostExecutePrinter.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/hooks/PostExecutePrinter.java (revision 927279) +++ ql/src/test/org/apache/hadoop/hive/ql/hooks/PostExecutePrinter.java (working copy) @@ -18,8 +18,15 @@ package org.apache.hadoop.hive.ql.hooks; +import java.util.Iterator; +import java.util.Map; import java.util.Set; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.ql.hooks.LineageInfo.BaseColumnInfo; +import org.apache.hadoop.hive.ql.hooks.LineageInfo.Dependency; +import org.apache.hadoop.hive.ql.hooks.LineageInfo.DependencyKey; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.security.UserGroupInformation; @@ -32,7 +39,8 @@ @Override public void run(SessionState sess, Set inputs, - Set outputs, UserGroupInformation ugi) throws Exception { + Set outputs, LineageInfo linfo, + UserGroupInformation ugi) throws Exception { LogHelper console = SessionState.getConsole(); @@ -51,6 +59,47 @@ for (WriteEntity we : outputs) { console.printError("POSTHOOK: Output: " + we.toString()); } + + // Also print out the generic lineage information if there is any + if (linfo != null) { + Iterator> iter = linfo.iterator(); + while(iter.hasNext()) { + Map.Entry it = iter.next(); + Dependency dep = it.getValue(); + DependencyKey depK = it.getKey(); + + StringBuilder sb = new StringBuilder(); + sb.append("POSTHOOK: Lineage: "); + if (depK.getDataContainer().isPartition()) { + Partition part = depK.getDataContainer().getPartition(); + sb.append(part.getTableName()); + sb.append(" PARTITION("); + int i = 0; + for (FieldSchema fs : depK.getDataContainer().getTable().getPartitionKeys()) { + if (i != 0) { + sb.append(","); + } + sb.append(fs.getName() + "=" + part.getValues().get(i++)); + } + sb.append(")"); + } + else { + sb.append(depK.getDataContainer().getTable().getTableName()); + } + sb.append("." + depK.getFieldSchema().getName() + " " + + dep.getType() + " " + dep.getExpr()); + + sb.append("["); + for(BaseColumnInfo col: dep.getBaseCols()) { + sb.append("("+col.getTabAlias().getTable().getTableName() + ")" + + col.getTabAlias().getAlias() + "." + + col.getColumn() + ", "); + } + sb.append("]"); + + console.printError(sb.toString()); + } + } } } Index: ql/src/test/org/apache/hadoop/hive/ql/hooks/PreExecutePrinter.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/hooks/PreExecutePrinter.java (revision 927279) +++ ql/src/test/org/apache/hadoop/hive/ql/hooks/PreExecutePrinter.java (working copy) @@ -32,7 +32,8 @@ @Override public void run(SessionState sess, Set inputs, - Set outputs, UserGroupInformation ugi) throws Exception { + Set outputs, UserGroupInformation ugi) + throws Exception { LogHelper console = SessionState.getConsole(); Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (revision 927279) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (working copy) @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.ppd.PredicatePushDown; +import org.apache.hadoop.hive.ql.optimizer.lineage.Generator; /** * Implementation of the optimizer. @@ -42,6 +43,8 @@ */ public void initialize(HiveConf hiveConf) { transformations = new ArrayList(); + // Add the transformation that computes the lineage information. + transformations.add(new Generator()); if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCP)) { transformations.add(new ColumnPruner()); } Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcCtx.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcCtx.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcCtx.java (revision 0) @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.lineage; + +import java.io.Serializable; + +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; + +/** + * The processor context for the lineage information. This contains the + * lineage context and the column info and operator information that is + * being used for the current expression. + */ +public class ExprProcCtx implements NodeProcessorCtx { + + /** + * The lineage context that is being populated. + */ + private LineageCtx lctx; + + /** + * The input operator in case the current operator is not a leaf. + */ + private Operator inpOp; + + /** + * Constructor. + * + * @param lctx The lineage context thatcontains the dependencies for the inputs. + * @param inpOp The input operator to the current operator. + */ + public ExprProcCtx(LineageCtx lctx, + Operator inpOp) { + this.lctx = lctx; + this.inpOp = inpOp; + } + + /** + * Gets the lineage context. + * + * @return LineageCtx The lineage context. + */ + public LineageCtx getLineageCtx() { + return lctx; + } + + /** + * Gets the input operator. + * + * @return Operator The input operator - this is null in case the current + * operator is a leaf. + */ + public Operator getInputOperator() { + return inpOp; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java (revision 0) @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.lineage; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import java.util.Vector; + +import org.apache.hadoop.hive.ql.exec.ColumnInfo; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.hooks.LineageInfo; +import org.apache.hadoop.hive.ql.hooks.LineageInfo.BaseColumnInfo; +import org.apache.hadoop.hive.ql.hooks.LineageInfo.Dependency; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.parse.ParseContext; + +/** + * This class contains the lineage context that is passed + * while walking the operator tree in Lineage. The context + * contains the LineageInfo structure that is passed to the + * pre-execution hooks. + */ +public class LineageCtx implements NodeProcessorCtx { + + public static class Index { + + /** + * Serial Version UID. + */ + private static final long serialVersionUID = 1L; + + /** + * The map contains an index from the (operator, columnInfo) to the + * dependency vector for that tuple. This is used to generate the + * dependency vectors during the walk of the operator tree. + */ + private final Map, HashMap> depMap; + + /** + * Constructor. + */ + public Index() { + depMap = new HashMap, HashMap>(); + } + + /** + * Gets the dependency for an operator, columninfo tuple. + * @param op The operator whose dependency is being inspected. + * @param col The column info whose dependency is being inspected. + * @return Dependency for that particular operator, columninfo tuple. + * null if no dependency is found. + */ + public Dependency getDependency(Operator op, ColumnInfo col) { + HashMap colMap = depMap.get(op); + if (colMap == null) { + return null; + } + + return colMap.get(col); + } + + /** + * Puts the dependency for an operator, columninfo tuple. + * @param op The operator whose dependency is being inserted. + * @param col The column info whose dependency is being inserted. + * @param dep The dependency. + */ + public void putDependency(Operator op, + ColumnInfo col, Dependency dep) { + HashMap colMap = depMap.get(op); + if (colMap == null) { + colMap = new HashMap(); + depMap.put(op, colMap); + } + colMap.put(col, dep); + } + + /** + * Merges the new dependencies in dep to the existing dependencies + * of (op, ci). + * + * @param op The operator of the column whose dependency is being modified. + * @param ci The column info of the associated column. + * @param dependency The new dependency. + */ + public void mergeDependency(Operator op, + ColumnInfo ci, Dependency dep) { + Dependency old_dep = getDependency(op, ci); + if (old_dep == null) { + putDependency(op, ci, dep); + } else { + old_dep.setType(LineageInfo.DependencyType.SET); + Set bci_set = new LinkedHashSet(old_dep.getBaseCols()); + bci_set.addAll(dep.getBaseCols()); + old_dep.setBaseCols(new Vector(bci_set)); + // TODO: Fix the expressions later. + old_dep.setExpr(null); + } + } + } + + /** + * The map contains an index from the (operator, columnInfo) to the + * dependency vector for that tuple. This is used to generate the + * dependency vectors during the walk of the operator tree. + */ + private final Index index; + + /** + * Parse context to get to the table metadata information. + */ + private final ParseContext pctx; + + /** + * Constructor. + * + * @param pctx The parse context that is used to get table metadata information. + */ + public LineageCtx(ParseContext pctx) { + index = new Index(); + this.pctx = pctx; + } + + /** + * Gets the parse context. + * + * @return ParseContext + */ + public ParseContext getParseCtx() { + return pctx; + } + + /** + * Gets the dependency index. + * + * @return Index + */ + public Index getIndex() { + return index; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java (revision 0) @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.lineage; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.Map; + +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.PreOrderWalker; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.optimizer.Transform; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.session.SessionState; + +/** + * This class generates the lineage information for the columns + * and tables from the plan before it goes through other + * optimization phases. + */ +public class Generator implements Transform { + + /* (non-Javadoc) + * @see org.apache.hadoop.hive.ql.optimizer.Transform#transform(org.apache.hadoop.hive.ql.parse.ParseContext) + */ + @Override + public ParseContext transform(ParseContext pctx) throws SemanticException { + + // Create the lineage context + LineageCtx lCtx = new LineageCtx(pctx); + + Map opRules = new LinkedHashMap(); + opRules.put(new RuleRegExp("R1", "TS%"), OpProcFactory.getTSProc()); + opRules.put(new RuleRegExp("R2", "SCR%"), OpProcFactory.getTransformProc()); + opRules.put(new RuleRegExp("R3", "UDTF%"), OpProcFactory.getTransformProc()); + opRules.put(new RuleRegExp("R4", "SEL%"), OpProcFactory.getSelProc()); + opRules.put(new RuleRegExp("R5", "GBY%"), OpProcFactory.getGroupByProc()); + opRules.put(new RuleRegExp("R6", "UNION%"), OpProcFactory.getUnionProc()); + opRules.put(new RuleRegExp("R7", "JOIN%|MAPJOIN%"), OpProcFactory.getJoinProc()); + opRules.put(new RuleRegExp("R8", "RS%"), OpProcFactory.getReduceSinkProc()); + opRules.put(new RuleRegExp("R9", "LVJ%"), OpProcFactory.getLateralViewJoinProc()); + + // The dispatcher fires the processor corresponding to the closest matching rule and passes the context along + Dispatcher disp = new DefaultRuleDispatcher(OpProcFactory.getDefaultProc(), opRules, lCtx); + GraphWalker ogw = new PreOrderWalker(disp); + + // Create a list of topop nodes + ArrayList topNodes = new ArrayList(); + topNodes.addAll(pctx.getTopOps().values()); + ogw.startWalking(topNodes, null); + + // Transfer the index from the lineage context to the session state. + if (SessionState.get() != null) { + SessionState.get().getLineageState().setIndex(lCtx.getIndex()); + } + + return pctx; + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java (revision 0) @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.lineage; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Stack; +import java.util.Vector; + +import org.apache.hadoop.hive.ql.exec.ColumnInfo; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.hooks.LineageInfo; +import org.apache.hadoop.hive.ql.hooks.LineageInfo.BaseColumnInfo; +import org.apache.hadoop.hive.ql.hooks.LineageInfo.Dependency; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeNullDesc; + +/** + * Expression processor factory for lineage. Each processor is responsible to + * create the leaf level column info objects that the expression depends upon + * and also generates a string representation of the expression. + */ +public class ExprProcFactory { + + /** + * Processor for column expressions. + */ + public static class ColumnExprProcessor implements NodeProcessor { + + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + + ExprNodeColumnDesc cd = (ExprNodeColumnDesc) nd; + ExprProcCtx epc = (ExprProcCtx) procCtx; + + // assert that the input operator is not null as there are no + // exprs associated with table scans. + assert (epc.getInputOperator() != null); + + ColumnInfo inp_ci = null; + for (ColumnInfo tmp_ci : epc.getInputOperator().getSchema() + .getSignature()) { + if (tmp_ci.getInternalName().equals(cd.getColumn())) { + inp_ci = tmp_ci; + break; + } + } + + // Insert the dependencies of inp_ci to that of the current operator, ci + LineageCtx lc = epc.getLineageCtx(); + Dependency dep = lc.getIndex().getDependency(epc.getInputOperator(), inp_ci); + + return dep; + } + + } + + /** + * Processor for any function or field expression. + */ + public static class GenericExprProcessor implements NodeProcessor { + + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + + assert (nd instanceof ExprNodeGenericFuncDesc || nd instanceof ExprNodeFieldDesc); + + // Concatenate the dependencies of all the children to compute the new + // dependency. + Dependency dep = new Dependency(); + + LinkedHashSet bci_set = new LinkedHashSet(); + dep.setType(LineageInfo.DependencyType.UDF); + + for (Object child : nodeOutputs) { + if (child == null) { + continue; + } + + Dependency child_dep = (Dependency) child; + if (child_dep.getType() != LineageInfo.DependencyType.UDF) { + dep.setType(child_dep.getType()); + } + bci_set.addAll(child_dep.getBaseCols()); + } + + dep.setBaseCols(new Vector(bci_set)); + + return dep; + } + + } + + /** + * Processor for constants and null expressions. For such expressions the + * processor simply returns a null dependency vector. + */ + public static class DefaultExprProcessor implements NodeProcessor { + + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + assert (nd instanceof ExprNodeConstantDesc || nd instanceof ExprNodeNullDesc); + + // Create a dependency that has no basecols + Dependency dep = new Dependency(); + dep.setType(LineageInfo.DependencyType.SIMPLE); + dep.setBaseCols(new Vector()); + return dep; + } + } + + public static NodeProcessor getDefaultExprProcessor() { + return new DefaultExprProcessor(); + } + + public static NodeProcessor getGenericFuncProcessor() { + return new GenericExprProcessor(); + } + + public static NodeProcessor getFieldProcessor() { + return new GenericExprProcessor(); + } + + public static NodeProcessor getColumnProcessor() { + return new ColumnExprProcessor(); + } + + /** + * Gets the expression dependencies for the expression. + * + * @param lctx + * The lineage context containing the input operators dependencies. + * @param inpOp + * The input operator to the current operator. + * @param expr + * The expression that is being processed. + * @throws SemanticException + */ + public static Dependency getExprDependency(LineageCtx lctx, + Operator inpOp, ExprNodeDesc expr) + throws SemanticException { + + // Create the walker, the rules dispatcher and the context. + ExprProcCtx exprCtx = new ExprProcCtx(lctx, inpOp); + + // create a walker which walks the tree in a DFS manner while maintaining + // the operator stack. The dispatcher + // generates the plan from the operator tree + Map exprRules = new LinkedHashMap(); + exprRules.put( + new RuleRegExp("R1", ExprNodeColumnDesc.class.getName() + "%"), + getColumnProcessor()); + exprRules.put( + new RuleRegExp("R2", ExprNodeFieldDesc.class.getName() + "%"), + getFieldProcessor()); + exprRules.put(new RuleRegExp("R3", ExprNodeGenericFuncDesc.class.getName() + + "%"), getGenericFuncProcessor()); + + // The dispatcher fires the processor corresponding to the closest matching + // rule and passes the context along + Dispatcher disp = new DefaultRuleDispatcher(getDefaultExprProcessor(), + exprRules, exprCtx); + GraphWalker egw = new DefaultGraphWalker(disp); + + List startNodes = new ArrayList(); + startNodes.add(expr); + + HashMap outputMap = new HashMap(); + egw.startWalking(startNodes, outputMap); + return (Dependency)outputMap.get(expr); + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java (revision 0) @@ -0,0 +1,510 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.lineage; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import java.util.Stack; +import java.util.Vector; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.exec.ColumnInfo; +import org.apache.hadoop.hive.ql.exec.ForwardOperator; +import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.exec.JoinOperator; +import org.apache.hadoop.hive.ql.exec.LateralViewJoinOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.RowSchema; +import org.apache.hadoop.hive.ql.exec.SelectOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.hooks.LineageInfo; +import org.apache.hadoop.hive.ql.hooks.LineageInfo.BaseColumnInfo; +import org.apache.hadoop.hive.ql.hooks.LineageInfo.Dependency; +import org.apache.hadoop.hive.ql.hooks.LineageInfo.TableAliasInfo; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Utils; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.AggregationDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.JoinDesc; +import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; + +/** + * Operator factory for the rule processors for lineage. + */ +public class OpProcFactory { + + /** + * Returns the parent operator in the walk path to the current operator. + * + * @param stack The stack encoding the path. + * + * @return Operator The parent operator in the current path. + */ + protected static Operator getParent(Stack stack) { + return (Operator)Utils.getNthAncestor(stack, 1); + } + + /** + * Processor for Script and UDTF Operators. + */ + public static class TransformLineage extends DefaultLineage implements NodeProcessor { + + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + + // LineageCTx + LineageCtx lCtx = (LineageCtx) procCtx; + + // The operators + Operator op = (Operator)nd; + Operator inpOp = getParent(stack); + + // Create a single dependency list by concatenating the dependencies of all + // the cols + Dependency dep = new Dependency(); + dep.setType(LineageInfo.DependencyType.SCRIPT); + // TODO: Fix this to a non null value. + dep.setExpr(null); + + LinkedHashSet col_set = new LinkedHashSet(); + for(ColumnInfo ci : inpOp.getSchema().getSignature()) { + col_set.addAll(lCtx.getIndex().getDependency(inpOp, ci).getBaseCols()); + } + + dep.setBaseCols(new Vector(col_set)); + + // This dependency is then set for all the colinfos of the script operator + for(ColumnInfo ci : op.getSchema().getSignature()) { + lCtx.getIndex().putDependency(op, ci, dep); + } + + return null; + } + + } + + /** + * Processor for TableScan Operator. This actually creates the base column mappings. + */ + public static class TableScanLineage extends DefaultLineage implements NodeProcessor { + + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + + // LineageCtx + LineageCtx lCtx = (LineageCtx) procCtx; + ParseContext pctx = lCtx.getParseCtx(); + + // Table scan operator. + TableScanOperator top = (TableScanOperator)nd; + org.apache.hadoop.hive.ql.metadata.Table t = pctx.getTopToTable().get(top); + Table tab = t.getTTable(); + + // Generate the mappings + RowSchema rs = top.getSchema(); + List cols = t.getAllCols(); + TableAliasInfo tai = new TableAliasInfo(); + tai.setAlias(top.getConf().getAlias()); + tai.setTable(tab); + int cnt = 0; + for(ColumnInfo ci : rs.getSignature()) { + // Create a dependency + Dependency dep = new Dependency(); + BaseColumnInfo bci = new BaseColumnInfo(); + bci.setTabAlias(tai); + bci.setColumn(cols.get(cnt++)); + + // Populate the dependency + dep.setType(LineageInfo.DependencyType.SIMPLE); + // TODO: Find out how to get the expression here. + dep.setExpr(null); + dep.setBaseCols(new Vector()); + dep.getBaseCols().add(bci); + + // Put the dependency in the map + lCtx.getIndex().putDependency(top, ci, dep); + } + + return null; + } + + } + + /** + * Processor for Join Operator. + */ + public static class JoinLineage extends DefaultLineage implements NodeProcessor { + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + + // Assert that there is atleast one item in the stack. This should never + // be called for leafs. + assert(!stack.isEmpty()); + + // LineageCtx + LineageCtx lCtx = (LineageCtx) procCtx; + JoinOperator op = (JoinOperator)nd; + JoinDesc jd = op.getConf(); + + // The input operator to the join is always a reduce sink operator + ReduceSinkOperator inpOp = (ReduceSinkOperator)getParent(stack); + ReduceSinkDesc rd = inpOp.getConf(); + int tag = rd.getTag(); + + // Iterate over the outputs of the join operator and merge the + // dependencies of the columns that corresponding to the tag. + int cnt = 0; + List exprs = jd.getExprs().get((byte)tag); + for(ColumnInfo ci : op.getSchema().getSignature()) { + if (jd.getReversedExprs().get(ci.getInternalName()) != tag) { + continue; + } + + // Otherwise look up the expression corresponding to this ci + ExprNodeDesc expr = exprs.get(cnt++); + lCtx.getIndex().mergeDependency(op, ci, + ExprProcFactory.getExprDependency(lCtx, inpOp, expr)); + } + + return null; + } + + } + + /** + * Processor for Join Operator. + */ + public static class LateralViewJoinLineage extends DefaultLineage implements NodeProcessor { + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + + // Assert that there is atleast one item in the stack. This should never + // be called for leafs. + assert(!stack.isEmpty()); + + // LineageCtx + LineageCtx lCtx = (LineageCtx) procCtx; + LateralViewJoinOperator op = (LateralViewJoinOperator)nd; + boolean isUdtfPath = true; + Operator inpOp = getParent(stack); + ArrayList cols = inpOp.getSchema().getSignature(); + + if (inpOp instanceof SelectOperator) { + isUdtfPath = false; + } + + // Dirty hack!! + // For the select path the columns are the ones at the end of the + // current operators schema and for the udtf path the columns are + // at the beginning of the operator schema. + ArrayList out_cols = op.getSchema().getSignature(); + int out_cols_size = out_cols.size(); + int cols_size = cols.size(); + if (isUdtfPath) { + int cnt = 0; + while (cnt < cols_size) { + lCtx.getIndex().mergeDependency(op, out_cols.get(cnt), + lCtx.getIndex().getDependency(inpOp, cols.get(cnt))); + cnt++; + } + } + else { + int cnt = cols_size - 1; + while (cnt >= 0) { + lCtx.getIndex().mergeDependency(op, out_cols.get(out_cols_size - cols_size + cnt), + lCtx.getIndex().getDependency(inpOp, cols.get(cnt))); + cnt--; + } + } + return null; + } + + } + + /** + * Processor for Select operator. + */ + public static class SelectLineage extends DefaultLineage implements NodeProcessor { + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + + LineageCtx lctx = (LineageCtx)procCtx; + SelectOperator sop = (SelectOperator)nd; + + // if this is a selStarNoCompute then this select operator + // is treated like a default operator, so just call the super classes + // process method. + if (sop.getConf().isSelStarNoCompute()) { + return super.process(nd, stack, procCtx, nodeOutputs); + } + + // Otherwise we treat this as a normal select operator and look at + // the expressions. + + ArrayList col_infos = sop.getSchema().getSignature(); + int cnt = 0; + for(ExprNodeDesc expr : sop.getConf().getColList()) { + lctx.getIndex().putDependency(sop, col_infos.get(cnt++), + ExprProcFactory.getExprDependency(lctx, getParent(stack), expr)); + } + + return null; + } + + } + + /** + * Processor for GroupBy operator. + */ + public static class GroupByLineage extends DefaultLineage implements NodeProcessor { + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + + LineageCtx lctx = (LineageCtx)procCtx; + GroupByOperator gop = (GroupByOperator)nd; + ArrayList col_infos = gop.getSchema().getSignature(); + Operator inpOp = getParent(stack); + int cnt = 0; + + for(ExprNodeDesc expr : gop.getConf().getKeys()) { + lctx.getIndex().putDependency(gop, col_infos.get(cnt++), + ExprProcFactory.getExprDependency(lctx, inpOp, expr)); + } + + for(AggregationDesc agg : gop.getConf().getAggregators()) { + // Concatenate the dependencies of all the parameters to + // create the new dependency + Dependency dep = new Dependency(); + dep.setType(LineageInfo.DependencyType.UDAF); + // TODO: Get the actual string here. + dep.setExpr(null); + LinkedHashSet bci_set = new LinkedHashSet(); + for(ExprNodeDesc expr : agg.getParameters()) { + Dependency expr_dep = ExprProcFactory.getExprDependency(lctx, inpOp, expr); + if (expr_dep != null) { + bci_set.addAll(expr_dep.getBaseCols()); + } + } + + // If the bci_set is empty, this means that the inputs to this + // aggregate function were all constants (e.g. count(1)). In this case + // the aggregate function is just dependent on all the tables that are in + // the dependency list of the input operator. + if (bci_set.isEmpty()) { + Set tai_set = new LinkedHashSet(); + for(ColumnInfo ci : inpOp.getSchema().getSignature()) { + Dependency inp_dep = lctx.getIndex().getDependency(inpOp, ci); + // The dependency can be null as some of the input cis may not have + // been set in case of joins. + if (inp_dep != null) { + for(BaseColumnInfo bci : inp_dep.getBaseCols()) { + tai_set.add(bci.getTabAlias()); + } + } + } + + // Create the BaseColumnInfos and set them in the bci_set + for(TableAliasInfo tai : tai_set) { + BaseColumnInfo bci = new BaseColumnInfo(); + bci.setTabAlias(tai); + // This is set to null to reflect that the dependency is not on any + // particular column of the table. + bci.setColumn(null); + bci_set.add(bci); + } + } + + dep.setBaseCols(new Vector(bci_set)); + lctx.getIndex().putDependency(gop, col_infos.get(cnt++), dep); + } + + return null; + } + + } + + /** + * Union processor. + * In this case we call mergeDependency as opposed to putDependency + * in order to account for visits from different parents. + */ + public static class UnionLineage extends DefaultLineage implements NodeProcessor { + + protected static final Log LOG = LogFactory.getLog(OpProcFactory.class.getName()); + + @SuppressWarnings("unchecked") + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + // Assert that there is atleast one item in the stack. This should never + // be called for leafs. + assert(!stack.isEmpty()); + + // LineageCtx + LineageCtx lCtx = (LineageCtx) procCtx; + Operator op = (Operator)nd; + + // Get the row schema of the input operator. + // The row schema of the parent operator + Operator inpOp = getParent(stack); + RowSchema rs = op.getSchema(); + ArrayList inp_cols = inpOp.getSchema().getSignature(); + int cnt = 0; + for(ColumnInfo ci : rs.getSignature()) { + lCtx.getIndex().mergeDependency(op, ci, + lCtx.getIndex().getDependency(inpOp, inp_cols.get(cnt++))); + } + return null; + } + } + + /** + * ReduceSink processor. + */ + public static class ReduceSinkLineage implements NodeProcessor { + + protected static final Log LOG = LogFactory.getLog(OpProcFactory.class.getName()); + + @SuppressWarnings("unchecked") + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + // Assert that there is atleast one item in the stack. This should never + // be called for leafs. + assert(!stack.isEmpty()); + + // LineageCtx + LineageCtx lCtx = (LineageCtx) procCtx; + ReduceSinkOperator rop = (ReduceSinkOperator)nd; + + ArrayList col_infos = rop.getSchema().getSignature(); + Operator inpOp = getParent(stack); + int cnt = 0; + + // The keys are included only in case the reduce sink feeds into + // a group by operator through a chain of forward operators + Operator op = rop.getChildOperators().get(0); + while (op instanceof ForwardOperator) { + op = op.getChildOperators().get(0); + } + + if (op instanceof GroupByOperator) { + for(ExprNodeDesc expr : rop.getConf().getKeyCols()) { + lCtx.getIndex().putDependency(rop, col_infos.get(cnt++), + ExprProcFactory.getExprDependency(lCtx, inpOp, expr)); + } + } + + for(ExprNodeDesc expr : rop.getConf().getValueCols()) { + lCtx.getIndex().putDependency(rop, col_infos.get(cnt++), + ExprProcFactory.getExprDependency(lCtx, inpOp, expr)); + } + + return null; + } + } + + /** + * Default processor. This basically passes the input dependencies as such + * to the output dependencies. + */ + public static class DefaultLineage implements NodeProcessor { + + protected static final Log LOG = LogFactory.getLog(OpProcFactory.class.getName()); + + @SuppressWarnings("unchecked") + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + // Assert that there is atleast one item in the stack. This should never + // be called for leafs. + assert(!stack.isEmpty()); + + // LineageCtx + LineageCtx lCtx = (LineageCtx) procCtx; + Operator op = (Operator)nd; + + // Get the row schema of the input operator. + // The row schema of the parent operator + Operator inpOp = getParent(stack); + RowSchema rs = op.getSchema(); + ArrayList inp_cols = inpOp.getSchema().getSignature(); + int cnt = 0; + for(ColumnInfo ci : rs.getSignature()) { + lCtx.getIndex().putDependency(op, ci, + lCtx.getIndex().getDependency(inpOp, inp_cols.get(cnt++))); + } + return null; + } + } + + public static NodeProcessor getJoinProc() { + return new JoinLineage(); + } + + public static NodeProcessor getLateralViewJoinProc() { + return new LateralViewJoinLineage(); + } + + public static NodeProcessor getTSProc() { + return new TableScanLineage(); + } + + public static NodeProcessor getTransformProc() { + return new TransformLineage(); + } + + public static NodeProcessor getSelProc() { + return new SelectLineage(); + } + + public static NodeProcessor getGroupByProc() { + return new GroupByLineage(); + } + + public static NodeProcessor getUnionProc() { + return new UnionLineage(); + } + + public static NodeProcessor getReduceSinkProc() { + return new ReduceSinkLineage(); + } + + public static NodeProcessor getDefaultProc() { + return new DefaultLineage(); + } + +} Index: ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (revision 927279) +++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (working copy) @@ -41,7 +41,7 @@ /** * SessionState encapsulates common data associated with a session. - * + * * Also provides support for a thread static session object that can be accessed * from any point in the code to interact with the user and to retrieve * configuration information @@ -74,6 +74,20 @@ */ private String commandType; + /** + * Lineage state. + */ + LineageState ls; + + /** + * Get the lineage state stored in this session. + * + * @return LineageState + */ + public LineageState getLineageState() { + return ls; + } + public HiveConf getConf() { return conf; } @@ -96,7 +110,7 @@ public SessionState(HiveConf conf) { this.conf = conf; - + ls = new LineageState(); } public void setCmd(String cmdString) { @@ -117,7 +131,7 @@ /** * Singleton Session object per thread. - * + * **/ private static ThreadLocal tss = new ThreadLocal(); @@ -161,7 +175,7 @@ /** * get hiveHitsory object which does structured logging. - * + * * @return The hive history object */ public HiveHistory getHiveHistory() { @@ -196,11 +210,11 @@ * This class provides helper routines to emit informational and error * messages to the user and log4j files while obeying the current session's * verbosity levels. - * + * * NEVER write directly to the SessionStates standard output other than to * emit result data DO use printInfo and printError provided by LogHelper to * emit non result data strings. - * + * * It is perfectly acceptable to have global static LogHelper objects (for * example - once per module) LogHelper always emits info/error to current * session as required. Index: ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java (revision 0) @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.session; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.ql.exec.ColumnInfo; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.hooks.LineageInfo; +import org.apache.hadoop.hive.ql.hooks.LineageInfo.DataContainer; +import org.apache.hadoop.hive.ql.optimizer.lineage.LineageCtx.Index; + +/** + * LineageState. Contains all the information used to generate the + * lineage information for the post execution hooks. + * + */ +public class LineageState { + + /** + * Mapping from the directory name to FileSinkOperator. This + * mapping is generated at the filesink operator creation + * time and is then later used to created the mapping from + * movetask to the set of filesink operators. + */ + private final Map dirToFop; + + /** + * The lineage context index for this query. + */ + private Index index; + + /** + * The lineage info structure that is used to pass the lineage + * information to the hooks. + */ + private final LineageInfo linfo; + + /** + * Constructor. + */ + public LineageState() { + dirToFop = new HashMap(); + linfo = new LineageInfo(); + } + + /** + * Adds a mapping from the load work to the file sink operator. + * + * @param dir The directory name. + * @param fop The file sink operator. + */ + public void mapDirToFop(String dir, FileSinkOperator fop) { + dirToFop.put(dir, fop); + } + + /** + * Set the lineage information for the associated directory. + * + * @param dir The directory containing the query results. + * @param dc The associated data container. + * @param cols The list of columns. + */ + public void setLineage(String dir, DataContainer dc, + List cols) { + // First lookup the file sink operator from the load work. + FileSinkOperator fop = dirToFop.get(dir); + + // Go over the associated fields and look up the dependencies + // by position in the row schema of the filesink operator. + if (fop == null) { + return; + } + + List signature = fop.getSchema().getSignature(); + int i = 0; + for (FieldSchema fs : cols) { + linfo.putDependency(dc, fs, index.getDependency(fop, signature.get(i++))); + } + } + + /** + * Gets the lineage information. + * + * @return LineageInfo. + */ + public LineageInfo getLineageInfo() { + return linfo; + } + + /** + * Sets the index for the lineage state. + * + * @param index The index derived from lineage context. + */ + public void setIndex(Index index) { + this.index = index; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java (revision 0) @@ -0,0 +1,398 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.hooks; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Vector; + +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; + +/** + * This class contains the lineage information that is passed + * to the PreExecution hook. + */ +public class LineageInfo implements Serializable { + + /** + * Serial version id. + */ + private static final long serialVersionUID = 1L; + + /** + * Enum to track dependency. This enum has two values: + * 1. SCALAR - Indicates that the column is derived from a scalar expression. + * 2. AGGREGATION - Indicates that the column is derived from an aggregation. + */ + public static enum DependencyType { + SIMPLE, UDF, UDAF, UDTF, SCRIPT, SET + } + + /** + * Table or Partition data container. We need this class because the output + * of the query can either go to a table or a partition within a table. The + * data container class subsumes both of these. + */ + public static class DataContainer implements Serializable { + + /** + * Serial version id. + */ + private static final long serialVersionUID = 1L; + + /** + * The table in case this container is a table. + */ + private final Table tab; + + /** + * The partition in case this container is a partition. + */ + private final Partition part; + + /** + * Constructor for non partitioned tables. + * + * @param tab The associated table. + */ + public DataContainer(Table tab) { + this.tab = tab; + this.part = null; + } + + /** + * Constructor for a partitioned tables partition. + * + * @param part The associated partition. + */ + public DataContainer(Table tab, Partition part) { + this.tab = tab; + this.part = part; + } + + /** + * Returns true in case this data container is a partition. + * + * @return boolean TRUE if the container is a table partition. + */ + public boolean isPartition() { + return (part != null); + } + + public Table getTable() { + return this.tab; + } + + public Partition getPartition() { + return this.part; + } + } + + /** + * Class that captures the lookup key for the dependency. The dependency + * is from (DataContainer, FieldSchema) to a Dependency structure. This + * class captures the (DataContainer, FieldSchema) tuple. + */ + public static class DependencyKey implements Serializable { + + /** + * Serial version id. + */ + private static final long serialVersionUID = 1L; + + /** + * The data container for this key. + */ + private final DataContainer dc; + + /** + * The field schema for this key. + */ + private final FieldSchema fld; + + /** + * Constructor. + * + * @param dc The associated data container. + * @param fld The associated field schema. + */ + public DependencyKey(DataContainer dc, FieldSchema fld) { + this.dc = dc; + this.fld = fld; + } + + public DataContainer getDataContainer() { + return this.dc; + } + + public FieldSchema getFieldSchema() { + return this.fld; + } + + /* (non-Javadoc) + * @see java.lang.Object#hashCode() + */ + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((dc == null) ? 0 : dc.hashCode()); + result = prime * result + ((fld == null) ? 0 : fld.hashCode()); + return result; + } + + /* (non-Javadoc) + * @see java.lang.Object#equals(java.lang.Object) + */ + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + DependencyKey other = (DependencyKey) obj; + if (dc != other.dc) { + return false; + } + if (fld != other.fld) { + return false; + } + return true; + } + } + + /** + * Base Column information. + */ + public static class BaseColumnInfo implements Serializable { + + /** + * Serial version id. + */ + private static final long serialVersionUID = 1L; + + /** + * The table and alias info encapsulated in a different class. + */ + private TableAliasInfo tabAlias; + + /** + * The metastore column information. The column can be null + * and that denotes that the expression is dependent on the row + * of the table and not particular column. This can happen in case + * of count(1). + */ + private FieldSchema column; + + /** + * @return the tabAlias + */ + public TableAliasInfo getTabAlias() { + return tabAlias; + } + + /** + * @param tabAlias the tabAlias to set + */ + public void setTabAlias(TableAliasInfo tabAlias) { + this.tabAlias = tabAlias; + } + + /** + * @return the column + */ + public FieldSchema getColumn() { + return column; + } + + /** + * @param column the column to set + */ + public void setColumn(FieldSchema column) { + this.column = column; + } + } + + public static class TableAliasInfo implements Serializable { + + /** + * Serail version id. + */ + private static final long serialVersionUID = 1L; + + /** + * The alias for the table. + */ + private String alias; + + /** + * The metastore table information. + */ + private Table table; + + /** + * @return the alias + */ + public String getAlias() { + return alias; + } + + /** + * @param alias the alias to set + */ + public void setAlias(String alias) { + this.alias = alias; + } + + /** + * @return the table + */ + public Table getTable() { + return table; + } + + /** + * @param table the table to set + */ + public void setTable(Table table) { + this.table = table; + } + } + + /** + * This class tracks the dependency information for the base column. + */ + public static class Dependency implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 1L; + + /** + * The type of dependency. + */ + private DependencyType type; + + /** + * Expression string for the dependency. + */ + private String expr; + + /** + * The list of base columns that the particular column depends on. + */ + private Vector baseCols; + + /** + * @return the type + */ + public DependencyType getType() { + return type; + } + + /** + * @param type the type to set + */ + public void setType(DependencyType type) { + this.type = type; + } + + /** + * @return the expr + */ + public String getExpr() { + return expr; + } + + /** + * @param expr the expr to set + */ + public void setExpr(String expr) { + this.expr = expr; + } + + /** + * @return the baseCols + */ + public Vector getBaseCols() { + return baseCols; + } + + /** + * @param basecols the baseCols to set + */ + public void setBaseCols(Vector baseCols) { + this.baseCols = baseCols; + } + } + + /** + * The map contains an index from the (datacontainer, columnname) to the + * dependency vector for that tuple. This is used to generate the + * dependency vectors during the walk of the operator tree. + */ + protected Map index; + + /** + * Constructor. + */ + public LineageInfo() { + index = new HashMap(); + } + + /** + * Gets the dependency for a table, column tuple. + * @param dc The data container of the column whose dependency is being inspected. + * @param col The column whose dependency is being inspected. + * @return Dependency for that particular table, column tuple. + * null if no dependency is found. + */ + public Dependency getDependency(DataContainer dc, FieldSchema col) { + return index.get(new DependencyKey(dc, col)); + } + + /** + * Puts the dependency for a table, column tuple. + * @param dc The datacontainer whose dependency is being inserted. + * @param col The column whose dependency is being inserted. + * @param dep The dependency. + */ + public void putDependency(DataContainer dc, FieldSchema col, Dependency dep) { + index.put(new DependencyKey(dc, col), dep); + } + + /** + * Gets the iterator on this structure. + * + * @return LineageInfoItereator + */ + public Iterator> iterator() { + return index.entrySet().iterator(); + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecute.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecute.java (revision 927279) +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecute.java (working copy) @@ -31,17 +31,20 @@ /** * The run command that is called just before the execution of the query. - * + * * @param sess * The session state. * @param inputs * The set of input tables and partitions. * @param outputs * The set of output tables, partitions, local and hdfs directories. + * @param lInfo + * The column level lineage information. * @param ugi * The user group security information. */ void run(SessionState sess, Set inputs, - Set outputs, UserGroupInformation ugi) throws Exception; + Set outputs, LineageInfo lInfo, + UserGroupInformation ugi) throws Exception; } Index: ql/src/java/org/apache/hadoop/hive/ql/hooks/PreExecute.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/hooks/PreExecute.java (revision 927279) +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/PreExecute.java (working copy) @@ -31,7 +31,7 @@ /** * The run command that is called just before the execution of the query. - * + * * @param sess * The session state. * @param inputs @@ -41,7 +41,8 @@ * @param ugi * The user group security information. */ - void run(SessionState sess, Set inputs, - Set outputs, UserGroupInformation ugi) throws Exception; + public void run(SessionState sess, Set inputs, + Set outputs, UserGroupInformation ugi) + throws Exception; } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java (revision 927279) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java (working copy) @@ -121,4 +121,9 @@ } + @Override + public String getName() { + return "LVJ"; + } + } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (revision 927279) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (working copy) @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.hooks.LineageInfo.DataContainer; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; @@ -39,6 +40,7 @@ import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.api.StageType; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.util.StringUtils; /** @@ -151,7 +153,10 @@ } } + // Create a data container + DataContainer dc = null; if (tbd.getPartitionSpec().size() == 0) { + dc = new DataContainer(table.getTTable()); db.loadTable(new Path(tbd.getSourceDir()), tbd.getTable() .getTableName(), tbd.getReplace(), new Path(tbd.getTmpDir())); if (work.getOutputs() != null) { @@ -164,10 +169,16 @@ new Path(tbd.getTmpDir())); Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false); + dc = new DataContainer(table.getTTable(), partn.getTPartition()); if (work.getOutputs() != null) { work.getOutputs().add(new WriteEntity(partn)); } } + + if (SessionState.get() != null) { + SessionState.get().getLineageState() + .setLineage(tbd.getSourceDir(), dc, table.getCols()); + } } return 0; Index: ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java (revision 927279) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java (working copy) @@ -20,8 +20,9 @@ import java.io.Serializable; import java.util.HashSet; -import java.util.Set; +import java.util.List; +import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; @@ -46,6 +47,11 @@ */ protected HashSet outputs; + /** + * List of inserted partitions + */ + protected List movedParts; + public MoveWork() { } Index: ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java (revision 927279) +++ ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java (working copy) @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.hooks.LineageInfo; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; @@ -73,6 +74,10 @@ * to the outputs here. */ private HashSet outputs; + /** + * Lineage information for the query. + */ + protected LineageInfo linfo; private HashMap idToTableNameMap; @@ -94,6 +99,7 @@ // Note that inputs and outputs can be changed when the query gets executed inputs = sem.getInputs(); outputs = sem.getOutputs(); + linfo = sem.getLineageInfo(); idToTableNameMap = new HashMap(sem.getIdToTableNameMap()); queryId = makeQueryId(); @@ -711,4 +717,21 @@ this.started = started; } + /** + * Gets the lineage information. + * + * @return LineageInfo associated with the query. + */ + public LineageInfo getLineageInfo() { + return linfo; + } + + /** + * Sets the lineage information. + * + * @param linfo The LineageInfo structure that is set in the optimization phase. + */ + public void setLineageInfo(LineageInfo linfo) { + this.linfo = linfo; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/lib/Utils.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lib/Utils.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/lib/Utils.java (revision 0) @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.lib; + +import java.util.Stack; + +/** + * Contains common utility functions to manipulate nodes, walkers etc. + */ +public class Utils { + + /** + * Gets the nth ancestor (the parent being the 1st ancestor) in the traversal + * path. n=0 returns the currently visited node. + * + * @param st The stack that encodes the traversal path. + * @param n The value of n (n=0 is the currently visited node). + * + * @return Node The Nth ancestor in the path with respect to the current node. + */ + public static Node getNthAncestor(Stack st, int n) { + assert(st.size() - 1 >= n); + + Stack tmpStack = new Stack(); + for(int i=0; i<=n; i++) + tmpStack.push(st.pop()); + + Node ret_nd = tmpStack.peek(); + + for(int i=0; i<=n; i++) + st.push(tmpStack.pop()); + + assert(tmpStack.isEmpty()); + + return ret_nd; + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/lib/PreOrderWalker.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lib/PreOrderWalker.java (revision 927279) +++ ql/src/java/org/apache/hadoop/hive/ql/lib/PreOrderWalker.java (working copy) @@ -30,7 +30,7 @@ * Since the operator tree is a DAG, nodes with mutliple parents will be * visited more than once. This can be made configurable. */ - + /** * Constructor. * Index: ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java (revision 927279) +++ ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java (working copy) @@ -38,7 +38,6 @@ protected Stack opStack; private final List toWalk = new ArrayList(); - private final Set seenList = new HashSet(); private final HashMap retMap = new HashMap(); private final Dispatcher dispatcher; Index: ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java (revision 927279) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java (working copy) @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.hooks.LineageInfo; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; @@ -58,14 +59,6 @@ * BaseSemanticAnalyzer. * */ -/** - * BaseSemanticAnalyzer. - * - */ -/** - * BaseSemanticAnalyzer. - * - */ public abstract class BaseSemanticAnalyzer { protected final Hive db; protected final HiveConf conf; @@ -88,6 +81,10 @@ * List of WriteEntities that are passed to the hooks. */ protected HashSet outputs; + /** + * Lineage information for the query. + */ + protected LineageInfo linfo; protected static final String TEXTFILE_INPUT = TextInputFormat.class .getName(); @@ -494,4 +491,22 @@ } } } + + /** + * Gets the lineage information. + * + * @return LineageInfo associated with the query. + */ + public LineageInfo getLineageInfo() { + return linfo; + } + + /** + * Sets the lineage information. + * + * @param linfo The LineageInfo structure that is set in the optimization phase. + */ + public void setLineageInfo(LineageInfo linfo) { + this.linfo = linfo; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 927279) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy) @@ -50,11 +50,11 @@ import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.ExecDriver; import org.apache.hadoop.hive.ql.exec.FetchTask; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.FunctionInfo; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; -import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.MapRedTask; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; @@ -245,13 +245,15 @@ qb = pctx.getQB(); groupOpToInputTables = pctx.getGroupOpToInputTables(); prunedPartitions = pctx.getPrunedPartitions(); + setLineageInfo(pctx.getLineageInfo()); } public ParseContext getParseContext() { return new ParseContext(conf, qb, ast, opToPartPruner, topOps, topSelOps, - opParseCtx, joinContext, topToTable, loadTableWork, loadFileWork, ctx, - idToTableNameMap, destTableId, uCtx, listMapJoinOpsNoReducer, - groupOpToInputTables, prunedPartitions, opToSamplePruner); + opParseCtx, joinContext, topToTable, loadTableWork, + loadFileWork, ctx, idToTableNameMap, destTableId, uCtx, + listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions, + opToSamplePruner); } @SuppressWarnings("nls") @@ -3143,13 +3145,14 @@ QBMetaData qbm = qb.getMetaData(); Integer dest_type = qbm.getDestTypeForAlias(dest); - Table dest_tab; // destination table if any + Table dest_tab = null; // destination table if any String queryTmpdir = null; // the intermediate destination directory Path dest_path = null; // the final destination directory TableDesc table_desc = null; int currentTableId = 0; boolean isLocal = false; SortBucketRSCtx rsCtx = new SortBucketRSCtx(); + LoadTableDesc ltd = null; switch (dest_type.intValue()) { case QBMetaData.DEST_TABLE: { @@ -3179,9 +3182,10 @@ // Create the work for moving the table if (!isNonNativeTable) { - loadTableWork.add(new LoadTableDesc(queryTmpdir, ctx + ltd = new LoadTableDesc(queryTmpdir, ctx .getExternalTmpFileURI(dest_path.toUri()), table_desc, - new HashMap())); + new HashMap()); + loadTableWork.add(ltd); } if (!outputs.add(new WriteEntity(dest_tab))) { throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES @@ -3205,9 +3209,10 @@ currentTableId = destTableId; destTableId++; - loadTableWork.add(new LoadTableDesc(queryTmpdir, ctx + ltd = new LoadTableDesc(queryTmpdir, ctx .getExternalTmpFileURI(dest_path.toUri()), table_desc, dest_part - .getSpec())); + .getSpec()); + loadTableWork.add(ltd); if (!outputs.add(new WriteEntity(dest_part))) { throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES .getMsg(dest_tab.getTableName() + "@" + dest_part.getName())); @@ -3355,6 +3360,12 @@ rsCtx.isMultiFileSpray(), rsCtx.getNumFiles(), rsCtx.getTotalFiles(), rsCtx.getPartnCols()), fsRS, input), inputRR); + + if (ltd != null && SessionState.get() != null) { + SessionState.get().getLineageState() + .mapDirToFop(ltd.getSourceDir(), (FileSinkOperator)output); + } + LOG.debug("Created FileSink Plan for clause: " + dest + "dest_path: " + dest_path + " row schema: " + inputRR.toString()); @@ -5277,7 +5288,8 @@ colsEqual, alias, rwsch, qb.getMetaData(), null); tableOp = OperatorFactory.getAndMakeChild(new FilterDesc( samplePredicate, true, new sampleDesc(ts.getNumerator(), ts - .getDenominator(), tabBucketCols, true)), top); + .getDenominator(), tabBucketCols, true)), + new RowSchema(rwsch.getColumnInfos()), top); } else { // need to add filter // create tableOp to be filterDesc and set as child to 'top' @@ -5285,7 +5297,8 @@ ExprNodeDesc samplePredicate = genSamplePredicate(ts, tabBucketCols, colsEqual, alias, rwsch, qb.getMetaData(), null); tableOp = OperatorFactory.getAndMakeChild(new FilterDesc( - samplePredicate, true), top); + samplePredicate, true), + new RowSchema(rwsch.getColumnInfos()), top); } } else { boolean testMode = conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODE); @@ -5316,7 +5329,8 @@ tableOp = OperatorFactory .getAndMakeChild(new FilterDesc(samplePred, true, new sampleDesc(tsSample.getNumerator(), tsSample - .getDenominator(), tab.getBucketCols(), true)), top); + .getDenominator(), tab.getBucketCols(), true)), + new RowSchema(rwsch.getColumnInfos()), top); LOG.info("No need for sample filter"); } else { // The table is not bucketed, add a dummy filter :: rand() @@ -5331,7 +5345,8 @@ ExprNodeDesc samplePred = genSamplePredicate(tsSample, null, false, alias, rwsch, qb.getMetaData(), randFunc); tableOp = OperatorFactory.getAndMakeChild(new FilterDesc( - samplePred, true), top); + samplePred, true), + new RowSchema(rwsch.getColumnInfos()), top); } } } @@ -5616,8 +5631,9 @@ } else { new ArrayList(); for (LoadTableDesc ltd : loadTableWork) { - mvTask.add(TaskFactory.get(new MoveWork(null, null, ltd, null, false), - conf)); + Task tsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), + conf); + mvTask.add(tsk); } boolean oneLoadFile = true; @@ -5950,8 +5966,8 @@ } ParseContext pCtx = new ParseContext(conf, qb, child, opToPartPruner, - topOps, topSelOps, opParseCtx, joinContext, topToTable, loadTableWork, - loadFileWork, ctx, idToTableNameMap, destTableId, uCtx, + topOps, topSelOps, opParseCtx, joinContext, topToTable, + loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx, listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions, opToSamplePruner); Index: ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java (revision 927279) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java (working copy) @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.hooks.LineageInfo; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -48,7 +49,7 @@ * populated. Note that since the parse context contains the operator tree, it * can be easily retrieved by the next optimization step or finally for task * generation after the plan has been completely optimized. - * + * **/ public class ParseContext { @@ -75,6 +76,11 @@ private Map> groupOpToInputTables; private Map prunedPartitions; + /** + * The lineage information. + */ + private LineageInfo lInfo; + // is set to true if the expression only contains partitioning columns and not // any other column reference. // This is used to optimize select * from table where ... scenario, when the @@ -105,6 +111,7 @@ * context needed join processing (map join specifically) * @param topToTable * the top tables being processed + * @param fopToTable the table schemas that are being inserted into * @param loadTableWork * list of destination tables being loaded * @param loadFileWork @@ -383,7 +390,7 @@ /** * Sets the hasNonPartCols flag. - * + * * @param val */ public void setHasNonPartCols(boolean val) { @@ -443,6 +450,24 @@ this.prunedPartitions = prunedPartitions; } + /** + * Sets the lineage information. + * + * @param lInfo The lineage information. + */ + public void setLineageInfo(LineageInfo lInfo) { + this.lInfo = lInfo; + } + + /** + * Gets the associated lineage information. + * + * @return LineageInfo + */ + public LineageInfo getLineageInfo() { + return lInfo; + } + public Map getMapJoinContext() { return mapJoinContext; } Index: ql/src/java/org/apache/hadoop/hive/ql/Driver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java (revision 927279) +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java (working copy) @@ -162,7 +162,7 @@ List lst = sem.getResultSchema(); schema = new Schema(lst, null); } else if (sem.getFetchTask() != null) { - FetchTask ft = (FetchTask) sem.getFetchTask(); + FetchTask ft = sem.getFetchTask(); TableDesc td = ft.getTblDesc(); // partitioned tables don't have tableDesc set on the FetchTask. Instead // they have a list of PartitionDesc objects, each with a table desc. @@ -294,7 +294,7 @@ try { ctx = new Context(conf); - + ParseDriver pd = new ParseDriver(); ASTNode tree = pd.parse(command, ctx); tree = ParseUtils.findRootNonNullToken(tree); @@ -317,25 +317,25 @@ schema = getSchema(sem, conf); // Serialize the query plan - // get temp file name and remove file: + // get temp file name and remove file: String queryPlanFileName = ctx.getLocalScratchDir() + Path.SEPARATOR_CHAR + "queryplan.xml"; LOG.info("query plan = " + queryPlanFileName); queryPlanFileName = new Path(queryPlanFileName).toUri().getPath(); - - // serialize the queryPlan + + // serialize the queryPlan FileOutputStream fos = new FileOutputStream(queryPlanFileName); Utilities.serializeQueryPlan(plan, fos); fos.close(); - - // deserialize the queryPlan + + // deserialize the queryPlan FileInputStream fis = new FileInputStream(queryPlanFileName); QueryPlan newPlan = Utilities.deserializeQueryPlan(fis, conf); fis.close(); - + // Use the deserialized plan plan = newPlan; - + // initialize FetchTask right here if (plan.getFetchTask() != null) { plan.getFetchTask().initialize(conf, plan, null); @@ -540,6 +540,7 @@ // Get all the post execution hooks and execute them. for (PostExecute peh : getPostExecHooks()) { peh.run(SessionState.get(), plan.getInputs(), plan.getOutputs(), + (SessionState.get() != null ? SessionState.get().getLineageState().getLineageInfo() : null), UnixUserGroupInformation.readFromConf(conf, UnixUserGroupInformation.UGI_PROPERTY_NAME)); } @@ -676,7 +677,7 @@ public boolean getResults(ArrayList res) throws IOException { if (plan != null && plan.getFetchTask() != null) { - FetchTask ft = (FetchTask) plan.getFetchTask(); + FetchTask ft = plan.getFetchTask(); ft.setMaxRows(maxRows); return ft.fetch(res); }