diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index a80004662068eb2391c0dd7062f77156b222375b..05a5b6b11d15f3b6f64ab87b922a83f76bdb597b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -202,6 +202,25 @@ // resource releases public final ReentrantLock stateLock = new ReentrantLock(); public DriverState driverState = DriverState.INITIALIZED; + private static ThreadLocal lds = new ThreadLocal() { + @Override + protected LockedDriverState initialValue() { + return new LockedDriverState(); + } + }; + + public static void setLockedDriverState(LockedDriverState lDrv) { + lds.set(lDrv); + } + + public static LockedDriverState getLockedDriverState() { + return lds.get(); + } + + public static void removeLockedDriverState() { + if (lds != null) + lds.remove(); + } } private boolean checkConcurrency() { @@ -420,6 +439,8 @@ public int compile(String command, boolean resetTaskIds, boolean deferClose) { TaskFactory.resetId(); } + LockedDriverState.setLockedDriverState(lDrvState); + String queryId = conf.getVar(HiveConf.ConfVars.HIVEQUERYID); //save some info for webUI for use after plan is freed @@ -1402,6 +1423,8 @@ private CommandProcessorResponse runInternal(String command, boolean alreadyComp errorMessage = null; SQLState = null; downstreamError = null; + LockedDriverState.setLockedDriverState(lDrvState); + lDrvState.stateLock.lock(); try { if (alreadyCompiled) { @@ -2376,6 +2399,7 @@ public int close() { lDrvState.driverState = DriverState.CLOSED; } finally { lDrvState.stateLock.unlock(); + LockedDriverState.removeLockedDriverState(); } if (SessionState.get() != null) { SessionState.get().getLineageState().clear(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index b0657f01d4482dc8bb8dc180e5e7deffbdb533e6..9036d9e748e22ff97424274ae4b7db2fb4d30ebb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -24,6 +24,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ThreadFactoryBuilder; + import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.WordUtils; @@ -52,6 +53,8 @@ import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.Driver.DriverState; +import org.apache.hadoop.hive.ql.Driver.LockedDriverState; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; @@ -3018,6 +3021,7 @@ public static double getHighestSamplePercentage (MapWork work) { Set pathsProcessed = new HashSet(); List pathsToAdd = new LinkedList(); + LockedDriverState lDrvStat = LockedDriverState.getLockedDriverState(); // AliasToWork contains all the aliases for (String alias : work.getAliasToWork().keySet()) { LOG.info("Processing alias " + alias); @@ -3027,6 +3031,9 @@ public static double getHighestSamplePercentage (MapWork work) { boolean hasLogged = false; // Note: this copies the list because createDummyFileForEmptyPartition may modify the map. for (Path file : new LinkedList(work.getPathToAliases().keySet())) { + if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT) + throw new IOException("Operation is Canceled. "); + List aliases = work.getPathToAliases().get(file); if (aliases.contains(alias)) { if (file != null) { @@ -3079,6 +3086,8 @@ public static double getHighestSamplePercentage (MapWork work) { List finalPathsToAdd = new LinkedList<>(); List> futures = new LinkedList<>(); for (final Path path : pathsToAdd) { + if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT) + throw new IOException("Operation is Canceled. "); if (pool == null) { finalPathsToAdd.add(new GetInputPathsCallable(path, job, work, hiveScratchDir, ctx, skipDummy).call()); } else { @@ -3088,6 +3097,8 @@ public static double getHighestSamplePercentage (MapWork work) { if (pool != null) { for (Future future : futures) { + if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT) + throw new IOException("Operation is Canceled. "); finalPathsToAdd.add(future.get()); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java index 7a113bf8e5c4dd8c2c486741a5ebc7b8940e746b..9a7e9d93c9878dcb1768e0864d7dff2b1c1edc44 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java @@ -35,6 +35,7 @@ import java.util.concurrent.Future; import com.google.common.annotations.VisibleForTesting; + import org.apache.hadoop.hive.common.StringInternUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +43,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hive.ql.Driver.DriverState; +import org.apache.hadoop.hive.ql.Driver.LockedDriverState; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.log.PerfLogger; @@ -348,8 +351,12 @@ public int hashCode() { Map poolMap = new HashMap(); Set poolSet = new HashSet(); + LockedDriverState lDrvStat = LockedDriverState.getLockedDriverState(); for (Path path : paths) { + if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT) + throw new IOException("Operation is Canceled. "); + PartitionDesc part = HiveFileFormatUtils.getPartitionDescFromPathRecursively( pathToPartitionInfo, path, IOPrepareCache.get().allocatePartitionDescMap()); TableDesc tableDesc = part.getTableDesc(); diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index 04fc0a17c93120b8f6e6d7c36e4d70631d56baca..64f78526a438c18d590d81b4c8932d65a1705476 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -394,7 +394,9 @@ private UserGroupInformation getCurrentUGI() throws HiveSQLException { private synchronized void cleanup(OperationState state) throws HiveSQLException { setState(state); - if (shouldRunAsync()) { + //Need shut down background thread gracefully, driver.close will inform background thread + //a cancel request is sent. + if (shouldRunAsync() && state != OperationState.CANCELED && state != OperationState.TIMEDOUT) { Future backgroundHandle = getBackgroundHandle(); if (backgroundHandle != null) { boolean success = backgroundHandle.cancel(true);