commit 19c8c2919ae4cefc904a3980ef31a617bec3f54b Author: Andrew Sherman Date: Fri Nov 17 15:26:13 2017 -0800 HIVE-18054: Make Lineage work with concurrent queries on a Session A Hive Session can contain multiple concurrent sql Operations. Lineage is currently tracked in SessionState and is cleared when a query completes. This results in Lineage for other running queries being lost. To fix this, move LineageState from SessionState to QueryState. In MoveTask/MoveWork use the LineageState from the MoveTask's QueryState rather than trying to use it from MoveWork. Add a test which runs multiple jdbc queries in a thread pool against the same connection and show that Vertices are not lost from Lineage. As part of this test, add ReadableHook, an ExecuteWithHookContext that stores HookContexts in memory and makes them available for reading. Make LineageLogger methods static so they can be used elsewhere. Sometimes a running query (originating in a Driver) will instantiate another Driver to run or compile another query. Because these Drivers shared a Session, the child Driver would accumulate Lineage information along with that of the parent Driver. For consistency a LineageState is passed to these child Drivers and stored in the new Driver's QueryState. diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/ReadableHook.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/ReadableHook.java new file mode 100644 index 0000000000000000000000000000000000000000..2dd283fd329563390eb78ef3573ac6224cf9db76 --- /dev/null +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/ReadableHook.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.jdbc; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext; +import org.apache.hadoop.hive.ql.hooks.HookContext; + +/** + * An ExecuteWithHookContext that stores HookContexts in memory and makes them available for reading + */ +public class ReadableHook implements ExecuteWithHookContext { + + private static List hookList = Collections.synchronizedList(new ArrayList<>()); + + @Override + public void run(HookContext hookContext) throws Exception { + hookList.add(hookContext); + } + + /** + * @return the stored HookContexts. + */ + public static List getHookList() { + return hookList; + } + + /** + * Clear the stored HookContexts. + */ + public static void clear() { + hookList.clear(); + } +} diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java index 70bd29c5178456c683652cf2377206059b735514..ffeee69f801330710481c4b0b3517fcb4e17a986 100644 --- itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java @@ -40,6 +40,7 @@ import java.sql.Types; import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -47,6 +48,7 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.SynchronousQueue; @@ -64,8 +66,12 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.ObjectStore; +import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.UDF; +import org.apache.hadoop.hive.ql.hooks.HookContext; +import org.apache.hadoop.hive.ql.hooks.LineageLogger; +import org.apache.hadoop.hive.ql.optimizer.lineage.LineageCtx; import org.apache.hive.common.util.ReflectionUtil; import org.apache.hive.jdbc.miniHS2.MiniHS2; import org.apache.hive.service.cli.HiveSQLException; @@ -205,6 +211,9 @@ private static void startMiniHS2(HiveConf conf, boolean httpMode) throws Excepti conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); conf.setBoolVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED, false); conf.setBoolVar(ConfVars.HIVESTATSCOLAUTOGATHER, false); + // store post-exec hooks calls so we can look at them later + conf.setVar(ConfVars.POSTEXECHOOKS, ReadableHook.class.getName() + "," + + LineageLogger.class.getName()); MiniHS2.Builder builder = new MiniHS2.Builder().withConf(conf).cleanupLocalDirOnStartup(false); if (httpMode) { builder = builder.withHTTPTransport(); @@ -1503,4 +1512,109 @@ public void testFetchSize() throws Exception { stmt.close(); fsConn.close(); } + + /** + * A test that checks that Lineage is correct when a multiple concurrent + * requests are make on a connection + */ + @Test + public void testConcurrentLineage() throws Exception { + // setup to run concurrent operations + Statement stmt = conTestDb.createStatement(); + setSerializeInTasksInConf(stmt); + stmt.execute("drop table if exists testConcurrentLineage1"); + stmt.execute("drop table if exists testConcurrentLineage2"); + stmt.execute("create table testConcurrentLineage1 (col1 int)"); + stmt.execute("create table testConcurrentLineage2 (col2 int)"); + + // clear vertices list + ReadableHook.clear(); + + // run 5 sql inserts concurrently + int numThreads = 5; // set to 1 for single threading + int concurrentCalls = 5; + ExecutorService pool = Executors.newFixedThreadPool(numThreads); + try { + List tasks = new ArrayList<>(); + for (int i = 0; i < concurrentCalls; i++) { + InsertCallable runner = new InsertCallable(conTestDb); + tasks.add(runner); + } + List> futures = pool.invokeAll(tasks); + for (Future future : futures) { + future.get(20, TimeUnit.SECONDS); + } + // check to see that the vertices are correct + checkVertices(); + } finally { + // clean up + stmt.execute("drop table testConcurrentLineage1"); + stmt.execute("drop table testConcurrentLineage2"); + stmt.close(); + pool.shutdownNow(); + } + } + + /** + * A Callable that does 2 inserts + */ + private class InsertCallable implements Callable { + private Connection connection; + + InsertCallable(Connection conn) { + this.connection = conn; + } + + @Override public Void call() throws Exception { + doLineageInserts(connection); + return null; + } + + private void doLineageInserts(Connection connection) throws SQLException { + Statement stmt = connection.createStatement(); + stmt.execute("insert into testConcurrentLineage1 values (1)"); + stmt.execute("insert into testConcurrentLineage2 values (2)"); + } + } + /** + * check to see that the vertices derived from the HookContexts are correct + */ + private void checkVertices() { + List> verticesLists = getVerticesFromHooks(); + + assertEquals("5 runs of 2 inserts makes 10", 10, verticesLists.size()); + for (Set vertices : verticesLists) { + assertFalse("Each insert affects a column so should be some vertices", + vertices.isEmpty()); + assertEquals("Each insert affects one column so should be one vertex", + 1, vertices.size()); + Iterator iterator = vertices.iterator(); + assertTrue(iterator.hasNext()); + LineageLogger.Vertex vertex = iterator.next(); + assertEquals(0, vertex.getId()); + assertEquals(LineageLogger.Vertex.Type.COLUMN, vertex.getType()); + String label = vertex.getLabel(); + System.out.println("vertex.getLabel() = " + label); + assertTrue("did not see one of the 2 expected column names", + label.equals("testjdbcminihs2.testconcurrentlineage1.col1") || + label.equals("testjdbcminihs2.testconcurrentlineage2.col2")); + } + } + + /** + * Use the logic in LineageLogger to get vertices from Hook Contexts + */ + private List> getVerticesFromHooks() { + List> verticesLists = new ArrayList<>(); + List hookList = ReadableHook.getHookList(); + for (HookContext hookContext : hookList) { + QueryPlan plan = hookContext.getQueryPlan(); + LineageCtx.Index index = hookContext.getIndex(); + assertNotNull(index); + List edges = LineageLogger.getEdges(plan, index); + Set vertices = LineageLogger.getVertices(edges); + verticesLists.add(vertices); + } + return verticesLists; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 389a1a6c0beed7cafc2f271632d26f07ad4229a3..7e13b210fb3695500322bb23dd1f5a2eb35466c8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -111,6 +111,7 @@ import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject; import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivObjectActionType; import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivilegeObjectType; +import org.apache.hadoop.hive.ql.session.LineageState; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.ql.wm.TriggerContext; @@ -373,12 +374,20 @@ public Driver(HiveConf conf) { this(getNewQueryState(conf), null); } + // Pass lineageState when a driver instantiates another Driver to run + // or compile another query + public Driver(HiveConf conf, LineageState lineageState) { + this(getNewQueryState(conf, lineageState), null); + } + public Driver(HiveConf conf, HiveTxnManager txnMgr) { this(getNewQueryState(conf), null, null, txnMgr); } - public Driver(HiveConf conf, Context ctx) { - this(getNewQueryState(conf), null, null); + // Pass lineageState when a driver instantiates another Driver to run + // or compile another query + public Driver(HiveConf conf, Context ctx, LineageState lineageState) { + this(getNewQueryState(conf, lineageState), null, null); this.ctx = ctx; } @@ -386,6 +395,12 @@ public Driver(HiveConf conf, String userName) { this(getNewQueryState(conf), userName, null); } + // Pass lineageState when a driver instantiates another Driver to run + // or compile another query + public Driver(HiveConf conf, String userName, LineageState lineageState) { + this(getNewQueryState(conf, lineageState), userName, null); + } + public Driver(QueryState queryState, String userName) { this(queryState, userName, new HooksLoader(queryState.getConf()), null, null); } @@ -424,6 +439,20 @@ private static QueryState getNewQueryState(HiveConf conf) { } /** + * Generating the new QueryState object. Making sure, that the new queryId is generated. + * @param conf The HiveConf which should be used + * @param lineageState a LineageState to be set in the new QueryState object + * @return The new QueryState object + */ + private static QueryState getNewQueryState(HiveConf conf, LineageState lineageState) { + return new QueryState.Builder() + .withGenerateNewQueryId(true) + .withHiveConf(conf) + .withLineageState(lineageState) + .build(); + } + + /** * Compile a new query. Any currently-planned query associated with this Driver is discarded. * Do not reset id for inner queries(index, etc). Task ids are used for task identity check. * @@ -1335,9 +1364,6 @@ public void releaseLocksAndCommitOrRollback(boolean commit, HiveTxnManager txnMa private void releaseResources() { releasePlan(); releaseDriverContext(); - if (SessionState.get() != null) { - SessionState.get().getLineageState().clear(); - } } @Override @@ -2403,9 +2429,6 @@ private int closeInProcess(boolean destroyed) { releaseFetchTask(); releaseResStream(); releaseContext(); - if (SessionState.get() != null) { - SessionState.get().getLineageState().clear(); - } if(destroyed) { if (!hiveLocks.isEmpty()) { try { @@ -2439,9 +2462,6 @@ public int close() { lDrvState.stateLock.unlock(); LockedDriverState.removeLockedDriverState(); } - if (SessionState.get() != null) { - SessionState.get().getLineageState().clear(); - } return 0; } @@ -2503,4 +2523,8 @@ public void resetQueryState() { releaseResources(); this.queryState = getNewQueryState(queryState.getConf()); } + + public QueryState getQueryState() { + return queryState; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/QueryState.java ql/src/java/org/apache/hadoop/hive/ql/QueryState.java index 7d5aa8b179e536e25c41a8946e667f8dd5669e0f..945325950914219f77db7436d3ece987dffe39e6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/QueryState.java +++ ql/src/java/org/apache/hadoop/hive/ql/QueryState.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.plan.HiveOperation; +import org.apache.hadoop.hive.ql.session.LineageState; /** * The class to store query level info such as queryId. Multiple queries can run @@ -40,6 +41,11 @@ private HiveOperation commandType; /** + * Per-query Lineage state to track what happens in the query + */ + private LineageState lineageState = new LineageState(); + + /** * transaction manager used in the query. */ private HiveTxnManager txnManager; @@ -78,6 +84,14 @@ public void setCommandType(HiveOperation commandType) { public HiveConf getConf() { return queryConf; } + + public LineageState getLineageState() { + return lineageState; + } + + public void setLineageState(LineageState lineageState) { + this.lineageState = lineageState; + } public HiveTxnManager getTxnManager() { return txnManager; @@ -95,6 +109,7 @@ public void setTxnManager(HiveTxnManager txnManager) { private boolean runAsync = false; private boolean generateNewQueryId = false; private HiveConf hiveConf = null; + private LineageState lineageState = null; /** * Default constructor - use this builder to create a QueryState object @@ -149,6 +164,16 @@ public Builder withHiveConf(HiveConf hiveConf) { } /** + * add a LineageState that will be set in the built QueryState + * @param lineageState the source lineageState + * @return the builder + */ + public Builder withLineageState(LineageState lineageState) { + this.lineageState = lineageState; + return this; + } + + /** * Creates the QueryState object. The default values are: * - runAsync false * - confOverlay null @@ -184,7 +209,11 @@ public QueryState build() { queryConf.setVar(HiveConf.ConfVars.HIVEQUERYID, QueryPlan.makeQueryId()); } - return new QueryState(queryConf); + QueryState queryState = new QueryState(queryConf); + if (lineageState != null) { + queryState.setLineageState(lineageState); + } + return queryState; } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 9184844f7e5d2f9a06bb6f04e4ce477940d6471e..1a78ba2fce220dd67bbdf638630a108207db99ab 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -4469,7 +4469,7 @@ private static void ensureDelete(FileSystem fs, Path path, String what) throws I } } // Don't set inputs and outputs - the locks have already been taken so it's pointless. - MoveWork mw = new MoveWork(null, null, null, null, false, SessionState.get().getLineageState()); + MoveWork mw = new MoveWork(null, null, null, null, false); mw.setMultiFilesDesc(new LoadMultiFilesDesc(srcs, tgts, true, null, null)); ImportCommitWork icw = new ImportCommitWork(tbl.getDbName(), tbl.getTableName(), mmWriteId, stmtId); Task mv = TaskFactory.get(mw, conf), ic = TaskFactory.get(icw, conf); @@ -4900,7 +4900,7 @@ private int createTable(Hive db, CreateTableDesc crtTbl) throws HiveException { Table createdTable = db.getTable(tbl.getDbName(), tbl.getTableName()); if (crtTbl.isCTAS()) { DataContainer dc = new DataContainer(createdTable.getTTable()); - SessionState.get().getLineageState().setLineage( + queryState.getLineageState().setLineage( createdTable.getPath(), dc, createdTable.getCols() ); } @@ -5132,7 +5132,7 @@ private int createView(Hive db, CreateViewDesc crtView) throws HiveException { //set lineage info DataContainer dc = new DataContainer(tbl.getTTable()); - SessionState.get().getLineageState().setLineage(new Path(crtView.getViewName()), dc, tbl.getCols()); + queryState.getLineageState().setLineage(new Path(crtView.getViewName()), dc, tbl.getCols()); } return 0; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index 6d1377370cc8589b336d3fe928f1bdf42dde1e87..6918e41af43e2edb23524f7d54236448c630b675 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -398,7 +398,7 @@ public int execute(DriverContext driverContext) { dc = handleStaticParts(db, table, tbd, ti); } } - if (work.getLineagState() != null && dc != null) { + if (dc != null) { // If we are doing an update or a delete the number of columns in the table will not // match the number of columns in the file sink. For update there will be one too many // (because of the ROW__ID), and in the case of the delete there will be just the @@ -416,7 +416,7 @@ public int execute(DriverContext driverContext) { tableCols = table.getCols(); break; } - work.getLineagState().setLineage(tbd.getSourcePath(), dc, tableCols); + queryState.getLineageState().setLineage(tbd.getSourcePath(), dc, tableCols); } releaseLocks(tbd); } @@ -552,10 +552,9 @@ private DataContainer handleDynParts(Hive db, Table table, LoadTableDesc tbd, dc = new DataContainer(table.getTTable(), partn.getTPartition()); // Don't set lineage on delete as we don't have all the columns - if (work.getLineagState() != null && - work.getLoadTableWork().getWriteType() != AcidUtils.Operation.DELETE && + if (work.getLoadTableWork().getWriteType() != AcidUtils.Operation.DELETE && work.getLoadTableWork().getWriteType() != AcidUtils.Operation.UPDATE) { - work.getLineagState().setLineage(tbd.getSourcePath(), dc, + queryState.getLineageState().setLineage(tbd.getSourcePath(), dc, table.getCols()); } LOG.info("Loading partition " + entry.getKey()); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java index 1f0487f4f72ab18bcf876f45ad5758d83a7f001b..d75fcf76401e87bf0ae18a8dfdd50bc3bc9d0f14 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java @@ -649,4 +649,8 @@ public boolean canExecuteInParallel(){ return true; } + public QueryState getQueryState() { + return queryState; + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index 262225fc202d4627652acfd77350e44b0284b3da..1a542e38a3e91da2d9361903e671afcd91886cea 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -245,8 +245,7 @@ private void addPartition(boolean hasMorePartitions, AddPartitionDesc addPartiti SessionState.get().getTxnMgr().getCurrentTxnId() ); loadTableWork.setInheritTableSpecs(false); - MoveWork work = new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false, - context.sessionStateLineageState); + MoveWork work = new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false); return TaskFactory.get(work, context.hiveConf); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index 545b7a8b7e9f1370b767fc777cb10fa59bd81917..f5125a21871d70c5d80ae2740e7efd3ced2c33f0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -233,8 +233,7 @@ private String location(ImportTableDesc tblDesc, Database parentDb) SessionState.get().getTxnMgr().getCurrentTxnId() ); MoveWork moveWork = - new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false, - context.sessionStateLineageState); + new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false); Task loadTableTask = TaskFactory.get(moveWork, context.hiveConf); copyTask.addDependentTask(loadTableTask); return copyTask; diff --git ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java index 7b617309f6b0d8a7ce0dea80ab1f790c2651b147..61aa3c560809858901b66738d3ffbca403943ec5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java @@ -88,12 +88,8 @@ public HookContext(QueryPlan queryPlan, QueryState queryState, inputs = queryPlan.getInputs(); outputs = queryPlan.getOutputs(); ugi = Utils.getUGI(); - linfo= null; - depMap = null; - if(SessionState.get() != null){ - linfo = SessionState.get().getLineageState().getLineageInfo(); - depMap = SessionState.get().getLineageState().getIndex(); - } + linfo = queryState.getLineageState().getLineageInfo(); + depMap = queryState.getLineageState().getIndex(); this.userName = userName; this.ipAddress = ipAddress; this.hiveInstanceAddress = hiveInstanceAddress; diff --git ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java index 2f764f8a29a9d41a7db013a949ffe3a8a9417d32..6dc6fca76de7cd272a559dec23d04179b627a1cb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.hooks; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; @@ -74,7 +75,8 @@ private static final String FORMAT_VERSION = "1.0"; - final static class Edge { + @VisibleForTesting + public final static class Edge { public static enum Type { PROJECTION, PREDICATE } @@ -92,7 +94,8 @@ } } - final static class Vertex { + @VisibleForTesting + public final static class Vertex { public static enum Type { COLUMN, TABLE } @@ -125,6 +128,21 @@ public boolean equals(Object obj) { Vertex vertex = (Vertex) obj; return label.equals(vertex.label) && type == vertex.type; } + + @VisibleForTesting + public Type getType() { + return type; + } + + @VisibleForTesting + public String getLabel() { + return label; + } + + @VisibleForTesting + public int getId() { + return id; + } } @Override @@ -203,7 +221,7 @@ public void run(HookContext hookContext) { /** * Logger an error to console if available. */ - private void log(String error) { + private static void log(String error) { LogHelper console = SessionState.getConsole(); if (console != null) { console.printError(error); @@ -214,7 +232,8 @@ private void log(String error) { * Based on the final select operator, find out all the target columns. * For each target column, find out its sources based on the dependency index. */ - private List getEdges(QueryPlan plan, Index index) { + @VisibleForTesting + public static List getEdges(QueryPlan plan, Index index) { LinkedHashMap> finalSelOps = index.getFinalSelectOps(); Map vertexCache = new LinkedHashMap(); @@ -292,7 +311,7 @@ private void log(String error) { return edges; } - private void addEdge(Map vertexCache, List edges, + private static void addEdge(Map vertexCache, List edges, Set srcCols, Vertex target, String expr, Edge.Type type) { Set targets = new LinkedHashSet(); targets.add(target); @@ -304,7 +323,7 @@ private void addEdge(Map vertexCache, List edges, * If found, add the more targets to this edge's target vertex list. * Otherwise, create a new edge and add to edge list. */ - private void addEdge(Map vertexCache, List edges, + private static void addEdge(Map vertexCache, List edges, Set srcCols, Set targets, String expr, Edge.Type type) { Set sources = createSourceVertices(vertexCache, srcCols); Edge edge = findSimilarEdgeBySources(edges, sources, expr, type); @@ -319,7 +338,7 @@ private void addEdge(Map vertexCache, List edges, * Convert a list of columns to a set of vertices. * Use cached vertices if possible. */ - private Set createSourceVertices( + private static Set createSourceVertices( Map vertexCache, Collection baseCols) { Set sources = new LinkedHashSet(); if (baseCols != null && !baseCols.isEmpty()) { @@ -346,7 +365,7 @@ private void addEdge(Map vertexCache, List edges, /** * Find a vertex from a cache, or create one if not. */ - private Vertex getOrCreateVertex( + private static Vertex getOrCreateVertex( Map vertices, String label, Vertex.Type type) { Vertex vertex = vertices.get(label); if (vertex == null) { @@ -359,7 +378,7 @@ private Vertex getOrCreateVertex( /** * Find an edge that has the same type, expression, and sources. */ - private Edge findSimilarEdgeBySources( + private static Edge findSimilarEdgeBySources( List edges, Set sources, String expr, Edge.Type type) { for (Edge edge: edges) { if (edge.type == type && StringUtils.equals(edge.expr, expr) @@ -373,7 +392,7 @@ private Edge findSimilarEdgeBySources( /** * Generate normalized name for a given target column. */ - private String getTargetFieldName(int fieldIndex, + private static String getTargetFieldName(int fieldIndex, String destTableName, List colNames, List fieldSchemas) { String fieldName = fieldSchemas.get(fieldIndex).getName(); String[] parts = fieldName.split("\\."); @@ -394,7 +413,8 @@ private String getTargetFieldName(int fieldIndex, * Get all the vertices of all edges. Targets at first, * then sources. Assign id to each vertex. */ - private Set getVertices(List edges) { + @VisibleForTesting + public static Set getVertices(List edges) { Set vertices = new LinkedHashSet(); for (Edge edge: edges) { vertices.addAll(edge.targets); diff --git ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java index 68709b4d3baf15d78e60e948ccdef3df84f28cec..bf0672339c092c25f120cd7e91a63bf08b4e7d6b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java @@ -37,7 +37,7 @@ import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.optimizer.IndexUtils; import org.apache.hadoop.hive.ql.plan.PartitionDesc; - +import org.apache.hadoop.hive.ql.session.LineageState; /** * Index handler for indexes that have aggregate functions on indexed columns. @@ -90,7 +90,8 @@ private void createAggregationFunction(List indexTblCols, String pr Set outputs, Index index, boolean partitioned, PartitionDesc indexTblPartDesc, String indexTableName, - PartitionDesc baseTablePartDesc, String baseTableName, String dbName) { + PartitionDesc baseTablePartDesc, String baseTableName, String dbName, + LineageState lineageState) { List indexField = index.getSd().getCols(); String indexCols = HiveUtils.getUnparsedColumnNamesFromFieldSchema(indexField); @@ -152,7 +153,7 @@ private void createAggregationFunction(List indexTblCols, String pr builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGEMAPREDFILES, false); builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGETEZFILES, false); Task rootTask = IndexUtils.createRootTask(builderConf, inputs, outputs, - command, (LinkedHashMap) partSpec, indexTableName, dbName); + command, (LinkedHashMap) partSpec, indexTableName, dbName, lineageState); return rootTask; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexHandler.java ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexHandler.java index 1e577da82343a1b7361467fb662661f9c6642ec0..b6c02524e243fbed340baf8afc1daacef2cd95e8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexHandler.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.session.LineageState; /** * HiveIndexHandler defines a pluggable interface for adding new index handlers @@ -99,6 +100,9 @@ void analyzeIndexDefinition( * outputs for hooks, supplemental outputs going * along with the return value * + * @param lineageState + * tracks Lineage for the query + * * @return list of tasks to be executed in parallel for building the index * * @throws HiveException if plan generation fails @@ -108,7 +112,7 @@ void analyzeIndexDefinition( org.apache.hadoop.hive.metastore.api.Index index, List indexTblPartitions, List baseTblPartitions, org.apache.hadoop.hive.ql.metadata.Table indexTbl, - Set inputs, Set outputs) + Set inputs, Set outputs, LineageState lineageState) throws HiveException; /** diff --git ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java index 29886ae7f97f8dae7116f4fc9a2417ab8f9dac0a..744ac29b2a4382de9f41d0eda5fe5cf399263810 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.session.LineageState; /** * Index handler for indexes that use tables to store indexes. @@ -51,7 +52,8 @@ org.apache.hadoop.hive.metastore.api.Index index, List indexTblPartitions, List baseTblPartitions, org.apache.hadoop.hive.ql.metadata.Table indexTbl, - Set inputs, Set outputs) throws HiveException { + Set inputs, Set outputs, + LineageState lineageState) throws HiveException { try { TableDesc desc = Utilities.getTableDesc(indexTbl); @@ -66,7 +68,7 @@ Task indexBuilder = getIndexBuilderMapRedTask(inputs, outputs, index, false, new PartitionDesc(desc, null), indexTbl.getTableName(), new PartitionDesc(Utilities.getTableDesc(baseTbl), null), - baseTbl.getTableName(), indexTbl.getDbName()); + baseTbl.getTableName(), indexTbl.getDbName(), lineageState); indexBuilderTasks.add(indexBuilder); } else { @@ -89,7 +91,8 @@ // for each partition, spawn a map reduce task. Task indexBuilder = getIndexBuilderMapRedTask(inputs, outputs, index, true, new PartitionDesc(indexPart), indexTbl.getTableName(), - new PartitionDesc(basePart), baseTbl.getTableName(), indexTbl.getDbName()); + new PartitionDesc(basePart), baseTbl.getTableName(), indexTbl.getDbName(), + lineageState); indexBuilderTasks.add(indexBuilder); } } @@ -102,15 +105,18 @@ protected Task getIndexBuilderMapRedTask(Set inputs, Set outputs, Index index, boolean partitioned, PartitionDesc indexTblPartDesc, String indexTableName, - PartitionDesc baseTablePartDesc, String baseTableName, String dbName) throws HiveException { + PartitionDesc baseTablePartDesc, String baseTableName, String dbName, + LineageState lineageState) throws HiveException { return getIndexBuilderMapRedTask(inputs, outputs, index.getSd().getCols(), - partitioned, indexTblPartDesc, indexTableName, baseTablePartDesc, baseTableName, dbName); + partitioned, indexTblPartDesc, indexTableName, baseTablePartDesc, baseTableName, dbName, + lineageState); } protected Task getIndexBuilderMapRedTask(Set inputs, Set outputs, List indexField, boolean partitioned, PartitionDesc indexTblPartDesc, String indexTableName, - PartitionDesc baseTablePartDesc, String baseTableName, String dbName) throws HiveException { + PartitionDesc baseTablePartDesc, String baseTableName, String dbName, + LineageState lineageState) throws HiveException { return null; } diff --git ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java index 7b067a0d45e33bc3347c43b050af933c296a9227..911715949ac3e18d32370fee0c66efe686b812e7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.ql.session.LineageState; import org.apache.hadoop.hive.ql.stats.StatsUtils; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan; @@ -115,7 +116,7 @@ public void generateIndexQuery(List indexes, ExprNodeDesc predicate, LOG.info("Generating tasks for re-entrant QL query: " + qlCommand.toString()); HiveConf queryConf = new HiveConf(pctx.getConf(), BitmapIndexHandler.class); HiveConf.setBoolVar(queryConf, HiveConf.ConfVars.COMPRESSRESULT, false); - Driver driver = new Driver(queryConf); + Driver driver = new Driver(queryConf, pctx.getQueryState().getLineageState()); driver.compile(qlCommand.toString(), false); queryContext.setIndexIntermediateFile(tmpFile); @@ -222,7 +223,8 @@ public void analyzeIndexDefinition(Table baseTable, Index index, protected Task getIndexBuilderMapRedTask(Set inputs, Set outputs, List indexField, boolean partitioned, PartitionDesc indexTblPartDesc, String indexTableName, - PartitionDesc baseTablePartDesc, String baseTableName, String dbName) throws HiveException { + PartitionDesc baseTablePartDesc, String baseTableName, String dbName, + LineageState lineageState) throws HiveException { HiveConf builderConf = new HiveConf(getConf(), BitmapIndexHandler.class); HiveConf.setBoolVar(builderConf, HiveConf.ConfVars.HIVEROWOFFSET, true); @@ -290,7 +292,7 @@ public void analyzeIndexDefinition(Table baseTable, Index index, } Task rootTask = IndexUtils.createRootTask(builderConf, inputs, outputs, - command, partSpec, indexTableName, dbName); + command, partSpec, indexTableName, dbName, lineageState); return rootTask; } diff --git ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java index 504b0623142a6fa6cdb45a26b49f146e12ec2d7a..73278cda820a3d7d75f6ce3d67b05c4a0d243739 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java @@ -59,6 +59,7 @@ import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.ql.session.LineageState; import org.apache.hadoop.hive.ql.stats.StatsUtils; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan; @@ -94,7 +95,8 @@ public void analyzeIndexDefinition(Table baseTable, Index index, protected Task getIndexBuilderMapRedTask(Set inputs, Set outputs, List indexField, boolean partitioned, PartitionDesc indexTblPartDesc, String indexTableName, - PartitionDesc baseTablePartDesc, String baseTableName, String dbName) throws HiveException { + PartitionDesc baseTablePartDesc, String baseTableName, String dbName, + LineageState lineageState) throws HiveException { String indexCols = HiveUtils.getUnparsedColumnNamesFromFieldSchema(indexField); @@ -150,7 +152,7 @@ public void analyzeIndexDefinition(Table baseTable, Index index, builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGEMAPREDFILES, false); builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGETEZFILES, false); Task rootTask = IndexUtils.createRootTask(builderConf, inputs, outputs, - command, partSpec, indexTableName, dbName); + command, partSpec, indexTableName, dbName, lineageState); return rootTask; } @@ -189,7 +191,7 @@ public void generateIndexQuery(List indexes, ExprNodeDesc predicate, LOG.info("Generating tasks for re-entrant QL query: " + qlCommand.toString()); HiveConf queryConf = new HiveConf(pctx.getConf(), CompactIndexHandler.class); HiveConf.setBoolVar(queryConf, HiveConf.ConfVars.COMPRESSRESULT, false); - Driver driver = new Driver(queryConf); + Driver driver = new Driver(queryConf, pctx.getQueryState().getLineageState()); driver.compile(qlCommand.toString(), false); if (pctx.getConf().getBoolVar(ConfVars.HIVE_INDEX_COMPACT_BINARY_SEARCH) && useSorted) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java index d7a83f775abca39b219f71aff88173a14ffaee9f..f6c65037047c838d8da7243daae9e22dcfb911e2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java @@ -112,7 +112,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx opProcCtx, LOG.info("using CombineHiveInputformat for the merge job"); GenMapRedUtils.createMRWorkForMergingFiles(fsOp, finalName, ctx.getDependencyTaskForMultiInsert(), ctx.getMvTask(), - hconf, currTask); + hconf, currTask, ctx.getLineageState()); } FileSinkDesc fileSinkDesc = fsOp.getConf(); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java index 4387c4297fee48d4c03e95d5a2fcb822ab480eeb..d0c6ea0bea784d241403c92064c489d0e611c31e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.session.LineageState; /** * Processor Context for creating map reduce task. Walk the tree in a DFS manner @@ -168,6 +169,11 @@ public void addListTopOperators(TableScanOperator topOperator) { */ private Set outputs; + /** + * To track lineage for the query + */ + private LineageState lineageState; + public GenMRProcContext() { } @@ -190,15 +196,16 @@ public GenMRProcContext() { * the set of input tables/partitions generated by the walk * @param outputs * the set of destinations generated by the walk + * @param lineageState + * to track activity in queries */ - public GenMRProcContext( - HiveConf conf, + public GenMRProcContext(HiveConf conf, HashMap, Task> opTaskMap, ParseContext parseCtx, List> mvTask, List> rootTasks, LinkedHashMap, GenMapRedCtx> mapCurrCtx, - Set inputs, Set outputs) { + Set inputs, Set outputs, LineageState lineageState) { this.conf = conf; this.opTaskMap = opTaskMap; this.mvTask = mvTask; @@ -207,6 +214,7 @@ public GenMRProcContext( this.mapCurrCtx = mapCurrCtx; this.inputs = inputs; this.outputs = outputs; + this.lineageState = lineageState; currTask = null; currTopOp = null; currUnionOp = null; @@ -453,4 +461,8 @@ public void setLinkedFileDescTasks( Map> linkedFileDescTasks) { this.linkedFileDescTasks = linkedFileDescTasks; } + + public LineageState getLineageState() { + return lineageState; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index be1d4b8cee8310e48fed8db7daa287f15ce465b1..acc7ec14b923fe576354f6cf8c4f3ba72b2d9893 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -1229,6 +1229,7 @@ public static void replaceMapWork(String sourceAlias, String targetAlias, * @param mvTasks * @param conf * @param currTask + * @param lineageState * @throws SemanticException * create a Map-only merge job using CombineHiveInputFormat for all partitions with @@ -1257,10 +1258,10 @@ public static void replaceMapWork(String sourceAlias, String targetAlias, * directories. * */ - public static void createMRWorkForMergingFiles (FileSinkOperator fsInput, - Path finalName, DependencyCollectionTask dependencyTask, - List> mvTasks, HiveConf conf, - Task currTask) throws SemanticException { + public static void createMRWorkForMergingFiles(FileSinkOperator fsInput, + Path finalName, DependencyCollectionTask dependencyTask, + List> mvTasks, HiveConf conf, + Task currTask, LineageState lineageState) throws SemanticException { // // 1. create the operator tree @@ -1370,8 +1371,7 @@ public static void createMRWorkForMergingFiles (FileSinkOperator fsInput, if (srcMmWriteId == null) { // Only create the movework for non-MM table. No action needed for a MM table. dummyMv = new MoveWork(null, null, null, - new LoadFileDesc(inputDirName, finalName, true, null, null, false), false, - SessionState.get().getLineageState()); + new LoadFileDesc(inputDirName, finalName, true, null, null, false), false); } // Use the original fsOp path here in case of MM - while the new FSOP merges files inside the // MM directory, the original MoveTask still commits based on the parent. Note that this path @@ -1382,7 +1382,7 @@ public static void createMRWorkForMergingFiles (FileSinkOperator fsInput, Task mvTask = GenMapRedUtils.findMoveTaskForFsopOutput( mvTasks, fsopPath, fsInputDesc.isMmTable()); ConditionalTask cndTsk = GenMapRedUtils.createCondTask(conf, currTask, dummyMv, work, - fsInputDesc.getMergeInputDirName(), finalName, mvTask, dependencyTask); + fsInputDesc.getMergeInputDirName(), finalName, mvTask, dependencyTask, lineageState); // keep the dynamic partition context in conditional task resolver context ConditionalResolverMergeFilesCtx mrCtx = @@ -1730,15 +1730,16 @@ protected static boolean shouldMergeMovePaths(HiveConf conf, Path condInputPath, * * @param condInputPath A path that the ConditionalTask uses as input for its sub-tasks. * @param linkedMoveWork A MoveWork that the ConditionalTask uses to link to its sub-tasks. + * @param lineageState A LineageState used to track what changes. * @return A new MoveWork that has the Conditional input path as source and the linkedMoveWork as target. */ @VisibleForTesting - protected static MoveWork mergeMovePaths(Path condInputPath, MoveWork linkedMoveWork) { + protected static MoveWork mergeMovePaths(Path condInputPath, MoveWork linkedMoveWork, + LineageState lineageState) { MoveWork newWork = new MoveWork(linkedMoveWork); LoadFileDesc fileDesc = null; LoadTableDesc tableDesc = null; - LineageState lineageState = SessionState.get().getLineageState(); if (linkedMoveWork.getLoadFileWork() != null) { fileDesc = new LoadFileDesc(linkedMoveWork.getLoadFileWork()); fileDesc.setSourcePath(condInputPath); @@ -1776,13 +1777,15 @@ protected static MoveWork mergeMovePaths(Path condInputPath, MoveWork linkedMove * a MoveTask that may be linked to the conditional sub-tasks * @param dependencyTask * a dependency task that may be linked to the conditional sub-tasks + * @param lineageState + * to track activity * @return The conditional task */ @SuppressWarnings("unchecked") private static ConditionalTask createCondTask(HiveConf conf, Task currTask, MoveWork mvWork, Serializable mergeWork, Path condInputPath, Path condOutputPath, Task moveTaskToLink, - DependencyCollectionTask dependencyTask) { + DependencyCollectionTask dependencyTask, LineageState lineageState) { if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("Creating conditional merge task for " + condInputPath); } @@ -1795,7 +1798,8 @@ private static ConditionalTask createCondTask(HiveConf conf, Serializable workForMoveOnlyTask = moveWork; if (shouldMergeMovePaths) { - workForMoveOnlyTask = mergeMovePaths(condInputPath, moveTaskToLink.getWork()); + workForMoveOnlyTask = mergeMovePaths(condInputPath, moveTaskToLink.getWork(), + lineageState); } // There are 3 options for this ConditionalTask: diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java index 338c1856672f09bb7da35d2336ebb5b6f3fdc5a6..f69c9a2503b006048f43c7911ad05a91aaa97754 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.session.LineageState; /** * Utility class for index support. @@ -221,10 +222,11 @@ private static boolean isIndexTableFresh(Hive hive, List indexes, Table s StringBuilder command, LinkedHashMap partSpec, String indexTableName, - String dbName){ + String dbName, + LineageState lineageState){ // Don't try to index optimize the query to build the index HiveConf.setBoolVar(builderConf, HiveConf.ConfVars.HIVEOPTINDEXFILTER, false); - Driver driver = new Driver(builderConf, SessionState.get().getUserName()); + Driver driver = new Driver(builderConf, SessionState.get().getUserName(), lineageState); driver.compile(command.toString(), false); Task rootTask = driver.getPlan().getRootTasks().get(0); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java index e6c07713b24df719315d804f006151106eea9aed..aeb12301113f49ba4b1381a6f954390b40b21bb9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java @@ -85,9 +85,10 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { return pctx; } } - - Index index = SessionState.get() != null ? - SessionState.get().getLineageState().getIndex() : new Index(); + Index index = pctx.getQueryState().getLineageState().getIndex(); + if (index == null) { + index = new Index(); + } long sTime = System.currentTimeMillis(); // Create the lineage context diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index e5e1b532797a106ea46b70c368e61f6d3a9ce5d0..1892dcebca4c02edde27ec635ff1c56c607e6f23 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -158,6 +158,7 @@ import org.apache.hadoop.hive.ql.plan.UnlockTableDesc; import org.apache.hadoop.hive.ql.plan.CreateOrAlterWMPoolDesc; import org.apache.hadoop.hive.ql.plan.CreateOrDropTriggerToPoolMappingDesc; +import org.apache.hadoop.hive.ql.session.LineageState; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.serde.serdeConstants; @@ -1478,7 +1479,7 @@ private void analyzeTruncateTable(ASTNode ast) throws SemanticException { ltd.setLbCtx(lbCtx); @SuppressWarnings("unchecked") Task moveTsk = TaskFactory.get(new MoveWork( - null, null, ltd, null, false, SessionState.get().getLineageState()), conf); + null, null, ltd, null, false), conf); truncateTask.addDependentTask(moveTsk); // Recalculate the HDFS stats if auto gather stats is set @@ -1695,8 +1696,10 @@ private void analyzeAlterIndexProps(ASTNode ast) indexTbl, db, indexTblPartitions); } + LineageState lineageState = queryState.getLineageState(); List> ret = handler.generateIndexBuildTaskList(baseTbl, - index, indexTblPartitions, baseTblPartitions, indexTbl, getInputs(), getOutputs()); + index, indexTblPartitions, baseTblPartitions, indexTbl, getInputs(), getOutputs(), + lineageState); return ret; } catch (Exception e) { throw new SemanticException(e); @@ -2139,7 +2142,7 @@ private void analyzeAlterTablePartMergeFiles(ASTNode ast, partSpec == null ? new HashMap<>() : partSpec, null); ltd.setLbCtx(lbCtx); Task moveTsk = TaskFactory.get( - new MoveWork(null, null, ltd, null, false, SessionState.get().getLineageState()), conf); + new MoveWork(null, null, ltd, null, false), conf); mergeTask.addDependentTask(moveTsk); if (conf.getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) { @@ -3531,7 +3534,7 @@ private void analyzeAlterTableAddParts(String[] qualified, CommonTree ast, boole } SessionState ss = SessionState.get(); String uName = (ss == null? null: ss.getUserName()); - Driver driver = new Driver(conf, uName); + Driver driver = new Driver(conf, uName, queryState.getLineageState()); int rc = driver.compile(cmd.toString(), false); if (rc != 0) { throw new SemanticException(ErrorMsg.NO_VALID_PARTN.getMsg()); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java index 065c7e50986872cd35386feee712f3452597d643..0eacfc0f0151d151c0537e81265ef2c9d114d895 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java @@ -134,7 +134,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { runCtx = new Context(conf); // runCtx and ctx share the configuration, but not isExplainPlan() runCtx.setExplainConfig(config); - Driver driver = new Driver(conf, runCtx); + Driver driver = new Driver(conf, runCtx, queryState.getLineageState()); CommandProcessorResponse ret = driver.run(query); if(ret.getResponseCode() == 0) { // Note that we need to call getResults for simple fetch optimization. diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java index 0c160acf46eb1eb07c5f04091099c1024e166638..01d156174138916f6c93d8b8929f82414061f23b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java @@ -54,6 +54,7 @@ import org.apache.hadoop.hive.ql.plan.TezEdgeProperty; import org.apache.hadoop.hive.ql.plan.TezWork; import org.apache.hadoop.hive.ql.plan.UnionWork; +import org.apache.hadoop.hive.ql.session.LineageState; /** * GenTezProcContext. GenTezProcContext maintains information @@ -162,10 +163,13 @@ // track of which small tables haven't been processed yet. public Map> mapJoinToUnprocessedSmallTableReduceSinks; + // track query activity + public LineageState lineageState; + @SuppressWarnings("unchecked") - public GenTezProcContext(HiveConf conf, ParseContext parseContext, - List> moveTask, List> rootTasks, - Set inputs, Set outputs) { + public GenTezProcContext(HiveConf conf, ParseContext parseContext, List> moveTask, + List> rootTasks, Set inputs, + Set outputs, LineageState lineageState) { this.conf = conf; this.parseContext = parseContext; @@ -175,6 +179,7 @@ public GenTezProcContext(HiveConf conf, ParseContext parseContext, this.outputs = outputs; this.currentTask = (TezTask) TaskFactory.get( new TezWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID), conf), conf); + this.lineageState = lineageState; this.leafOperatorToFollowingWork = new LinkedHashMap, BaseWork>(); this.linkOpWithWorkMap = new LinkedHashMap, Map>(); this.linkWorkWithReduceSinkMap = new LinkedHashMap>(); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java index b6f1139fe1a78283277bf4d0c5224ab1d718c634..86c6c16651ae5117c046dfcc1a08fdad73c74819 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java @@ -385,7 +385,7 @@ public static void processFileSink(GenTezProcContext context, FileSinkOperator f + fileSink.getConf().getDirName() + " to " + finalName); GenMapRedUtils.createMRWorkForMergingFiles(fileSink, finalName, context.dependencyTask, context.moveTask, - hconf, context.currentTask); + hconf, context.currentTask, context.lineageState); } FetchTask fetchTask = parseContext.getFetchTask(); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index a1b6cda3e80adca3eba2550c636a77c7ead21c7a..37a17d46b8bafbac5c6565d283a124632efbfba7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -392,7 +392,8 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, Utilities.getTableDesc(table), new TreeMap<>(), replace ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, txnId); loadTableWork.setStmtId(stmtId); - MoveWork mv = new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, null, false, SessionState.get().getLineageState()); + MoveWork mv = new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, + null, false); Task loadTableTask = TaskFactory.get(mv, x.getConf()); copyTask.addDependentTask(loadTableTask); x.getTasks().add(copyTask); @@ -496,7 +497,7 @@ private static boolean isAcid(Long txnId) { loadTableWork.setStmtId(stmtId); loadTableWork.setInheritTableSpecs(false); Task loadPartTask = TaskFactory.get(new MoveWork( - x.getInputs(), x.getOutputs(), loadTableWork, null, false, SessionState.get().getLineageState()), x.getConf()); + x.getInputs(), x.getOutputs(), loadTableWork, null, false), x.getConf()); copyTask.addDependentTask(loadPartTask); addPartTask.addDependentTask(loadPartTask); x.getTasks().add(copyTask); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java index f31775ed942160da73344c4dca707da7b8c658a6..342e1484610c1a407810608a1e7828f7675d8cb9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.optimizer.IndexUtils; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.session.LineageState; import java.io.Serializable; import java.util.LinkedList; @@ -47,12 +48,14 @@ private Hive hive; private List> tasks; private Set inputs; + private LineageState lineageState; - - public IndexUpdater(List loadTableWork, Set inputs, Configuration conf) { + public IndexUpdater(List loadTableWork, Set inputs, Configuration conf, + LineageState lineageState) { this.loadTableWork = loadTableWork; this.inputs = inputs; this.conf = new HiveConf(conf, IndexUpdater.class); + this.lineageState = lineageState; this.tasks = new LinkedList>(); } @@ -133,7 +136,7 @@ private void doIndexUpdate(Index index, Map partSpec) { } private void compileRebuild(String query) { - Driver driver = new Driver(this.conf); + Driver driver = new Driver(this.conf, lineageState); driver.compile(query, false); tasks.addAll(driver.getPlan().getRootTasks()); inputs.addAll(driver.getPlan().getInputs()); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java index cc956da57567114aa29ee0552566ca62c68f6be7..e600f7a7aaddb2061ecc8f7705b75ca7aceb86fa 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java @@ -302,7 +302,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { Task childTask = TaskFactory.get( new MoveWork(getInputs(), getOutputs(), loadTableWork, null, true, - isLocal, SessionState.get().getLineageState()), conf + isLocal), conf ); if (rTask != null) { rTask.addDependentTask(childTask); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java index d7a56e5846d5754dec5070d8c44443543a3695e4..723359c8155a6aa2ade13dd12659d0a2794586ca 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java @@ -70,6 +70,7 @@ import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.session.LineageState; import org.apache.hadoop.hive.shims.ShimLoader; public class MapReduceCompiler extends TaskCompiler { @@ -274,7 +275,8 @@ protected void optimizeTaskPlan(List> rootTasks, @Override protected void generateTaskTree(List> rootTasks, ParseContext pCtx, - List> mvTask, Set inputs, Set outputs) throws SemanticException { + List> mvTask, Set inputs, Set outputs, + LineageState lineageState) throws SemanticException { // generate map reduce plans ParseContext tempParseContext = getParseContext(pCtx, rootTasks); @@ -284,7 +286,7 @@ protected void generateTaskTree(List> rootTasks, Pa new LinkedHashMap, Task>(), tempParseContext, mvTask, rootTasks, new LinkedHashMap, GenMapRedCtx>(), - inputs, outputs); + inputs, outputs, lineageState); // create a walker which walks the tree in a DFS manner while maintaining // the operator stack. diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 498b6741c3f40b92ce3fb218e91e7809a17383f0..80556aec88d4c8f15ad7d00724339276c4931b47 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -313,7 +313,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { if ((!evDump) && (tblNameOrPattern != null) && !(tblNameOrPattern.isEmpty())) { ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, tblNameOrPattern, - SessionState.get().getLineageState(), SessionState.get().getTxnMgr().getCurrentTxnId()); + queryState.getLineageState(), SessionState.get().getTxnMgr().getCurrentTxnId()); rootTasks.add(TaskFactory.get(replLoadWork, conf, true)); return; } @@ -344,7 +344,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { } ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, - SessionState.get().getLineageState(), SessionState.get().getTxnMgr().getCurrentTxnId()); + queryState.getLineageState(), SessionState.get().getTxnMgr().getCurrentTxnId()); rootTasks.add(TaskFactory.get(replLoadWork, conf, true)); // // for (FileStatus dir : dirsInLoadPath) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index b323edeb743add0164732b2a403ac4c4f49456da..213f69f372671eb3fab787b00548f620422ff848 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -7313,8 +7313,8 @@ private FileSinkDesc createFileSinkDesc(String dest, TableDesc table_desc, private void handleLineage(LoadTableDesc ltd, Operator output) throws SemanticException { - if (ltd != null && SessionState.get() != null) { - SessionState.get().getLineageState() + if (ltd != null) { + queryState.getLineageState() .mapDirToOp(ltd.getSourcePath(), output); } else if ( queryState.getCommandType().equals(HiveOperation.CREATETABLE_AS_SELECT.getOperationName())) { @@ -7327,7 +7327,7 @@ private void handleLineage(LoadTableDesc ltd, Operator output) throw new SemanticException(e); } - SessionState.get().getLineageState() + queryState.getLineageState() .mapDirToOp(tlocation, output); } } @@ -11662,7 +11662,7 @@ void analyzeInternal(ASTNode ast, PlannerContextFactory pcf) throws SemanticExce pCtx = t.transform(pCtx); } // we just use view name as location. - SessionState.get().getLineageState() + queryState.getLineageState() .mapDirToOp(new Path(createVwDesc.getViewName()), sinkOp); } return; @@ -11707,7 +11707,7 @@ void analyzeInternal(ASTNode ast, PlannerContextFactory pcf) throws SemanticExce if (!ctx.getExplainLogical()) { TaskCompiler compiler = TaskCompilerFactory.getCompiler(conf, pCtx); compiler.init(queryState, console, db); - compiler.compile(pCtx, rootTasks, inputs, outputs); + compiler.compile(pCtx, rootTasks, inputs, outputs, queryState.getLineageState()); fetchTask = pCtx.getFetchTask(); } //find all Acid FileSinkOperatorS diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index 7b2937032ab8dd57f8923e0a9e7aab4a92de55ee..aec13ff6c251d43d6d5c3576762cb4fb54c58979 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -72,6 +72,7 @@ import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.StatsWork; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.session.LineageState; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.serde.serdeConstants; @@ -106,7 +107,8 @@ public void init(QueryState queryState, LogHelper console, Hive db) { @SuppressWarnings({"nls", "unchecked"}) public void compile(final ParseContext pCtx, final List> rootTasks, - final HashSet inputs, final HashSet outputs) throws SemanticException { + final HashSet inputs, final HashSet outputs, + LineageState lineageState) throws SemanticException { Context ctx = pCtx.getContext(); GlobalLimitCtx globalLimitCtx = pCtx.getGlobalLimitCtx(); @@ -218,12 +220,13 @@ public void compile(final ParseContext pCtx, final List tsk = TaskFactory - .get(new MoveWork(null, null, ltd, null, false, SessionState.get().getLineageState()), + .get(new MoveWork(null, null, ltd, null, false), conf); mvTask.add(tsk); // Check to see if we are stale'ing any indexes and auto-update them if we want if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEINDEXAUTOUPDATE)) { - IndexUpdater indexUpdater = new IndexUpdater(loadTableWork, inputs, conf); + IndexUpdater indexUpdater = new IndexUpdater(loadTableWork, inputs, conf, + queryState.getLineageState()); try { List> indexUpdateTasks = indexUpdater .generateUpdateTasks(); @@ -248,12 +251,12 @@ public void compile(final ParseContext pCtx, final List rootTask : rootTasks) { @@ -602,7 +605,8 @@ protected abstract void optimizeTaskPlan(List> root * Called to generate the taks tree from the parse context/operator tree */ protected abstract void generateTaskTree(List> rootTasks, ParseContext pCtx, - List> mvTask, Set inputs, Set outputs) throws SemanticException; + List> mvTask, Set inputs, Set outputs, + LineageState lineageState) throws SemanticException; /** * Create a clone of the parse context diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index be33f380030ea8b416a4549c3947d767bba66356..e1581653f1349bbf7ae0012e7b50c3e16c6ce7c1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -105,6 +105,7 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.Statistics; import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.hive.ql.session.LineageState; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.ql.stats.StatsUtils; @@ -491,7 +492,8 @@ private void runDynamicPartitionPruning(OptimizeTezProcContext procCtx, Set> rootTasks, ParseContext pCtx, - List> mvTask, Set inputs, Set outputs) + List> mvTask, Set inputs, Set outputs, + LineageState lineageState) throws SemanticException { PerfLogger perfLogger = SessionState.getPerfLogger(); @@ -501,7 +503,7 @@ protected void generateTaskTree(List> rootTasks, Pa GenTezWork genTezWork = new GenTezWork(utils); GenTezProcContext procCtx = new GenTezProcContext( - conf, tempParseContext, mvTask, rootTasks, inputs, outputs); + conf, tempParseContext, mvTask, rootTasks, inputs, outputs, queryState.getLineageState()); // create a walker which walks the tree in a DFS manner while maintaining // the operator stack. diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java index 4d2bcfa285dc08811106f3c234346efff22afd99..f999187fd92353e9b3d1d9bbfefa1221dc9ab67d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty; +import org.apache.hadoop.hive.ql.session.LineageState; import java.io.Serializable; import java.util.HashMap; @@ -148,6 +149,9 @@ // The set of TableScanOperators for pruning OP trees public final Set> clonedPruningTableScanSet; + // to track query activity + public LineageState lineageState; + @SuppressWarnings("unchecked") public GenSparkProcContext(HiveConf conf, @@ -156,7 +160,8 @@ public GenSparkProcContext(HiveConf conf, List> rootTasks, Set inputs, Set outputs, - Map topOps) { + Map topOps, + LineageState lineageState) { this.conf = conf; this.parseContext = parseContext; this.moveTask = moveTask; @@ -164,6 +169,7 @@ public GenSparkProcContext(HiveConf conf, this.inputs = inputs; this.outputs = outputs; this.topOps = topOps; + this.lineageState = lineageState; this.currentTask = SparkUtilities.createSparkTask(conf); this.rootTasks.add(currentTask); this.leafOpToFollowingWorkInfo = diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java index 604c8aee151a45cf942852a3644b5e79f779f353..5e81bfe04c875f2e7fe5eb46ef4d312cf13cc690 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java @@ -385,7 +385,7 @@ public void processFileSink(GenSparkProcContext context, FileSinkOperator fileSi LOG.info("using CombineHiveInputformat for the merge job"); GenMapRedUtils.createMRWorkForMergingFiles(fileSink, finalName, context.dependencyTask, context.moveTask, - hconf, context.currentTask); + hconf, context.currentTask, context.lineageState); } FetchTask fetchTask = parseContext.getFetchTask(); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java index 965044d9253585eeaeef50d7fe4fc4d818042df8..37c6e6ae4fe5c212f27895b88789eba8f988e721 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java @@ -93,6 +93,7 @@ import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.SparkWork; +import org.apache.hadoop.hive.ql.session.LineageState; import org.apache.hadoop.hive.ql.session.SessionState; /** @@ -336,7 +337,8 @@ private void runJoinOptimizations(OptimizeSparkProcContext procCtx) throws Seman */ @Override protected void generateTaskTree(List> rootTasks, ParseContext pCtx, - List> mvTask, Set inputs, Set outputs) + List> mvTask, Set inputs, Set outputs, + LineageState lineageState) throws SemanticException { PERF_LOGGER.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_GENERATE_TASK_TREE); @@ -345,7 +347,8 @@ protected void generateTaskTree(List> rootTasks, Pa ParseContext tempParseContext = getParseContext(pCtx, rootTasks); GenSparkProcContext procCtx = new GenSparkProcContext( - conf, tempParseContext, mvTask, rootTasks, inputs, outputs, pCtx.getTopOps()); + conf, tempParseContext, mvTask, rootTasks, inputs, outputs, pCtx.getTopOps(), + queryState.getLineageState()); // -------------------------------- First Pass ---------------------------------- // // Identify SparkPartitionPruningSinkOperators, and break OP tree if necessary diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java index 28a33740b30b7be0057ce91de55a0407dd2f2cbf..49fe54015d8c7c15cfd1d2ba1409399c2a7867f7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java @@ -27,7 +27,6 @@ import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.plan.Explain.Level; -import org.apache.hadoop.hive.ql.session.LineageState; /** * MoveWork. @@ -39,13 +38,6 @@ private LoadTableDesc loadTableWork; private LoadFileDesc loadFileWork; private LoadMultiFilesDesc loadMultiFilesWork; - /* - these are sessionState objects that are copied over to work to allow for parallel execution. - based on the current use case the methods are selectively synchronized, which might need to be - taken care when using other methods. - */ - private final LineageState sessionStateLineageState; - private boolean checkFileFormat; private boolean srcLocal; @@ -65,21 +57,18 @@ private boolean isNoop; public MoveWork() { - sessionStateLineageState = null; } - private MoveWork(HashSet inputs, HashSet outputs, - LineageState lineageState) { + private MoveWork(HashSet inputs, HashSet outputs) { this.inputs = inputs; this.outputs = outputs; - sessionStateLineageState = lineageState; } public MoveWork(HashSet inputs, HashSet outputs, final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork, - boolean checkFileFormat, boolean srcLocal, LineageState lineageState) { - this(inputs, outputs, lineageState); + boolean checkFileFormat, boolean srcLocal) { + this(inputs, outputs); if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("Creating MoveWork " + System.identityHashCode(this) + " with " + loadTableWork + "; " + loadFileWork); @@ -92,8 +81,8 @@ public MoveWork(HashSet inputs, HashSet outputs, public MoveWork(HashSet inputs, HashSet outputs, final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork, - boolean checkFileFormat, LineageState lineageState) { - this(inputs, outputs, loadTableWork, loadFileWork, checkFileFormat, false, lineageState); + boolean checkFileFormat) { + this(inputs, outputs, loadTableWork, loadFileWork, checkFileFormat, false); } public MoveWork(final MoveWork o) { @@ -104,7 +93,6 @@ public MoveWork(final MoveWork o) { srcLocal = o.isSrcLocal(); inputs = o.getInputs(); outputs = o.getOutputs(); - sessionStateLineageState = o.sessionStateLineageState; } @Explain(displayName = "tables", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) @@ -166,7 +154,4 @@ public void setSrcLocal(boolean srcLocal) { this.srcLocal = srcLocal; } - public LineageState getLineagState() { - return sessionStateLineageState; - } } diff --git ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java index 056d6141d6239816699ed5f730cbd14e48d8d9bb..82eeb35fb15153bf8d1dea4a39068d528a05f4f3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java +++ ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java @@ -60,7 +60,7 @@ /** * Constructor. */ - LineageState() { + public LineageState() { dirToFop = new HashMap<>(); linfo = new LineageInfo(); index = new Index(); diff --git ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index bb6ddc6fa4667ac0e30994d0f9ee8b969542383c..d03f5e3144be62b81426f2856d692de20a735f90 100644 --- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -232,11 +232,6 @@ */ private Map hdfsEncryptionShims = Maps.newHashMap(); - /** - * Lineage state. - */ - LineageState ls; - private final String userName; /** @@ -294,15 +289,6 @@ private List cleanupItems = new LinkedList(); - /** - * Get the lineage state stored in this session. - * - * @return LineageState - */ - public LineageState getLineageState() { - return ls; - } - public HiveConf getConf() { return sessionConf; } @@ -387,7 +373,6 @@ public SessionState(HiveConf conf, String userName) { LOG.debug("SessionState user: " + userName); } isSilent = conf.getBoolVar(HiveConf.ConfVars.HIVESESSIONSILENT); - ls = new LineageState(); resourceMaps = new ResourceMaps(); // Must be deterministic order map for consistent q-test output across Java versions overriddenConfigurations = new LinkedHashMap(); diff --git ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java index 340689255c738ea497bcd269463b8b8bc38cf34c..3c007a762cffcfdecc88ec8c4a4f4c9d2db88731 100644 --- ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java +++ ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java @@ -35,6 +35,7 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.session.LineageState; import org.apache.hadoop.hive.ql.session.SessionState; import org.junit.Before; import org.junit.BeforeClass; @@ -135,9 +136,10 @@ public void testMovePathsThatCanBeMerged() { public void testMergePathWithInvalidMoveWorkThrowsException() { final Path condInputPath = new Path("s3a://bucket/scratch/-ext-10000"); final MoveWork mockWork = mock(MoveWork.class); + final LineageState lineageState = new LineageState(); when(mockWork.getLoadMultiFilesWork()).thenReturn(new LoadMultiFilesDesc()); - GenMapRedUtils.mergeMovePaths(condInputPath, mockWork); + GenMapRedUtils.mergeMovePaths(condInputPath, mockWork, lineageState); } @Test @@ -146,12 +148,13 @@ public void testMergePathValidMoveWorkReturnsNewMoveWork() { final Path condOutputPath = new Path("s3a://bucket/scratch/-ext-10002"); final Path targetMoveWorkPath = new Path("s3a://bucket/scratch/-ext-10003"); final MoveWork mockWork = mock(MoveWork.class); + final LineageState lineageState = new LineageState(); MoveWork newWork; // test using loadFileWork when(mockWork.getLoadFileWork()).thenReturn(new LoadFileDesc( condOutputPath, targetMoveWorkPath, false, "", "", false)); - newWork = GenMapRedUtils.mergeMovePaths(condInputPath, mockWork); + newWork = GenMapRedUtils.mergeMovePaths(condInputPath, mockWork, lineageState); assertNotNull(newWork); assertNotEquals(newWork, mockWork); assertEquals(condInputPath, newWork.getLoadFileWork().getSourcePath()); @@ -162,7 +165,7 @@ public void testMergePathValidMoveWorkReturnsNewMoveWork() { reset(mockWork); when(mockWork.getLoadTableWork()).thenReturn(new LoadTableDesc( condOutputPath, tableDesc, null, null)); - newWork = GenMapRedUtils.mergeMovePaths(condInputPath, mockWork); + newWork = GenMapRedUtils.mergeMovePaths(condInputPath, mockWork, lineageState); assertNotNull(newWork); assertNotEquals(newWork, mockWork); assertEquals(condInputPath, newWork.getLoadTableWork().getSourcePath()); @@ -181,7 +184,8 @@ public void testConditionalMoveTaskIsOptimized() throws SemanticException { Task moveTask = createMoveTask(finalDirName, tableLocation); List> moveTaskList = Collections.singletonList(moveTask); - GenMapRedUtils.createMRWorkForMergingFiles(fileSinkOperator, finalDirName, null, moveTaskList, hiveConf, dummyMRTask); + GenMapRedUtils.createMRWorkForMergingFiles(fileSinkOperator, finalDirName, null, + moveTaskList, hiveConf, dummyMRTask, new LineageState()); ConditionalTask conditionalTask = (ConditionalTask)dummyMRTask.getChildTasks().get(0); Task moveOnlyTask = conditionalTask.getListTasks().get(0); Task mergeOnlyTask = conditionalTask.getListTasks().get(1); @@ -221,7 +225,8 @@ public void testConditionalMoveTaskIsNotOptimized() throws SemanticException { Task moveTask = createMoveTask(finalDirName, tableLocation); List> moveTaskList = Collections.singletonList(moveTask); - GenMapRedUtils.createMRWorkForMergingFiles(fileSinkOperator, finalDirName, null, moveTaskList, hiveConf, dummyMRTask); + GenMapRedUtils.createMRWorkForMergingFiles(fileSinkOperator, finalDirName, null, + moveTaskList, hiveConf, dummyMRTask, new LineageState()); ConditionalTask conditionalTask = (ConditionalTask)dummyMRTask.getChildTasks().get(0); Task moveOnlyTask = conditionalTask.getListTasks().get(0); Task mergeOnlyTask = conditionalTask.getListTasks().get(1); @@ -255,7 +260,8 @@ public void testConditionalMoveOnHdfsIsNotOptimized() throws SemanticException { Task moveTask = createMoveTask(finalDirName, tableLocation); List> moveTaskList = Collections.singletonList(moveTask); - GenMapRedUtils.createMRWorkForMergingFiles(fileSinkOperator, finalDirName, null, moveTaskList, hiveConf, dummyMRTask); + GenMapRedUtils.createMRWorkForMergingFiles(fileSinkOperator, finalDirName, null, + moveTaskList, hiveConf, dummyMRTask, new LineageState()); ConditionalTask conditionalTask = (ConditionalTask)dummyMRTask.getChildTasks().get(0); Task moveOnlyTask = conditionalTask.getListTasks().get(0); Task mergeOnlyTask = conditionalTask.getListTasks().get(1); diff --git ql/src/test/org/apache/hadoop/hive/ql/parse/TestGenTezWork.java ql/src/test/org/apache/hadoop/hive/ql/parse/TestGenTezWork.java index 2c28c398ca49ba661df460c9f3e6d578c785d3ce..73e536465ea31e3c3dde09dca5287f025442bdfe 100644 --- ql/src/test/org/apache/hadoop/hive/ql/parse/TestGenTezWork.java +++ ql/src/test/org/apache/hadoop/hive/ql/parse/TestGenTezWork.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.hive.ql.session.LineageState; import org.apache.hadoop.hive.ql.session.SessionState; import org.junit.After; import org.junit.Before; @@ -82,7 +83,8 @@ public void setUp() throws Exception { Collections.EMPTY_LIST, new ArrayList>(), Collections.EMPTY_SET, - Collections.EMPTY_SET); + Collections.EMPTY_SET, + new LineageState()); proc = new GenTezWork(new GenTezUtils() { @Override