diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index cf3f50ba64..6863eeacbf 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -429,6 +429,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "HDFS root scratch dir for Hive jobs which gets created with write all (733) permission. " + "For each connecting user, an HDFS scratch dir: ${hive.exec.scratchdir}/ is created, " + "with ${hive.scratch.dir.permission}."), + PRINT_TASK_GRAPH("hive.print.task.graph", false, "This can be enabled to print the task graph" + + "generated for the query. This should not be enabled by default as it will slow down the " + + "execution of queries and should only be used for debug/tracing purpose. The image file will" + + "be generated under /tmp/{current-query-id}."), REPLDIR("hive.repl.rootdir","/user/hive/repl/", "HDFS root dir for all replication dumps."), REPLCMENABLED("hive.repl.cm.enabled", false, diff --git a/ql/pom.xml b/ql/pom.xml index 16b88d687c..f3f1d82e54 100644 --- a/ql/pom.xml +++ b/ql/pom.xml @@ -33,7 +33,13 @@ - + + org.graphstream + gs-core + 1.3 + + + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 4e7c80f184..a9467e88f3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -39,7 +39,6 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang.StringUtils; - import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.ValidReadTxnList; @@ -90,6 +89,7 @@ import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.ColumnAccessInfo; +import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState; import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHook; import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext; import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContextImpl; @@ -100,7 +100,6 @@ import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory; -import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.HiveOperation; import org.apache.hadoop.hive.ql.plan.TableDesc; @@ -108,22 +107,21 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.security.authorization.AuthorizationUtils; import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; +import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzContext; import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType; import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject; import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivObjectActionType; import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivilegeObjectType; -import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzContext; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.ql.util.TasksGraph; import org.apache.hadoop.hive.serde2.ByteStream; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.MRJobConfig; - import org.apache.hive.common.util.ShutdownHookManager; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1805,9 +1803,11 @@ public int execute(boolean deferClose) throws CommandNeedRetryException { ctx.setHDFSCleanup(true); this.driverCxt = driverCxt; // for canceling the query (should be bound to session?) - SessionState.get().setMapRedStats(new LinkedHashMap()); - SessionState.get().setStackTraces(new HashMap>>()); - SessionState.get().setLocalMapRedErrors(new HashMap>()); + SessionState.get().setMapRedStats(new LinkedHashMap<>()); + SessionState.get().setStackTraces(new HashMap<>()); + SessionState.get().setLocalMapRedErrors(new HashMap<>()); + + new TasksGraph(conf, queryId, plan.getRootTasks()).prettyPrint(); // Add root Tasks to runnable for (Task tsk : plan.getRootTasks()) { @@ -2159,7 +2159,7 @@ private TaskRunner launchTask(Task tsk, String queryId, cxt.launching(tskRun); // Launch Task - if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL) && tsk.isMapRedTask()) { + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL) && tsk.canExecuteInParallel()) { // Launch it in the parallel mode, as a separate thread only for MR tasks if (LOG.isInfoEnabled()){ LOG.info("Starting task [" + tsk + "] in parallel"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java index f43992c85d..583d3d3893 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java @@ -160,7 +160,7 @@ public synchronized void shutdown() { public static boolean isLaunchable(Task tsk) { // A launchable task is one that hasn't been queued, hasn't been // initialized, and is runnable. - return !tsk.getQueued() && !tsk.getInitialized() && tsk.isRunnable(); + return tsk.isNotInitialized() && tsk.isRunnable(); } public synchronized boolean addToRunnable(Task tsk) throws HiveException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java index 2b2c004fea..e3b0d1eaea 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java @@ -53,7 +53,6 @@ import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState; import org.apache.hadoop.hive.ql.plan.ColumnStatsWork; import org.apache.hadoop.hive.ql.plan.api.StageType; -import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -330,7 +329,7 @@ private void unpackStructObject(ObjectInspector oi, Object o, String fName, private List constructColumnStatsFromPackedRows( Hive db) throws HiveException, MetaException, IOException { - String currentDb = SessionState.get().getCurrentDatabase(); + String currentDb = work.getCurrentDatabaseName(); String tableName = work.getColStats().getTableName(); String partName = null; List colName = work.getColStats().getColName(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java index 82fbf28a0b..48a9c9a5a1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java @@ -51,7 +51,6 @@ import org.apache.hadoop.hive.ql.plan.ColumnStatsDesc; import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork; import org.apache.hadoop.hive.ql.plan.api.StageType; -import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde2.io.DateWritable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,7 +77,7 @@ public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext private ColumnStatistics constructColumnStatsFromInput() throws SemanticException, MetaException { - String dbName = SessionState.get().getCurrentDatabase(); + String dbName = work.getCurrentDatabaseName(); ColumnStatsDesc desc = work.getColStats(); String tableName = desc.getTableName(); String partName = work.getPartName(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java index 52cb445754..a51e69db3f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java @@ -21,9 +21,7 @@ import java.io.Serializable; import java.util.List; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.DriverContext; -import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.ConditionalResolver; import org.apache.hadoop.hive.ql.plan.ConditionalWork; @@ -61,6 +59,11 @@ public boolean isMapRedTask() { } @Override + public boolean canExecuteInParallel() { + return isMapRedTask(); + } + + @Override public boolean hasReduce() { for (Task task : listTasks) { if (task.hasReduce()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java index 2683f294f6..2e06ad070f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java @@ -26,7 +26,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer; import org.apache.hadoop.hive.ql.plan.CopyWork; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 646bb23f2e..70d18cd830 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -4931,4 +4931,13 @@ public static boolean doesTableNeedLocation(Table tbl) { } return retval; } + + /* + uses the authorizer from SessionState will need some more work to get this to run in parallel, + however this should not be a bottle neck so might not need to parallelize this. + */ + @Override + public boolean canExecuteInParallel() { + return false; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainSQRewriteTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainSQRewriteTask.java index 6fffab0751..ec5ebb07b1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainSQRewriteTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainSQRewriteTask.java @@ -202,4 +202,9 @@ public String getName() { colList.add(tmpFieldSchema); return colList; } + + @Override + public boolean canExecuteInParallel() { + return false; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java index 2dc681ede1..a9ed97a70a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java @@ -1247,4 +1247,9 @@ public String getName() { colList.add(tmpFieldSchema); return colList; } + + @Override + public boolean canExecuteInParallel() { + return false; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java index e708d58345..bde052b9d5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java @@ -193,4 +193,8 @@ public void clearFetch() throws HiveException { } } + @Override + public boolean canExecuteInParallel() { + return false; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java index 0f990e68f0..bb0fff4fb5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java @@ -357,4 +357,12 @@ public StageType getType() { public String getName() { return "FUNCTION"; } + + /** + * this needs access to session state resource downloads which in turn uses references to Registry objects. + */ + @Override + public boolean canExecuteInParallel() { + return false; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index cde2805142..9600980d83 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -18,22 +18,12 @@ package org.apache.hadoop.hive.ql.exec; -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; - import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.HiveStatsUtils; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.io.HdfsUtils; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; @@ -68,11 +58,19 @@ import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.api.StageType; -import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; + /** * MoveTask implementation. **/ @@ -449,7 +447,7 @@ public int execute(DriverContext driverContext) { dpCtx.getNumDPCols(), isSkewedStoredAsDirs(tbd), work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID, - SessionState.get().getTxnMgr().getCurrentTxnId(), hasFollowingStatsTask(), + work.getLoadTableWork().getCurrentTransactionId(), hasFollowingStatsTask(), work.getLoadTableWork().getWriteType()); // publish DP columns to its subscribers @@ -497,10 +495,10 @@ public int execute(DriverContext driverContext) { dc = new DataContainer(table.getTTable(), partn.getTPartition()); // Don't set lineage on delete as we don't have all the columns - if (SessionState.get() != null && + if (work.getLineagState() != null && work.getLoadTableWork().getWriteType() != AcidUtils.Operation.DELETE && work.getLoadTableWork().getWriteType() != AcidUtils.Operation.UPDATE) { - SessionState.get().getLineageState().setLineage(tbd.getSourcePath(), dc, + work.getLineagState().setLineage(tbd.getSourcePath(), dc, table.getCols()); } LOG.info("\tLoading partition " + entry.getKey()); @@ -531,7 +529,7 @@ public int execute(DriverContext driverContext) { } } } - if (SessionState.get() != null && dc != null) { + if (work.getLineagState() != null && dc != null) { // If we are doing an update or a delete the number of columns in the table will not // match the number of columns in the file sink. For update there will be one too many // (because of the ROW__ID), and in the case of the delete there will be just the @@ -542,14 +540,14 @@ public int execute(DriverContext driverContext) { case UPDATE: // Pass an empty list as no columns will be written to the file. // TODO I should be able to make this work for update - tableCols = new ArrayList(); + tableCols = new ArrayList<>(); break; default: tableCols = table.getCols(); break; } - SessionState.get().getLineageState().setLineage(tbd.getSourcePath(), dc, tableCols); + work.getLineagState().setLineage(tbd.getSourcePath(), dc, tableCols); } releaseLocks(tbd); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/NodeUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/NodeUtils.java index 249ffb35ec..a9f9fe2b32 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/NodeUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/NodeUtils.java @@ -20,6 +20,7 @@ import org.apache.hadoop.hive.ql.lib.Node; +import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -33,7 +34,7 @@ public class NodeUtils { - public static void iterateTask(Collection> tasks, Class clazz, Function function) { + public static void iterateTask(Collection> tasks, Class clazz, Function function) { // Does a breadth first traversal of the tasks Set visited = new HashSet(); while (!tasks.isEmpty()) { @@ -42,7 +43,7 @@ return; } - private static Collection> iterateTask(Collection> tasks, + private static Collection> iterateTask(Collection> tasks, Class clazz, Function function, Set visited) { @@ -92,7 +93,7 @@ return childListNodes; } - public static interface Function { + public interface Function { void apply(T argument); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java index e1bd2918ca..c881199a61 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java @@ -18,17 +18,7 @@ package org.apache.hadoop.hive.ql.exec; -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; - import org.apache.hadoop.hive.common.metrics.common.Metrics; -import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.DriverContext; @@ -47,6 +37,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; + /** * Task implementation. **/ @@ -114,6 +113,10 @@ public String getStatusMessage() { DYNAMIC_PARTITIONS, // list of dynamic partitions } public enum TaskState { + // Task state is unkown + UNKNOWN, + // Task is just created + CREATED, // Task data structures have been initialized INITIALIZED, // Task has been queued for execution by the driver @@ -121,11 +124,7 @@ public String getStatusMessage() { // Task is currently running RUNNING, // Task has completed - FINISHED, - // Task is just created - CREATED, - // Task state is unkown - UNKNOWN + FINISHED } // Bean methods @@ -366,38 +365,44 @@ public void removeDependentTask(Task dependent) { } } } - public void setStarted() { + + public synchronized void setStarted() { setState(TaskState.RUNNING); } - public boolean started() { + public synchronized boolean started() { return taskState == TaskState.RUNNING; } - public boolean done() { + public synchronized boolean done() { return taskState == TaskState.FINISHED; } - public void setDone() { + public synchronized void setDone() { setState(TaskState.FINISHED); } - public void setQueued() { + public synchronized void setQueued() { setState(TaskState.QUEUED); } - public boolean getQueued() { + public synchronized boolean getQueued() { return taskState == TaskState.QUEUED; } - public void setInitialized() { + public synchronized void setInitialized() { setState(TaskState.INITIALIZED); } - public boolean getInitialized() { + public synchronized boolean getInitialized() { return taskState == TaskState.INITIALIZED; } + public synchronized boolean isNotInitialized() { + return taskState.ordinal() < TaskState.INITIALIZED.ordinal(); + } + + public boolean isRunnable() { boolean isrunnable = true; if (parentTasks != null) { @@ -630,5 +635,7 @@ public boolean equals(Object obj) { return toString().equals(String.valueOf(obj)); } - + public boolean canExecuteInParallel(){ + return true; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java index 1bd4db7805..41a1ef11e3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java @@ -36,7 +36,6 @@ import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.exec.Operator; @@ -49,7 +48,6 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.ResourceType; import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hive.common.util.HiveStringUtils; import org.apache.hive.common.util.StreamPrinter; /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 7703f31a37..ac33f2421f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -322,4 +322,9 @@ private void dumpFunctionMetadata(String dbName, Path dumpRoot) throws Exception public StageType getType() { return StageType.REPL_DUMP; } + + @Override + public boolean canExecuteInParallel() { + return false; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java index cd31b173a3..e85d5508b9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java @@ -19,7 +19,6 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -39,12 +38,13 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; -import org.apache.hadoop.hive.ql.plan.ImportTableDesc; import org.apache.hadoop.hive.ql.plan.api.StageType; +import org.apache.hadoop.hive.ql.util.TasksGraph; import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import java.util.UUID; import static org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase.AlterDatabase; @@ -69,7 +69,8 @@ public String getName() { protected int execute(DriverContext driverContext) { try { int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS); - Context context = new Context(conf, getHive()); + Context context = new Context(conf, getHive(), work.sessionStateLineageState, + work.currentTransactionId); TaskTracker loadTaskTracker = new TaskTracker(maxTasks); /* for now for simplicity we are doing just one directory ( one database ), come back to use @@ -186,6 +187,13 @@ a database ( directory ) work.updateDbEventState(null); } this.childTasks = scope.rootTasks; + /* + Since there can be multiple rounds of this run all of which will be tied to the same + query id -- generated in compile phase , adding a additional UUID to the end to print each run + in separate files. + */ + String queryId = this.getQueryPlan().getQueryId() + UUID.randomUUID().toString(); + new TasksGraph(context.hiveConf, queryId, scope.rootTasks).prettyPrint(); LOG.info("Root Tasks / Total Tasks : {} / {} ", childTasks.size(), loadTaskTracker.numberOfTasks()); } catch (Exception e) { LOG.error("failed replication", e); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java index f51afe18a1..18a5dda10f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java @@ -21,6 +21,7 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator; import org.apache.hadoop.hive.ql.plan.Explain; +import org.apache.hadoop.hive.ql.session.LineageState; import java.io.IOException; import java.io.Serializable; @@ -35,16 +36,27 @@ Licensed to the Apache Software Foundation (ASF) under one private int loadTaskRunCount = 0; private DatabaseEvent.State state = null; + /* + these are sessionState objects that are copied over to work to allow for parallel execution. + based on the current use case the methods are selectively synchronized, which might need to be + taken care when using other methods. + */ + final LineageState sessionStateLineageState; + public final long currentTransactionId; + public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoadIn, - String tableNameToLoadIn) throws IOException { + String tableNameToLoadIn, LineageState lineageState, long currentTransactionId) + throws IOException { this.tableNameToLoadIn = tableNameToLoadIn; + sessionStateLineageState = lineageState; + this.currentTransactionId = currentTransactionId; this.iterator = new BootstrapEventsIterator(dumpDirectory, dbNameToLoadIn, hiveConf); this.dbNameToLoadIn = dbNameToLoadIn; } - public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameOrPattern) - throws IOException { - this(hiveConf, dumpDirectory, dbNameOrPattern, null); + public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameOrPattern, + LineageState lineageState, long currentTransactionId) throws IOException { + this(hiveConf, dumpDirectory, dbNameOrPattern, null, lineageState, currentTransactionId); } public BootstrapEventsIterator iterator() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index c944a13c67..85d17eb6fd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -224,10 +224,10 @@ private TaskTracker forNewTable() throws Exception { Path tmpPath) { LoadTableDesc loadTableWork = new LoadTableDesc( tmpPath, Utilities.getTableDesc(table), partSpec.getPartSpec(), - event.replicationSpec().isReplace() - ); + event.replicationSpec().isReplace()); loadTableWork.setInheritTableSpecs(false); - MoveWork work = new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false); + MoveWork work = new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false, + context.sessionStateLineageState); return TaskFactory.get(work, context.hiveConf); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index a1187c4460..b49062117e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -223,10 +223,10 @@ private String location(ImportTableDesc tblDesc, Database parentDb) ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, context.hiveConf); LoadTableDesc loadTableWork = new LoadTableDesc( - tmpPath, Utilities.getTableDesc(table), new TreeMap<>(), replicationSpec.isReplace() - ); + tmpPath, Utilities.getTableDesc(table), new TreeMap<>(), replicationSpec.isReplace()); MoveWork moveWork = - new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false); + new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false, + context.sessionStateLineageState); Task loadTableTask = TaskFactory.get(moveWork, context.hiveConf); copyTask.addDependentTask(loadTableTask); return copyTask; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java index 2a7cca1459..89f309069e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java @@ -21,6 +21,7 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.session.LineageState; public class Context { public final HiveConf hiveConf; @@ -28,10 +29,22 @@ Licensed to the Apache Software Foundation (ASF) under one public final Warehouse warehouse; public final PathUtils utils; - public Context(HiveConf hiveConf, Hive hiveDb) throws MetaException { + /* + these are sessionState objects that are copied over to work to allow for parallel execution. + based on the current use case the methods are selectively synchronized, which might need to be + taken care when using other methods. + */ + public final LineageState sessionStateLineageState; + public final long currentTransactionId; + + + public Context(HiveConf hiveConf, Hive hiveDb, + LineageState lineageState, long currentTransactionId) throws MetaException { this.hiveConf = hiveConf; this.hiveDb = hiveDb; this.warehouse = new Warehouse(hiveConf); this.utils = new PathUtils(hiveConf); + sessionStateLineageState = lineageState; + this.currentTransactionId = currentTransactionId; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java index 73054361f8..05b7d715dc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java @@ -430,7 +430,7 @@ public String toString() { /** * This class tracks the predicate information for an operator. */ - public static class Predicate { + public static class Predicate implements Serializable { /** * Expression string for the predicate. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java index da99c23997..0e86aac281 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java @@ -40,7 +40,6 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.Counters; import org.apache.hadoop.mapred.FileInputFormat; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index da153e36d2..25cca7b4be 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -75,6 +75,7 @@ import org.apache.hadoop.hive.ql.io.orc.OrcFileStripeMergeInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileBlockMergeInputFormat; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; @@ -1222,7 +1223,6 @@ public static void replaceMapWork(String sourceAlias, String targetAlias, /** * @param fsInput The FileSink operator. - * @param ctx The MR processing context. * @param finalName the final destination path the merge job should output. * @param dependencyTask * @param mvTasks @@ -1309,8 +1309,10 @@ public static void createMRWorkForMergingFiles (FileSinkOperator fsInput, // // 2. Constructing a conditional task consisting of a move task and a map reduce task // + HiveTxnManager txnMgr = SessionState.get().getTxnMgr(); MoveWork dummyMv = new MoveWork(null, null, null, - new LoadFileDesc(fsInputDesc.getFinalDirName(), finalName, true, null, null), false); + new LoadFileDesc(fsInputDesc.getFinalDirName(), finalName, true, null, + null), false, SessionState.get().getLineageState()); MapWork cplan; Serializable work; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java index 3c20532892..5557138db9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.optimizer.lineage; +import java.io.Serializable; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.LinkedHashSet; @@ -47,7 +48,7 @@ */ public class LineageCtx implements NodeProcessorCtx { - public static class Index { + public static class Index implements Serializable { /** * The map contains an index from the (operator, columnInfo) to the diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index 230ca47e4a..386dd9a986 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -591,7 +591,7 @@ private void analyzeAlterTableUpdateStats(ASTNode ast, String tblName, Map() : partSpec); + partSpec == null ? new HashMap<>() : partSpec); ltd.setLbCtx(lbCtx); - Task moveTsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), + Task moveTsk = TaskFactory + .get(new MoveWork(null, null, ltd, null, false, SessionState.get().getLineageState()), conf); truncateTask.addDependentTask(moveTsk); @@ -1706,7 +1707,8 @@ private void analyzeAlterTablePartMergeFiles(ASTNode ast, LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, tblDesc, partSpec == null ? new HashMap<>() : partSpec); ltd.setLbCtx(lbCtx); - Task moveTsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), + Task moveTsk = TaskFactory + .get(new MoveWork(null, null, ltd, null, false, SessionState.get().getLineageState()), conf); mergeTask.addDependentTask(moveTsk); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index aa4c660c26..7185be28c6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -18,19 +18,6 @@ package org.apache.hadoop.hive.ql.parse; -import java.io.IOException; -import java.io.Serializable; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; - import org.antlr.runtime.tree.Tree; import org.apache.commons.lang.ObjectUtils; import org.apache.hadoop.fs.FileStatus; @@ -71,6 +58,19 @@ import org.apache.hadoop.mapred.OutputFormat; import org.slf4j.Logger; +import java.io.IOException; +import java.io.Serializable; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + /** * ImportSemanticAnalyzer. * @@ -350,10 +350,11 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, Path tmpPath = x.getCtx().getExternalTmpPath(tgtPath); Task copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, x.getConf()); LoadTableDesc loadTableWork = new LoadTableDesc(tmpPath, - Utilities.getTableDesc(table), new TreeMap(), + Utilities.getTableDesc(table), new TreeMap<>(), replace); Task loadTableTask = TaskFactory.get(new MoveWork(x.getInputs(), - x.getOutputs(), loadTableWork, null, false), x.getConf()); + x.getOutputs(), loadTableWork, null, false, SessionState.get().getLineageState()), + x.getConf()); copyTask.addDependentTask(loadTableTask); x.getTasks().add(copyTask); return loadTableTask; @@ -418,7 +419,8 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, partSpec.getPartSpec(), replicationSpec.isReplace()); loadTableWork.setInheritTableSpecs(false); Task loadPartTask = TaskFactory.get(new MoveWork( - x.getInputs(), x.getOutputs(), loadTableWork, null, false), + x.getInputs(), x.getOutputs(), loadTableWork, null, false, + SessionState.get().getLineageState()), x.getConf()); copyTask.addDependentTask(loadPartTask); addPartTask.addDependentTask(loadPartTask); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java index fa79700df7..8879b80909 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java @@ -20,8 +20,6 @@ import org.apache.hadoop.hive.conf.HiveConf.StrictChecks; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; - import java.io.IOException; import java.io.Serializable; import java.net.URI; @@ -54,10 +52,10 @@ import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.StatsWork; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.mapred.InputFormat; import com.google.common.collect.Lists; -import org.apache.orc.impl.OrcAcidUtils; /** * LoadSemanticAnalyzer. @@ -285,8 +283,10 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { loadTableWork.setInheritTableSpecs(false); } - Task childTask = TaskFactory.get(new MoveWork(getInputs(), - getOutputs(), loadTableWork, null, true, isLocal), conf); + Task childTask = TaskFactory.get( + new MoveWork(getInputs(), getOutputs(), loadTableWork, null, true, + isLocal, SessionState.get().getLineageState()), conf + ); if (rTask != null) { rTask.addDependentTask(childTask); } else { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 7794d3e3ad..450818a619 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -17,7 +17,6 @@ Licensed to the Apache Software Foundation (ASF) under one */ package org.apache.hadoop.hive.ql.parse; -import io.netty.util.internal.StringUtil; import org.antlr.runtime.tree.Tree; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileStatus; @@ -46,6 +45,7 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.ql.plan.DDLWork; import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.ql.session.SessionState; import java.io.FileNotFoundException; import java.io.Serializable; @@ -286,7 +286,8 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { if ((!evDump) && (tblNameOrPattern != null) && !(tblNameOrPattern.isEmpty())) { ReplLoadWork replLoadWork = - new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, tblNameOrPattern); + new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, tblNameOrPattern, + SessionState.get().getLineageState(), SessionState.get().getTxnMgr().getCurrentTxnId()); rootTasks.add(TaskFactory.get(replLoadWork, conf)); return; } @@ -316,7 +317,8 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { + " does not correspond to REPL LOAD expecting to load to a singular destination point."); } - ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern); + ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, + SessionState.get().getLineageState(), SessionState.get().getTxnMgr().getCurrentTxnId()); rootTasks.add(TaskFactory.get(replLoadWork, conf)); // // for (FileStatus dir : dirsInLoadPath) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 6f379da1e3..59399291b9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -6898,7 +6898,10 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest); checkAcidConstraints(qb, table_desc, dest_tab); } - ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp); + Long currentTransactionId = acidOp == Operation.NOT_ACID ? null : + SessionState.get().getTxnMgr().getCurrentTxnId(); + ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp, + currentTransactionId); // For Acid table, Insert Overwrite shouldn't replace the table content. We keep the old // deltas and base and leave them up to the cleaner to clean up ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(), @@ -7013,7 +7016,10 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest); checkAcidConstraints(qb, table_desc, dest_tab); } - ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), acidOp); + Long currentTransactionId = (acidOp == Operation.NOT_ACID) ? null : + SessionState.get().getTxnMgr().getCurrentTxnId(); + ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), acidOp, + currentTransactionId); // For Acid table, Insert Overwrite shouldn't replace the table content. We keep the old // deltas and base and leave them up to the cleaner to clean up ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index 08a8f00e06..f0089fc11e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -18,20 +18,8 @@ package org.apache.hadoop.hive.ql.parse; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Queue; -import java.util.Set; -import java.util.Stack; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import com.google.common.collect.Interner; +import com.google.common.collect.Interners; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.HiveStatsUtils; import org.apache.hadoop.hive.conf.HiveConf; @@ -42,7 +30,6 @@ import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.ColumnStatsTask; import org.apache.hadoop.hive.ql.exec.FetchTask; -import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.StatsTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -54,7 +41,6 @@ import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; -import org.apache.hadoop.hive.ql.optimizer.physical.AnnotateRunTimeStatsOptimizer; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.AnalyzeRewriteContext; import org.apache.hadoop.hive.ql.plan.ColumnStatsDesc; import org.apache.hadoop.hive.ql.plan.ColumnStatsWork; @@ -76,9 +62,16 @@ import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.thrift.ThriftFormatter; import org.apache.hadoop.hive.serde2.thrift.ThriftJDBCBinarySerDe; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import com.google.common.collect.Interner; -import com.google.common.collect.Interners; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; /** * TaskCompiler is a the base class for classes that compile @@ -107,7 +100,7 @@ public void compile(final ParseContext pCtx, final List> mvTask = new ArrayList>(); + List> mvTask = new ArrayList<>(); List loadTableWork = pCtx.getLoadTableWork(); List loadFileWork = pCtx.getLoadFileWork(); @@ -214,7 +207,9 @@ public void compile(final ParseContext pCtx, final List tsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), conf); + Task tsk = TaskFactory + .get(new MoveWork(null, null, ltd, null, false, SessionState.get().getLineageState()), + conf); mvTask.add(tsk); // Check to see if we are stale'ing any indexes and auto-update them if we want if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEINDEXAUTOUPDATE)) { @@ -258,9 +253,7 @@ public void compile(final ParseContext pCtx, final List> leafTasks = new LinkedHashSet>(); + Set> leafTasks = new LinkedHashSet<>(); getLeafTasks(rootTasks, leafTasks); if (isCStats) { genColumnStatsTask(pCtx.getAnalyzeRewrite(), loadFileWork, leafTasks, outerQueryLimit, 0); @@ -384,7 +379,7 @@ private void patchUpAfterCTASorMaterializedView(final List> leaves = - new LinkedHashSet>(); + new LinkedHashSet<>(); getLeafTasks(rootTasks, leaves); assert (leaves.size() > 0); for (Task task : leaves) { @@ -411,19 +406,14 @@ private void patchUpAfterCTASorMaterializedView(final List loadFileWork, Set> leafTasks, int outerQueryLimit, int numBitVector) { - ColumnStatsTask cStatsTask = null; - ColumnStatsWork cStatsWork = null; - FetchWork fetch = null; + ColumnStatsTask cStatsTask; + ColumnStatsWork cStatsWork; + FetchWork fetch; String tableName = analyzeRewrite.getTableName(); List colName = analyzeRewrite.getColName(); List colType = analyzeRewrite.getColType(); @@ -450,7 +440,7 @@ protected void genColumnStatsTask(AnalyzeRewriteContext analyzeRewrite, ColumnStatsDesc cStatsDesc = new ColumnStatsDesc(tableName, colName, colType, isTblLevel, numBitVector); - cStatsWork = new ColumnStatsWork(fetch, cStatsDesc); + cStatsWork = new ColumnStatsWork(fetch, cStatsDesc, SessionState.get().getCurrentDatabase()); cStatsTask = (ColumnStatsTask) TaskFactory.get(cStatsWork, conf); for (Task tsk : leafTasks) { tsk.addDependentTask(cStatsTask); @@ -461,7 +451,7 @@ protected void genColumnStatsTask(AnalyzeRewriteContext analyzeRewrite, /** * Find all leaf tasks of the list of root tasks. */ - protected void getLeafTasks(List> rootTasks, + private void getLeafTasks(List> rootTasks, Set> leaves) { for (Task root : rootTasks) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java index 8db28895d0..5f9041e449 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.plan; import java.io.Serializable; -import java.util.List; import java.util.Map; import org.apache.hadoop.hive.ql.plan.Explain.Level; @@ -38,12 +37,14 @@ private ColumnStatsDesc colStats; private String partName; private Map mapProp; + private String currentDatabaseName; public ColumnStatsUpdateWork(ColumnStatsDesc colStats, String partName, - Map mapProp) { + Map mapProp, String currentDatabaseName) { this.partName = partName; this.colStats = colStats; this.mapProp = mapProp; + this.currentDatabaseName = currentDatabaseName; } @Override @@ -64,4 +65,11 @@ public String getPartName() { return mapProp; } + public String getCurrentDatabaseName() { + return currentDatabaseName; + } + + public void setCurrentDatabaseName(String currentDatabaseName) { + this.currentDatabaseName = currentDatabaseName; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java index 76811b1a93..842fd1a411 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java @@ -34,15 +34,17 @@ private static final long serialVersionUID = 1L; private FetchWork fWork; private ColumnStatsDesc colStats; + private String currentDatabaseName; private static final int LIMIT = -1; public ColumnStatsWork() { } - public ColumnStatsWork(FetchWork work, ColumnStatsDesc colStats) { + public ColumnStatsWork(FetchWork work, ColumnStatsDesc colStats, String currentDatabaseName) { this.fWork = work; this.setColStats(colStats); + this.currentDatabaseName = currentDatabaseName; } @Override @@ -85,4 +87,11 @@ public static int getLimit() { return LIMIT; } + public String getCurrentDatabaseName() { + return currentDatabaseName; + } + + public void setCurrentDatabaseName(String currentDatabaseName) { + this.currentDatabaseName = currentDatabaseName; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java index aa778504ba..c08498dcc7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java @@ -41,6 +41,10 @@ // Need to remember whether this is an acid compliant operation, and if so whether it is an // insert, update, or delete. private AcidUtils.Operation writeType; + /* + if the writeType above is NOT_ACID then the currentTransactionId will be null + */ + private final Long currentTransactionId; // TODO: the below seems like they should just be combined into partitionDesc private org.apache.hadoop.hive.ql.plan.TableDesc table; @@ -54,16 +58,18 @@ public LoadTableDesc(final LoadTableDesc o) { this.lbCtx = o.lbCtx; this.inheritTableSpecs = o.inheritTableSpecs; this.writeType = o.writeType; + this.currentTransactionId = o.currentTransactionId; this.table = o.table; this.partitionSpec = o.partitionSpec; } public LoadTableDesc(final Path sourcePath, - final org.apache.hadoop.hive.ql.plan.TableDesc table, + final TableDesc table, final Map partitionSpec, final boolean replace, - final AcidUtils.Operation writeType) { + final AcidUtils.Operation writeType, Long currentTransactionId) { super(sourcePath); + this.currentTransactionId = currentTransactionId; init(table, partitionSpec, replace, writeType); } @@ -75,17 +81,18 @@ public LoadTableDesc(final Path sourcePath, * @param replace */ public LoadTableDesc(final Path sourcePath, - final TableDesc table, - final Map partitionSpec, - final boolean replace) { - this(sourcePath, table, partitionSpec, replace, AcidUtils.Operation.NOT_ACID); + final TableDesc table, + final Map partitionSpec, + final boolean replace) { + this(sourcePath, table, partitionSpec, replace, AcidUtils.Operation.NOT_ACID, + null); } public LoadTableDesc(final Path sourcePath, - final org.apache.hadoop.hive.ql.plan.TableDesc table, + final TableDesc table, final Map partitionSpec, - final AcidUtils.Operation writeType) { - this(sourcePath, table, partitionSpec, true, writeType); + final AcidUtils.Operation writeType, Long currentTransactionId) { + this(sourcePath, table, partitionSpec, true, writeType, currentTransactionId); } /** @@ -95,21 +102,22 @@ public LoadTableDesc(final Path sourcePath, * @param partitionSpec */ public LoadTableDesc(final Path sourcePath, - final org.apache.hadoop.hive.ql.plan.TableDesc table, - final Map partitionSpec) { - this(sourcePath, table, partitionSpec, true, AcidUtils.Operation.NOT_ACID); + final TableDesc table, + final Map partitionSpec) { + this(sourcePath, table, partitionSpec, true, AcidUtils.Operation.NOT_ACID, null); } public LoadTableDesc(final Path sourcePath, - final org.apache.hadoop.hive.ql.plan.TableDesc table, + final TableDesc table, final DynamicPartitionCtx dpCtx, - final AcidUtils.Operation writeType) { + final AcidUtils.Operation writeType, Long currentTransactionId) { super(sourcePath); this.dpCtx = dpCtx; + this.currentTransactionId = currentTransactionId; if (dpCtx != null && dpCtx.getPartSpec() != null && partitionSpec == null) { init(table, dpCtx.getPartSpec(), true, writeType); } else { - init(table, new LinkedHashMap(), true, writeType); + init(table, new LinkedHashMap<>(), true, writeType); } } @@ -184,4 +192,8 @@ public void setLbCtx(ListBucketingCtx lbCtx) { public AcidUtils.Operation getWriteType() { return writeType; } + + public long getCurrentTransactionId() { + return writeType == AcidUtils.Operation.NOT_ACID ? 0L : currentTransactionId; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java index 8ce211fd5f..00c0ce373d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java @@ -26,7 +26,7 @@ import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.plan.Explain.Level; - +import org.apache.hadoop.hive.ql.session.LineageState; /** * MoveWork. @@ -38,6 +38,12 @@ private LoadTableDesc loadTableWork; private LoadFileDesc loadFileWork; private LoadMultiFilesDesc loadMultiFilesWork; + /* + these are sessionState objects that are copied over to work to allow for parallel execution. + based on the current use case the methods are selectively synchronized, which might need to be + taken care when using other methods. + */ + private final LineageState sessionStateLineageState; private boolean checkFileFormat; private boolean srcLocal; @@ -57,17 +63,20 @@ protected List movedParts; public MoveWork() { + sessionStateLineageState = null; } - public MoveWork(HashSet inputs, HashSet outputs) { + private MoveWork(HashSet inputs, HashSet outputs, + LineageState lineageState) { this.inputs = inputs; this.outputs = outputs; + sessionStateLineageState = lineageState; } public MoveWork(HashSet inputs, HashSet outputs, final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork, - boolean checkFileFormat, boolean srcLocal) { - this(inputs, outputs); + boolean checkFileFormat, boolean srcLocal, LineageState lineageState) { + this(inputs, outputs, lineageState); this.loadTableWork = loadTableWork; this.loadFileWork = loadFileWork; this.checkFileFormat = checkFileFormat; @@ -76,8 +85,8 @@ public MoveWork(HashSet inputs, HashSet outputs, public MoveWork(HashSet inputs, HashSet outputs, final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork, - boolean checkFileFormat) { - this(inputs, outputs); + boolean checkFileFormat, LineageState lineageState) { + this(inputs, outputs, lineageState); this.loadTableWork = loadTableWork; this.loadFileWork = loadFileWork; this.checkFileFormat = checkFileFormat; @@ -91,6 +100,7 @@ public MoveWork(final MoveWork o) { srcLocal = o.isSrcLocal(); inputs = o.getInputs(); outputs = o.getOutputs(); + sessionStateLineageState = o.sessionStateLineageState; } @Explain(displayName = "tables", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) @@ -152,4 +162,7 @@ public void setSrcLocal(boolean srcLocal) { this.srcLocal = srcLocal; } + public LineageState getLineagState() { + return sessionStateLineageState; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java index e2f2a68ff9..056d6141d6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java @@ -21,11 +21,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.io.Serializable; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.exec.ColumnInfo; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.hooks.LineageInfo; import org.apache.hadoop.hive.ql.hooks.LineageInfo.DataContainer; @@ -36,7 +36,7 @@ * lineage information for the post execution hooks. * */ -public class LineageState { +public class LineageState implements Serializable { /** * Mapping from the directory name to FileSinkOperator (may not be FileSinkOperator for views). This @@ -44,7 +44,7 @@ * time and is then later used to created the mapping from * movetask to the set of filesink operators. */ - private final Map dirToFop; + private final Map dirToFop; /** * The lineage context index for this query. @@ -60,8 +60,8 @@ /** * Constructor. */ - public LineageState() { - dirToFop = new HashMap(); + LineageState() { + dirToFop = new HashMap<>(); linfo = new LineageInfo(); index = new Index(); } @@ -72,8 +72,8 @@ public LineageState() { * @param dir The directory name. * @param fop The sink operator. */ - public void mapDirToOp(Path dir, Operator fop) { - dirToFop.put(dir, fop); + public synchronized void mapDirToOp(Path dir, Operator fop) { + dirToFop.put(dir.toUri().toString(), fop); } /** @@ -83,10 +83,10 @@ public void mapDirToOp(Path dir, Operator fop) { * @param newPath conditional input path * @param oldPath path of the old linked MoveWork */ - public void updateDirToOpMap(Path newPath, Path oldPath) { - Operator op = dirToFop.get(oldPath); + public synchronized void updateDirToOpMap(Path newPath, Path oldPath) { + Operator op = dirToFop.get(oldPath.toUri().toString()); if (op != null) { - dirToFop.put(newPath, op); + dirToFop.put(newPath.toUri().toString(), op); } } @@ -97,10 +97,10 @@ public void updateDirToOpMap(Path newPath, Path oldPath) { * @param dc The associated data container. * @param cols The list of columns. */ - public void setLineage(Path dir, DataContainer dc, + public synchronized void setLineage(Path dir, DataContainer dc, List cols) { // First lookup the file sink operator from the load work. - Operator op = dirToFop.get(dir); + Operator op = dirToFop.get(dir.toUri().toString()); // Go over the associated fields and look up the dependencies // by position in the row schema of the filesink operator. @@ -136,7 +136,7 @@ public Index getIndex() { /** * Clear all lineage states */ - public void clear() { + public synchronized void clear() { dirToFop.clear(); linfo.clear(); index.clear(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/util/TasksGraph.java b/ql/src/java/org/apache/hadoop/hive/ql/util/TasksGraph.java new file mode 100644 index 0000000000..a0f751177a --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/util/TasksGraph.java @@ -0,0 +1,131 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package org.apache.hadoop.hive.ql.util; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.ConditionalTask; +import org.apache.hadoop.hive.ql.exec.Task; +import org.graphstream.graph.Node; +import org.graphstream.graph.implementations.MultiGraph; +import org.graphstream.stream.file.FileSinkImages; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +public class TasksGraph { + private static Logger LOGGER = LoggerFactory.getLogger(TasksGraph.class); + + static { + System.setProperty("gs.ui.renderer", "org.graphstream.ui.j2dviewer.J2DGraphRenderer"); + } + + static class MyGraph { + private boolean isConditionalTaskMode = false; + private final String queryId; + private MultiGraph graph; + private Set map = new HashSet<>(); + + MyGraph(String queryId) { + this.graph = new MultiGraph(queryId); + this.queryId = queryId; + } + + MyGraph addNode(String id, boolean isRoot) { + if (map.contains(id)) + return this; + Node node = graph.addNode(id); + map.add(id); + node.addAttribute("ui.label", id); + node.addAttribute("ui.style", "size-mode:fit;"); + + if (isConditionalTaskMode) { + node.addAttribute("ui.style", "fill-color: blue;"); + } + if (isRoot) { + node.addAttribute("ui.style", "fill-color: red;"); + } + return this; + } + + void addEdge(String edgeId, String nodeOne, String nodeTwo) { + graph.addEdge(edgeId, nodeTwo, nodeOne, true); + } + + void write() throws IOException { + FileSinkImages sink = + new FileSinkImages(FileSinkImages.OutputType.PNG, FileSinkImages.Resolutions.HD1080); + sink.setLayoutPolicy(FileSinkImages.LayoutPolicy.COMPUTED_FULLY_AT_NEW_IMAGE); + String fileName = "/tmp/" + queryId + ".jpg"; + sink.writeAll(graph, fileName); + LOGGER.info("graph for query id: {} available at :: {}", queryId, fileName); + } + } + + private HiveConf hiveConf; + private final List> rootTasks; + private MyGraph graph; + + public TasksGraph(HiveConf hiveConf, String queryId, + List> rootTasks) { + this.hiveConf = hiveConf; + this.rootTasks = rootTasks; + this.graph = new MyGraph(queryId); + } + + public void prettyPrint() { + if (hiveConf.getBoolVar(HiveConf.ConfVars.PRINT_TASK_GRAPH)) { + try { + for (Task task : rootTasks) { + graph.addNode(task.toString(), true); + child(task, task.getChildTasks()); + } + graph.write(); + } catch (Throwable e) { + LOGGER.error("error while printing graph", e); + } + } + } + + private void child(Task precursor, + List> childTasks) { + if (childTasks == null || childTasks.isEmpty()) { + if (precursor instanceof ConditionalTask) { + graph.isConditionalTaskMode = true; + child(precursor, ((ConditionalTask) precursor).getListTasks()); + graph.isConditionalTaskMode = false; + } + return; + } + for (Task task : childTasks) { + graph.addNode(task.toString(), false) + .addEdge(id(precursor, task), precursor.toString(), task.toString()); + child(task, task.getChildTasks()); + } + } + + private String id(Task precursor, Task child) { + return UUID.randomUUID().toString(); + // return precursor.toString() + " ::: " + child.toString(); + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java index e7ce2345cd..32a2a8da90 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java @@ -1,42 +1,56 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. */ package org.apache.hadoop.hive.ql.optimizer; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CompilationOpContext; -import org.apache.hadoop.hive.ql.exec.*; +import org.apache.hadoop.hive.ql.exec.ConditionalTask; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.MoveTask; +import org.apache.hadoop.hive.ql.exec.RowSchema; +import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.plan.*; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; +import org.apache.hadoop.hive.ql.plan.LoadFileDesc; +import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc; +import org.apache.hadoop.hive.ql.plan.LoadTableDesc; +import org.apache.hadoop.hive.ql.plan.MoveWork; +import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.session.SessionState; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import java.io.Serializable; -import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Properties; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.when; @@ -44,7 +58,7 @@ public class TestGenMapRedUtilsCreateConditionalTask { private static HiveConf hiveConf; - private Task dummyMRTask; + private Task dummyMRTask; @BeforeClass public static void initializeSessionState() { @@ -159,7 +173,7 @@ public void testConditionalMoveTaskIsOptimized() throws SemanticException { Path finalDirName = new Path("s3a://bucket/scratch/-ext-10000"); Path tableLocation = new Path("s3a://bucket/warehouse/table"); Task moveTask = createMoveTask(finalDirName, tableLocation); - List> moveTaskList = Arrays.asList(moveTask); + List> moveTaskList = Collections.singletonList(moveTask); GenMapRedUtils.createMRWorkForMergingFiles(fileSinkOperator, finalDirName, null, moveTaskList, hiveConf, dummyMRTask); ConditionalTask conditionalTask = (ConditionalTask)dummyMRTask.getChildTasks().get(0); @@ -199,7 +213,7 @@ public void testConditionalMoveTaskIsNotOptimized() throws SemanticException { Path finalDirName = new Path("s3a://bucket/scratch/-ext-10000"); Path tableLocation = new Path("s3a://bucket/warehouse/table"); Task moveTask = createMoveTask(finalDirName, tableLocation); - List> moveTaskList = Arrays.asList(moveTask); + List> moveTaskList = Collections.singletonList(moveTask); GenMapRedUtils.createMRWorkForMergingFiles(fileSinkOperator, finalDirName, null, moveTaskList, hiveConf, dummyMRTask); ConditionalTask conditionalTask = (ConditionalTask)dummyMRTask.getChildTasks().get(0); @@ -233,7 +247,7 @@ public void testConditionalMoveOnHdfsIsNotOptimized() throws SemanticException { Path finalDirName = new Path("hdfs://bucket/scratch/-ext-10000"); Path tableLocation = new Path("hdfs://bucket/warehouse/table"); Task moveTask = createMoveTask(finalDirName, tableLocation); - List> moveTaskList = Arrays.asList(moveTask); + List> moveTaskList = Collections.singletonList(moveTask); GenMapRedUtils.createMRWorkForMergingFiles(fileSinkOperator, finalDirName, null, moveTaskList, hiveConf, dummyMRTask); ConditionalTask conditionalTask = (ConditionalTask)dummyMRTask.getChildTasks().get(0);