Index: eclipse-templates/TestHive.launchtemplate
===================================================================
--- eclipse-templates/TestHive.launchtemplate (revision 906309)
+++ eclipse-templates/TestHive.launchtemplate (working copy)
@@ -9,7 +9,7 @@
-
+
@@ -19,5 +19,6 @@
+
Index: eclipse-templates/TestMTQueries.launchtemplate
===================================================================
--- eclipse-templates/TestMTQueries.launchtemplate (revision 906309)
+++ eclipse-templates/TestMTQueries.launchtemplate (working copy)
@@ -9,7 +9,7 @@
-
+
@@ -19,5 +19,6 @@
+
Index: eclipse-templates/TestJdbc.launchtemplate
===================================================================
--- eclipse-templates/TestJdbc.launchtemplate (revision 906309)
+++ eclipse-templates/TestJdbc.launchtemplate (working copy)
@@ -9,7 +9,7 @@
-
+
@@ -19,5 +19,6 @@
+
Index: eclipse-templates/TestCliDriver.launchtemplate
===================================================================
--- eclipse-templates/TestCliDriver.launchtemplate (revision 906309)
+++ eclipse-templates/TestCliDriver.launchtemplate (working copy)
@@ -9,7 +9,7 @@
-
+
@@ -19,5 +19,6 @@
+
Index: eclipse-templates/TestTruncate.launchtemplate
===================================================================
--- eclipse-templates/TestTruncate.launchtemplate (revision 906309)
+++ eclipse-templates/TestTruncate.launchtemplate (working copy)
@@ -9,7 +9,7 @@
-
+
@@ -19,5 +19,6 @@
+
Index: ql/src/test/results/clientpositive/fileformat_sequencefile.q.out
===================================================================
--- ql/src/test/results/clientpositive/fileformat_sequencefile.q.out (revision 906309)
+++ ql/src/test/results/clientpositive/fileformat_sequencefile.q.out (working copy)
@@ -43,12 +43,14 @@
key int
value string
-Detailed Table Information Table(tableName:dest1, dbName:default, owner:njain, createTime:1253779880, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:key, type:int, comment:null), FieldSchema(name:value, type:string, comment:null)], location:file:/data/users/njain/hive5/hive5/build/ql/test/data/warehouse/dest1, inputFormat:org.apache.hadoop.mapred.SequenceFileInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, parameters:{serialization.format=1}), bucketCols:[], sortCols:[], parameters:{}), partitionKeys:[], parameters:{})
+Detailed Table Information Table(tableName:dest1, dbName:default, owner:athusoo, createTime:1264496237, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:key, type:int, comment:null), FieldSchema(name:value, type:string, comment:null)], location:file:/data/users/athusoo/apache_workspaces/hive_trunk_ws1/.ptest_0/build/ql/test/data/warehouse/dest1, inputFormat:org.apache.hadoop.mapred.SequenceFileInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, parameters:{serialization.format=1}), bucketCols:[], sortCols:[], parameters:{}), partitionKeys:[], parameters:{transient_lastDdlTime=1264496237})
PREHOOK: query: FROM src
INSERT OVERWRITE TABLE dest1 SELECT src.key, src.value WHERE src.key < 10
PREHOOK: type: QUERY
PREHOOK: Input: default@src
PREHOOK: Output: default@dest1
+PREHOOK: Lineage: dest1.value SIMPLE null[(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: Lineage: dest1.key SIMPLE null[(src)src.FieldSchema(name:key, type:string, comment:default), ]
POSTHOOK: query: FROM src
INSERT OVERWRITE TABLE dest1 SELECT src.key, src.value WHERE src.key < 10
POSTHOOK: type: QUERY
@@ -57,11 +59,11 @@
PREHOOK: query: SELECT dest1.* FROM dest1
PREHOOK: type: QUERY
PREHOOK: Input: default@dest1
-PREHOOK: Output: file:/data/users/njain/hive5/hive5/build/ql/tmp/862471397/10000
+PREHOOK: Output: file:/data/users/athusoo/apache_workspaces/hive_trunk_ws1/.ptest_0/build/ql/scratchdir/1514937477/10000
POSTHOOK: query: SELECT dest1.* FROM dest1
POSTHOOK: type: QUERY
POSTHOOK: Input: default@dest1
-POSTHOOK: Output: file:/data/users/njain/hive5/hive5/build/ql/tmp/862471397/10000
+POSTHOOK: Output: file:/data/users/athusoo/apache_workspaces/hive_trunk_ws1/.ptest_0/build/ql/scratchdir/1514937477/10000
0 val_0
4 val_4
8 val_8
@@ -77,3 +79,88 @@
POSTHOOK: query: DROP TABLE dest1
POSTHOOK: type: DROPTABLE
POSTHOOK: Output: default@dest1
+PREHOOK: query: DROP TABLE dest1
+PREHOOK: type: LOAD
+POSTHOOK: query: DROP TABLE dest1
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@srcpart@ds=2008-04-08/hr=11
+PREHOOK: query: DROP TABLE dest1
+PREHOOK: type: LOAD
+POSTHOOK: query: DROP TABLE dest1
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@srcpart@ds=2008-04-08/hr=12
+PREHOOK: query: DROP TABLE dest1
+PREHOOK: type: LOAD
+POSTHOOK: query: DROP TABLE dest1
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@srcpart@ds=2008-04-09/hr=11
+PREHOOK: query: DROP TABLE dest1
+PREHOOK: type: LOAD
+POSTHOOK: query: DROP TABLE dest1
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@srcpart@ds=2008-04-09/hr=12
+PREHOOK: query: DROP TABLE dest1
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: DROP TABLE dest1
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@srcbucket
+PREHOOK: query: DROP TABLE dest1
+PREHOOK: type: LOAD
+POSTHOOK: query: DROP TABLE dest1
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@srcbucket
+PREHOOK: query: DROP TABLE dest1
+PREHOOK: type: LOAD
+POSTHOOK: query: DROP TABLE dest1
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@srcbucket
+PREHOOK: query: DROP TABLE dest1
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: DROP TABLE dest1
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@srcbucket2
+PREHOOK: query: DROP TABLE dest1
+PREHOOK: type: LOAD
+POSTHOOK: query: DROP TABLE dest1
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@srcbucket2
+PREHOOK: query: DROP TABLE dest1
+PREHOOK: type: LOAD
+POSTHOOK: query: DROP TABLE dest1
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@srcbucket2
+PREHOOK: query: DROP TABLE dest1
+PREHOOK: type: LOAD
+POSTHOOK: query: DROP TABLE dest1
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@srcbucket2
+PREHOOK: query: DROP TABLE dest1
+PREHOOK: type: LOAD
+POSTHOOK: query: DROP TABLE dest1
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@srcbucket2
+PREHOOK: query: DROP TABLE dest1
+PREHOOK: type: LOAD
+POSTHOOK: query: DROP TABLE dest1
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@src
+PREHOOK: query: DROP TABLE dest1
+PREHOOK: type: LOAD
+POSTHOOK: query: DROP TABLE dest1
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@src1
+PREHOOK: query: DROP TABLE dest1
+PREHOOK: type: LOAD
+POSTHOOK: query: DROP TABLE dest1
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@src_sequencefile
+PREHOOK: query: DROP TABLE dest1
+PREHOOK: type: LOAD
+POSTHOOK: query: DROP TABLE dest1
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@src_thrift
+PREHOOK: query: DROP TABLE dest1
+PREHOOK: type: LOAD
+POSTHOOK: query: DROP TABLE dest1
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@src_json
Index: ql/src/test/org/apache/hadoop/hive/ql/hooks/PreExecutePrinter.java
===================================================================
--- ql/src/test/org/apache/hadoop/hive/ql/hooks/PreExecutePrinter.java (revision 906309)
+++ ql/src/test/org/apache/hadoop/hive/ql/hooks/PreExecutePrinter.java (working copy)
@@ -20,6 +20,8 @@
import java.util.Set;
+import org.apache.hadoop.hive.ql.hooks.LineageInfo.BaseColumnInfo;
+import org.apache.hadoop.hive.ql.hooks.LineageInfo.Dependency;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.security.UserGroupInformation;
@@ -32,7 +34,8 @@
@Override
public void run(SessionState sess, Set inputs,
- Set outputs, UserGroupInformation ugi) throws Exception {
+ Set outputs, LineageInfo linfo, UserGroupInformation ugi)
+ throws Exception {
LogHelper console = SessionState.getConsole();
@@ -51,6 +54,29 @@
for (WriteEntity we : outputs) {
console.printError("PREHOOK: Output: " + we.toString());
}
+
+ // Also print out the generic lineage information if there is any
+ if (linfo != null) {
+ LineageInfoIterator iter = linfo.iterator();
+ while(iter.hasNext()) {
+ LineageInfoItem it = iter.next();
+ Dependency dep = it.getDependency();
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("PREHOOK: Lineage: " + it.getTable().getTableName() + "."
+ + it.getField().getName() + " " + dep.getType() + " " + dep.getExpr());
+
+ sb.append("[");
+ for(BaseColumnInfo col: dep.getBaseCols()) {
+ sb.append("("+col.getTabAlias().getTable().getTableName() + ")"
+ + col.getTabAlias().getAlias() + "."
+ + col.getColumn() + ", ");
+ }
+ sb.append("]");
+
+ console.printError(sb.toString());
+ }
+ }
}
}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (revision 906309)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (working copy)
@@ -27,6 +27,7 @@
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.ppd.PredicatePushDown;
+import org.apache.hadoop.hive.ql.optimizer.lineage.Generator;
/**
* Implementation of the optimizer
@@ -42,6 +43,8 @@
*/
public void initialize(HiveConf hiveConf) {
transformations = new ArrayList();
+ // Add the transformation that computes the lineage information.
+ transformations.add(new Generator());
if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCP)) {
transformations.add(new ColumnPruner());
}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcCtx.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcCtx.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcCtx.java (revision 0)
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.lineage;
+
+import java.io.Serializable;
+
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+
+/**
+ * The processor context for the lineage information. This contains the
+ * lineage context and the column info and operator information that is
+ * being used for the current expression.
+ */
+public class ExprProcCtx implements NodeProcessorCtx {
+
+ /**
+ * The lineage context that is being populated.
+ */
+ private LineageCtx lctx;
+
+ /**
+ * The input operator in case the current operator is not a leaf.
+ */
+ private Operator extends Serializable> inpOp;
+
+ /**
+ * Constructor.
+ *
+ * @param lctx The lineage context thatcontains the dependencies for the inputs.
+ * @param inpOp The input operator to the current operator.
+ */
+ public ExprProcCtx(LineageCtx lctx,
+ Operator extends Serializable> inpOp) {
+ this.lctx = lctx;
+ this.inpOp = inpOp;
+ }
+
+ /**
+ * Gets the lineage context.
+ *
+ * @return LineageCtx The lineage context.
+ */
+ public LineageCtx getLineageCtx() {
+ return lctx;
+ }
+
+ /**
+ * Gets the input operator.
+ *
+ * @return Operator The input operator - this is null in case the current
+ * operator is a leaf.
+ */
+ public Operator extends Serializable> getInputOperator() {
+ return inpOp;
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java (revision 0)
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.lineage;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.Vector;
+
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.hooks.LineageInfo;
+import org.apache.hadoop.hive.ql.hooks.LineageInfo.BaseColumnInfo;
+import org.apache.hadoop.hive.ql.hooks.LineageInfo.Dependency;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+
+/**
+ * This class contains the lineage context that is passed
+ * while walking the operator tree in Lineage. The context
+ * contains the LineageInfo structure that is passed to the
+ * pre-execution hooks.
+ */
+public class LineageCtx implements NodeProcessorCtx {
+
+ /**
+ * The map contains an index from the (operator, columnInfo) to the
+ * dependency vector for that tuple. This is used to generate the
+ * dependency vectors during the walk of the operator tree.
+ */
+ private Map, HashMap> index;
+
+ /**
+ * The lineage info structure that is eventually generated by processing the filesink
+ * operator.
+ */
+ private LineageInfo linfo;
+
+ /**
+ * Parse context to get to the table metadata information.
+ */
+ private ParseContext pctx;
+
+ /**
+ * Class to record a visit from fromOp to toOp. This class is used to figure out
+ * whether an operator was already visited from a parent operator during a walk.
+ */
+ protected static class Visit {
+
+ /**
+ * The parent operator from where the visit was made.
+ */
+ private Operator extends Serializable> fromOp;
+
+ /**
+ * The child operator to which the visit was made.
+ */
+ private Operator extends Serializable> toOp;
+
+ /**
+ * Default constructor.
+ */
+ public Visit() {
+ fromOp = null;
+ toOp = null;
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param fromOp The parent operator from where the visit was made.
+ * @param toOp The child operator to where the visit was made.
+ */
+ public Visit(Operator extends Serializable> fromOp, Operator extends Serializable> toOp) {
+ this.fromOp = fromOp;
+ this.toOp = toOp;
+ }
+
+ /**
+ * @return the fromOp
+ */
+ public Operator extends Serializable> getFromOp() {
+ return fromOp;
+ }
+
+ /**
+ * @param fromOp the fromOp to set
+ */
+ public void setFromOp(Operator extends Serializable> fromOp) {
+ this.fromOp = fromOp;
+ }
+
+ /**
+ * @return the toOp
+ */
+ public Operator extends Serializable> getToOp() {
+ return toOp;
+ }
+
+ /**
+ * @param toOp the toOp to set
+ */
+ public void setToOp(Operator extends Serializable> toOp) {
+ this.toOp = toOp;
+ }
+ }
+
+ /**
+ * A set of visits that have been made. This map can be used if a visit from a
+ * parent to a child is being made again.
+ */
+ private Set visits;
+
+ /**
+ * Constructor.
+ *
+ * @param pctx The parse context that is used to get table metadata information.
+ */
+ public LineageCtx(ParseContext pctx) {
+ index = new HashMap, HashMap>();
+ linfo = new LineageInfo();
+ this.pctx = pctx;
+ visits = new HashSet();
+ }
+
+ /**
+ * Gets the dependency for an operator, columninfo tuple.
+ * @param op The operator whose dependency is being inspected.
+ * @param col The column info whose dependency is being inspected.
+ * @return Dependency for that particular operator, columninfo tuple.
+ * null if no dependency is found.
+ */
+ public Dependency getDependency(Operator extends Serializable> op, ColumnInfo col) {
+ HashMap colMap = index.get(op);
+ if (colMap == null)
+ return null;
+
+ return colMap.get(col);
+ }
+
+ /**
+ * Puts the dependency for an operator, columninfo tuple.
+ * @param op The operator whose dependency is being inserted.
+ * @param col The column info whose dependency is being inserted.
+ * @param dep The dependency.
+ */
+ public void putDependency(Operator extends Serializable> op,
+ ColumnInfo col, Dependency dep) {
+ HashMap colMap = index.get(op);
+ if (colMap == null) {
+ colMap = new HashMap();
+ index.put(op, colMap);
+ }
+ colMap.put(col, dep);
+ }
+
+ /**
+ * Merges the new dependencies in dep to the existing dependencies
+ * of (op, ci).
+ *
+ * @param op The operator of the column whose dependency is being modified.
+ * @param ci The column info of the associated column.
+ * @param dependency The new dependency.
+ */
+ public void mergeDependency(Operator extends Serializable> op,
+ ColumnInfo ci, Dependency dep) {
+ Dependency old_dep = getDependency(op, ci);
+ if (old_dep == null)
+ putDependency(op, ci, dep);
+ else {
+ old_dep.setType(LineageInfo.DependencyType.SET);
+ Set bci_set = new LinkedHashSet(old_dep.getBaseCols());
+ bci_set.addAll(dep.getBaseCols());
+ old_dep.setBaseCols(new Vector(bci_set));
+ // TODO: Fix the expressions later.
+ old_dep.setExpr(null);
+ }
+ }
+
+ /**
+ * Gets the lineage information computed for this query.
+ *
+ * @return LineageInfo The lineageinfo structure for this query.
+ */
+ public LineageInfo getInfo() {
+ return linfo;
+ }
+
+ /**
+ * Sets the lineage information structure.
+ *
+ * @param linfo The associated lineage information structure.
+ */
+ public void setInfo(LineageInfo linfo) {
+ this.linfo = linfo;
+ }
+
+ /**
+ * Gets the parse context.
+ *
+ * @return ParseContext
+ */
+ public ParseContext getParseCtx() {
+ return pctx;
+ }
+
+ /**
+ * Record a visit from inpOp to op
+ *
+ * @param inpOp The parent operator from where the visit is made.
+ * @param op The child operator to which the visit is made.
+ */
+ public void recordVisit(Operator extends Serializable> inpOp,
+ Operator extends Serializable> op) {
+ visits.add(new Visit(inpOp, op));
+ }
+
+ /**
+ * Checks if op has been visited from inpOp.
+ *
+ * @param inpOp The parent operator from where the visit was made.
+ * @param op The child operator to which the visit was made.
+ *
+ * @return boolean true if op was already visited from inpOp else false.
+ */
+ public boolean hasVisited(Operator extends Serializable> inpOp,
+ Operator extends Serializable> op) {
+ return visits.contains(new Visit(inpOp, op));
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java (revision 0)
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.lineage;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.PreOrderWalker;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.optimizer.Transform;
+import org.apache.hadoop.hive.ql.optimizer.lineage.OpProcFactory;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+/**
+ * This class generates the lineage information for the columns
+ * and tables from the plan before it goes through other
+ * optimization phases.
+ */
+public class Generator implements Transform {
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.hive.ql.optimizer.Transform#transform(org.apache.hadoop.hive.ql.parse.ParseContext)
+ */
+ @Override
+ public ParseContext transform(ParseContext pctx) throws SemanticException {
+
+ // Create the lineage context
+ LineageCtx lCtx = new LineageCtx(pctx);
+
+ Map opRules = new LinkedHashMap();
+ opRules.put(new RuleRegExp("R1", "TS%"), OpProcFactory.getTSProc());
+ opRules.put(new RuleRegExp("R2", "SCR%"), OpProcFactory.getTransformProc());
+ opRules.put(new RuleRegExp("R3", "UDTF%"), OpProcFactory.getTransformProc());
+ opRules.put(new RuleRegExp("R4", "FS%"), OpProcFactory.getFSProc());
+ opRules.put(new RuleRegExp("R5", "SEL%"), OpProcFactory.getSelProc());
+ opRules.put(new RuleRegExp("R6", "GBY%"), OpProcFactory.getGroupByProc());
+ opRules.put(new RuleRegExp("R7", "UNION%"), OpProcFactory.getUnionProc());
+ opRules.put(new RuleRegExp("R8", "JOIN%|MAPJOIN%"), OpProcFactory.getJoinProc());
+ opRules.put(new RuleRegExp("R8", "RS%"), OpProcFactory.getReduceSinkProc());
+ opRules.put(new RuleRegExp("R9", "LVJ%"), OpProcFactory.getLateralViewJoinProc());
+
+ // The dispatcher fires the processor corresponding to the closest matching rule and passes the context along
+ Dispatcher disp = new DefaultRuleDispatcher(OpProcFactory.getDefaultProc(), opRules, lCtx);
+ GraphWalker ogw = new PreOrderWalker(disp);
+
+ // Create a list of topop nodes
+ ArrayList topNodes = new ArrayList();
+ topNodes.addAll(pctx.getTopOps().values());
+ ogw.startWalking(topNodes, null);
+
+ // Transfer the lineage context to the proper place so that it can be passed to the
+ // pre-execution hook.
+ pctx.setLineageInfo(lCtx.getInfo());
+
+ return pctx;
+ }
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java (revision 0)
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.lineage;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+import java.util.Vector;
+
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.hooks.LineageInfo;
+import org.apache.hadoop.hive.ql.hooks.LineageInfo.BaseColumnInfo;
+import org.apache.hadoop.hive.ql.hooks.LineageInfo.Dependency;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeNullDesc;
+
+/**
+ * Expression processor factory for lineage. Each processor is responsible to
+ * create the leaf level column info objects that the expression depends upon
+ * and also generates a string representation of the expression.
+ */
+public class ExprProcFactory {
+
+ /**
+ * Processor for column expressions.
+ */
+ public static class ColumnExprProcessor implements NodeProcessor {
+
+ @Override
+ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+
+ ExprNodeColumnDesc cd = (ExprNodeColumnDesc) nd;
+ ExprProcCtx epc = (ExprProcCtx) procCtx;
+
+ // assert that the input operator is not null as there are no
+ // exprs associated with table scans.
+ assert (epc.getInputOperator() != null);
+
+ ColumnInfo inp_ci = null;
+ for (ColumnInfo tmp_ci : epc.getInputOperator().getSchema()
+ .getSignature()) {
+ if (tmp_ci.getInternalName().equals(cd.getColumn())) {
+ inp_ci = tmp_ci;
+ break;
+ }
+ }
+
+ // Insert the dependencies of inp_ci to that of the current operator, ci
+ LineageCtx lc = epc.getLineageCtx();
+ Dependency dep = lc.getDependency(epc.getInputOperator(), inp_ci);
+
+ return dep;
+ }
+
+ }
+
+ /**
+ * Processor for any function or field expression.
+ */
+ public static class GenericExprProcessor implements NodeProcessor {
+
+ @Override
+ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+
+ assert (nd instanceof ExprNodeGenericFuncDesc || nd instanceof ExprNodeFieldDesc);
+
+ // Concatenate the dependencies of all the children to compute the new
+ // dependency.
+ Dependency dep = new Dependency();
+
+ LinkedHashSet bci_set = new LinkedHashSet();
+ dep.setType(LineageInfo.DependencyType.UDF);
+
+ for (Object child : nodeOutputs) {
+ if (child == null)
+ continue;
+
+ Dependency child_dep = (Dependency) child;
+ if (child_dep.getType() != LineageInfo.DependencyType.UDF)
+ dep.setType(child_dep.getType());
+ bci_set.addAll(child_dep.getBaseCols());
+ }
+
+ dep.setBaseCols(new Vector(bci_set));
+
+ return dep;
+ }
+
+ }
+
+ /**
+ * Processor for constants and null expressions. For such expressions the
+ * processor simply returns a null dependency vector.
+ */
+ public static class DefaultExprProcessor implements NodeProcessor {
+
+ @Override
+ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ assert (nd instanceof ExprNodeConstantDesc || nd instanceof ExprNodeNullDesc);
+
+ // Create a dependency that has no basecols
+ Dependency dep = new Dependency();
+ dep.setType(LineageInfo.DependencyType.SIMPLE);
+ dep.setBaseCols(new Vector());
+ return dep;
+ }
+ }
+
+ public static NodeProcessor getDefaultExprProcessor() {
+ return new DefaultExprProcessor();
+ }
+
+ public static NodeProcessor getGenericFuncProcessor() {
+ return new GenericExprProcessor();
+ }
+
+ public static NodeProcessor getFieldProcessor() {
+ return new GenericExprProcessor();
+ }
+
+ public static NodeProcessor getColumnProcessor() {
+ return new ColumnExprProcessor();
+ }
+
+ /**
+ * Gets the expression dependencies for the expression.
+ *
+ * @param lctx
+ * The lineage context containing the input operators dependencies.
+ * @param inpOp
+ * The input operator to the current operator.
+ * @param expr
+ * The expression that is being processed.
+ * @throws SemanticException
+ */
+ public static Dependency getExprDependency(LineageCtx lctx,
+ Operator extends Serializable> inpOp, ExprNodeDesc expr)
+ throws SemanticException {
+
+ // Create the walker, the rules dispatcher and the context.
+ ExprProcCtx exprCtx = new ExprProcCtx(lctx, inpOp);
+
+ // create a walker which walks the tree in a DFS manner while maintaining
+ // the operator stack. The dispatcher
+ // generates the plan from the operator tree
+ Map exprRules = new LinkedHashMap();
+ exprRules.put(
+ new RuleRegExp("R1", ExprNodeColumnDesc.class.getName() + "%"),
+ getColumnProcessor());
+ exprRules.put(
+ new RuleRegExp("R2", ExprNodeFieldDesc.class.getName() + "%"),
+ getFieldProcessor());
+ exprRules.put(new RuleRegExp("R3", ExprNodeGenericFuncDesc.class.getName()
+ + "%"), getGenericFuncProcessor());
+
+ // The dispatcher fires the processor corresponding to the closest matching
+ // rule and passes the context along
+ Dispatcher disp = new DefaultRuleDispatcher(getDefaultExprProcessor(),
+ exprRules, exprCtx);
+ GraphWalker egw = new DefaultGraphWalker(disp);
+
+ List startNodes = new ArrayList();
+ startNodes.add(expr);
+
+ HashMap outputMap = new HashMap();
+ egw.startWalking(startNodes, outputMap);
+ return (Dependency)outputMap.get(expr);
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java (revision 0)
@@ -0,0 +1,544 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.lineage;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.Stack;
+import java.util.Vector;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.ForwardOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.LateralViewJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.ScriptOperator;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.hooks.LineageInfo;
+import org.apache.hadoop.hive.ql.hooks.LineageInfo.BaseColumnInfo;
+import org.apache.hadoop.hive.ql.hooks.LineageInfo.Dependency;
+import org.apache.hadoop.hive.ql.hooks.LineageInfo.TableAliasInfo;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Utils;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.JoinDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+
+/**
+ * Operator factory for the rule processors for lineage.
+ */
+public class OpProcFactory {
+
+ /**
+ * Returns the parent operator in the walk path to the current operator.
+ *
+ * @param stack The stack encoding the path.
+ *
+ * @return Operator The parent operator in the current path.
+ */
+ protected static Operator extends Serializable> getParent(Stack stack) {
+ return (Operator extends Serializable>)Utils.getNthAncestor(stack, 1);
+ }
+
+ /**
+ * Processor for Script and UDTF Operators.
+ */
+ public static class TransformLineage extends DefaultLineage implements NodeProcessor {
+
+ @Override
+ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+
+ // LineageCTx
+ LineageCtx lCtx = (LineageCtx) procCtx;
+
+ // The operators
+ Operator extends Serializable> op = (Operator extends Serializable>)nd;
+ Operator extends Serializable> inpOp = getParent(stack);
+
+ // Create a single dependency list by concatenating the dependencies of all
+ // the cols
+ Dependency dep = new Dependency();
+ dep.setType(LineageInfo.DependencyType.SCRIPT);
+ // TODO: Fix this to a non null value.
+ dep.setExpr(null);
+
+ LinkedHashSet col_set = new LinkedHashSet();
+ for(ColumnInfo ci : inpOp.getSchema().getSignature())
+ col_set.addAll(lCtx.getDependency(inpOp, ci).getBaseCols());
+
+ dep.setBaseCols(new Vector(col_set));
+
+ // This dependency is then set for all the colinfos of the script operator
+ for(ColumnInfo ci : op.getSchema().getSignature())
+ lCtx.putDependency(op, ci, dep);
+
+ return null;
+ }
+
+ }
+
+ /**
+ * Processor for TableScan Operator. This actually creates the base column mappings.
+ */
+ public static class TableScanLineage extends DefaultLineage implements NodeProcessor {
+
+ @Override
+ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+
+ // LineageCtx
+ LineageCtx lCtx = (LineageCtx) procCtx;
+ ParseContext pctx = lCtx.getParseCtx();
+
+ // Table scan operator.
+ TableScanOperator top = (TableScanOperator)nd;
+ org.apache.hadoop.hive.ql.metadata.Table t = pctx.getTopToTable().get(top);
+ Table tab = t.getTTable();
+
+ // Generate the mappings
+ RowSchema rs = top.getSchema();
+ List cols = t.getAllCols();
+ TableAliasInfo tai = new TableAliasInfo();
+ tai.setAlias(top.getConf().getAlias());
+ tai.setTable(tab);
+ int cnt = 0;
+ for(ColumnInfo ci : rs.getSignature()) {
+ // Create a dependency
+ Dependency dep = new Dependency();
+ BaseColumnInfo bci = new BaseColumnInfo();
+ bci.setTabAlias(tai);
+ bci.setColumn(cols.get(cnt++));
+
+ // Populate the dependency
+ dep.setType(LineageInfo.DependencyType.SIMPLE);
+ // TODO: Find out how to get the expression here.
+ dep.setExpr(null);
+ dep.setBaseCols(new Vector());
+ dep.getBaseCols().add(bci);
+
+ // Put the dependency in the map
+ lCtx.putDependency(top, ci, dep);
+ }
+
+ return null;
+ }
+
+ }
+
+ /**
+ * Processor for Join Operator.
+ */
+ public static class JoinLineage extends DefaultLineage implements NodeProcessor {
+ @Override
+ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+
+ // Assert that there is atleast one item in the stack. This should never
+ // be called for leafs.
+ assert(!stack.isEmpty());
+
+ // LineageCtx
+ LineageCtx lCtx = (LineageCtx) procCtx;
+ JoinOperator op = (JoinOperator)nd;
+ JoinDesc jd = op.getConf();
+
+ // The input operator to the join is always a reduce sink operator
+ ReduceSinkOperator inpOp = (ReduceSinkOperator)getParent(stack);
+ ReduceSinkDesc rd = inpOp.getConf();
+ int tag = rd.getTag();
+
+ // Iterate over the outputs of the join operator and merge the
+ // dependencies of the columns that corresponding to the tag.
+ int cnt = 0;
+ List exprs = jd.getExprs().get((byte)tag);
+ for(ColumnInfo ci : op.getSchema().getSignature()) {
+ if (jd.getReversedExprs().get(ci.getInternalName()) != tag)
+ continue;
+
+ // Otherwise look up the expression corresponding to this ci
+ ExprNodeDesc expr = exprs.get(cnt++);
+ lCtx.mergeDependency(op, ci,
+ ExprProcFactory.getExprDependency(lCtx, inpOp, expr));
+ }
+
+ return null;
+ }
+
+ }
+
+ /**
+ * Processor for Join Operator.
+ */
+ public static class LateralViewJoinLineage extends DefaultLineage implements NodeProcessor {
+ @Override
+ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+
+ // Assert that there is atleast one item in the stack. This should never
+ // be called for leafs.
+ assert(!stack.isEmpty());
+
+ // LineageCtx
+ LineageCtx lCtx = (LineageCtx) procCtx;
+ LateralViewJoinOperator op = (LateralViewJoinOperator)nd;
+ boolean isUdtfPath = true;
+ Operator extends Serializable> inpOp = getParent(stack);
+ ArrayList cols = inpOp.getSchema().getSignature();
+
+ if (inpOp instanceof SelectOperator)
+ isUdtfPath = false;
+
+ // Dirty hack!!
+ // For the select path the columns are the ones at the end of the
+ // current operators schema and for the udtf path the columns are
+ // at the beginning of the operator schema.
+ ArrayList out_cols = op.getSchema().getSignature();
+ int out_cols_size = out_cols.size();
+ int cols_size = cols.size();
+ if (isUdtfPath) {
+ int cnt = 0;
+ while (cnt < cols_size) {
+ lCtx.mergeDependency(op, out_cols.get(cnt),
+ lCtx.getDependency(inpOp, cols.get(cnt)));
+ cnt++;
+ }
+ }
+ else {
+ int cnt = cols_size - 1;
+ while (cnt >= 0) {
+ lCtx.mergeDependency(op, out_cols.get(out_cols_size - cols_size + cnt),
+ lCtx.getDependency(inpOp, cols.get(cnt)));
+ cnt--;
+ }
+ }
+ return null;
+ }
+
+ }
+
+ /**
+ * Processor for Select operator.
+ */
+ public static class SelectLineage extends DefaultLineage implements NodeProcessor {
+ @Override
+ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+
+ LineageCtx lctx = (LineageCtx)procCtx;
+ SelectOperator sop = (SelectOperator)nd;
+
+ // if this is a selStarNoCompute then this select operator
+ // is treated like a default operator, so just call the super classes
+ // process method.
+ if (sop.getConf().isSelStarNoCompute())
+ return super.process(nd, stack, procCtx, nodeOutputs);
+
+ // Otherwise we treat this as a normal select operator and look at
+ // the expressions.
+
+ ArrayList col_infos = sop.getSchema().getSignature();
+ int cnt = 0;
+ for(ExprNodeDesc expr : sop.getConf().getColList())
+ lctx.putDependency(sop, col_infos.get(cnt++),
+ ExprProcFactory.getExprDependency(lctx, getParent(stack), expr));
+
+ return null;
+ }
+
+ }
+
+ /**
+ * Processor for GroupBy operator.
+ */
+ public static class GroupByLineage extends DefaultLineage implements NodeProcessor {
+ @Override
+ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+
+ LineageCtx lctx = (LineageCtx)procCtx;
+ GroupByOperator gop = (GroupByOperator)nd;
+ ArrayList col_infos = gop.getSchema().getSignature();
+ Operator extends Serializable> inpOp = getParent(stack);
+ int cnt = 0;
+
+ for(ExprNodeDesc expr : gop.getConf().getKeys())
+ lctx.putDependency(gop, col_infos.get(cnt++),
+ ExprProcFactory.getExprDependency(lctx, inpOp, expr));
+
+ for(AggregationDesc agg : gop.getConf().getAggregators()) {
+ // Concatenate the dependencies of all the parameters to
+ // create the new dependency
+ Dependency dep = new Dependency();
+ dep.setType(LineageInfo.DependencyType.UDAF);
+ // TODO: Get the actual string here.
+ dep.setExpr(null);
+ LinkedHashSet bci_set = new LinkedHashSet();
+ for(ExprNodeDesc expr : agg.getParameters()) {
+ Dependency expr_dep = ExprProcFactory.getExprDependency(lctx, inpOp, expr);
+ if (expr_dep != null)
+ bci_set.addAll(expr_dep.getBaseCols());
+ }
+
+ // If the bci_set is empty, this means that the inputs to this
+ // aggregate function were all constants (e.g. count(1)). In this case
+ // the aggregate function is just dependent on all the tables that are in
+ // the dependency list of the input operator.
+ if (bci_set.isEmpty()) {
+ Set tai_set = new LinkedHashSet();
+ for(ColumnInfo ci : inpOp.getSchema().getSignature()) {
+ Dependency inp_dep = lctx.getDependency(inpOp, ci);
+ // The dependency can be null as some of the input cis may not have
+ // been set in case of joins.
+ if (inp_dep != null)
+ for(BaseColumnInfo bci : inp_dep.getBaseCols())
+ tai_set.add(bci.getTabAlias());
+ }
+
+ // Create the BaseColumnInfos and set them in the bci_set
+ for(TableAliasInfo tai : tai_set) {
+ BaseColumnInfo bci = new BaseColumnInfo();
+ bci.setTabAlias(tai);
+ // This is set to null to reflect that the dependency is not on any
+ // particular column of the table.
+ bci.setColumn(null);
+ bci_set.add(bci);
+ }
+ }
+
+ dep.setBaseCols(new Vector(bci_set));
+ lctx.putDependency(gop, col_infos.get(cnt++), dep);
+ }
+
+ return null;
+ }
+
+ }
+
+ /**
+ * Processor for FileSink operator.
+ */
+ public static class FileSinkLineage extends DefaultLineage implements NodeProcessor {
+ @Override
+ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+
+ // There are atleast 2 operators in the stack
+ assert(stack.size() >= 2);
+
+ // LineageCtx
+ LineageCtx lCtx = (LineageCtx) procCtx;
+ ParseContext pctx = lCtx.getParseCtx();
+
+ // File sink operator.
+ FileSinkOperator fop = (FileSinkOperator)nd;
+ // The row schema of the parent operator
+ Operator extends Serializable> inpOp = getParent(stack);
+ ArrayList inp_cols = inpOp.getSchema().getSignature();
+
+ // Get the associated table
+ org.apache.hadoop.hive.ql.metadata.Table tab = pctx.getFopToTable().get(fop);
+ // If this file sink is not for a table or a partition, just return.
+ if (tab == null)
+ return null;
+ Table dest_tab = tab.getTTable();
+
+ // Generate the mappings
+ int cnt = 0;
+ for(FieldSchema fs : tab.getCols()) {
+ // Propagate the dependency
+ lCtx.getInfo().putDependency(dest_tab, fs,
+ lCtx.getDependency(inpOp, inp_cols.get(cnt++)));
+ }
+
+ return null;
+ }
+
+ }
+
+ /**
+ * Union processor.
+ * In this case we call mergeDependency as opposed to putDependency
+ * in order to account for visits from different parents.
+ */
+ public static class UnionLineage extends DefaultLineage implements NodeProcessor {
+
+ protected static final Log LOG = LogFactory.getLog(OpProcFactory.class.getName());
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ // Assert that there is atleast one item in the stack. This should never
+ // be called for leafs.
+ assert(!stack.isEmpty());
+
+ // LineageCtx
+ LineageCtx lCtx = (LineageCtx) procCtx;
+ Operator extends Serializable> op = (Operator extends Serializable>)nd;
+
+ // Get the row schema of the input operator.
+ // The row schema of the parent operator
+ Operator extends Serializable> inpOp = getParent(stack);
+ RowSchema rs = op.getSchema();
+ ArrayList inp_cols = inpOp.getSchema().getSignature();
+ int cnt = 0;
+ for(ColumnInfo ci : rs.getSignature()) {
+ lCtx.mergeDependency(op, ci,
+ lCtx.getDependency(inpOp, inp_cols.get(cnt++)));
+ }
+ return null;
+ }
+ }
+
+ /**
+ * ReduceSink processor.
+ */
+ public static class ReduceSinkLineage implements NodeProcessor {
+
+ protected static final Log LOG = LogFactory.getLog(OpProcFactory.class.getName());
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ // Assert that there is atleast one item in the stack. This should never
+ // be called for leafs.
+ assert(!stack.isEmpty());
+
+ // LineageCtx
+ LineageCtx lCtx = (LineageCtx) procCtx;
+ ReduceSinkOperator rop = (ReduceSinkOperator)nd;
+
+ ArrayList col_infos = rop.getSchema().getSignature();
+ Operator extends Serializable> inpOp = getParent(stack);
+ int cnt = 0;
+
+ // The keys are included only in case the reduce sink feeds into
+ // a group by operator through a chain of forward operators
+ Operator extends Serializable> op = rop.getChildOperators().get(0);
+ while (op instanceof ForwardOperator)
+ op = op.getChildOperators().get(0);
+
+ if (op instanceof GroupByOperator)
+ for(ExprNodeDesc expr : rop.getConf().getKeyCols())
+ lCtx.putDependency(rop, col_infos.get(cnt++),
+ ExprProcFactory.getExprDependency(lCtx, inpOp, expr));
+
+ for(ExprNodeDesc expr : rop.getConf().getValueCols())
+ lCtx.putDependency(rop, col_infos.get(cnt++),
+ ExprProcFactory.getExprDependency(lCtx, inpOp, expr));
+
+ return null;
+ }
+ }
+
+ /**
+ * Default processor. This basically passes the input dependencies as such
+ * to the output dependencies.
+ */
+ public static class DefaultLineage implements NodeProcessor {
+
+ protected static final Log LOG = LogFactory.getLog(OpProcFactory.class.getName());
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ // Assert that there is atleast one item in the stack. This should never
+ // be called for leafs.
+ assert(!stack.isEmpty());
+
+ // LineageCtx
+ LineageCtx lCtx = (LineageCtx) procCtx;
+ Operator extends Serializable> op = (Operator extends Serializable>)nd;
+
+ // Get the row schema of the input operator.
+ // The row schema of the parent operator
+ Operator extends Serializable> inpOp = getParent(stack);
+ RowSchema rs = op.getSchema();
+ ArrayList inp_cols = inpOp.getSchema().getSignature();
+ int cnt = 0;
+ for(ColumnInfo ci : rs.getSignature()) {
+ lCtx.putDependency(op, ci,
+ lCtx.getDependency(inpOp, inp_cols.get(cnt++)));
+ }
+ return null;
+ }
+ }
+
+ public static NodeProcessor getJoinProc() {
+ return new JoinLineage();
+ }
+
+ public static NodeProcessor getLateralViewJoinProc() {
+ return new LateralViewJoinLineage();
+ }
+
+ public static NodeProcessor getTSProc() {
+ return new TableScanLineage();
+ }
+
+ public static NodeProcessor getTransformProc() {
+ return new TransformLineage();
+ }
+
+ public static NodeProcessor getFSProc() {
+ return new FileSinkLineage();
+ }
+
+ public static NodeProcessor getSelProc() {
+ return new SelectLineage();
+ }
+
+ public static NodeProcessor getGroupByProc() {
+ return new GroupByLineage();
+ }
+
+ public static NodeProcessor getUnionProc() {
+ return new UnionLineage();
+ }
+
+ public static NodeProcessor getReduceSinkProc() {
+ return new ReduceSinkLineage();
+ }
+
+ public static NodeProcessor getDefaultProc() {
+ return new DefaultLineage();
+ }
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfoItem.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfoItem.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfoItem.java (revision 0)
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.hooks;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.hooks.LineageInfo.Dependency;
+
+/**
+ * Implements a lineageinfo item. The item is basically a triple
+ * of the form (table, column, dependency).
+ */
+public class LineageInfoItem {
+
+ /**
+ * The table.
+ */
+ private Table tab;
+
+ /**
+ * The field.
+ */
+ private FieldSchema field;
+
+ /**
+ * The dependency associated witht the table and field.
+ */
+ private Dependency dep;
+
+ /**
+ * Constructor.
+ *
+ * @param tab The table.
+ * @param field The field.
+ * @param dep The dependency.
+ */
+ public LineageInfoItem(Table tab, FieldSchema field, Dependency dep) {
+ this.tab = tab;
+ this.field = field;
+ this.dep = dep;
+ }
+
+ /**
+ * Constructor.
+ */
+ public LineageInfoItem() {
+ this.tab = null;
+ this.field = null;
+ this.dep = null;
+ }
+
+ /**
+ * Gets the associated table.
+ *
+ * @return Table
+ */
+ public Table getTable() {
+ return tab;
+ }
+
+ /**
+ * Sets the associated table.
+ *
+ * @param tab Table.
+ */
+ public void setTable(Table tab) {
+ this.tab = tab;
+ }
+
+ /**
+ * Gets the associated field.
+ *
+ * @return FieldSchema
+ */
+ public FieldSchema getField() {
+ return field;
+ }
+
+ /**
+ * Sets the associated field.
+ *
+ * @param field FieldSchema.
+ */
+ public void setField(FieldSchema field) {
+ this.field = field;
+ }
+
+ /**
+ * Gets the associated dependency.
+ *
+ * @return Dependency
+ */
+ public Dependency getDependency() {
+ return dep;
+ }
+
+ /**
+ * Sets the associated dependency.
+ *
+ * @param dep Dependency.
+ */
+ public void setDependency(Dependency dep) {
+ this.dep = dep;
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java (revision 0)
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Vector;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+
+/**
+ * This class contains the lineage information that is passed
+ * to the PreExecution hook.
+ */
+public class LineageInfo {
+
+ /**
+ * Enum to track dependency. This enum has two values:
+ * 1. SCALAR - Indicates that the column is derived from a scalar expression.
+ * 2. AGGREGATION - Indicates that the column is derived from an aggregation.
+ */
+ public static enum DependencyType {
+ SIMPLE, UDF, UDAF, UDTF, SCRIPT, SET
+ }
+
+ /**
+ * Base Column information.
+ */
+ public static class BaseColumnInfo {
+
+ /**
+ * The table and alias info ecapsulated in a different class.
+ */
+ private TableAliasInfo tabAlias;
+
+ /**
+ * The metastore column information. The column can be null
+ * and that denotes that the expression is dependent on the row
+ * of the table and not particular column. This can happen in case
+ * of count(1).
+ */
+ private FieldSchema column;
+
+ /**
+ * @return the tabAlias
+ */
+ public TableAliasInfo getTabAlias() {
+ return tabAlias;
+ }
+
+ /**
+ * @param tabAlias the tabAlias to set
+ */
+ public void setTabAlias(TableAliasInfo tabAlias) {
+ this.tabAlias = tabAlias;
+ }
+
+ /**
+ * @return the column
+ */
+ public FieldSchema getColumn() {
+ return column;
+ }
+
+ /**
+ * @param column the column to set
+ */
+ public void setColumn(FieldSchema column) {
+ this.column = column;
+ }
+ }
+
+ public static class TableAliasInfo {
+
+ /**
+ * The alias for the table.
+ */
+ private String alias;
+
+ /**
+ * The metastore table information.
+ */
+ private Table table;
+
+ /**
+ * @return the alias
+ */
+ public String getAlias() {
+ return alias;
+ }
+
+ /**
+ * @param alias the alias to set
+ */
+ public void setAlias(String alias) {
+ this.alias = alias;
+ }
+
+ /**
+ * @return the table
+ */
+ public Table getTable() {
+ return table;
+ }
+
+ /**
+ * @param table the table to set
+ */
+ public void setTable(Table table) {
+ this.table = table;
+ }
+ }
+
+ /**
+ * This class tracks the dependency information for the base column.
+ */
+ public static class Dependency {
+
+ /**
+ * The type of dependency.
+ */
+ private DependencyType type;
+
+ /**
+ * Expression string for the dependency.
+ */
+ private String expr;
+
+ /**
+ * The list of base columns that the particular column depends on.
+ */
+ private Vector baseCols;
+
+ /**
+ * @return the type
+ */
+ public DependencyType getType() {
+ return type;
+ }
+
+ /**
+ * @param type the type to set
+ */
+ public void setType(DependencyType type) {
+ this.type = type;
+ }
+
+ /**
+ * @return the expr
+ */
+ public String getExpr() {
+ return expr;
+ }
+
+ /**
+ * @param expr the expr to set
+ */
+ public void setExpr(String expr) {
+ this.expr = expr;
+ }
+
+ /**
+ * @return the baseCols
+ */
+ public Vector getBaseCols() {
+ return baseCols;
+ }
+
+ /**
+ * @param basecols the baseCols to set
+ */
+ public void setBaseCols(Vector baseCols) {
+ this.baseCols = baseCols;
+ }
+ }
+
+ /**
+ * The map contains an index from the (tablename, columnname) to the
+ * dependency vector for that tuple. This is used to generate the
+ * dependency vectors during the walk of the operator tree.
+ */
+ protected HashMap> index;
+
+ /**
+ * Constructor.
+ */
+ public LineageInfo() {
+ index = new HashMap>();
+ }
+
+ /**
+ * Gets the dependency for a table, column tuple.
+ * @param tab The table name of the column whose dependency is being inspected.
+ * @param col The column whose dependency is being inspected.
+ * @return Dependency for that particular table, column tuple.
+ * null if no dependency is found.
+ */
+ public Dependency getDependency(Table tab, FieldSchema col) {
+ HashMap colMap = index.get(tab);
+ if (colMap == null)
+ return null;
+
+ return colMap.get(col);
+ }
+
+ /**
+ * Puts the dependency for a table, column tuple.
+ * @param tab The table of the column whose dependency is being inserted.
+ * @param col The column whose dependency is being inserted.
+ * @param dep The dependency.
+ */
+ public void putDependency(Table tab, FieldSchema col, Dependency dep) {
+ HashMap colMap = index.get(tab);
+ if (colMap == null) {
+ colMap = new HashMap();
+ index.put(tab, colMap);
+ }
+ colMap.put(col, dep);
+ }
+
+ /**
+ * Gets the iterator on this structure.
+ *
+ * @return LineageInfoItereator
+ */
+ public LineageInfoIterator iterator() {
+ return new LineageInfoIterator(this);
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/hooks/PreExecute.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/hooks/PreExecute.java (revision 906309)
+++ ql/src/java/org/apache/hadoop/hive/ql/hooks/PreExecute.java (working copy)
@@ -38,10 +38,13 @@
* The set of input tables and partitions.
* @param outputs
* The set of output tables, partitions, local and hdfs directories.
+ * @param lInfo
+ * The column level lineage information.
* @param ugi
* The user group security information.
*/
- public void run(SessionState sess, Set inputs,
- Set outputs, UserGroupInformation ugi) throws Exception;
-
+ public void run(SessionState sess, Set inputs,
+ Set outputs, LineageInfo lInfo, UserGroupInformation ugi)
+ throws Exception;
+
}
Index: ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfoIterator.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfoIterator.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfoIterator.java (revision 0)
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.util.Iterator;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+
+/**
+ * Impelements an iterator on the lineageinfo structure. The returned
+ * items are LineageInfoTuples.
+ */
+public class LineageInfoIterator implements Iterator {
+
+ /**
+ * The iterator on the table keys.
+ */
+ private Iterator tabIter;
+
+ /**
+ * The iterator on the field keys.
+ */
+ private Iterator fieldIter;
+
+ /**
+ * The info object that is returned.
+ */
+ private LineageInfoItem item;
+
+ /**
+ * The lineage info object that this object is an iterator for.
+ */
+ protected LineageInfo linfo;
+
+ /**
+ * Initializes the internal iterators when we go to the next table.
+ */
+ private void nextTable() {
+ item.setTable(tabIter.next());
+ this.fieldIter = linfo.index.get(item.getTable()).keySet().iterator();
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param linfo The lineage info structure that needs to be iterated on.
+ */
+ public LineageInfoIterator(LineageInfo linfo) {
+ this.item = new LineageInfoItem();
+ this.linfo = linfo;
+ this.fieldIter = null;
+ this.tabIter = linfo.index.keySet().iterator();
+ if (this.tabIter.hasNext())
+ nextTable();
+ }
+
+ @Override
+ public LineageInfoItem next() {
+ // Check if there is anything to return from the fieldIter.
+ if (!fieldIter.hasNext())
+ nextTable();
+
+ item.setField(fieldIter.next());
+ item.setDependency(linfo.index.get(item.getTable()).get(item.getField()));
+
+ return item;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return ((fieldIter != null && fieldIter.hasNext()) ||
+ tabIter.hasNext());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Remove not supported for LineageInfoIterator");
+ }
+
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java (revision 906309)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java (working copy)
@@ -121,4 +121,9 @@
}
+ @Override
+ public String getName() {
+ return "LVJ";
+ }
+
}
Index: ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java (revision 906309)
+++ ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java (working copy)
@@ -39,6 +39,7 @@
import org.apache.hadoop.hive.ql.exec.FetchTask;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.hooks.LineageInfo;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
@@ -66,6 +67,10 @@
private FetchTask fetchTask;
private HashSet inputs;
private HashSet outputs;
+ /**
+ * Lineage information for the query.
+ */
+ protected LineageInfo linfo;
private HashMap idToTableNameMap;
@@ -87,6 +92,7 @@
// Note that inputs and outputs can be changed when the query gets executed
inputs = sem.getInputs();
outputs = sem.getOutputs();
+ linfo = sem.getLineageInfo();
idToTableNameMap = new HashMap(sem.getIdToTableNameMap());
queryId = makeQueryId();
@@ -699,4 +705,21 @@
this.started = started;
}
+ /**
+ * Gets the lineage information.
+ *
+ * @return LineageInfo associated with the query.
+ */
+ public LineageInfo getLineageInfo() {
+ return linfo;
+ }
+
+ /**
+ * Sets the lineage information.
+ *
+ * @param linfo The LineageInfo structure that is set in the optimization phase.
+ */
+ public void setLineageInfo(LineageInfo linfo) {
+ this.linfo = linfo;
+ }
}
Index: ql/src/java/org/apache/hadoop/hive/ql/lib/Utils.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/lib/Utils.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/lib/Utils.java (revision 0)
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.lib;
+
+import java.util.Stack;
+
+/**
+ * Contains common utility functions to manipulate nodes, walkers etc.
+ */
+public class Utils {
+
+ /**
+ * Gets the nth ancestor (the parent being the 1st ancestor) in the traversal
+ * path. n=0 returns the currently visited node.
+ *
+ * @param st The stack that encodes the traversal path.
+ * @param n The value of n (n=0 is the currently visited node).
+ *
+ * @return Node The Nth ancestor in the path with respect to the current node.
+ */
+ public static Node getNthAncestor(Stack st, int n) {
+ assert(st.size() - 1 >= n);
+
+ Stack tmpStack = new Stack();
+ for(int i=0; i<=n; i++)
+ tmpStack.push(st.pop());
+
+ Node ret_nd = tmpStack.peek();
+
+ for(int i=0; i<=n; i++)
+ st.push(tmpStack.pop());
+
+ assert(tmpStack.isEmpty());
+
+ return ret_nd;
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/lib/PreOrderWalker.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/lib/PreOrderWalker.java (revision 906309)
+++ ql/src/java/org/apache/hadoop/hive/ql/lib/PreOrderWalker.java (working copy)
@@ -30,7 +30,7 @@
* Since the operator tree is a DAG, nodes with mutliple parents will be
* visited more than once. This can be made configurable.
*/
-
+
/**
* Constructor
*
Index: ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java (revision 906309)
+++ ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java (working copy)
@@ -38,7 +38,6 @@
protected Stack opStack;
private final List toWalk = new ArrayList();
- private final Set seenList = new HashSet();
private final HashMap retMap = new HashMap();
private final Dispatcher dispatcher;
Index: ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java (revision 906309)
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java (working copy)
@@ -37,6 +37,7 @@
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.exec.FetchTask;
import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.hooks.LineageInfo;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat;
@@ -73,7 +74,11 @@
* List of WriteEntities that are passed to the hooks.
*/
protected HashSet outputs;
-
+ /**
+ * Lineage information for the query.
+ */
+ protected LineageInfo linfo;
+
protected static final String TEXTFILE_INPUT = TextInputFormat.class
.getName();
protected static final String TEXTFILE_OUTPUT = IgnoreKeyTextOutputFormat.class
@@ -456,4 +461,22 @@
}
}
}
+
+ /**
+ * Gets the lineage information.
+ *
+ * @return LineageInfo associated with the query.
+ */
+ public LineageInfo getLineageInfo() {
+ return linfo;
+ }
+
+ /**
+ * Sets the lineage information.
+ *
+ * @param linfo The LineageInfo structure that is set in the optimization phase.
+ */
+ public void setLineageInfo(LineageInfo linfo) {
+ this.linfo = linfo;
+ }
}
Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 906309)
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy)
@@ -57,6 +57,7 @@
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.RowSchema;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.UnionOperator;
@@ -163,6 +164,7 @@
private List loadFileWork;
private Map joinContext;
private final HashMap topToTable;
+ private HashMap fopToTable;
private QB qb;
private ASTNode ast;
private int destTableId;
@@ -194,6 +196,7 @@
opParseCtx = new LinkedHashMap, OpParseContext>();
joinContext = new HashMap();
topToTable = new HashMap();
+ fopToTable = new HashMap();
destTableId = 1;
uCtx = null;
listMapJoinOpsNoReducer = new ArrayList();
@@ -237,13 +240,15 @@
qb = pctx.getQB();
groupOpToInputTables = pctx.getGroupOpToInputTables();
prunedPartitions = pctx.getPrunedPartitions();
+ setLineageInfo(pctx.getLineageInfo());
}
public ParseContext getParseContext() {
return new ParseContext(conf, qb, ast, opToPartPruner, topOps, topSelOps,
- opParseCtx, joinContext, topToTable, loadTableWork, loadFileWork, ctx,
- idToTableNameMap, destTableId, uCtx, listMapJoinOpsNoReducer,
- groupOpToInputTables, prunedPartitions, opToSamplePruner);
+ opParseCtx, joinContext, topToTable, fopToTable, loadTableWork,
+ loadFileWork, ctx, idToTableNameMap, destTableId, uCtx,
+ listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions,
+ opToSamplePruner);
}
@SuppressWarnings("nls")
@@ -2965,7 +2970,7 @@
QBMetaData qbm = qb.getMetaData();
Integer dest_type = qbm.getDestTypeForAlias(dest);
- Table dest_tab; // destination table if any
+ Table dest_tab = null; // destination table if any
String queryTmpdir; // the intermediate destination directory
Path dest_path; // the final destination directory
TableDesc table_desc = null;
@@ -3160,6 +3165,9 @@
.getBoolVar(HiveConf.ConfVars.COMPRESSRESULT), currentTableId),
fsRS, input), inputRR);
+ if (dest_tab != null)
+ fopToTable.put((FileSinkOperator)output, dest_tab);
+
LOG.debug("Created FileSink Plan for clause: " + dest + "dest_path: "
+ dest_path + " row schema: " + inputRR.toString());
@@ -4927,7 +4935,8 @@
colsEqual, alias, rwsch, qb.getMetaData(), null);
tableOp = OperatorFactory.getAndMakeChild(new FilterDesc(
samplePredicate, true, new sampleDesc(ts.getNumerator(), ts
- .getDenominator(), tabBucketCols, true)), top);
+ .getDenominator(), tabBucketCols, true)),
+ new RowSchema(rwsch.getColumnInfos()), top);
} else {
// need to add filter
// create tableOp to be filterDesc and set as child to 'top'
@@ -4935,7 +4944,8 @@
ExprNodeDesc samplePredicate = genSamplePredicate(ts, tabBucketCols,
colsEqual, alias, rwsch, qb.getMetaData(), null);
tableOp = OperatorFactory.getAndMakeChild(new FilterDesc(
- samplePredicate, true), top);
+ samplePredicate, true),
+ new RowSchema(rwsch.getColumnInfos()), top);
}
} else {
boolean testMode = conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODE);
@@ -4966,7 +4976,8 @@
tableOp = OperatorFactory
.getAndMakeChild(new FilterDesc(samplePred, true,
new sampleDesc(tsSample.getNumerator(), tsSample
- .getDenominator(), tab.getBucketCols(), true)), top);
+ .getDenominator(), tab.getBucketCols(), true)),
+ new RowSchema(rwsch.getColumnInfos()), top);
LOG.info("No need for sample filter");
}
// The table is not bucketed, add a dummy filter :: rand()
@@ -4982,7 +4993,8 @@
ExprNodeDesc samplePred = genSamplePredicate(tsSample, null, false,
alias, rwsch, qb.getMetaData(), randFunc);
tableOp = OperatorFactory.getAndMakeChild(new FilterDesc(
- samplePred, true), top);
+ samplePred, true),
+ new RowSchema(rwsch.getColumnInfos()), top);
}
}
}
@@ -5600,8 +5612,8 @@
}
ParseContext pCtx = new ParseContext(conf, qb, child, opToPartPruner,
- topOps, topSelOps, opParseCtx, joinContext, topToTable, loadTableWork,
- loadFileWork, ctx, idToTableNameMap, destTableId, uCtx,
+ topOps, topSelOps, opParseCtx, joinContext, topToTable, fopToTable,
+ loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx,
listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions,
opToSamplePruner);
Index: ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java (revision 906309)
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java (working copy)
@@ -32,6 +32,8 @@
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.hooks.LineageInfo;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -59,6 +61,7 @@
private LinkedHashMap, OpParseContext> opParseCtx;
private Map joinContext;
private HashMap topToTable;
+ private HashMap fopToTable;
private List loadTableWork;
private List loadFileWork;
private Context ctx;
@@ -71,7 +74,12 @@
// reducer
private Map> groupOpToInputTables;
private Map prunedPartitions;
-
+
+ /**
+ * The lineage information.
+ */
+ private LineageInfo lInfo;
+
// is set to true if the expression only contains partitioning columns and not
// any other column reference.
// This is used to optimize select * from table where ... scenario, when the
@@ -102,6 +110,7 @@
* context needed join processing (map join specifically)
* @param topToTable
* the top tables being processed
+ * @param fopToTable the table schemas that are being inserted into
* @param loadTableWork
* list of destination tables being loaded
* @param loadFileWork
@@ -126,6 +135,7 @@
LinkedHashMap, OpParseContext> opParseCtx,
Map joinContext,
HashMap topToTable,
+ HashMap fopToTable,
List loadTableWork, List loadFileWork,
Context ctx, HashMap idToTableNameMap, int destTableId,
UnionProcContext uCtx, List listMapJoinOpsNoReducer,
@@ -138,6 +148,7 @@
this.opToPartPruner = opToPartPruner;
this.joinContext = joinContext;
this.topToTable = topToTable;
+ this.fopToTable = fopToTable;
this.loadFileWork = loadFileWork;
this.loadTableWork = loadTableWork;
this.opParseCtx = opParseCtx;
@@ -247,6 +258,21 @@
}
/**
+ * @return the fopToTable
+ */
+ public HashMap getFopToTable() {
+ return fopToTable;
+ }
+
+ /**
+ * @param fopToTable
+ * the fopToTable to set
+ */
+ public void setFopToTable(HashMap fopToTable) {
+ this.fopToTable = fopToTable;
+ }
+
+ /**
* @return the topOps
*/
public HashMap> getTopOps() {
@@ -439,4 +465,22 @@
Map prunedPartitions) {
this.prunedPartitions = prunedPartitions;
}
+
+ /**
+ * Sets the lineage information.
+ *
+ * @param lInfo The lineage information.
+ */
+ public void setLineageInfo(LineageInfo lInfo) {
+ this.lInfo = lInfo;
+ }
+
+ /**
+ * Gets the associated lineage information.
+ *
+ * @return LineageInfo
+ */
+ public LineageInfo getLineageInfo() {
+ return lInfo;
+ }
}
Index: ql/src/java/org/apache/hadoop/hive/ql/Driver.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/Driver.java (revision 906309)
+++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java (working copy)
@@ -488,6 +488,7 @@
// Get all the pre execution hooks and execute them.
for (PreExecute peh : getPreExecHooks()) {
peh.run(SessionState.get(), plan.getInputs(), plan.getOutputs(),
+ plan.getLineageInfo(),
UnixUserGroupInformation.readFromConf(conf,
UnixUserGroupInformation.UGI_PROPERTY_NAME));
}