commit b3fd23efb65eaf9c711386e1b131a943178832ef Author: Andrew Sherman Date: Tue Nov 14 17:12:53 2017 -0800 HIVE-18054: Make Lineage work with concurrent queries on a Session A Hive Session can contain multiple concurrent sql Operations. Lineage is currently tracked in SessionState and is cleared when a query completes. This results in Lineage for other running queries being lost. To fix this, move LineageState from SessionState to QueryState. In MoveTask/MoveWork use the LineageState from the MoveTask's QueryState rather than trying to use it from MoveWork. Add a test which runs multiple jdbc queries in a thread pool against the same connection and show that Vertices are not lost from Lineage. As part of this test, add ReadableHook, an ExecuteWithHookContext that stores HookContexts in memory and makes them available for reading. Make LineageLogger methods static so they can be used elsewhere. Pass LineageState through to sub-driver diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java index f5ed735c1ec14dfee338e56020fa2629b168389d..31d99052511c6f485d9f3d21be9727bbfaa4a8b9 100644 --- itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java @@ -40,6 +40,7 @@ import java.sql.Types; import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -47,6 +48,7 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.SynchronousQueue; @@ -64,8 +66,13 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.ObjectStore; +import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.UDF; +import org.apache.hadoop.hive.ql.hooks.HookContext; +import org.apache.hadoop.hive.ql.hooks.LineageLogger; +import org.apache.hadoop.hive.ql.hooks.ReadableHook; +import org.apache.hadoop.hive.ql.optimizer.lineage.LineageCtx; import org.apache.hive.common.util.ReflectionUtil; import org.apache.hive.jdbc.miniHS2.MiniHS2; import org.datanucleus.ClassLoaderResolver; @@ -76,6 +83,8 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TestJdbcWithMiniHS2 { private static MiniHS2 miniHS2 = null; @@ -204,6 +213,9 @@ private static void startMiniHS2(HiveConf conf, boolean httpMode) throws Excepti conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); conf.setBoolVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED, false); conf.setBoolVar(ConfVars.HIVESTATSCOLAUTOGATHER, false); + // store post-exec hooks calls so we can look at them later + conf.setVar(ConfVars.POSTEXECHOOKS, ReadableHook.class.getName() + "," + + LineageLogger.class.getName()); MiniHS2.Builder builder = new MiniHS2.Builder().withConf(conf).cleanupLocalDirOnStartup(false); if (httpMode) { builder = builder.withHTTPTransport(); @@ -1490,4 +1502,109 @@ public void testFetchSize() throws Exception { stmt.close(); fsConn.close(); } + + /** + * A test that checks that Lineage is correct when a multiple concurrent + * requests are make on a connection + */ + @Test + public void testConcurrentLineage() throws Exception { + // setup to run concurrent operations + Statement stmt = conTestDb.createStatement(); + setSerializeInTasksInConf(stmt); + stmt.execute("drop table if exists testConcurrentLineage1"); + stmt.execute("drop table if exists testConcurrentLineage2"); + stmt.execute("create table testConcurrentLineage1 (col1 int)"); + stmt.execute("create table testConcurrentLineage2 (col2 int)"); + + // clear vertices list + ReadableHook.clear(); + + // run 5 sql inserts concurrently + int numThreads = 5; // set to 1 for single threading + int concurrentCalls = 5; + ExecutorService pool = Executors.newFixedThreadPool(numThreads); + try { + List tasks = new ArrayList<>(); + for (int i = 0; i < concurrentCalls; i++) { + InsertCallable runner = new InsertCallable(conTestDb); + tasks.add(runner); + } + List> futures = pool.invokeAll(tasks); + for (Future future : futures) { + future.get(20, TimeUnit.SECONDS); + } + // check to see that the vertices are correct + checkVertices(); + } finally { + // clean up + stmt.execute("drop table testConcurrentLineage1"); + stmt.execute("drop table testConcurrentLineage2"); + stmt.close(); + pool.shutdownNow(); + } + } + + /** + * A Callable that does 2 inserts + */ + private class InsertCallable implements Callable { + private Connection connection; + + InsertCallable(Connection conn) { + this.connection = conn; + } + + @Override public Void call() throws Exception { + doLineageInserts(connection); + return null; + } + + private void doLineageInserts(Connection connection) throws SQLException { + Statement stmt = connection.createStatement(); + stmt.execute("insert into testConcurrentLineage1 values (1)"); + stmt.execute("insert into testConcurrentLineage2 values (2)"); + } + } + /** + * check to see that the vertices derived from the HookContexts are correct + */ + private void checkVertices() { + List> verticesLists = getVerticesFromHooks(); + + assertEquals("5 runs of 2 inserts makes 10", 10, verticesLists.size()); + for (Set vertices : verticesLists) { + assertFalse("Each insert affects a column so should be some vertices", + vertices.isEmpty()); + assertEquals("Each insert affects one column so should be one vertex", + 1, vertices.size()); + Iterator iterator = vertices.iterator(); + assertTrue(iterator.hasNext()); + LineageLogger.Vertex vertex = iterator.next(); + assertEquals(0, vertex.getId()); + assertEquals(LineageLogger.Vertex.Type.COLUMN, vertex.getType()); + String label = vertex.getLabel(); + System.out.println("vertex.getLabel() = " + label); + assertTrue("did not see one of the 2 expected column names", + label.equals("testjdbcminihs2.testconcurrentlineage1.col1") || + label.equals("testjdbcminihs2.testconcurrentlineage2.col2")); + } + } + + /** + * Use the logic in LineageLogger to get vertices from Hook Contexts + */ + private List> getVerticesFromHooks() { + List> verticesLists = new ArrayList<>(); + List hookList = ReadableHook.getHookList(); + for (HookContext hookContext : hookList) { + QueryPlan plan = hookContext.getQueryPlan(); + LineageCtx.Index index = hookContext.getIndex(); + assertNotNull(index); + List edges = LineageLogger.getEdges(plan, index); + Set vertices = LineageLogger.getVertices(edges); + verticesLists.add(vertices); + } + return verticesLists; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index af9f193dc94e2e05caa88d965a34f4483c9d7069..3480efdc575c09007f5d4e3a35a3b2577636f0cd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -1335,9 +1335,6 @@ public void releaseLocksAndCommitOrRollback(boolean commit, HiveTxnManager txnMa private void releaseResources() { releasePlan(); releaseDriverContext(); - if (SessionState.get() != null) { - SessionState.get().getLineageState().clear(); - } } @Override @@ -2413,9 +2410,6 @@ private int closeInProcess(boolean destroyed) { releaseFetchTask(); releaseResStream(); releaseContext(); - if (SessionState.get() != null) { - SessionState.get().getLineageState().clear(); - } if(destroyed) { if (!hiveLocks.isEmpty()) { try { @@ -2450,9 +2444,6 @@ public int close() { lDrvState.stateLock.unlock(); LockedDriverState.removeLockedDriverState(); } - if (SessionState.get() != null) { - SessionState.get().getLineageState().clear(); - } return 0; } @@ -2515,4 +2506,8 @@ public void resetQueryState() { releaseResources(); this.queryState = getNewQueryState(queryState.getConf()); } + + public QueryState getQueryState() { + return queryState; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/QueryState.java ql/src/java/org/apache/hadoop/hive/ql/QueryState.java index 7d5aa8b179e536e25c41a8946e667f8dd5669e0f..921efcd2ed6282b4134bacf7d60d3f1e79052749 100644 --- ql/src/java/org/apache/hadoop/hive/ql/QueryState.java +++ ql/src/java/org/apache/hadoop/hive/ql/QueryState.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.plan.HiveOperation; +import org.apache.hadoop.hive.ql.session.LineageState; /** * The class to store query level info such as queryId. Multiple queries can run @@ -38,6 +39,10 @@ * type of the command. */ private HiveOperation commandType; + /** + * Per-query Lineage state to track what happens in the query + */ + LineageState lineageState = new LineageState(); /** * transaction manager used in the query. @@ -78,6 +83,14 @@ public void setCommandType(HiveOperation commandType) { public HiveConf getConf() { return queryConf; } + + public LineageState getLineageState() { + return lineageState; + } + + public void setLineageState(LineageState lineageState) { + this.lineageState = lineageState; + } public HiveTxnManager getTxnManager() { return txnManager; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 0a34633fa44f6913c2af3703ae3dfeff5419170a..96ebf0ae48a7f8069f3651a5f01fc0d431f1e936 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -23,12 +23,9 @@ import java.util.concurrent.ExecutionException; -import com.google.common.util.concurrent.FutureCallback; - import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import java.io.BufferedWriter; import java.io.DataOutputStream; @@ -4386,7 +4383,7 @@ private static void ensureDelete(FileSystem fs, Path path, String what) throws I } } // Don't set inputs and outputs - the locks have already been taken so it's pointless. - MoveWork mw = new MoveWork(null, null, null, null, false, SessionState.get().getLineageState()); + MoveWork mw = new MoveWork(null, null, null, null, false); mw.setMultiFilesDesc(new LoadMultiFilesDesc(srcs, tgts, true, null, null)); ImportCommitWork icw = new ImportCommitWork(tbl.getDbName(), tbl.getTableName(), mmWriteId, stmtId); Task mv = TaskFactory.get(mw, conf), ic = TaskFactory.get(icw, conf); @@ -4817,7 +4814,7 @@ private int createTable(Hive db, CreateTableDesc crtTbl) throws HiveException { Table createdTable = db.getTable(tbl.getDbName(), tbl.getTableName()); if (crtTbl.isCTAS()) { DataContainer dc = new DataContainer(createdTable.getTTable()); - SessionState.get().getLineageState().setLineage( + queryState.getLineageState().setLineage( createdTable.getPath(), dc, createdTable.getCols() ); } @@ -5107,7 +5104,7 @@ private int createView(Hive db, CreateViewDesc crtView) throws HiveException { //set lineage info DataContainer dc = new DataContainer(tbl.getTTable()); - SessionState.get().getLineageState().setLineage(new Path(crtView.getViewName()), dc, tbl.getCols()); + queryState.getLineageState().setLineage(new Path(crtView.getViewName()), dc, tbl.getCols()); } return 0; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index e2f8c1f8012ad25114e279747e821b291c7f4ca6..59db86894cd89e851b314729c27c868dd508c2ce 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -398,7 +398,7 @@ public int execute(DriverContext driverContext) { dc = handleStaticParts(db, table, tbd, ti); } } - if (work.getLineagState() != null && dc != null) { + if (dc != null) { // If we are doing an update or a delete the number of columns in the table will not // match the number of columns in the file sink. For update there will be one too many // (because of the ROW__ID), and in the case of the delete there will be just the @@ -416,7 +416,7 @@ public int execute(DriverContext driverContext) { tableCols = table.getCols(); break; } - work.getLineagState().setLineage(tbd.getSourcePath(), dc, tableCols); + queryState.getLineageState().setLineage(tbd.getSourcePath(), dc, tableCols); } releaseLocks(tbd); } @@ -552,10 +552,9 @@ private DataContainer handleDynParts(Hive db, Table table, LoadTableDesc tbd, dc = new DataContainer(table.getTTable(), partn.getTPartition()); // Don't set lineage on delete as we don't have all the columns - if (work.getLineagState() != null && - work.getLoadTableWork().getWriteType() != AcidUtils.Operation.DELETE && + if (work.getLoadTableWork().getWriteType() != AcidUtils.Operation.DELETE && work.getLoadTableWork().getWriteType() != AcidUtils.Operation.UPDATE) { - work.getLineagState().setLineage(tbd.getSourcePath(), dc, + queryState.getLineageState().setLineage(tbd.getSourcePath(), dc, table.getCols()); } LOG.info("Loading partition " + entry.getKey()); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java index 1f0487f4f72ab18bcf876f45ad5758d83a7f001b..d75fcf76401e87bf0ae18a8dfdd50bc3bc9d0f14 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java @@ -649,4 +649,8 @@ public boolean canExecuteInParallel(){ return true; } + public QueryState getQueryState() { + return queryState; + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index 262225fc202d4627652acfd77350e44b0284b3da..1a542e38a3e91da2d9361903e671afcd91886cea 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -245,8 +245,7 @@ private void addPartition(boolean hasMorePartitions, AddPartitionDesc addPartiti SessionState.get().getTxnMgr().getCurrentTxnId() ); loadTableWork.setInheritTableSpecs(false); - MoveWork work = new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false, - context.sessionStateLineageState); + MoveWork work = new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false); return TaskFactory.get(work, context.hiveConf); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index bb1f4e50509e57a9d0b9e6793c1fc08baa4d2981..0c0d60e3929b4d2ca534d5448a714d6ffa3eee0b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -232,8 +232,7 @@ private String location(ImportTableDesc tblDesc, Database parentDb) SessionState.get().getTxnMgr().getCurrentTxnId() ); MoveWork moveWork = - new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false, - context.sessionStateLineageState); + new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false); Task loadTableTask = TaskFactory.get(moveWork, context.hiveConf); copyTask.addDependentTask(loadTableTask); return copyTask; diff --git ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java index 7b617309f6b0d8a7ce0dea80ab1f790c2651b147..fd6cf3a871d16565176772cca63eb8e1197426b9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java @@ -88,12 +88,9 @@ public HookContext(QueryPlan queryPlan, QueryState queryState, inputs = queryPlan.getInputs(); outputs = queryPlan.getOutputs(); ugi = Utils.getUGI(); - linfo= null; depMap = null; - if(SessionState.get() != null){ - linfo = SessionState.get().getLineageState().getLineageInfo(); - depMap = SessionState.get().getLineageState().getIndex(); - } + linfo = queryState.getLineageState().getLineageInfo(); + depMap = queryState.getLineageState().getIndex(); this.userName = userName; this.ipAddress = ipAddress; this.hiveInstanceAddress = hiveInstanceAddress; diff --git ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java index 2f764f8a29a9d41a7db013a949ffe3a8a9417d32..a725b68312958bc584c393f4259f0b49d9f0b7f6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java @@ -74,7 +74,7 @@ private static final String FORMAT_VERSION = "1.0"; - final static class Edge { + public final static class Edge { public static enum Type { PROJECTION, PREDICATE } @@ -92,7 +92,7 @@ } } - final static class Vertex { + public final static class Vertex { public static enum Type { COLUMN, TABLE } @@ -125,6 +125,18 @@ public boolean equals(Object obj) { Vertex vertex = (Vertex) obj; return label.equals(vertex.label) && type == vertex.type; } + + public Type getType() { + return type; + } + + public String getLabel() { + return label; + } + + public int getId() { + return id; + } } @Override @@ -203,7 +215,7 @@ public void run(HookContext hookContext) { /** * Logger an error to console if available. */ - private void log(String error) { + private static void log(String error) { LogHelper console = SessionState.getConsole(); if (console != null) { console.printError(error); @@ -214,7 +226,7 @@ private void log(String error) { * Based on the final select operator, find out all the target columns. * For each target column, find out its sources based on the dependency index. */ - private List getEdges(QueryPlan plan, Index index) { + public static List getEdges(QueryPlan plan, Index index) { LinkedHashMap> finalSelOps = index.getFinalSelectOps(); Map vertexCache = new LinkedHashMap(); @@ -292,7 +304,7 @@ private void log(String error) { return edges; } - private void addEdge(Map vertexCache, List edges, + private static void addEdge(Map vertexCache, List edges, Set srcCols, Vertex target, String expr, Edge.Type type) { Set targets = new LinkedHashSet(); targets.add(target); @@ -304,7 +316,7 @@ private void addEdge(Map vertexCache, List edges, * If found, add the more targets to this edge's target vertex list. * Otherwise, create a new edge and add to edge list. */ - private void addEdge(Map vertexCache, List edges, + private static void addEdge(Map vertexCache, List edges, Set srcCols, Set targets, String expr, Edge.Type type) { Set sources = createSourceVertices(vertexCache, srcCols); Edge edge = findSimilarEdgeBySources(edges, sources, expr, type); @@ -319,7 +331,7 @@ private void addEdge(Map vertexCache, List edges, * Convert a list of columns to a set of vertices. * Use cached vertices if possible. */ - private Set createSourceVertices( + private static Set createSourceVertices( Map vertexCache, Collection baseCols) { Set sources = new LinkedHashSet(); if (baseCols != null && !baseCols.isEmpty()) { @@ -346,7 +358,7 @@ private void addEdge(Map vertexCache, List edges, /** * Find a vertex from a cache, or create one if not. */ - private Vertex getOrCreateVertex( + private static Vertex getOrCreateVertex( Map vertices, String label, Vertex.Type type) { Vertex vertex = vertices.get(label); if (vertex == null) { @@ -359,7 +371,7 @@ private Vertex getOrCreateVertex( /** * Find an edge that has the same type, expression, and sources. */ - private Edge findSimilarEdgeBySources( + private static Edge findSimilarEdgeBySources( List edges, Set sources, String expr, Edge.Type type) { for (Edge edge: edges) { if (edge.type == type && StringUtils.equals(edge.expr, expr) @@ -373,7 +385,7 @@ private Edge findSimilarEdgeBySources( /** * Generate normalized name for a given target column. */ - private String getTargetFieldName(int fieldIndex, + private static String getTargetFieldName(int fieldIndex, String destTableName, List colNames, List fieldSchemas) { String fieldName = fieldSchemas.get(fieldIndex).getName(); String[] parts = fieldName.split("\\."); @@ -394,7 +406,7 @@ private String getTargetFieldName(int fieldIndex, * Get all the vertices of all edges. Targets at first, * then sources. Assign id to each vertex. */ - private Set getVertices(List edges) { + public static Set getVertices(List edges) { Set vertices = new LinkedHashSet(); for (Edge edge: edges) { vertices.addAll(edge.targets); diff --git ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecutePrinter.java ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecutePrinter.java index 3e74396f385751054468f21d0d061f721fda09f3..3c0778518cd40a6a05c68fc84419967545a2622c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecutePrinter.java +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecutePrinter.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.tools.HiveSchemaHelper; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.hooks.HookContext.HookType; import org.apache.hadoop.hive.ql.hooks.LineageInfo.BaseColumnInfo; diff --git ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadableHook.java ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadableHook.java new file mode 100644 index 0000000000000000000000000000000000000000..c680c4fc8b9e88115ca44381d1454fde2d2de067 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadableHook.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.hooks; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * An ExecuteWithHookContext that stores HookContexts in memory and makes them available for reading + */ +public class ReadableHook implements ExecuteWithHookContext { + + private static List hookList = Collections.synchronizedList(new ArrayList<>()); + + @Override + public void run(HookContext hookContext) throws Exception { + hookList.add(hookContext); + } + + /** + * @return the stored HookContexts. + */ + public static List getHookList() { + return hookList; + } + + /** + * Clear the stored HookContexts. + */ + public static void clear() { + hookList.clear(); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java index 68709b4d3baf15d78e60e948ccdef3df84f28cec..f0b9c137514f26cd49c369f54b7b2cd068bf7ab6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java @@ -37,7 +37,7 @@ import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.optimizer.IndexUtils; import org.apache.hadoop.hive.ql.plan.PartitionDesc; - +import org.apache.hadoop.hive.ql.session.LineageState; /** * Index handler for indexes that have aggregate functions on indexed columns. @@ -86,11 +86,10 @@ private void createAggregationFunction(List indexTblCols, String pr } @Override - protected Task getIndexBuilderMapRedTask(Set inputs, - Set outputs, - Index index, boolean partitioned, - PartitionDesc indexTblPartDesc, String indexTableName, - PartitionDesc baseTablePartDesc, String baseTableName, String dbName) { + protected Task getIndexBuilderMapRedTask(Set inputs, Set outputs, + Index index, boolean partitioned, PartitionDesc indexTblPartDesc, String indexTableName, + PartitionDesc baseTablePartDesc, String baseTableName, String dbName, + LineageState lineageState) { List indexField = index.getSd().getCols(); String indexCols = HiveUtils.getUnparsedColumnNamesFromFieldSchema(indexField); @@ -152,7 +151,7 @@ private void createAggregationFunction(List indexTblCols, String pr builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGEMAPREDFILES, false); builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGETEZFILES, false); Task rootTask = IndexUtils.createRootTask(builderConf, inputs, outputs, - command, (LinkedHashMap) partSpec, indexTableName, dbName); + command, (LinkedHashMap) partSpec, indexTableName, dbName, lineageState); return rootTask; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexHandler.java ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexHandler.java index 1e577da82343a1b7361467fb662661f9c6642ec0..5563c54c1cae457a1073ccce59f9212b3cabdd1e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexHandler.java @@ -29,8 +29,10 @@ import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.session.LineageState; /** * HiveIndexHandler defines a pluggable interface for adding new index handlers @@ -99,16 +101,14 @@ void analyzeIndexDefinition( * outputs for hooks, supplemental outputs going * along with the return value * + * @param lineageState * @return list of tasks to be executed in parallel for building the index * * @throws HiveException if plan generation fails */ - List> generateIndexBuildTaskList( - org.apache.hadoop.hive.ql.metadata.Table baseTbl, - org.apache.hadoop.hive.metastore.api.Index index, - List indexTblPartitions, List baseTblPartitions, - org.apache.hadoop.hive.ql.metadata.Table indexTbl, - Set inputs, Set outputs) + List> generateIndexBuildTaskList(Table baseTbl, + Index index, List indexTblPartitions, List baseTblPartitions, Table indexTbl, + Set inputs, Set outputs, LineageState lineageState) throws HiveException; /** diff --git ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java index 29886ae7f97f8dae7116f4fc9a2417ab8f9dac0a..bbdac31609ccacea65ca1bfa44ff7d5e7db178f8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java @@ -35,9 +35,11 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveUtils; import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.session.LineageState; /** * Index handler for indexes that use tables to store indexes. @@ -46,12 +48,9 @@ protected Configuration configuration; @Override - public List> generateIndexBuildTaskList( - org.apache.hadoop.hive.ql.metadata.Table baseTbl, - org.apache.hadoop.hive.metastore.api.Index index, - List indexTblPartitions, List baseTblPartitions, - org.apache.hadoop.hive.ql.metadata.Table indexTbl, - Set inputs, Set outputs) throws HiveException { + public List> generateIndexBuildTaskList(Table baseTbl, + Index index, List indexTblPartitions, List baseTblPartitions, Table indexTbl, + Set inputs, Set outputs, LineageState lineageState) throws HiveException { try { TableDesc desc = Utilities.getTableDesc(indexTbl); @@ -66,7 +65,7 @@ Task indexBuilder = getIndexBuilderMapRedTask(inputs, outputs, index, false, new PartitionDesc(desc, null), indexTbl.getTableName(), new PartitionDesc(Utilities.getTableDesc(baseTbl), null), - baseTbl.getTableName(), indexTbl.getDbName()); + baseTbl.getTableName(), indexTbl.getDbName(), lineageState); indexBuilderTasks.add(indexBuilder); } else { @@ -89,7 +88,8 @@ // for each partition, spawn a map reduce task. Task indexBuilder = getIndexBuilderMapRedTask(inputs, outputs, index, true, new PartitionDesc(indexPart), indexTbl.getTableName(), - new PartitionDesc(basePart), baseTbl.getTableName(), indexTbl.getDbName()); + new PartitionDesc(basePart), baseTbl.getTableName(), indexTbl.getDbName(), + lineageState); indexBuilderTasks.add(indexBuilder); } } @@ -100,17 +100,18 @@ } protected Task getIndexBuilderMapRedTask(Set inputs, Set outputs, - Index index, boolean partitioned, - PartitionDesc indexTblPartDesc, String indexTableName, - PartitionDesc baseTablePartDesc, String baseTableName, String dbName) throws HiveException { + Index index, boolean partitioned, PartitionDesc indexTblPartDesc, String indexTableName, + PartitionDesc baseTablePartDesc, String baseTableName, String dbName, + LineageState lineageState) throws HiveException { return getIndexBuilderMapRedTask(inputs, outputs, index.getSd().getCols(), - partitioned, indexTblPartDesc, indexTableName, baseTablePartDesc, baseTableName, dbName); + partitioned, indexTblPartDesc, indexTableName, baseTablePartDesc, baseTableName, dbName, + lineageState); } protected Task getIndexBuilderMapRedTask(Set inputs, Set outputs, - List indexField, boolean partitioned, - PartitionDesc indexTblPartDesc, String indexTableName, - PartitionDesc baseTablePartDesc, String baseTableName, String dbName) throws HiveException { + List indexField, boolean partitioned, PartitionDesc indexTblPartDesc, String indexTableName, + PartitionDesc baseTablePartDesc, String baseTableName, String dbName, + LineageState lineageState) throws HiveException { return null; } diff --git ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java index 7b067a0d45e33bc3347c43b050af933c296a9227..69f8018b9681df2ecf323785d556530ee3b7cd64 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Set; +import org.apache.hadoop.hive.ql.session.LineageState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -115,7 +116,7 @@ public void generateIndexQuery(List indexes, ExprNodeDesc predicate, LOG.info("Generating tasks for re-entrant QL query: " + qlCommand.toString()); HiveConf queryConf = new HiveConf(pctx.getConf(), BitmapIndexHandler.class); HiveConf.setBoolVar(queryConf, HiveConf.ConfVars.COMPRESSRESULT, false); - Driver driver = new Driver(queryConf); + Driver driver = new Driver(queryConf); // FIXME sub-driver needs lineage? driver.compile(qlCommand.toString(), false); queryContext.setIndexIntermediateFile(tmpFile); @@ -220,9 +221,9 @@ public void analyzeIndexDefinition(Table baseTable, Index index, @Override protected Task getIndexBuilderMapRedTask(Set inputs, Set outputs, - List indexField, boolean partitioned, - PartitionDesc indexTblPartDesc, String indexTableName, - PartitionDesc baseTablePartDesc, String baseTableName, String dbName) throws HiveException { + List indexField, boolean partitioned, PartitionDesc indexTblPartDesc, String indexTableName, + PartitionDesc baseTablePartDesc, String baseTableName, String dbName, + LineageState lineageState) throws HiveException { HiveConf builderConf = new HiveConf(getConf(), BitmapIndexHandler.class); HiveConf.setBoolVar(builderConf, HiveConf.ConfVars.HIVEROWOFFSET, true); @@ -290,7 +291,7 @@ public void analyzeIndexDefinition(Table baseTable, Index index, } Task rootTask = IndexUtils.createRootTask(builderConf, inputs, outputs, - command, partSpec, indexTableName, dbName); + command, partSpec, indexTableName, dbName, lineageState); return rootTask; } diff --git ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java index 504b0623142a6fa6cdb45a26b49f146e12ec2d7a..2e57609f4a4782c9976b26924ac8e6d9feeeabc7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Set; +import org.apache.hadoop.hive.ql.session.LineageState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.JavaUtils; @@ -92,9 +93,9 @@ public void analyzeIndexDefinition(Table baseTable, Index index, @Override protected Task getIndexBuilderMapRedTask(Set inputs, Set outputs, - List indexField, boolean partitioned, - PartitionDesc indexTblPartDesc, String indexTableName, - PartitionDesc baseTablePartDesc, String baseTableName, String dbName) throws HiveException { + List indexField, boolean partitioned, PartitionDesc indexTblPartDesc, String indexTableName, + PartitionDesc baseTablePartDesc, String baseTableName, String dbName, + LineageState lineageState) throws HiveException { String indexCols = HiveUtils.getUnparsedColumnNamesFromFieldSchema(indexField); @@ -150,7 +151,7 @@ public void analyzeIndexDefinition(Table baseTable, Index index, builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGEMAPREDFILES, false); builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGETEZFILES, false); Task rootTask = IndexUtils.createRootTask(builderConf, inputs, outputs, - command, partSpec, indexTableName, dbName); + command, partSpec, indexTableName, dbName, lineageState); return rootTask; } @@ -189,7 +190,7 @@ public void generateIndexQuery(List indexes, ExprNodeDesc predicate, LOG.info("Generating tasks for re-entrant QL query: " + qlCommand.toString()); HiveConf queryConf = new HiveConf(pctx.getConf(), CompactIndexHandler.class); HiveConf.setBoolVar(queryConf, HiveConf.ConfVars.COMPRESSRESULT, false); - Driver driver = new Driver(queryConf); + Driver driver = new Driver(queryConf); // FIXME sub-driver needs lineage? driver.compile(qlCommand.toString(), false); if (pctx.getConf().getBoolVar(ConfVars.HIVE_INDEX_COMPACT_BINARY_SEARCH) && useSorted) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 67739a1db9fc52a67f4f5ea7dba80fe0e95750c8..b6a973e7565d60cf3408b9a2c29c06e40cb9b36e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -1370,8 +1370,7 @@ public static void createMRWorkForMergingFiles (FileSinkOperator fsInput, if (srcMmWriteId == null) { // Only create the movework for non-MM table. No action needed for a MM table. dummyMv = new MoveWork(null, null, null, - new LoadFileDesc(inputDirName, finalName, true, null, null, false), false, - SessionState.get().getLineageState()); + new LoadFileDesc(inputDirName, finalName, true, null, null, false), false); } // Use the original fsOp path here in case of MM - while the new FSOP merges files inside the // MM directory, the original MoveTask still commits based on the parent. Note that this path @@ -1711,15 +1710,16 @@ protected static boolean shouldMergeMovePaths(HiveConf conf, Path condInputPath, * * @param condInputPath A path that the ConditionalTask uses as input for its sub-tasks. * @param linkedMoveWork A MoveWork that the ConditionalTask uses to link to its sub-tasks. + * @param lineageState A LineageState used to track what changes. * @return A new MoveWork that has the Conditional input path as source and the linkedMoveWork as target. */ @VisibleForTesting - protected static MoveWork mergeMovePaths(Path condInputPath, MoveWork linkedMoveWork) { + protected static MoveWork mergeMovePaths(Path condInputPath, MoveWork linkedMoveWork, + LineageState lineageState) { MoveWork newWork = new MoveWork(linkedMoveWork); LoadFileDesc fileDesc = null; LoadTableDesc tableDesc = null; - LineageState lineageState = SessionState.get().getLineageState(); if (linkedMoveWork.getLoadFileWork() != null) { fileDesc = new LoadFileDesc(linkedMoveWork.getLoadFileWork()); fileDesc.setSourcePath(condInputPath); @@ -1776,7 +1776,8 @@ private static ConditionalTask createCondTask(HiveConf conf, Serializable workForMoveOnlyTask = moveWork; if (shouldMergeMovePaths) { - workForMoveOnlyTask = mergeMovePaths(condInputPath, moveTaskToLink.getWork()); + workForMoveOnlyTask = mergeMovePaths(condInputPath, moveTaskToLink.getWork(), + currTask.getQueryState().getLineageState()); } // There are 3 options for this ConditionalTask: diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java index 338c1856672f09bb7da35d2336ebb5b6f3fdc5a6..604aa379f5b325a785781cb665a8eede55fa0cb8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Set; +import org.apache.hadoop.hive.ql.session.LineageState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileStatus; @@ -214,17 +215,13 @@ private static boolean isIndexTableFresh(Hive hive, List indexes, Table s return hive.getIndexes(table.getTTable().getDbName(), table.getTTable().getTableName(), max); } - public static Task createRootTask( - HiveConf builderConf, - Set inputs, - Set outputs, - StringBuilder command, - LinkedHashMap partSpec, - String indexTableName, - String dbName){ + public static Task createRootTask(HiveConf builderConf, Set inputs, + Set outputs, StringBuilder command, LinkedHashMap partSpec, + String indexTableName, String dbName, LineageState lineageState){ // Don't try to index optimize the query to build the index HiveConf.setBoolVar(builderConf, HiveConf.ConfVars.HIVEOPTINDEXFILTER, false); Driver driver = new Driver(builderConf, SessionState.get().getUserName()); + driver.getQueryState().setLineageState(lineageState); // FIXME change Driver contructor?? driver.compile(command.toString(), false); Task rootTask = driver.getPlan().getRootTasks().get(0); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java index e6c07713b24df719315d804f006151106eea9aed..aeb12301113f49ba4b1381a6f954390b40b21bb9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java @@ -85,9 +85,10 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { return pctx; } } - - Index index = SessionState.get() != null ? - SessionState.get().getLineageState().getIndex() : new Index(); + Index index = pctx.getQueryState().getLineageState().getIndex(); + if (index == null) { + index = new Index(); + } long sTime = System.currentTimeMillis(); // Create the lineage context diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index 579f2df280fcb7e267bacab81253324698aaf34f..c8d10c1168c785d7609342d2b818de49fec4ef2d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -25,6 +25,7 @@ import org.antlr.runtime.tree.Tree; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.session.LineageState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileSystem; @@ -1320,7 +1321,7 @@ private void analyzeTruncateTable(ASTNode ast) throws SemanticException { ltd.setLbCtx(lbCtx); @SuppressWarnings("unchecked") Task moveTsk = TaskFactory.get(new MoveWork( - null, null, ltd, null, false, SessionState.get().getLineageState()), conf); + null, null, ltd, null, false), conf); truncateTask.addDependentTask(moveTsk); // Recalculate the HDFS stats if auto gather stats is set @@ -1536,9 +1537,10 @@ private void analyzeAlterIndexProps(ASTNode ast) baseTblPartitions = preparePartitions(baseTbl, partSpec, indexTbl, db, indexTblPartitions); } - + LineageState lineageState = queryState.getLineageState(); List> ret = handler.generateIndexBuildTaskList(baseTbl, - index, indexTblPartitions, baseTblPartitions, indexTbl, getInputs(), getOutputs()); + index, indexTblPartitions, baseTblPartitions, indexTbl, getInputs(), getOutputs(), + lineageState); return ret; } catch (Exception e) { throw new SemanticException(e); @@ -1981,7 +1983,7 @@ private void analyzeAlterTablePartMergeFiles(ASTNode ast, partSpec == null ? new HashMap<>() : partSpec, null); ltd.setLbCtx(lbCtx); Task moveTsk = TaskFactory.get( - new MoveWork(null, null, ltd, null, false, SessionState.get().getLineageState()), conf); + new MoveWork(null, null, ltd, null, false), conf); mergeTask.addDependentTask(moveTsk); if (conf.getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index cd75130d7c5f0b402f1b4331c57edc611eb4b2ed..a912debb674567671bd6158db9ce6eee0c003568 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -26,7 +26,6 @@ import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.Database; @@ -393,7 +392,8 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, replace ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, txnId); loadTableWork.setTxnId(txnId); loadTableWork.setStmtId(stmtId); - MoveWork mv = new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, null, false, SessionState.get().getLineageState()); + MoveWork mv = new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, + null, false); Task loadTableTask = TaskFactory.get(mv, x.getConf()); copyTask.addDependentTask(loadTableTask); x.getTasks().add(copyTask); @@ -494,7 +494,7 @@ private static boolean isAcid(Long txnId) { loadTableWork.setStmtId(stmtId); loadTableWork.setInheritTableSpecs(false); Task loadPartTask = TaskFactory.get(new MoveWork( - x.getInputs(), x.getOutputs(), loadTableWork, null, false, SessionState.get().getLineageState()), x.getConf()); + x.getInputs(), x.getOutputs(), loadTableWork, null, false), x.getConf()); copyTask.addDependentTask(loadPartTask); addPartTask.addDependentTask(loadPartTask); x.getTasks().add(copyTask); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java index 238fbd60572ee5f7f8f6c4d5b2abce8f66c7e495..8b208cd824bd46e37eb179715ebb650b59d7e881 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java @@ -298,7 +298,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { Task childTask = TaskFactory.get( new MoveWork(getInputs(), getOutputs(), loadTableWork, null, true, - isLocal, SessionState.get().getLineageState()), conf + isLocal), conf ); if (rTask != null) { rTask.addDependentTask(childTask); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 498b6741c3f40b92ce3fb218e91e7809a17383f0..80556aec88d4c8f15ad7d00724339276c4931b47 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -313,7 +313,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { if ((!evDump) && (tblNameOrPattern != null) && !(tblNameOrPattern.isEmpty())) { ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, tblNameOrPattern, - SessionState.get().getLineageState(), SessionState.get().getTxnMgr().getCurrentTxnId()); + queryState.getLineageState(), SessionState.get().getTxnMgr().getCurrentTxnId()); rootTasks.add(TaskFactory.get(replLoadWork, conf, true)); return; } @@ -344,7 +344,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { } ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, - SessionState.get().getLineageState(), SessionState.get().getTxnMgr().getCurrentTxnId()); + queryState.getLineageState(), SessionState.get().getTxnMgr().getCurrentTxnId()); rootTasks.add(TaskFactory.get(replLoadWork, conf, true)); // // for (FileStatus dir : dirsInLoadPath) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 1de3dd7230c5c3708fd7dd6baf9c6913e91cab3c..72b1046cfc73b3b31b704ade14341fd0585609f8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -7312,8 +7312,8 @@ private FileSinkDesc createFileSinkDesc(String dest, TableDesc table_desc, private void handleLineage(LoadTableDesc ltd, Operator output) throws SemanticException { - if (ltd != null && SessionState.get() != null) { - SessionState.get().getLineageState() + if (ltd != null) { + queryState.getLineageState() .mapDirToOp(ltd.getSourcePath(), output); } else if ( queryState.getCommandType().equals(HiveOperation.CREATETABLE_AS_SELECT.getOperationName())) { @@ -7326,7 +7326,7 @@ private void handleLineage(LoadTableDesc ltd, Operator output) throw new SemanticException(e); } - SessionState.get().getLineageState() + queryState.getLineageState() .mapDirToOp(tlocation, output); } } @@ -11659,7 +11659,7 @@ void analyzeInternal(ASTNode ast, PlannerContextFactory pcf) throws SemanticExce pCtx = t.transform(pCtx); } // we just use view name as location. - SessionState.get().getLineageState() + queryState.getLineageState() .mapDirToOp(new Path(createVwDesc.getViewName()), sinkOp); } return; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index 7b2937032ab8dd57f8923e0a9e7aab4a92de55ee..e807e3566b43cfa0aa0a233d94f008cc4bdfbfb2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -218,7 +218,7 @@ public void compile(final ParseContext pCtx, final List tsk = TaskFactory - .get(new MoveWork(null, null, ltd, null, false, SessionState.get().getLineageState()), + .get(new MoveWork(null, null, ltd, null, false), conf); mvTask.add(tsk); // Check to see if we are stale'ing any indexes and auto-update them if we want @@ -248,7 +248,7 @@ public void compile(final ParseContext pCtx, final List inputs, HashSet outputs, - LineageState lineageState) { + private MoveWork(HashSet inputs, HashSet outputs) { this.inputs = inputs; this.outputs = outputs; - sessionStateLineageState = lineageState; } public MoveWork(HashSet inputs, HashSet outputs, - final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork, - boolean checkFileFormat, boolean srcLocal, LineageState lineageState) { - this(inputs, outputs, lineageState); + final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork, boolean checkFileFormat, + boolean srcLocal) { + this(inputs, outputs); if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("Creating MoveWork " + System.identityHashCode(this) + " with " + loadTableWork + "; " + loadFileWork); @@ -91,9 +80,8 @@ public MoveWork(HashSet inputs, HashSet outputs, } public MoveWork(HashSet inputs, HashSet outputs, - final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork, - boolean checkFileFormat, LineageState lineageState) { - this(inputs, outputs, loadTableWork, loadFileWork, checkFileFormat, false, lineageState); + final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork, boolean checkFileFormat) { + this(inputs, outputs, loadTableWork, loadFileWork, checkFileFormat, false); } public MoveWork(final MoveWork o) { @@ -104,7 +92,6 @@ public MoveWork(final MoveWork o) { srcLocal = o.isSrcLocal(); inputs = o.getInputs(); outputs = o.getOutputs(); - sessionStateLineageState = o.sessionStateLineageState; } @Explain(displayName = "tables", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) @@ -166,7 +153,4 @@ public void setSrcLocal(boolean srcLocal) { this.srcLocal = srcLocal; } - public LineageState getLineagState() { - return sessionStateLineageState; - } } diff --git ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java index 056d6141d6239816699ed5f730cbd14e48d8d9bb..7fc5ddb36944fb855aefb2fa1a7f9ee8ef27cf1e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java +++ ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java @@ -60,7 +60,7 @@ /** * Constructor. */ - LineageState() { + public LineageState() { dirToFop = new HashMap<>(); linfo = new LineageInfo(); index = new Index(); @@ -133,6 +133,10 @@ public Index getIndex() { return index; } + public Map getDirToFop() { // FIXME remove if unused + return dirToFop; + } + /** * Clear all lineage states */ diff --git ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index bb6ddc6fa4667ac0e30994d0f9ee8b969542383c..d03f5e3144be62b81426f2856d692de20a735f90 100644 --- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -232,11 +232,6 @@ */ private Map hdfsEncryptionShims = Maps.newHashMap(); - /** - * Lineage state. - */ - LineageState ls; - private final String userName; /** @@ -294,15 +289,6 @@ private List cleanupItems = new LinkedList(); - /** - * Get the lineage state stored in this session. - * - * @return LineageState - */ - public LineageState getLineageState() { - return ls; - } - public HiveConf getConf() { return sessionConf; } @@ -387,7 +373,6 @@ public SessionState(HiveConf conf, String userName) { LOG.debug("SessionState user: " + userName); } isSilent = conf.getBoolVar(HiveConf.ConfVars.HIVESESSIONSILENT); - ls = new LineageState(); resourceMaps = new ResourceMaps(); // Must be deterministic order map for consistent q-test output across Java versions overriddenConfigurations = new LinkedHashMap(); diff --git ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java index 340689255c738ea497bcd269463b8b8bc38cf34c..84e7e19cd2e996d607e71bbee848bb42639bb890 100644 --- ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java +++ ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java @@ -35,6 +35,7 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.session.LineageState; import org.apache.hadoop.hive.ql.session.SessionState; import org.junit.Before; import org.junit.BeforeClass; @@ -135,9 +136,10 @@ public void testMovePathsThatCanBeMerged() { public void testMergePathWithInvalidMoveWorkThrowsException() { final Path condInputPath = new Path("s3a://bucket/scratch/-ext-10000"); final MoveWork mockWork = mock(MoveWork.class); + final LineageState lineageState = new LineageState(); when(mockWork.getLoadMultiFilesWork()).thenReturn(new LoadMultiFilesDesc()); - GenMapRedUtils.mergeMovePaths(condInputPath, mockWork); + GenMapRedUtils.mergeMovePaths(condInputPath, mockWork, lineageState); } @Test @@ -146,12 +148,13 @@ public void testMergePathValidMoveWorkReturnsNewMoveWork() { final Path condOutputPath = new Path("s3a://bucket/scratch/-ext-10002"); final Path targetMoveWorkPath = new Path("s3a://bucket/scratch/-ext-10003"); final MoveWork mockWork = mock(MoveWork.class); + final LineageState lineageState = new LineageState(); MoveWork newWork; // test using loadFileWork when(mockWork.getLoadFileWork()).thenReturn(new LoadFileDesc( condOutputPath, targetMoveWorkPath, false, "", "", false)); - newWork = GenMapRedUtils.mergeMovePaths(condInputPath, mockWork); + newWork = GenMapRedUtils.mergeMovePaths(condInputPath, mockWork, lineageState); assertNotNull(newWork); assertNotEquals(newWork, mockWork); assertEquals(condInputPath, newWork.getLoadFileWork().getSourcePath()); @@ -162,7 +165,7 @@ public void testMergePathValidMoveWorkReturnsNewMoveWork() { reset(mockWork); when(mockWork.getLoadTableWork()).thenReturn(new LoadTableDesc( condOutputPath, tableDesc, null, null)); - newWork = GenMapRedUtils.mergeMovePaths(condInputPath, mockWork); + newWork = GenMapRedUtils.mergeMovePaths(condInputPath, mockWork, lineageState); assertNotNull(newWork); assertNotEquals(newWork, mockWork); assertEquals(condInputPath, newWork.getLoadTableWork().getSourcePath());