commit b3c4058a9f160f650f9d14c7a9f48e4ed5e15a52 Author: Andrew Sherman Date: Fri Dec 8 16:40:08 2017 -0800 HIVE-18054: Make Lineage work with concurrent queries on a Session A Hive Session can contain multiple concurrent sql Operations. Lineage is currently tracked in SessionState and is cleared when a query completes. This results in Lineage for other running queries being lost. To fix this, move LineageState from SessionState to QueryState. In MoveTask/MoveWork use the LineageState from the MoveTask's QueryState rather than trying to use it from MoveWork. Add a test which runs multiple jdbc queries in a thread pool against the same connection and show that Vertices are not lost from Lineage. As part of this test, add ReadableHook, an ExecuteWithHookContext that stores HookContexts in memory and makes them available for reading. Make LineageLogger methods static so they can be used elsewhere. Sometimes a running query (originating in a Driver) will instantiate another Driver to run or compile another query. Because these Drivers shared a Session, the child Driver would accumulate Lineage information along with that of the parent Driver. For consistency a LineageState is passed to these child Drivers and stored in the new Driver's QueryState. diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/ReadableHook.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/ReadableHook.java new file mode 100644 index 0000000000000000000000000000000000000000..2dd283fd329563390eb78ef3573ac6224cf9db76 --- /dev/null +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/ReadableHook.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.jdbc; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext; +import org.apache.hadoop.hive.ql.hooks.HookContext; + +/** + * An ExecuteWithHookContext that stores HookContexts in memory and makes them available for reading + */ +public class ReadableHook implements ExecuteWithHookContext { + + private static List hookList = Collections.synchronizedList(new ArrayList<>()); + + @Override + public void run(HookContext hookContext) throws Exception { + hookList.add(hookContext); + } + + /** + * @return the stored HookContexts. + */ + public static List getHookList() { + return hookList; + } + + /** + * Clear the stored HookContexts. + */ + public static void clear() { + hookList.clear(); + } +} diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java index 70bd29c5178456c683652cf2377206059b735514..ffeee69f801330710481c4b0b3517fcb4e17a986 100644 --- itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java @@ -40,6 +40,7 @@ import java.sql.Types; import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -47,6 +48,7 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.SynchronousQueue; @@ -64,8 +66,12 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.ObjectStore; +import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.UDF; +import org.apache.hadoop.hive.ql.hooks.HookContext; +import org.apache.hadoop.hive.ql.hooks.LineageLogger; +import org.apache.hadoop.hive.ql.optimizer.lineage.LineageCtx; import org.apache.hive.common.util.ReflectionUtil; import org.apache.hive.jdbc.miniHS2.MiniHS2; import org.apache.hive.service.cli.HiveSQLException; @@ -205,6 +211,9 @@ private static void startMiniHS2(HiveConf conf, boolean httpMode) throws Excepti conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); conf.setBoolVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED, false); conf.setBoolVar(ConfVars.HIVESTATSCOLAUTOGATHER, false); + // store post-exec hooks calls so we can look at them later + conf.setVar(ConfVars.POSTEXECHOOKS, ReadableHook.class.getName() + "," + + LineageLogger.class.getName()); MiniHS2.Builder builder = new MiniHS2.Builder().withConf(conf).cleanupLocalDirOnStartup(false); if (httpMode) { builder = builder.withHTTPTransport(); @@ -1503,4 +1512,109 @@ public void testFetchSize() throws Exception { stmt.close(); fsConn.close(); } + + /** + * A test that checks that Lineage is correct when a multiple concurrent + * requests are make on a connection + */ + @Test + public void testConcurrentLineage() throws Exception { + // setup to run concurrent operations + Statement stmt = conTestDb.createStatement(); + setSerializeInTasksInConf(stmt); + stmt.execute("drop table if exists testConcurrentLineage1"); + stmt.execute("drop table if exists testConcurrentLineage2"); + stmt.execute("create table testConcurrentLineage1 (col1 int)"); + stmt.execute("create table testConcurrentLineage2 (col2 int)"); + + // clear vertices list + ReadableHook.clear(); + + // run 5 sql inserts concurrently + int numThreads = 5; // set to 1 for single threading + int concurrentCalls = 5; + ExecutorService pool = Executors.newFixedThreadPool(numThreads); + try { + List tasks = new ArrayList<>(); + for (int i = 0; i < concurrentCalls; i++) { + InsertCallable runner = new InsertCallable(conTestDb); + tasks.add(runner); + } + List> futures = pool.invokeAll(tasks); + for (Future future : futures) { + future.get(20, TimeUnit.SECONDS); + } + // check to see that the vertices are correct + checkVertices(); + } finally { + // clean up + stmt.execute("drop table testConcurrentLineage1"); + stmt.execute("drop table testConcurrentLineage2"); + stmt.close(); + pool.shutdownNow(); + } + } + + /** + * A Callable that does 2 inserts + */ + private class InsertCallable implements Callable { + private Connection connection; + + InsertCallable(Connection conn) { + this.connection = conn; + } + + @Override public Void call() throws Exception { + doLineageInserts(connection); + return null; + } + + private void doLineageInserts(Connection connection) throws SQLException { + Statement stmt = connection.createStatement(); + stmt.execute("insert into testConcurrentLineage1 values (1)"); + stmt.execute("insert into testConcurrentLineage2 values (2)"); + } + } + /** + * check to see that the vertices derived from the HookContexts are correct + */ + private void checkVertices() { + List> verticesLists = getVerticesFromHooks(); + + assertEquals("5 runs of 2 inserts makes 10", 10, verticesLists.size()); + for (Set vertices : verticesLists) { + assertFalse("Each insert affects a column so should be some vertices", + vertices.isEmpty()); + assertEquals("Each insert affects one column so should be one vertex", + 1, vertices.size()); + Iterator iterator = vertices.iterator(); + assertTrue(iterator.hasNext()); + LineageLogger.Vertex vertex = iterator.next(); + assertEquals(0, vertex.getId()); + assertEquals(LineageLogger.Vertex.Type.COLUMN, vertex.getType()); + String label = vertex.getLabel(); + System.out.println("vertex.getLabel() = " + label); + assertTrue("did not see one of the 2 expected column names", + label.equals("testjdbcminihs2.testconcurrentlineage1.col1") || + label.equals("testjdbcminihs2.testconcurrentlineage2.col2")); + } + } + + /** + * Use the logic in LineageLogger to get vertices from Hook Contexts + */ + private List> getVerticesFromHooks() { + List> verticesLists = new ArrayList<>(); + List hookList = ReadableHook.getHookList(); + for (HookContext hookContext : hookList) { + QueryPlan plan = hookContext.getQueryPlan(); + LineageCtx.Index index = hookContext.getIndex(); + assertNotNull(index); + List edges = LineageLogger.getEdges(plan, index); + Set vertices = LineageLogger.getVertices(edges); + verticesLists.add(vertices); + } + return verticesLists; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index d3df015288fe1963d2b548e32db53cfc2310af21..b168906b440d14dfebab381d2109d11467437e2b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -112,6 +112,7 @@ import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject; import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivObjectActionType; import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivilegeObjectType; +import org.apache.hadoop.hive.ql.session.LineageState; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.ql.wm.WmContext; @@ -374,12 +375,20 @@ public Driver(HiveConf conf) { this(getNewQueryState(conf), null); } + // Pass lineageState when a driver instantiates another Driver to run + // or compile another query + public Driver(HiveConf conf, LineageState lineageState) { + this(getNewQueryState(conf, lineageState), null); + } + public Driver(HiveConf conf, HiveTxnManager txnMgr) { this(getNewQueryState(conf), null, null, txnMgr); } - public Driver(HiveConf conf, Context ctx) { - this(getNewQueryState(conf), null, null); + // Pass lineageState when a driver instantiates another Driver to run + // or compile another query + public Driver(HiveConf conf, Context ctx, LineageState lineageState) { + this(getNewQueryState(conf, lineageState), null, null); this.ctx = ctx; } @@ -387,6 +396,12 @@ public Driver(HiveConf conf, String userName) { this(getNewQueryState(conf), userName, null); } + // Pass lineageState when a driver instantiates another Driver to run + // or compile another query + public Driver(HiveConf conf, String userName, LineageState lineageState) { + this(getNewQueryState(conf, lineageState), userName, null); + } + public Driver(QueryState queryState, String userName) { this(queryState, userName, new HooksLoader(queryState.getConf()), null, null); } @@ -425,6 +440,20 @@ private static QueryState getNewQueryState(HiveConf conf) { } /** + * Generating the new QueryState object. Making sure, that the new queryId is generated. + * @param conf The HiveConf which should be used + * @param lineageState a LineageState to be set in the new QueryState object + * @return The new QueryState object + */ + private static QueryState getNewQueryState(HiveConf conf, LineageState lineageState) { + return new QueryState.Builder() + .withGenerateNewQueryId(true) + .withHiveConf(conf) + .withLineageState(lineageState) + .build(); + } + + /** * Compile a new query. Any currently-planned query associated with this Driver is discarded. * Do not reset id for inner queries(index, etc). Task ids are used for task identity check. * @@ -1336,9 +1365,6 @@ public void releaseLocksAndCommitOrRollback(boolean commit, HiveTxnManager txnMa private void releaseResources() { releasePlan(); releaseDriverContext(); - if (SessionState.get() != null) { - SessionState.get().getLineageState().clear(); - } } @Override @@ -2404,9 +2430,6 @@ private int closeInProcess(boolean destroyed) { releaseFetchTask(); releaseResStream(); releaseContext(); - if (SessionState.get() != null) { - SessionState.get().getLineageState().clear(); - } if(destroyed) { if (!hiveLocks.isEmpty()) { try { @@ -2440,9 +2463,6 @@ public int close() { lDrvState.stateLock.unlock(); LockedDriverState.removeLockedDriverState(); } - if (SessionState.get() != null) { - SessionState.get().getLineageState().clear(); - } return 0; } @@ -2504,4 +2524,8 @@ public void resetQueryState() { releaseResources(); this.queryState = getNewQueryState(queryState.getConf()); } + + public QueryState getQueryState() { + return queryState; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/QueryState.java ql/src/java/org/apache/hadoop/hive/ql/QueryState.java index f3a46dbcaf151706521c735654f377a2f2f76a81..4f0c165b927fa6da68509c44c7d6ee29e09b0419 100644 --- ql/src/java/org/apache/hadoop/hive/ql/QueryState.java +++ ql/src/java/org/apache/hadoop/hive/ql/QueryState.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.plan.HiveOperation; +import org.apache.hadoop.hive.ql.session.LineageState; /** * The class to store query level info such as queryId. Multiple queries can run @@ -40,12 +41,17 @@ private HiveOperation commandType; /** + * Per-query Lineage state to track what happens in the query + */ + private LineageState lineageState = new LineageState(); + + /** * transaction manager used in the query. */ private HiveTxnManager txnManager; /** - * Private constructor, use QueryState.Builder instead + * Private constructor, use QueryState.Builder instead. * @param conf The query specific configuration object */ private QueryState(HiveConf conf) { @@ -79,6 +85,14 @@ public HiveConf getConf() { return queryConf; } + public LineageState getLineageState() { + return lineageState; + } + + public void setLineageState(LineageState lineageState) { + this.lineageState = lineageState; + } + public HiveTxnManager getTxnManager() { return txnManager; } @@ -95,9 +109,10 @@ public void setTxnManager(HiveTxnManager txnManager) { private boolean runAsync = false; private boolean generateNewQueryId = false; private HiveConf hiveConf = null; + private LineageState lineageState = null; /** - * Default constructor - use this builder to create a QueryState object + * Default constructor - use this builder to create a QueryState object. */ public Builder() { } @@ -149,6 +164,16 @@ public Builder withHiveConf(HiveConf hiveConf) { } /** + * add a LineageState that will be set in the built QueryState + * @param lineageState the source lineageState + * @return the builder + */ + public Builder withLineageState(LineageState lineageState) { + this.lineageState = lineageState; + return this; + } + + /** * Creates the QueryState object. The default values are: * - runAsync false * - confOverlay null @@ -184,7 +209,11 @@ public QueryState build() { queryConf.setVar(HiveConf.ConfVars.HIVEQUERYID, QueryPlan.makeQueryId()); } - return new QueryState(queryConf); + QueryState queryState = new QueryState(queryConf); + if (lineageState != null) { + queryState.setLineageState(lineageState); + } + return queryState; } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 55ef8de9a5c7144931d0a6ff13224765ee737fea..05041cd78c1cbf6ba9c6b75be4d2a46065ab13e5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -4478,7 +4478,7 @@ private static void ensureDelete(FileSystem fs, Path path, String what) throws I } } // Don't set inputs and outputs - the locks have already been taken so it's pointless. - MoveWork mw = new MoveWork(null, null, null, null, false, SessionState.get().getLineageState()); + MoveWork mw = new MoveWork(null, null, null, null, false); mw.setMultiFilesDesc(new LoadMultiFilesDesc(srcs, tgts, true, null, null)); ImportCommitWork icw = new ImportCommitWork(tbl.getDbName(), tbl.getTableName(), mmWriteId, stmtId); Task mv = TaskFactory.get(mw, conf), ic = TaskFactory.get(icw, conf); @@ -4909,7 +4909,7 @@ private int createTable(Hive db, CreateTableDesc crtTbl) throws HiveException { Table createdTable = db.getTable(tbl.getDbName(), tbl.getTableName()); if (crtTbl.isCTAS()) { DataContainer dc = new DataContainer(createdTable.getTTable()); - SessionState.get().getLineageState().setLineage( + queryState.getLineageState().setLineage( createdTable.getPath(), dc, createdTable.getCols() ); } @@ -5137,7 +5137,7 @@ private int createView(Hive db, CreateViewDesc crtView) throws HiveException { //set lineage info DataContainer dc = new DataContainer(tbl.getTTable()); - SessionState.get().getLineageState().setLineage(new Path(crtView.getViewName()), dc, tbl.getCols()); + queryState.getLineageState().setLineage(new Path(crtView.getViewName()), dc, tbl.getCols()); } return 0; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index f5a5e713bb0e081591a53a30caf56f97750c3f8e..8387208dd404bae0482ff8e4fb3d8248998322c5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -398,7 +398,7 @@ public int execute(DriverContext driverContext) { dc = handleStaticParts(db, table, tbd, ti); } } - if (work.getLineagState() != null && dc != null) { + if (dc != null) { // If we are doing an update or a delete the number of columns in the table will not // match the number of columns in the file sink. For update there will be one too many // (because of the ROW__ID), and in the case of the delete there will be just the @@ -416,7 +416,7 @@ public int execute(DriverContext driverContext) { tableCols = table.getCols(); break; } - work.getLineagState().setLineage(tbd.getSourcePath(), dc, tableCols); + queryState.getLineageState().setLineage(tbd.getSourcePath(), dc, tableCols); } releaseLocks(tbd); } @@ -552,10 +552,9 @@ private DataContainer handleDynParts(Hive db, Table table, LoadTableDesc tbd, dc = new DataContainer(table.getTTable(), partn.getTPartition()); // Don't set lineage on delete as we don't have all the columns - if (work.getLineagState() != null && - work.getLoadTableWork().getWriteType() != AcidUtils.Operation.DELETE && + if (work.getLoadTableWork().getWriteType() != AcidUtils.Operation.DELETE && work.getLoadTableWork().getWriteType() != AcidUtils.Operation.UPDATE) { - work.getLineagState().setLineage(tbd.getSourcePath(), dc, + queryState.getLineageState().setLineage(tbd.getSourcePath(), dc, table.getCols()); } LOG.info("Loading partition " + entry.getKey()); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java index 1f0487f4f72ab18bcf876f45ad5758d83a7f001b..d75fcf76401e87bf0ae18a8dfdd50bc3bc9d0f14 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java @@ -649,4 +649,8 @@ public boolean canExecuteInParallel(){ return true; } + public QueryState getQueryState() { + return queryState; + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index 262225fc202d4627652acfd77350e44b0284b3da..1a542e38a3e91da2d9361903e671afcd91886cea 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -245,8 +245,7 @@ private void addPartition(boolean hasMorePartitions, AddPartitionDesc addPartiti SessionState.get().getTxnMgr().getCurrentTxnId() ); loadTableWork.setInheritTableSpecs(false); - MoveWork work = new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false, - context.sessionStateLineageState); + MoveWork work = new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false); return TaskFactory.get(work, context.hiveConf); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index 545b7a8b7e9f1370b767fc777cb10fa59bd81917..f5125a21871d70c5d80ae2740e7efd3ced2c33f0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -233,8 +233,7 @@ private String location(ImportTableDesc tblDesc, Database parentDb) SessionState.get().getTxnMgr().getCurrentTxnId() ); MoveWork moveWork = - new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false, - context.sessionStateLineageState); + new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false); Task loadTableTask = TaskFactory.get(moveWork, context.hiveConf); copyTask.addDependentTask(loadTableTask); return copyTask; diff --git ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java index 7b617309f6b0d8a7ce0dea80ab1f790c2651b147..93f1da7c6374014e0094d871b6b380991c276837 100644 --- ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java @@ -35,7 +35,6 @@ import org.apache.hadoop.hive.ql.history.HiveHistory; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.optimizer.lineage.LineageCtx.Index; -import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -88,12 +87,8 @@ public HookContext(QueryPlan queryPlan, QueryState queryState, inputs = queryPlan.getInputs(); outputs = queryPlan.getOutputs(); ugi = Utils.getUGI(); - linfo= null; - depMap = null; - if(SessionState.get() != null){ - linfo = SessionState.get().getLineageState().getLineageInfo(); - depMap = SessionState.get().getLineageState().getIndex(); - } + linfo = queryState.getLineageState().getLineageInfo(); + depMap = queryState.getLineageState().getIndex(); this.userName = userName; this.ipAddress = ipAddress; this.hiveInstanceAddress = hiveInstanceAddress; diff --git ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java index 2f764f8a29a9d41a7db013a949ffe3a8a9417d32..06eb9c82d83f71dcafe2016a23fb46431791e7f1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.hooks; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; @@ -74,7 +75,15 @@ private static final String FORMAT_VERSION = "1.0"; - final static class Edge { + /** + * An edge in lineage. + */ + @VisibleForTesting + public static final class Edge { + + /** + * The types of Edge. + */ public static enum Type { PROJECTION, PREDICATE } @@ -92,7 +101,15 @@ } } - final static class Vertex { + /** + * A vertex in lineage. + */ + @VisibleForTesting + public static final class Vertex { + + /** + * A type in lineage. + */ public static enum Type { COLUMN, TABLE } @@ -125,6 +142,21 @@ public boolean equals(Object obj) { Vertex vertex = (Vertex) obj; return label.equals(vertex.label) && type == vertex.type; } + + @VisibleForTesting + public Type getType() { + return type; + } + + @VisibleForTesting + public String getLabel() { + return label; + } + + @VisibleForTesting + public int getId() { + return id; + } } @Override @@ -203,7 +235,7 @@ public void run(HookContext hookContext) { /** * Logger an error to console if available. */ - private void log(String error) { + private static void log(String error) { LogHelper console = SessionState.getConsole(); if (console != null) { console.printError(error); @@ -214,7 +246,8 @@ private void log(String error) { * Based on the final select operator, find out all the target columns. * For each target column, find out its sources based on the dependency index. */ - private List getEdges(QueryPlan plan, Index index) { + @VisibleForTesting + public static List getEdges(QueryPlan plan, Index index) { LinkedHashMap> finalSelOps = index.getFinalSelectOps(); Map vertexCache = new LinkedHashMap(); @@ -292,7 +325,7 @@ private void log(String error) { return edges; } - private void addEdge(Map vertexCache, List edges, + private static void addEdge(Map vertexCache, List edges, Set srcCols, Vertex target, String expr, Edge.Type type) { Set targets = new LinkedHashSet(); targets.add(target); @@ -304,7 +337,7 @@ private void addEdge(Map vertexCache, List edges, * If found, add the more targets to this edge's target vertex list. * Otherwise, create a new edge and add to edge list. */ - private void addEdge(Map vertexCache, List edges, + private static void addEdge(Map vertexCache, List edges, Set srcCols, Set targets, String expr, Edge.Type type) { Set sources = createSourceVertices(vertexCache, srcCols); Edge edge = findSimilarEdgeBySources(edges, sources, expr, type); @@ -319,7 +352,7 @@ private void addEdge(Map vertexCache, List edges, * Convert a list of columns to a set of vertices. * Use cached vertices if possible. */ - private Set createSourceVertices( + private static Set createSourceVertices( Map vertexCache, Collection baseCols) { Set sources = new LinkedHashSet(); if (baseCols != null && !baseCols.isEmpty()) { @@ -346,7 +379,7 @@ private void addEdge(Map vertexCache, List edges, /** * Find a vertex from a cache, or create one if not. */ - private Vertex getOrCreateVertex( + private static Vertex getOrCreateVertex( Map vertices, String label, Vertex.Type type) { Vertex vertex = vertices.get(label); if (vertex == null) { @@ -359,7 +392,7 @@ private Vertex getOrCreateVertex( /** * Find an edge that has the same type, expression, and sources. */ - private Edge findSimilarEdgeBySources( + private static Edge findSimilarEdgeBySources( List edges, Set sources, String expr, Edge.Type type) { for (Edge edge: edges) { if (edge.type == type && StringUtils.equals(edge.expr, expr) @@ -373,7 +406,7 @@ private Edge findSimilarEdgeBySources( /** * Generate normalized name for a given target column. */ - private String getTargetFieldName(int fieldIndex, + private static String getTargetFieldName(int fieldIndex, String destTableName, List colNames, List fieldSchemas) { String fieldName = fieldSchemas.get(fieldIndex).getName(); String[] parts = fieldName.split("\\."); @@ -394,7 +427,8 @@ private String getTargetFieldName(int fieldIndex, * Get all the vertices of all edges. Targets at first, * then sources. Assign id to each vertex. */ - private Set getVertices(List edges) { + @VisibleForTesting + public static Set getVertices(List edges) { Set vertices = new LinkedHashSet(); for (Edge edge: edges) { vertices.addAll(edge.targets); diff --git ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java index 68709b4d3baf15d78e60e948ccdef3df84f28cec..bf0672339c092c25f120cd7e91a63bf08b4e7d6b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java @@ -37,7 +37,7 @@ import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.optimizer.IndexUtils; import org.apache.hadoop.hive.ql.plan.PartitionDesc; - +import org.apache.hadoop.hive.ql.session.LineageState; /** * Index handler for indexes that have aggregate functions on indexed columns. @@ -90,7 +90,8 @@ private void createAggregationFunction(List indexTblCols, String pr Set outputs, Index index, boolean partitioned, PartitionDesc indexTblPartDesc, String indexTableName, - PartitionDesc baseTablePartDesc, String baseTableName, String dbName) { + PartitionDesc baseTablePartDesc, String baseTableName, String dbName, + LineageState lineageState) { List indexField = index.getSd().getCols(); String indexCols = HiveUtils.getUnparsedColumnNamesFromFieldSchema(indexField); @@ -152,7 +153,7 @@ private void createAggregationFunction(List indexTblCols, String pr builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGEMAPREDFILES, false); builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGETEZFILES, false); Task rootTask = IndexUtils.createRootTask(builderConf, inputs, outputs, - command, (LinkedHashMap) partSpec, indexTableName, dbName); + command, (LinkedHashMap) partSpec, indexTableName, dbName, lineageState); return rootTask; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexHandler.java ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexHandler.java index 1e577da82343a1b7361467fb662661f9c6642ec0..b6c02524e243fbed340baf8afc1daacef2cd95e8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexHandler.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.session.LineageState; /** * HiveIndexHandler defines a pluggable interface for adding new index handlers @@ -99,6 +100,9 @@ void analyzeIndexDefinition( * outputs for hooks, supplemental outputs going * along with the return value * + * @param lineageState + * tracks Lineage for the query + * * @return list of tasks to be executed in parallel for building the index * * @throws HiveException if plan generation fails @@ -108,7 +112,7 @@ void analyzeIndexDefinition( org.apache.hadoop.hive.metastore.api.Index index, List indexTblPartitions, List baseTblPartitions, org.apache.hadoop.hive.ql.metadata.Table indexTbl, - Set inputs, Set outputs) + Set inputs, Set outputs, LineageState lineageState) throws HiveException; /** diff --git ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java index 29886ae7f97f8dae7116f4fc9a2417ab8f9dac0a..744ac29b2a4382de9f41d0eda5fe5cf399263810 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.session.LineageState; /** * Index handler for indexes that use tables to store indexes. @@ -51,7 +52,8 @@ org.apache.hadoop.hive.metastore.api.Index index, List indexTblPartitions, List baseTblPartitions, org.apache.hadoop.hive.ql.metadata.Table indexTbl, - Set inputs, Set outputs) throws HiveException { + Set inputs, Set outputs, + LineageState lineageState) throws HiveException { try { TableDesc desc = Utilities.getTableDesc(indexTbl); @@ -66,7 +68,7 @@ Task indexBuilder = getIndexBuilderMapRedTask(inputs, outputs, index, false, new PartitionDesc(desc, null), indexTbl.getTableName(), new PartitionDesc(Utilities.getTableDesc(baseTbl), null), - baseTbl.getTableName(), indexTbl.getDbName()); + baseTbl.getTableName(), indexTbl.getDbName(), lineageState); indexBuilderTasks.add(indexBuilder); } else { @@ -89,7 +91,8 @@ // for each partition, spawn a map reduce task. Task indexBuilder = getIndexBuilderMapRedTask(inputs, outputs, index, true, new PartitionDesc(indexPart), indexTbl.getTableName(), - new PartitionDesc(basePart), baseTbl.getTableName(), indexTbl.getDbName()); + new PartitionDesc(basePart), baseTbl.getTableName(), indexTbl.getDbName(), + lineageState); indexBuilderTasks.add(indexBuilder); } } @@ -102,15 +105,18 @@ protected Task getIndexBuilderMapRedTask(Set inputs, Set outputs, Index index, boolean partitioned, PartitionDesc indexTblPartDesc, String indexTableName, - PartitionDesc baseTablePartDesc, String baseTableName, String dbName) throws HiveException { + PartitionDesc baseTablePartDesc, String baseTableName, String dbName, + LineageState lineageState) throws HiveException { return getIndexBuilderMapRedTask(inputs, outputs, index.getSd().getCols(), - partitioned, indexTblPartDesc, indexTableName, baseTablePartDesc, baseTableName, dbName); + partitioned, indexTblPartDesc, indexTableName, baseTablePartDesc, baseTableName, dbName, + lineageState); } protected Task getIndexBuilderMapRedTask(Set inputs, Set outputs, List indexField, boolean partitioned, PartitionDesc indexTblPartDesc, String indexTableName, - PartitionDesc baseTablePartDesc, String baseTableName, String dbName) throws HiveException { + PartitionDesc baseTablePartDesc, String baseTableName, String dbName, + LineageState lineageState) throws HiveException { return null; } diff --git ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java index 7b067a0d45e33bc3347c43b050af933c296a9227..911715949ac3e18d32370fee0c66efe686b812e7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.ql.session.LineageState; import org.apache.hadoop.hive.ql.stats.StatsUtils; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan; @@ -115,7 +116,7 @@ public void generateIndexQuery(List indexes, ExprNodeDesc predicate, LOG.info("Generating tasks for re-entrant QL query: " + qlCommand.toString()); HiveConf queryConf = new HiveConf(pctx.getConf(), BitmapIndexHandler.class); HiveConf.setBoolVar(queryConf, HiveConf.ConfVars.COMPRESSRESULT, false); - Driver driver = new Driver(queryConf); + Driver driver = new Driver(queryConf, pctx.getQueryState().getLineageState()); driver.compile(qlCommand.toString(), false); queryContext.setIndexIntermediateFile(tmpFile); @@ -222,7 +223,8 @@ public void analyzeIndexDefinition(Table baseTable, Index index, protected Task getIndexBuilderMapRedTask(Set inputs, Set outputs, List indexField, boolean partitioned, PartitionDesc indexTblPartDesc, String indexTableName, - PartitionDesc baseTablePartDesc, String baseTableName, String dbName) throws HiveException { + PartitionDesc baseTablePartDesc, String baseTableName, String dbName, + LineageState lineageState) throws HiveException { HiveConf builderConf = new HiveConf(getConf(), BitmapIndexHandler.class); HiveConf.setBoolVar(builderConf, HiveConf.ConfVars.HIVEROWOFFSET, true); @@ -290,7 +292,7 @@ public void analyzeIndexDefinition(Table baseTable, Index index, } Task rootTask = IndexUtils.createRootTask(builderConf, inputs, outputs, - command, partSpec, indexTableName, dbName); + command, partSpec, indexTableName, dbName, lineageState); return rootTask; } diff --git ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java index 504b0623142a6fa6cdb45a26b49f146e12ec2d7a..73278cda820a3d7d75f6ce3d67b05c4a0d243739 100644 --- ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java @@ -59,6 +59,7 @@ import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.ql.session.LineageState; import org.apache.hadoop.hive.ql.stats.StatsUtils; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan; @@ -94,7 +95,8 @@ public void analyzeIndexDefinition(Table baseTable, Index index, protected Task getIndexBuilderMapRedTask(Set inputs, Set outputs, List indexField, boolean partitioned, PartitionDesc indexTblPartDesc, String indexTableName, - PartitionDesc baseTablePartDesc, String baseTableName, String dbName) throws HiveException { + PartitionDesc baseTablePartDesc, String baseTableName, String dbName, + LineageState lineageState) throws HiveException { String indexCols = HiveUtils.getUnparsedColumnNamesFromFieldSchema(indexField); @@ -150,7 +152,7 @@ public void analyzeIndexDefinition(Table baseTable, Index index, builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGEMAPREDFILES, false); builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGETEZFILES, false); Task rootTask = IndexUtils.createRootTask(builderConf, inputs, outputs, - command, partSpec, indexTableName, dbName); + command, partSpec, indexTableName, dbName, lineageState); return rootTask; } @@ -189,7 +191,7 @@ public void generateIndexQuery(List indexes, ExprNodeDesc predicate, LOG.info("Generating tasks for re-entrant QL query: " + qlCommand.toString()); HiveConf queryConf = new HiveConf(pctx.getConf(), CompactIndexHandler.class); HiveConf.setBoolVar(queryConf, HiveConf.ConfVars.COMPRESSRESULT, false); - Driver driver = new Driver(queryConf); + Driver driver = new Driver(queryConf, pctx.getQueryState().getLineageState()); driver.compile(qlCommand.toString(), false); if (pctx.getConf().getBoolVar(ConfVars.HIVE_INDEX_COMPACT_BINARY_SEARCH) && useSorted) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java index d7a83f775abca39b219f71aff88173a14ffaee9f..bb42dde57df7d36a0acbe3bafe9424e712f9404f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java @@ -112,7 +112,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx opProcCtx, LOG.info("using CombineHiveInputformat for the merge job"); GenMapRedUtils.createMRWorkForMergingFiles(fsOp, finalName, ctx.getDependencyTaskForMultiInsert(), ctx.getMvTask(), - hconf, currTask); + hconf, currTask, parseCtx.getQueryState().getLineageState()); } FileSinkDesc fileSinkDesc = fsOp.getConf(); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index bdaf105697fd2c2074885fa3a35548043167c7e7..a0b26785864f84eec1734c389983078f3ced33bf 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -1229,6 +1229,7 @@ public static void replaceMapWork(String sourceAlias, String targetAlias, * @param mvTasks * @param conf * @param currTask + * @param lineageState * @throws SemanticException * create a Map-only merge job using CombineHiveInputFormat for all partitions with @@ -1257,10 +1258,11 @@ public static void replaceMapWork(String sourceAlias, String targetAlias, * directories. * */ - public static void createMRWorkForMergingFiles (FileSinkOperator fsInput, - Path finalName, DependencyCollectionTask dependencyTask, - List> mvTasks, HiveConf conf, - Task currTask) throws SemanticException { + public static void createMRWorkForMergingFiles(FileSinkOperator fsInput, + Path finalName, DependencyCollectionTask dependencyTask, + List> mvTasks, HiveConf conf, + Task currTask, LineageState lineageState) + throws SemanticException { // // 1. create the operator tree @@ -1370,8 +1372,7 @@ public static void createMRWorkForMergingFiles (FileSinkOperator fsInput, if (srcMmWriteId == null) { // Only create the movework for non-MM table. No action needed for a MM table. dummyMv = new MoveWork(null, null, null, - new LoadFileDesc(inputDirName, finalName, true, null, null, false), false, - SessionState.get().getLineageState()); + new LoadFileDesc(inputDirName, finalName, true, null, null, false), false); } // Use the original fsOp path here in case of MM - while the new FSOP merges files inside the // MM directory, the original MoveTask still commits based on the parent. Note that this path @@ -1382,7 +1383,7 @@ public static void createMRWorkForMergingFiles (FileSinkOperator fsInput, Task mvTask = GenMapRedUtils.findMoveTaskForFsopOutput( mvTasks, fsopPath, fsInputDesc.isMmTable()); ConditionalTask cndTsk = GenMapRedUtils.createCondTask(conf, currTask, dummyMv, work, - fsInputDesc.getMergeInputDirName(), finalName, mvTask, dependencyTask); + fsInputDesc.getMergeInputDirName(), finalName, mvTask, dependencyTask, lineageState); // keep the dynamic partition context in conditional task resolver context ConditionalResolverMergeFilesCtx mrCtx = @@ -1730,15 +1731,16 @@ protected static boolean shouldMergeMovePaths(HiveConf conf, Path condInputPath, * * @param condInputPath A path that the ConditionalTask uses as input for its sub-tasks. * @param linkedMoveWork A MoveWork that the ConditionalTask uses to link to its sub-tasks. + * @param lineageState A LineageState used to track what changes. * @return A new MoveWork that has the Conditional input path as source and the linkedMoveWork as target. */ @VisibleForTesting - protected static MoveWork mergeMovePaths(Path condInputPath, MoveWork linkedMoveWork) { + protected static MoveWork mergeMovePaths(Path condInputPath, MoveWork linkedMoveWork, + LineageState lineageState) { MoveWork newWork = new MoveWork(linkedMoveWork); LoadFileDesc fileDesc = null; LoadTableDesc tableDesc = null; - LineageState lineageState = SessionState.get().getLineageState(); if (linkedMoveWork.getLoadFileWork() != null) { fileDesc = new LoadFileDesc(linkedMoveWork.getLoadFileWork()); fileDesc.setSourcePath(condInputPath); @@ -1776,13 +1778,15 @@ protected static MoveWork mergeMovePaths(Path condInputPath, MoveWork linkedMove * a MoveTask that may be linked to the conditional sub-tasks * @param dependencyTask * a dependency task that may be linked to the conditional sub-tasks + * @param lineageState + * to track activity * @return The conditional task */ @SuppressWarnings("unchecked") private static ConditionalTask createCondTask(HiveConf conf, Task currTask, MoveWork mvWork, Serializable mergeWork, Path condInputPath, Path condOutputPath, Task moveTaskToLink, - DependencyCollectionTask dependencyTask) { + DependencyCollectionTask dependencyTask, LineageState lineageState) { if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("Creating conditional merge task for " + condInputPath); } @@ -1795,7 +1799,8 @@ private static ConditionalTask createCondTask(HiveConf conf, Serializable workForMoveOnlyTask = moveWork; if (shouldMergeMovePaths) { - workForMoveOnlyTask = mergeMovePaths(condInputPath, moveTaskToLink.getWork()); + workForMoveOnlyTask = mergeMovePaths(condInputPath, moveTaskToLink.getWork(), + lineageState); } // There are 3 options for this ConditionalTask: diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java index 338c1856672f09bb7da35d2336ebb5b6f3fdc5a6..f69c9a2503b006048f43c7911ad05a91aaa97754 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.session.LineageState; /** * Utility class for index support. @@ -221,10 +222,11 @@ private static boolean isIndexTableFresh(Hive hive, List indexes, Table s StringBuilder command, LinkedHashMap partSpec, String indexTableName, - String dbName){ + String dbName, + LineageState lineageState){ // Don't try to index optimize the query to build the index HiveConf.setBoolVar(builderConf, HiveConf.ConfVars.HIVEOPTINDEXFILTER, false); - Driver driver = new Driver(builderConf, SessionState.get().getUserName()); + Driver driver = new Driver(builderConf, SessionState.get().getUserName(), lineageState); driver.compile(command.toString(), false); Task rootTask = driver.getPlan().getRootTasks().get(0); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java index e6c07713b24df719315d804f006151106eea9aed..0d72a1e934ee6bf9bf267d1359e028afa73589ad 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java @@ -47,7 +47,6 @@ import org.apache.hadoop.hive.ql.optimizer.lineage.LineageCtx.Index; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.session.SessionState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,9 +84,10 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { return pctx; } } - - Index index = SessionState.get() != null ? - SessionState.get().getLineageState().getIndex() : new Index(); + Index index = pctx.getQueryState().getLineageState().getIndex(); + if (index == null) { + index = new Index(); + } long sTime = System.currentTimeMillis(); // Create the lineage context diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index a09b7961c2dbc26b4d2fa912d0be7037885f63e4..971a0613824a4381a09aceb7f80f51855d9c8b18 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -158,6 +158,7 @@ import org.apache.hadoop.hive.ql.plan.UnlockTableDesc; import org.apache.hadoop.hive.ql.plan.CreateOrAlterWMPoolDesc; import org.apache.hadoop.hive.ql.plan.CreateOrDropTriggerToPoolMappingDesc; +import org.apache.hadoop.hive.ql.session.LineageState; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.serde.serdeConstants; @@ -1485,8 +1486,8 @@ private void analyzeTruncateTable(ASTNode ast) throws SemanticException { partSpec == null ? new HashMap<>() : partSpec, null); ltd.setLbCtx(lbCtx); @SuppressWarnings("unchecked") - Task moveTsk = TaskFactory.get(new MoveWork( - null, null, ltd, null, false, SessionState.get().getLineageState()), conf); + Task moveTsk = + TaskFactory.get(new MoveWork(null, null, ltd, null, false), conf); truncateTask.addDependentTask(moveTsk); // Recalculate the HDFS stats if auto gather stats is set @@ -1703,8 +1704,10 @@ private void analyzeAlterIndexProps(ASTNode ast) indexTbl, db, indexTblPartitions); } + LineageState lineageState = queryState.getLineageState(); List> ret = handler.generateIndexBuildTaskList(baseTbl, - index, indexTblPartitions, baseTblPartitions, indexTbl, getInputs(), getOutputs()); + index, indexTblPartitions, baseTblPartitions, indexTbl, getInputs(), getOutputs(), + lineageState); return ret; } catch (Exception e) { throw new SemanticException(e); @@ -2146,8 +2149,8 @@ private void analyzeAlterTablePartMergeFiles(ASTNode ast, LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, tblDesc, partSpec == null ? new HashMap<>() : partSpec, null); ltd.setLbCtx(lbCtx); - Task moveTsk = TaskFactory.get( - new MoveWork(null, null, ltd, null, false, SessionState.get().getLineageState()), conf); + Task moveTsk = + TaskFactory.get(new MoveWork(null, null, ltd, null, false), conf); mergeTask.addDependentTask(moveTsk); if (conf.getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) { @@ -3539,7 +3542,7 @@ private void analyzeAlterTableAddParts(String[] qualified, CommonTree ast, boole } SessionState ss = SessionState.get(); String uName = (ss == null? null: ss.getUserName()); - Driver driver = new Driver(conf, uName); + Driver driver = new Driver(conf, uName, queryState.getLineageState()); int rc = driver.compile(cmd.toString(), false); if (rc != 0) { throw new SemanticException(ErrorMsg.NO_VALID_PARTN.getMsg()); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java index 065c7e50986872cd35386feee712f3452597d643..0eacfc0f0151d151c0537e81265ef2c9d114d895 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java @@ -134,7 +134,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { runCtx = new Context(conf); // runCtx and ctx share the configuration, but not isExplainPlan() runCtx.setExplainConfig(config); - Driver driver = new Driver(conf, runCtx); + Driver driver = new Driver(conf, runCtx, queryState.getLineageState()); CommandProcessorResponse ret = driver.run(query); if(ret.getResponseCode() == 0) { // Note that we need to call getResults for simple fetch optimization. diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java index b6f1139fe1a78283277bf4d0c5224ab1d718c634..e6d4cbe4dae44dbdda5f43875dc76f360f8a6e52 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java @@ -385,7 +385,8 @@ public static void processFileSink(GenTezProcContext context, FileSinkOperator f + fileSink.getConf().getDirName() + " to " + finalName); GenMapRedUtils.createMRWorkForMergingFiles(fileSink, finalName, context.dependencyTask, context.moveTask, - hconf, context.currentTask); + hconf, context.currentTask, + parseContext.getQueryState().getLineageState()); } FetchTask fetchTask = parseContext.getFetchTask(); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 83d53bc157f35b4b57fc37bb24b6c400ac58d8ca..c79df56dd24031748577cef3cb4ac24edde22c7e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -391,7 +391,8 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, Utilities.getTableDesc(table), new TreeMap<>(), replace ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, txnId); loadTableWork.setStmtId(stmtId); - MoveWork mv = new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, null, false, SessionState.get().getLineageState()); + MoveWork mv = new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, + null, false); Task loadTableTask = TaskFactory.get(mv, x.getConf()); copyTask.addDependentTask(loadTableTask); x.getTasks().add(copyTask); @@ -495,7 +496,7 @@ private static boolean isAcid(Long txnId) { loadTableWork.setStmtId(stmtId); loadTableWork.setInheritTableSpecs(false); Task loadPartTask = TaskFactory.get(new MoveWork( - x.getInputs(), x.getOutputs(), loadTableWork, null, false, SessionState.get().getLineageState()), x.getConf()); + x.getInputs(), x.getOutputs(), loadTableWork, null, false), x.getConf()); copyTask.addDependentTask(loadPartTask); addPartTask.addDependentTask(loadPartTask); x.getTasks().add(copyTask); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java index f31775ed942160da73344c4dca707da7b8c658a6..ccf1e66846f899e6ee6c0c885cea358203cd5e82 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.optimizer.IndexUtils; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.session.LineageState; import java.io.Serializable; import java.util.LinkedList; @@ -47,12 +48,14 @@ private Hive hive; private List> tasks; private Set inputs; + private LineageState lineageState; - - public IndexUpdater(List loadTableWork, Set inputs, Configuration conf) { + public IndexUpdater(List loadTableWork, Set inputs, Configuration conf, + LineageState lineageState) { this.loadTableWork = loadTableWork; this.inputs = inputs; this.conf = new HiveConf(conf, IndexUpdater.class); + this.lineageState = lineageState; this.tasks = new LinkedList>(); } @@ -133,7 +136,7 @@ private void doIndexUpdate(Index index, Map partSpec) { } private void compileRebuild(String query) { - Driver driver = new Driver(this.conf); + Driver driver = new Driver(this.conf, lineageState); driver.compile(query, false); tasks.addAll(driver.getPlan().getRootTasks()); inputs.addAll(driver.getPlan().getInputs()); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java index cc956da57567114aa29ee0552566ca62c68f6be7..e600f7a7aaddb2061ecc8f7705b75ca7aceb86fa 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java @@ -302,7 +302,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { Task childTask = TaskFactory.get( new MoveWork(getInputs(), getOutputs(), loadTableWork, null, true, - isLocal, SessionState.get().getLineageState()), conf + isLocal), conf ); if (rTask != null) { rTask.addDependentTask(childTask); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 498b6741c3f40b92ce3fb218e91e7809a17383f0..80556aec88d4c8f15ad7d00724339276c4931b47 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -313,7 +313,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { if ((!evDump) && (tblNameOrPattern != null) && !(tblNameOrPattern.isEmpty())) { ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, tblNameOrPattern, - SessionState.get().getLineageState(), SessionState.get().getTxnMgr().getCurrentTxnId()); + queryState.getLineageState(), SessionState.get().getTxnMgr().getCurrentTxnId()); rootTasks.add(TaskFactory.get(replLoadWork, conf, true)); return; } @@ -344,7 +344,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { } ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, - SessionState.get().getLineageState(), SessionState.get().getTxnMgr().getCurrentTxnId()); + queryState.getLineageState(), SessionState.get().getTxnMgr().getCurrentTxnId()); rootTasks.add(TaskFactory.get(replLoadWork, conf, true)); // // for (FileStatus dir : dirsInLoadPath) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 28e3621d3264f4f704da0d775b396f7b7764fdb6..dcda8b3e008115f0f9b9922086bb52df8b441a1f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -7336,8 +7336,8 @@ private FileSinkDesc createFileSinkDesc(String dest, TableDesc table_desc, private void handleLineage(LoadTableDesc ltd, Operator output) throws SemanticException { - if (ltd != null && SessionState.get() != null) { - SessionState.get().getLineageState() + if (ltd != null) { + queryState.getLineageState() .mapDirToOp(ltd.getSourcePath(), output); } else if ( queryState.getCommandType().equals(HiveOperation.CREATETABLE_AS_SELECT.getOperationName())) { @@ -7350,7 +7350,7 @@ private void handleLineage(LoadTableDesc ltd, Operator output) throw new SemanticException(e); } - SessionState.get().getLineageState() + queryState.getLineageState() .mapDirToOp(tlocation, output); } } @@ -11685,7 +11685,7 @@ void analyzeInternal(ASTNode ast, PlannerContextFactory pcf) throws SemanticExce pCtx = t.transform(pCtx); } // we just use view name as location. - SessionState.get().getLineageState() + queryState.getLineageState() .mapDirToOp(new Path(createVwDesc.getViewName()), sinkOp); } return; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index 7b2937032ab8dd57f8923e0a9e7aab4a92de55ee..24559b649379fbe2faa7445d642a27bd850ac0ed 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -72,6 +72,7 @@ import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.StatsWork; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.session.LineageState; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.serde.serdeConstants; @@ -105,7 +106,8 @@ public void init(QueryState queryState, LogHelper console, Hive db) { } @SuppressWarnings({"nls", "unchecked"}) - public void compile(final ParseContext pCtx, final List> rootTasks, + public void compile(final ParseContext pCtx, + final List> rootTasks, final HashSet inputs, final HashSet outputs) throws SemanticException { Context ctx = pCtx.getContext(); @@ -218,12 +220,13 @@ public void compile(final ParseContext pCtx, final List tsk = TaskFactory - .get(new MoveWork(null, null, ltd, null, false, SessionState.get().getLineageState()), + .get(new MoveWork(null, null, ltd, null, false), conf); mvTask.add(tsk); // Check to see if we are stale'ing any indexes and auto-update them if we want if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEINDEXAUTOUPDATE)) { - IndexUpdater indexUpdater = new IndexUpdater(loadTableWork, inputs, conf); + IndexUpdater indexUpdater = new IndexUpdater(loadTableWork, inputs, conf, + queryState.getLineageState()); try { List> indexUpdateTasks = indexUpdater .generateUpdateTasks(); @@ -248,7 +251,7 @@ public void compile(final ParseContext pCtx, final List inputs, HashSet outputs, - LineageState lineageState) { + private MoveWork(HashSet inputs, HashSet outputs) { this.inputs = inputs; this.outputs = outputs; - sessionStateLineageState = lineageState; } public MoveWork(HashSet inputs, HashSet outputs, final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork, - boolean checkFileFormat, boolean srcLocal, LineageState lineageState) { - this(inputs, outputs, lineageState); + boolean checkFileFormat, boolean srcLocal) { + this(inputs, outputs); if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("Creating MoveWork " + System.identityHashCode(this) + " with " + loadTableWork + "; " + loadFileWork); @@ -92,8 +81,8 @@ public MoveWork(HashSet inputs, HashSet outputs, public MoveWork(HashSet inputs, HashSet outputs, final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork, - boolean checkFileFormat, LineageState lineageState) { - this(inputs, outputs, loadTableWork, loadFileWork, checkFileFormat, false, lineageState); + boolean checkFileFormat) { + this(inputs, outputs, loadTableWork, loadFileWork, checkFileFormat, false); } public MoveWork(final MoveWork o) { @@ -104,7 +93,6 @@ public MoveWork(final MoveWork o) { srcLocal = o.isSrcLocal(); inputs = o.getInputs(); outputs = o.getOutputs(); - sessionStateLineageState = o.sessionStateLineageState; } @Explain(displayName = "tables", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) @@ -166,7 +154,4 @@ public void setSrcLocal(boolean srcLocal) { this.srcLocal = srcLocal; } - public LineageState getLineagState() { - return sessionStateLineageState; - } } diff --git ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java index 056d6141d6239816699ed5f730cbd14e48d8d9bb..82eeb35fb15153bf8d1dea4a39068d528a05f4f3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java +++ ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java @@ -60,7 +60,7 @@ /** * Constructor. */ - LineageState() { + public LineageState() { dirToFop = new HashMap<>(); linfo = new LineageInfo(); index = new Index(); diff --git ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index bb6ddc6fa4667ac0e30994d0f9ee8b969542383c..d03f5e3144be62b81426f2856d692de20a735f90 100644 --- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -232,11 +232,6 @@ */ private Map hdfsEncryptionShims = Maps.newHashMap(); - /** - * Lineage state. - */ - LineageState ls; - private final String userName; /** @@ -294,15 +289,6 @@ private List cleanupItems = new LinkedList(); - /** - * Get the lineage state stored in this session. - * - * @return LineageState - */ - public LineageState getLineageState() { - return ls; - } - public HiveConf getConf() { return sessionConf; } @@ -387,7 +373,6 @@ public SessionState(HiveConf conf, String userName) { LOG.debug("SessionState user: " + userName); } isSilent = conf.getBoolVar(HiveConf.ConfVars.HIVESESSIONSILENT); - ls = new LineageState(); resourceMaps = new ResourceMaps(); // Must be deterministic order map for consistent q-test output across Java versions overriddenConfigurations = new LinkedHashMap(); diff --git ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java index 340689255c738ea497bcd269463b8b8bc38cf34c..3c007a762cffcfdecc88ec8c4a4f4c9d2db88731 100644 --- ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java +++ ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java @@ -35,6 +35,7 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.session.LineageState; import org.apache.hadoop.hive.ql.session.SessionState; import org.junit.Before; import org.junit.BeforeClass; @@ -135,9 +136,10 @@ public void testMovePathsThatCanBeMerged() { public void testMergePathWithInvalidMoveWorkThrowsException() { final Path condInputPath = new Path("s3a://bucket/scratch/-ext-10000"); final MoveWork mockWork = mock(MoveWork.class); + final LineageState lineageState = new LineageState(); when(mockWork.getLoadMultiFilesWork()).thenReturn(new LoadMultiFilesDesc()); - GenMapRedUtils.mergeMovePaths(condInputPath, mockWork); + GenMapRedUtils.mergeMovePaths(condInputPath, mockWork, lineageState); } @Test @@ -146,12 +148,13 @@ public void testMergePathValidMoveWorkReturnsNewMoveWork() { final Path condOutputPath = new Path("s3a://bucket/scratch/-ext-10002"); final Path targetMoveWorkPath = new Path("s3a://bucket/scratch/-ext-10003"); final MoveWork mockWork = mock(MoveWork.class); + final LineageState lineageState = new LineageState(); MoveWork newWork; // test using loadFileWork when(mockWork.getLoadFileWork()).thenReturn(new LoadFileDesc( condOutputPath, targetMoveWorkPath, false, "", "", false)); - newWork = GenMapRedUtils.mergeMovePaths(condInputPath, mockWork); + newWork = GenMapRedUtils.mergeMovePaths(condInputPath, mockWork, lineageState); assertNotNull(newWork); assertNotEquals(newWork, mockWork); assertEquals(condInputPath, newWork.getLoadFileWork().getSourcePath()); @@ -162,7 +165,7 @@ public void testMergePathValidMoveWorkReturnsNewMoveWork() { reset(mockWork); when(mockWork.getLoadTableWork()).thenReturn(new LoadTableDesc( condOutputPath, tableDesc, null, null)); - newWork = GenMapRedUtils.mergeMovePaths(condInputPath, mockWork); + newWork = GenMapRedUtils.mergeMovePaths(condInputPath, mockWork, lineageState); assertNotNull(newWork); assertNotEquals(newWork, mockWork); assertEquals(condInputPath, newWork.getLoadTableWork().getSourcePath()); @@ -181,7 +184,8 @@ public void testConditionalMoveTaskIsOptimized() throws SemanticException { Task moveTask = createMoveTask(finalDirName, tableLocation); List> moveTaskList = Collections.singletonList(moveTask); - GenMapRedUtils.createMRWorkForMergingFiles(fileSinkOperator, finalDirName, null, moveTaskList, hiveConf, dummyMRTask); + GenMapRedUtils.createMRWorkForMergingFiles(fileSinkOperator, finalDirName, null, + moveTaskList, hiveConf, dummyMRTask, new LineageState()); ConditionalTask conditionalTask = (ConditionalTask)dummyMRTask.getChildTasks().get(0); Task moveOnlyTask = conditionalTask.getListTasks().get(0); Task mergeOnlyTask = conditionalTask.getListTasks().get(1); @@ -221,7 +225,8 @@ public void testConditionalMoveTaskIsNotOptimized() throws SemanticException { Task moveTask = createMoveTask(finalDirName, tableLocation); List> moveTaskList = Collections.singletonList(moveTask); - GenMapRedUtils.createMRWorkForMergingFiles(fileSinkOperator, finalDirName, null, moveTaskList, hiveConf, dummyMRTask); + GenMapRedUtils.createMRWorkForMergingFiles(fileSinkOperator, finalDirName, null, + moveTaskList, hiveConf, dummyMRTask, new LineageState()); ConditionalTask conditionalTask = (ConditionalTask)dummyMRTask.getChildTasks().get(0); Task moveOnlyTask = conditionalTask.getListTasks().get(0); Task mergeOnlyTask = conditionalTask.getListTasks().get(1); @@ -255,7 +260,8 @@ public void testConditionalMoveOnHdfsIsNotOptimized() throws SemanticException { Task moveTask = createMoveTask(finalDirName, tableLocation); List> moveTaskList = Collections.singletonList(moveTask); - GenMapRedUtils.createMRWorkForMergingFiles(fileSinkOperator, finalDirName, null, moveTaskList, hiveConf, dummyMRTask); + GenMapRedUtils.createMRWorkForMergingFiles(fileSinkOperator, finalDirName, null, + moveTaskList, hiveConf, dummyMRTask, new LineageState()); ConditionalTask conditionalTask = (ConditionalTask)dummyMRTask.getChildTasks().get(0); Task moveOnlyTask = conditionalTask.getListTasks().get(0); Task mergeOnlyTask = conditionalTask.getListTasks().get(1);