diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 4e7c80f184..44752f6253 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -2159,7 +2159,7 @@ private TaskRunner launchTask(Task tsk, String queryId, cxt.launching(tskRun); // Launch Task - if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL) && tsk.isMapRedTask()) { + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL) && tsk.canExecuteInParallel()) { // Launch it in the parallel mode, as a separate thread only for MR tasks if (LOG.isInfoEnabled()){ LOG.info("Starting task [" + tsk + "] in parallel"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java index 2b2c004fea..e3b0d1eaea 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java @@ -53,7 +53,6 @@ import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState; import org.apache.hadoop.hive.ql.plan.ColumnStatsWork; import org.apache.hadoop.hive.ql.plan.api.StageType; -import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -330,7 +329,7 @@ private void unpackStructObject(ObjectInspector oi, Object o, String fName, private List constructColumnStatsFromPackedRows( Hive db) throws HiveException, MetaException, IOException { - String currentDb = SessionState.get().getCurrentDatabase(); + String currentDb = work.getCurrentDatabaseName(); String tableName = work.getColStats().getTableName(); String partName = null; List colName = work.getColStats().getColName(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java index 82fbf28a0b..48a9c9a5a1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java @@ -51,7 +51,6 @@ import org.apache.hadoop.hive.ql.plan.ColumnStatsDesc; import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork; import org.apache.hadoop.hive.ql.plan.api.StageType; -import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde2.io.DateWritable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,7 +77,7 @@ public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext private ColumnStatistics constructColumnStatsFromInput() throws SemanticException, MetaException { - String dbName = SessionState.get().getCurrentDatabase(); + String dbName = work.getCurrentDatabaseName(); ColumnStatsDesc desc = work.getColStats(); String tableName = desc.getTableName(); String partName = work.getPartName(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java index 52cb445754..a51e69db3f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java @@ -21,9 +21,7 @@ import java.io.Serializable; import java.util.List; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.DriverContext; -import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.ConditionalResolver; import org.apache.hadoop.hive.ql.plan.ConditionalWork; @@ -61,6 +59,11 @@ public boolean isMapRedTask() { } @Override + public boolean canExecuteInParallel() { + return isMapRedTask(); + } + + @Override public boolean hasReduce() { for (Task task : listTasks) { if (task.hasReduce()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java index 2683f294f6..2e06ad070f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java @@ -26,7 +26,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer; import org.apache.hadoop.hive.ql.plan.CopyWork; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index acc23901d3..34d900a65d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -4916,4 +4916,13 @@ public static boolean doesTableNeedLocation(Table tbl) { } return retval; } + + /* + uses the authorizer from SessionState will need some more work to get this to run in parallel, + however this should not be a bottle neck so might not need to parallelize this. + */ + @Override + public boolean canExecuteInParallel() { + return false; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainSQRewriteTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainSQRewriteTask.java index 6fffab0751..ec5ebb07b1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainSQRewriteTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainSQRewriteTask.java @@ -202,4 +202,9 @@ public String getName() { colList.add(tmpFieldSchema); return colList; } + + @Override + public boolean canExecuteInParallel() { + return false; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java index 2dc681ede1..a9ed97a70a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java @@ -1247,4 +1247,9 @@ public String getName() { colList.add(tmpFieldSchema); return colList; } + + @Override + public boolean canExecuteInParallel() { + return false; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java index e708d58345..bde052b9d5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java @@ -193,4 +193,8 @@ public void clearFetch() throws HiveException { } } + @Override + public boolean canExecuteInParallel() { + return false; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java index 0f990e68f0..bb0fff4fb5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java @@ -357,4 +357,12 @@ public StageType getType() { public String getName() { return "FUNCTION"; } + + /** + * this needs access to session state resource downloads which in turn uses references to Registry objects. + */ + @Override + public boolean canExecuteInParallel() { + return false; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index cde2805142..5509bc05d3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -18,22 +18,12 @@ package org.apache.hadoop.hive.ql.exec; -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; - import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.HiveStatsUtils; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.io.HdfsUtils; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; @@ -68,11 +58,19 @@ import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.api.StageType; -import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; + /** * MoveTask implementation. **/ @@ -449,7 +447,7 @@ public int execute(DriverContext driverContext) { dpCtx.getNumDPCols(), isSkewedStoredAsDirs(tbd), work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID, - SessionState.get().getTxnMgr().getCurrentTxnId(), hasFollowingStatsTask(), + work.getCurrentTransactionId(), hasFollowingStatsTask(), work.getLoadTableWork().getWriteType()); // publish DP columns to its subscribers @@ -497,10 +495,10 @@ public int execute(DriverContext driverContext) { dc = new DataContainer(table.getTTable(), partn.getTPartition()); // Don't set lineage on delete as we don't have all the columns - if (SessionState.get() != null && + if (work.getLineagState() != null && work.getLoadTableWork().getWriteType() != AcidUtils.Operation.DELETE && work.getLoadTableWork().getWriteType() != AcidUtils.Operation.UPDATE) { - SessionState.get().getLineageState().setLineage(tbd.getSourcePath(), dc, + work.getLineagState().setLineage(tbd.getSourcePath(), dc, table.getCols()); } LOG.info("\tLoading partition " + entry.getKey()); @@ -531,7 +529,7 @@ public int execute(DriverContext driverContext) { } } } - if (SessionState.get() != null && dc != null) { + if (work.getLineagState() != null && dc != null) { // If we are doing an update or a delete the number of columns in the table will not // match the number of columns in the file sink. For update there will be one too many // (because of the ROW__ID), and in the case of the delete there will be just the @@ -542,14 +540,14 @@ public int execute(DriverContext driverContext) { case UPDATE: // Pass an empty list as no columns will be written to the file. // TODO I should be able to make this work for update - tableCols = new ArrayList(); + tableCols = new ArrayList<>(); break; default: tableCols = table.getCols(); break; } - SessionState.get().getLineageState().setLineage(tbd.getSourcePath(), dc, tableCols); + work.getLineagState().setLineage(tbd.getSourcePath(), dc, tableCols); } releaseLocks(tbd); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java index e1bd2918ca..18b64b5680 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java @@ -28,7 +28,6 @@ import java.util.List; import org.apache.hadoop.hive.common.metrics.common.Metrics; -import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.DriverContext; @@ -630,5 +629,7 @@ public boolean equals(Object obj) { return toString().equals(String.valueOf(obj)); } - + public boolean canExecuteInParallel(){ + return true; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java index 1bd4db7805..41a1ef11e3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java @@ -36,7 +36,6 @@ import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.exec.Operator; @@ -49,7 +48,6 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.ResourceType; import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hive.common.util.HiveStringUtils; import org.apache.hive.common.util.StreamPrinter; /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 3ebd3cc7d8..178de1d7e1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -322,4 +322,9 @@ private void dumpFunctionMetadata(String dbName, Path dumpRoot) throws Exception public StageType getType() { return StageType.REPL_DUMP; } + + @Override + public boolean canExecuteInParallel() { + return false; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java index cd31b173a3..0ff6f819a6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java @@ -19,7 +19,6 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -39,7 +38,6 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; -import org.apache.hadoop.hive.ql.plan.ImportTableDesc; import org.apache.hadoop.hive.ql.plan.api.StageType; import java.io.Serializable; @@ -69,7 +67,8 @@ public String getName() { protected int execute(DriverContext driverContext) { try { int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS); - Context context = new Context(conf, getHive()); + Context context = new Context(conf, getHive(), work.sessionStateLineageState, + work.currentTransactionId); TaskTracker loadTaskTracker = new TaskTracker(maxTasks); /* for now for simplicity we are doing just one directory ( one database ), come back to use diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java index f51afe18a1..18a5dda10f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java @@ -21,6 +21,7 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator; import org.apache.hadoop.hive.ql.plan.Explain; +import org.apache.hadoop.hive.ql.session.LineageState; import java.io.IOException; import java.io.Serializable; @@ -35,16 +36,27 @@ Licensed to the Apache Software Foundation (ASF) under one private int loadTaskRunCount = 0; private DatabaseEvent.State state = null; + /* + these are sessionState objects that are copied over to work to allow for parallel execution. + based on the current use case the methods are selectively synchronized, which might need to be + taken care when using other methods. + */ + final LineageState sessionStateLineageState; + public final long currentTransactionId; + public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoadIn, - String tableNameToLoadIn) throws IOException { + String tableNameToLoadIn, LineageState lineageState, long currentTransactionId) + throws IOException { this.tableNameToLoadIn = tableNameToLoadIn; + sessionStateLineageState = lineageState; + this.currentTransactionId = currentTransactionId; this.iterator = new BootstrapEventsIterator(dumpDirectory, dbNameToLoadIn, hiveConf); this.dbNameToLoadIn = dbNameToLoadIn; } - public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameOrPattern) - throws IOException { - this(hiveConf, dumpDirectory, dbNameOrPattern, null); + public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameOrPattern, + LineageState lineageState, long currentTransactionId) throws IOException { + this(hiveConf, dumpDirectory, dbNameOrPattern, null, lineageState, currentTransactionId); } public BootstrapEventsIterator iterator() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index c944a13c67..2d826ba06d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -227,7 +227,8 @@ private TaskTracker forNewTable() throws Exception { event.replicationSpec().isReplace() ); loadTableWork.setInheritTableSpecs(false); - MoveWork work = new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false); + MoveWork work = new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false, + context.sessionStateLineageState, context.currentTransactionId); return TaskFactory.get(work, context.hiveConf); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index a1187c4460..248ff1cc47 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -226,7 +226,8 @@ private String location(ImportTableDesc tblDesc, Database parentDb) tmpPath, Utilities.getTableDesc(table), new TreeMap<>(), replicationSpec.isReplace() ); MoveWork moveWork = - new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false); + new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false, + context.sessionStateLineageState, context.currentTransactionId); Task loadTableTask = TaskFactory.get(moveWork, context.hiveConf); copyTask.addDependentTask(loadTableTask); return copyTask; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java index 2a7cca1459..89f309069e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java @@ -21,6 +21,7 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.session.LineageState; public class Context { public final HiveConf hiveConf; @@ -28,10 +29,22 @@ Licensed to the Apache Software Foundation (ASF) under one public final Warehouse warehouse; public final PathUtils utils; - public Context(HiveConf hiveConf, Hive hiveDb) throws MetaException { + /* + these are sessionState objects that are copied over to work to allow for parallel execution. + based on the current use case the methods are selectively synchronized, which might need to be + taken care when using other methods. + */ + public final LineageState sessionStateLineageState; + public final long currentTransactionId; + + + public Context(HiveConf hiveConf, Hive hiveDb, + LineageState lineageState, long currentTransactionId) throws MetaException { this.hiveConf = hiveConf; this.hiveDb = hiveDb; this.warehouse = new Warehouse(hiveConf); this.utils = new PathUtils(hiveConf); + sessionStateLineageState = lineageState; + this.currentTransactionId = currentTransactionId; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java index 73054361f8..05b7d715dc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java @@ -430,7 +430,7 @@ public String toString() { /** * This class tracks the predicate information for an operator. */ - public static class Predicate { + public static class Predicate implements Serializable { /** * Expression string for the predicate. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java index da99c23997..0e86aac281 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java @@ -40,7 +40,6 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.Counters; import org.apache.hadoop.mapred.FileInputFormat; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index da153e36d2..496ede2c46 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -1310,7 +1310,9 @@ public static void createMRWorkForMergingFiles (FileSinkOperator fsInput, // 2. Constructing a conditional task consisting of a move task and a map reduce task // MoveWork dummyMv = new MoveWork(null, null, null, - new LoadFileDesc(fsInputDesc.getFinalDirName(), finalName, true, null, null), false); + new LoadFileDesc(fsInputDesc.getFinalDirName(), finalName, true, null, + null), false, SessionState.get().getLineageState(), + SessionState.get().getTxnMgr().getCurrentTxnId()); MapWork cplan; Serializable work; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java index 3c20532892..5557138db9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.optimizer.lineage; +import java.io.Serializable; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.LinkedHashSet; @@ -47,7 +48,7 @@ */ public class LineageCtx implements NodeProcessorCtx { - public static class Index { + public static class Index implements Serializable { /** * The map contains an index from the (operator, columnInfo) to the diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index a054abb127..69ad0f2616 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -51,7 +51,6 @@ import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.ArchiveUtils; import org.apache.hadoop.hive.ql.exec.ColumnStatsUpdateTask; -import org.apache.hadoop.hive.ql.exec.FunctionInfo; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -592,7 +591,7 @@ private void analyzeAlterTableUpdateStats(ASTNode ast, String tblName, Map() : partSpec); + partSpec == null ? new HashMap<>() : partSpec); ltd.setLbCtx(lbCtx); - Task moveTsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), + Task moveTsk = TaskFactory + .get(new MoveWork(null, null, ltd, null, false, SessionState.get().getLineageState(), + SessionState.get().getTxnMgr().getCurrentTxnId()), conf); truncateTask.addDependentTask(moveTsk); @@ -1707,7 +1708,9 @@ private void analyzeAlterTablePartMergeFiles(ASTNode ast, LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, tblDesc, partSpec == null ? new HashMap() : partSpec); ltd.setLbCtx(lbCtx); - Task moveTsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), + Task moveTsk = TaskFactory + .get(new MoveWork(null, null, ltd, null, false, SessionState.get().getLineageState(), + SessionState.get().getTxnMgr().getCurrentTxnId()), conf); mergeTask.addDependentTask(moveTsk); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 7f3460f5b2..c33e8d3dc4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -18,19 +18,6 @@ package org.apache.hadoop.hive.ql.parse; -import java.io.IOException; -import java.io.Serializable; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; - import org.antlr.runtime.tree.Tree; import org.apache.commons.lang.ObjectUtils; import org.apache.hadoop.fs.FileStatus; @@ -70,6 +57,19 @@ import org.apache.hadoop.mapred.OutputFormat; import org.slf4j.Logger; +import java.io.IOException; +import java.io.Serializable; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + /** * ImportSemanticAnalyzer. * @@ -345,10 +345,11 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, Path tmpPath = x.getCtx().getExternalTmpPath(tgtPath); Task copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, x.getConf()); LoadTableDesc loadTableWork = new LoadTableDesc(tmpPath, - Utilities.getTableDesc(table), new TreeMap(), + Utilities.getTableDesc(table), new TreeMap<>(), replace); Task loadTableTask = TaskFactory.get(new MoveWork(x.getInputs(), - x.getOutputs(), loadTableWork, null, false), x.getConf()); + x.getOutputs(), loadTableWork, null, false, SessionState.get().getLineageState(), + SessionState.get().getTxnMgr().getCurrentTxnId()), x.getConf()); copyTask.addDependentTask(loadTableTask); x.getTasks().add(copyTask); return loadTableTask; @@ -413,7 +414,9 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, partSpec.getPartSpec(), replicationSpec.isReplace()); loadTableWork.setInheritTableSpecs(false); Task loadPartTask = TaskFactory.get(new MoveWork( - x.getInputs(), x.getOutputs(), loadTableWork, null, false), + x.getInputs(), x.getOutputs(), loadTableWork, null, false, + SessionState.get().getLineageState(), + SessionState.get().getTxnMgr().getCurrentTxnId()), x.getConf()); copyTask.addDependentTask(loadPartTask); addPartTask.addDependentTask(loadPartTask); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java index fa79700df7..4ab4022643 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java @@ -20,8 +20,6 @@ import org.apache.hadoop.hive.conf.HiveConf.StrictChecks; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; - import java.io.IOException; import java.io.Serializable; import java.net.URI; @@ -54,10 +52,10 @@ import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.StatsWork; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.mapred.InputFormat; import com.google.common.collect.Lists; -import org.apache.orc.impl.OrcAcidUtils; /** * LoadSemanticAnalyzer. @@ -285,8 +283,11 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { loadTableWork.setInheritTableSpecs(false); } - Task childTask = TaskFactory.get(new MoveWork(getInputs(), - getOutputs(), loadTableWork, null, true, isLocal), conf); + Task childTask = TaskFactory.get( + new MoveWork(getInputs(), getOutputs(), loadTableWork, null, true, + isLocal, SessionState.get().getLineageState(), + SessionState.get().getTxnMgr().getCurrentTxnId()), conf + ); if (rTask != null) { rTask.addDependentTask(childTask); } else { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 7794d3e3ad..450818a619 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -17,7 +17,6 @@ Licensed to the Apache Software Foundation (ASF) under one */ package org.apache.hadoop.hive.ql.parse; -import io.netty.util.internal.StringUtil; import org.antlr.runtime.tree.Tree; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileStatus; @@ -46,6 +45,7 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.ql.plan.DDLWork; import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.ql.session.SessionState; import java.io.FileNotFoundException; import java.io.Serializable; @@ -286,7 +286,8 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { if ((!evDump) && (tblNameOrPattern != null) && !(tblNameOrPattern.isEmpty())) { ReplLoadWork replLoadWork = - new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, tblNameOrPattern); + new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, tblNameOrPattern, + SessionState.get().getLineageState(), SessionState.get().getTxnMgr().getCurrentTxnId()); rootTasks.add(TaskFactory.get(replLoadWork, conf)); return; } @@ -316,7 +317,8 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { + " does not correspond to REPL LOAD expecting to load to a singular destination point."); } - ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern); + ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, + SessionState.get().getLineageState(), SessionState.get().getTxnMgr().getCurrentTxnId()); rootTasks.add(TaskFactory.get(replLoadWork, conf)); // // for (FileStatus dir : dirsInLoadPath) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index 08a8f00e06..0a49f8a689 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -18,20 +18,8 @@ package org.apache.hadoop.hive.ql.parse; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Queue; -import java.util.Set; -import java.util.Stack; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import com.google.common.collect.Interner; +import com.google.common.collect.Interners; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.HiveStatsUtils; import org.apache.hadoop.hive.conf.HiveConf; @@ -42,7 +30,6 @@ import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.ColumnStatsTask; import org.apache.hadoop.hive.ql.exec.FetchTask; -import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.StatsTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -54,7 +41,6 @@ import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; -import org.apache.hadoop.hive.ql.optimizer.physical.AnnotateRunTimeStatsOptimizer; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.AnalyzeRewriteContext; import org.apache.hadoop.hive.ql.plan.ColumnStatsDesc; import org.apache.hadoop.hive.ql.plan.ColumnStatsWork; @@ -76,9 +62,16 @@ import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.thrift.ThriftFormatter; import org.apache.hadoop.hive.serde2.thrift.ThriftJDBCBinarySerDe; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import com.google.common.collect.Interner; -import com.google.common.collect.Interners; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; /** * TaskCompiler is a the base class for classes that compile @@ -214,7 +207,8 @@ public void compile(final ParseContext pCtx, final List tsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), conf); + Task tsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false, SessionState.get().getLineageState(), + SessionState.get().getTxnMgr().getCurrentTxnId()), conf); mvTask.add(tsk); // Check to see if we are stale'ing any indexes and auto-update them if we want if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEINDEXAUTOUPDATE)) { @@ -272,7 +266,8 @@ public void compile(final ParseContext pCtx, final List tsk : leafTasks) { tsk.addDependentTask(cStatsTask); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java index 8db28895d0..5f9041e449 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.plan; import java.io.Serializable; -import java.util.List; import java.util.Map; import org.apache.hadoop.hive.ql.plan.Explain.Level; @@ -38,12 +37,14 @@ private ColumnStatsDesc colStats; private String partName; private Map mapProp; + private String currentDatabaseName; public ColumnStatsUpdateWork(ColumnStatsDesc colStats, String partName, - Map mapProp) { + Map mapProp, String currentDatabaseName) { this.partName = partName; this.colStats = colStats; this.mapProp = mapProp; + this.currentDatabaseName = currentDatabaseName; } @Override @@ -64,4 +65,11 @@ public String getPartName() { return mapProp; } + public String getCurrentDatabaseName() { + return currentDatabaseName; + } + + public void setCurrentDatabaseName(String currentDatabaseName) { + this.currentDatabaseName = currentDatabaseName; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java index 76811b1a93..842fd1a411 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java @@ -34,15 +34,17 @@ private static final long serialVersionUID = 1L; private FetchWork fWork; private ColumnStatsDesc colStats; + private String currentDatabaseName; private static final int LIMIT = -1; public ColumnStatsWork() { } - public ColumnStatsWork(FetchWork work, ColumnStatsDesc colStats) { + public ColumnStatsWork(FetchWork work, ColumnStatsDesc colStats, String currentDatabaseName) { this.fWork = work; this.setColStats(colStats); + this.currentDatabaseName = currentDatabaseName; } @Override @@ -85,4 +87,11 @@ public static int getLimit() { return LIMIT; } + public String getCurrentDatabaseName() { + return currentDatabaseName; + } + + public void setCurrentDatabaseName(String currentDatabaseName) { + this.currentDatabaseName = currentDatabaseName; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java index 8ce211fd5f..a903a86101 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java @@ -26,7 +26,7 @@ import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.plan.Explain.Level; - +import org.apache.hadoop.hive.ql.session.LineageState; /** * MoveWork. @@ -38,6 +38,13 @@ private LoadTableDesc loadTableWork; private LoadFileDesc loadFileWork; private LoadMultiFilesDesc loadMultiFilesWork; + /* + these are sessionState objects that are copied over to work to allow for parallel execution. + based on the current use case the methods are selectively synchronized, which might need to be + taken care when using other methods. + */ + private final LineageState sessionStateLineageState; + private final long currentTransactionId; private boolean checkFileFormat; private boolean srcLocal; @@ -57,17 +64,26 @@ protected List movedParts; public MoveWork() { + sessionStateLineageState = null; + /* + fail fast. + */ + currentTransactionId = Long.MIN_VALUE; } - public MoveWork(HashSet inputs, HashSet outputs) { + private MoveWork(HashSet inputs, HashSet outputs, + LineageState lineageState, long currentTransactionId) { this.inputs = inputs; this.outputs = outputs; + sessionStateLineageState = lineageState; + this.currentTransactionId = currentTransactionId; } public MoveWork(HashSet inputs, HashSet outputs, final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork, - boolean checkFileFormat, boolean srcLocal) { - this(inputs, outputs); + boolean checkFileFormat, boolean srcLocal, LineageState lineageState, + long currentTransactionId) { + this(inputs, outputs, lineageState, currentTransactionId); this.loadTableWork = loadTableWork; this.loadFileWork = loadFileWork; this.checkFileFormat = checkFileFormat; @@ -76,8 +92,8 @@ public MoveWork(HashSet inputs, HashSet outputs, public MoveWork(HashSet inputs, HashSet outputs, final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork, - boolean checkFileFormat) { - this(inputs, outputs); + boolean checkFileFormat, LineageState lineageState, long currentTransactionId) { + this(inputs, outputs, lineageState, currentTransactionId); this.loadTableWork = loadTableWork; this.loadFileWork = loadFileWork; this.checkFileFormat = checkFileFormat; @@ -91,6 +107,8 @@ public MoveWork(final MoveWork o) { srcLocal = o.isSrcLocal(); inputs = o.getInputs(); outputs = o.getOutputs(); + sessionStateLineageState = o.sessionStateLineageState; + currentTransactionId =o.currentTransactionId; } @Explain(displayName = "tables", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) @@ -152,4 +170,11 @@ public void setSrcLocal(boolean srcLocal) { this.srcLocal = srcLocal; } + public LineageState getLineagState() { + return sessionStateLineageState; + } + + public long getCurrentTransactionId(){ + return currentTransactionId; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java index e2f2a68ff9..056d6141d6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java @@ -21,11 +21,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.io.Serializable; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.exec.ColumnInfo; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.hooks.LineageInfo; import org.apache.hadoop.hive.ql.hooks.LineageInfo.DataContainer; @@ -36,7 +36,7 @@ * lineage information for the post execution hooks. * */ -public class LineageState { +public class LineageState implements Serializable { /** * Mapping from the directory name to FileSinkOperator (may not be FileSinkOperator for views). This @@ -44,7 +44,7 @@ * time and is then later used to created the mapping from * movetask to the set of filesink operators. */ - private final Map dirToFop; + private final Map dirToFop; /** * The lineage context index for this query. @@ -60,8 +60,8 @@ /** * Constructor. */ - public LineageState() { - dirToFop = new HashMap(); + LineageState() { + dirToFop = new HashMap<>(); linfo = new LineageInfo(); index = new Index(); } @@ -72,8 +72,8 @@ public LineageState() { * @param dir The directory name. * @param fop The sink operator. */ - public void mapDirToOp(Path dir, Operator fop) { - dirToFop.put(dir, fop); + public synchronized void mapDirToOp(Path dir, Operator fop) { + dirToFop.put(dir.toUri().toString(), fop); } /** @@ -83,10 +83,10 @@ public void mapDirToOp(Path dir, Operator fop) { * @param newPath conditional input path * @param oldPath path of the old linked MoveWork */ - public void updateDirToOpMap(Path newPath, Path oldPath) { - Operator op = dirToFop.get(oldPath); + public synchronized void updateDirToOpMap(Path newPath, Path oldPath) { + Operator op = dirToFop.get(oldPath.toUri().toString()); if (op != null) { - dirToFop.put(newPath, op); + dirToFop.put(newPath.toUri().toString(), op); } } @@ -97,10 +97,10 @@ public void updateDirToOpMap(Path newPath, Path oldPath) { * @param dc The associated data container. * @param cols The list of columns. */ - public void setLineage(Path dir, DataContainer dc, + public synchronized void setLineage(Path dir, DataContainer dc, List cols) { // First lookup the file sink operator from the load work. - Operator op = dirToFop.get(dir); + Operator op = dirToFop.get(dir.toUri().toString()); // Go over the associated fields and look up the dependencies // by position in the row schema of the filesink operator. @@ -136,7 +136,7 @@ public Index getIndex() { /** * Clear all lineage states */ - public void clear() { + public synchronized void clear() { dirToFop.clear(); linfo.clear(); index.clear();