diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index fd6020b85591ea190aa33ae9f2dc925a38fc7471..efa2bdc81c7c77c242b66f8f0d3708e7cd245c8a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -172,6 +172,7 @@ // For WebUI. Kept alive after queryPlan is freed. private final QueryDisplay queryDisplay = new QueryDisplay(); + private LockedDriverState lDrvState = new LockedDriverState(); // Query specific info private QueryState queryState; @@ -179,12 +180,7 @@ // Query hooks that execute before compilation and after execution List queryHooks; - // a lock is used for synchronizing the state transition and its associated - // resource releases - private final ReentrantLock stateLock = new ReentrantLock(); - private DriverState driverState = DriverState.INITIALIZED; - - private enum DriverState { + public enum DriverState { INITIALIZED, COMPILING, COMPILED, @@ -201,6 +197,13 @@ ERROR } + public static class LockedDriverState { + // a lock is used for synchronizing the state transition and its associated + // resource releases + public final ReentrantLock stateLock = new ReentrantLock(); + public DriverState driverState = DriverState.INITIALIZED; + } + private boolean checkConcurrency() { boolean supportConcurrency = conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY); if (!supportConcurrency) { @@ -381,11 +384,11 @@ public int compile(String command, boolean resetTaskIds, boolean deferClose) { PerfLogger perfLogger = SessionState.getPerfLogger(true); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_RUN); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.COMPILE); - stateLock.lock(); + lDrvState.stateLock.lock(); try { - driverState = DriverState.COMPILING; + lDrvState.driverState = DriverState.COMPILING; } finally { - stateLock.unlock(); + lDrvState.stateLock.unlock(); } command = new VariableSubstitution(new HiveVariableSource() { @@ -623,15 +626,15 @@ public void run() { if (isInterrupted && !deferClose) { closeInProcess(true); } - stateLock.lock(); + lDrvState.stateLock.lock(); try { if (isInterrupted) { - driverState = deferClose ? DriverState.EXECUTING : DriverState.ERROR; + lDrvState.driverState = deferClose ? DriverState.EXECUTING : DriverState.ERROR; } else { - driverState = compileError ? DriverState.ERROR : DriverState.COMPILED; + lDrvState.driverState = compileError ? DriverState.ERROR : DriverState.COMPILED; } } finally { - stateLock.unlock(); + lDrvState.stateLock.unlock(); } if (isInterrupted) { @@ -650,16 +653,16 @@ private int handleInterruption(String msg) { } private boolean isInterrupted() { - stateLock.lock(); + lDrvState.stateLock.lock(); try { - if (driverState == DriverState.INTERRUPT) { + if (lDrvState.driverState == DriverState.INTERRUPT) { Thread.currentThread().interrupt(); return true; } else { return false; } } finally { - stateLock.unlock(); + lDrvState.stateLock.unlock(); } } @@ -1123,7 +1126,7 @@ private int acquireLocksAndOpenTxn(boolean startTxnImplicitly) { see the changes made by 1st one. This takes care of autoCommit=true case. For multi-stmt txns this is not sufficient and will be managed via WriteSet tracking in the lock manager.*/ - txnMgr.acquireLocks(plan, ctx, userFromUGI); + txnMgr.acquireLocks(plan, ctx, userFromUGI, lDrvState); if(initiatingTransaction || (readOnlyQueryInAutoCommit && acidInQuery)) { //For multi-stmt txns we should record the snapshot when txn starts but // don't update it after that until txn completes. Thus the check for {@code initiatingTransaction} @@ -1394,21 +1397,21 @@ private CommandProcessorResponse runInternal(String command, boolean alreadyComp errorMessage = null; SQLState = null; downstreamError = null; - stateLock.lock(); + lDrvState.stateLock.lock(); try { if (alreadyCompiled) { - if (driverState == DriverState.COMPILED) { - driverState = DriverState.EXECUTING; + if (lDrvState.driverState == DriverState.COMPILED) { + lDrvState.driverState = DriverState.EXECUTING; } else { errorMessage = "FAILED: Precompiled query has been cancelled or closed."; console.printError(errorMessage); return createProcessorResponse(12); } } else { - driverState = DriverState.COMPILING; + lDrvState.driverState = DriverState.COMPILING; } } finally { - stateLock.unlock(); + lDrvState.stateLock.unlock(); } // a flag that helps to set the correct driver state in finally block by tracking if @@ -1555,15 +1558,15 @@ else if(plan.getOperation() == HiveOperation.ROLLBACK) { // only release the related resources ctx, driverContext as normal releaseResources(); } - stateLock.lock(); + lDrvState.stateLock.lock(); try { - if (driverState == DriverState.INTERRUPT) { - driverState = DriverState.ERROR; + if (lDrvState.driverState == DriverState.INTERRUPT) { + lDrvState.driverState = DriverState.ERROR; } else { - driverState = isFinishedWithError ? DriverState.ERROR : DriverState.EXECUTED; + lDrvState.driverState = isFinishedWithError ? DriverState.ERROR : DriverState.EXECUTED; } } finally { - stateLock.unlock(); + lDrvState.stateLock.unlock(); } } } @@ -1690,22 +1693,22 @@ public int execute(boolean deferClose) throws CommandNeedRetryException { // hide sensitive information during query redaction. String queryStr = conf.getQueryString(); - stateLock.lock(); + lDrvState.stateLock.lock(); try { // if query is not in compiled state, or executing state which is carried over from // a combined compile/execute in runInternal, throws the error - if (driverState != DriverState.COMPILED && - driverState != DriverState.EXECUTING) { + if (lDrvState.driverState != DriverState.COMPILED && + lDrvState.driverState != DriverState.EXECUTING) { SQLState = "HY008"; errorMessage = "FAILED: query " + queryStr + " has " + - (driverState == DriverState.INTERRUPT ? "been cancelled" : "not been compiled."); + (lDrvState.driverState == DriverState.INTERRUPT ? "been cancelled" : "not been compiled."); console.printError(errorMessage); return 1000; } else { - driverState = DriverState.EXECUTING; + lDrvState.driverState = DriverState.EXECUTING; } } finally { - stateLock.unlock(); + lDrvState.stateLock.unlock(); } maxthreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.EXECPARALLETHREADNUMBER); @@ -2017,17 +2020,17 @@ public int execute(boolean deferClose) throws CommandNeedRetryException { if (isInterrupted && !deferClose) { closeInProcess(true); } - stateLock.lock(); + lDrvState.stateLock.lock(); try { if (isInterrupted) { if (!deferClose) { - driverState = DriverState.ERROR; + lDrvState.driverState = DriverState.ERROR; } } else { - driverState = executionError ? DriverState.ERROR : DriverState.EXECUTED; + lDrvState.driverState = executionError ? DriverState.ERROR : DriverState.EXECUTED; } } finally { - stateLock.unlock(); + lDrvState.stateLock.unlock(); } if (isInterrupted) { LOG.info("Executing command(queryId=" + queryId + ") has been interrupted after " + duration + " seconds"); @@ -2045,7 +2048,7 @@ public int execute(boolean deferClose) throws CommandNeedRetryException { private void releasePlan(QueryPlan plan) { // Plan maybe null if Driver.close is called in another thread for the same Driver object - stateLock.lock(); + lDrvState.stateLock.lock(); try { if (plan != null) { plan.setDone(); @@ -2059,7 +2062,7 @@ private void releasePlan(QueryPlan plan) { } } } finally { - stateLock.unlock(); + lDrvState.stateLock.unlock(); } } @@ -2176,7 +2179,7 @@ public boolean isFetchingTable() { @SuppressWarnings("unchecked") public boolean getResults(List res) throws IOException, CommandNeedRetryException { - if (driverState == DriverState.DESTROYED || driverState == DriverState.CLOSED) { + if (lDrvState.driverState == DriverState.DESTROYED || lDrvState.driverState == DriverState.CLOSED) { throw new IOException("FAILED: query has been cancelled, closed, or destroyed."); } @@ -2240,7 +2243,7 @@ public boolean getResults(List res) throws IOException, CommandNeedRetryExceptio } public void resetFetch() throws IOException { - if (driverState == DriverState.DESTROYED || driverState == DriverState.CLOSED) { + if (lDrvState.driverState == DriverState.DESTROYED || lDrvState.driverState == DriverState.CLOSED) { throw new IOException("FAILED: driver has been cancelled, closed or destroyed."); } if (isFetchingTable()) { @@ -2268,7 +2271,7 @@ public void setTryCount(int tryCount) { // DriverContext could be released in the query and close processes at same // time, which needs to be thread protected. private void releaseDriverContext() { - stateLock.lock(); + lDrvState.stateLock.lock(); try { if (driverCxt != null) { driverCxt.shutdown(); @@ -2277,7 +2280,7 @@ private void releaseDriverContext() { } catch (Exception e) { LOG.debug("Exception while shutting down the task runner", e); } finally { - stateLock.unlock(); + lDrvState.stateLock.unlock(); } } @@ -2360,22 +2363,22 @@ private int closeInProcess(boolean destroyed) { // is called to stop the query if it is running, clean query results, and release resources. public int close() { - stateLock.lock(); + lDrvState.stateLock.lock(); try { releaseDriverContext(); - if (driverState == DriverState.COMPILING || - driverState == DriverState.EXECUTING || - driverState == DriverState.INTERRUPT) { - driverState = DriverState.INTERRUPT; + if (lDrvState.driverState == DriverState.COMPILING || + lDrvState.driverState == DriverState.EXECUTING || + lDrvState.driverState == DriverState.INTERRUPT) { + lDrvState.driverState = DriverState.INTERRUPT; return 0; } releasePlan(); releaseFetchTask(); releaseResStream(); releaseContext(); - driverState = DriverState.CLOSED; + lDrvState.driverState = DriverState.CLOSED; } finally { - stateLock.unlock(); + lDrvState.stateLock.unlock(); } if (SessionState.get() != null) { SessionState.get().getLineageState().clear(); @@ -2386,18 +2389,18 @@ public int close() { // is usually called after close() to commit or rollback a query and end the driver life cycle. // do not understand why it is needed and wonder if it could be combined with close. public void destroy() { - stateLock.lock(); + lDrvState.stateLock.lock(); try { // in the cancel case where the driver state is INTERRUPTED, destroy will be deferred to // the query process - if (driverState == DriverState.DESTROYED || - driverState == DriverState.INTERRUPT) { + if (lDrvState.driverState == DriverState.DESTROYED || + lDrvState.driverState == DriverState.INTERRUPT) { return; } else { - driverState = DriverState.DESTROYED; + lDrvState.driverState = DriverState.DESTROYED; } } finally { - stateLock.unlock(); + lDrvState.stateLock.unlock(); } if (!hiveLocks.isEmpty()) { try { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 721974db03f1f29bdb84f41db317e37a6a78ca32..6399dbef63666d6c0f96c520aa13e632f9e0197f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -451,6 +451,7 @@ "Oldest available base: {2}", true), INVALID_COLUMN_NAME(10328, "Invalid column name"), UNSUPPORTED_SET_OPERATOR(10329, "Unsupported set operator"), + LOCK_ACQUIRE_CANCELLED(10330, "Query was cancelled while acquiring locks on the underlying objects. "), REPLACE_VIEW_WITH_MATERIALIZED(10400, "Attempt to replace view {0} with materialized view", true), REPLACE_MATERIALIZED_WITH_VIEW(10401, "Attempt to replace materialized view {0} with view", true), UPDATE_DELETE_VIEW(10402, "You cannot update or delete records in a view"), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java index 45ead16560ce7514a1ab6f4ac2de6771582a8a73..529e64c7af17011759ca8dd0c4c4bc68cde98448 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.ql.Driver.LockedDriverState; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.thrift.TException; @@ -74,7 +75,7 @@ public HiveLock lock(HiveLockObject key, HiveLockMode mode, } @Override - public List lock(List objs, boolean keepAlive) throws + public List lock(List objs, boolean keepAlive, LockedDriverState lDrvState) throws LockException { throw new UnsupportedOperationException(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java index 24fbd9af5fb7be6b238c6ed246e360477d3c47de..53ee9c84b517c81abce0a2736d840787fb83f4ef 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java @@ -25,6 +25,8 @@ import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.Driver.DriverState; +import org.apache.hadoop.hive.ql.Driver.LockedDriverState; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.hooks.ReadEntity; @@ -110,6 +112,11 @@ public HiveLockManager getLockManager() throws LockException { @Override public void acquireLocks(QueryPlan plan, Context ctx, String username) throws LockException { + acquireLocks(plan,ctx,username,null); + } + + @Override + public void acquireLocks(QueryPlan plan, Context ctx, String username, LockedDriverState lDrvState) throws LockException { // Make sure we've built the lock manager getLockManager(); @@ -171,7 +178,7 @@ else if (output.getTyp() == WriteEntity.Type.DUMMYPARTITION) { } dedupLockObjects(lockObjects); - List hiveLocks = lockMgr.lock(lockObjects, false); + List hiveLocks = lockMgr.lock(lockObjects, false, lDrvState); if (hiveLocks == null) { throw new LockException(ErrorMsg.LOCK_CANNOT_BE_ACQUIRED.getMsg()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java index 20e114776f143715d5820e6a1acb794a9d6de02c..c15035d254b61c019fceaf48280a0106c3f95dee 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java @@ -21,6 +21,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Driver.LockedDriverState; import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData; import org.apache.hadoop.hive.ql.metadata.*; @@ -59,7 +60,7 @@ public HiveLock lock(HiveLockObject key, HiveLockMode mode, boolean keepAlive) } @Override - public List lock(List objs, boolean keepAlive) throws LockException { + public List lock(List objs, boolean keepAlive, LockedDriverState lDrvState) throws LockException { return lock(objs, numRetriesForLock, sleepTime); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java index b2eb99775c220e9ce347fa1cb918ebf4e738eac2..2f22d74e2959390198e44b4ae36510ef92dcc948 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.lockmgr; import java.util.List; +import org.apache.hadoop.hive.ql.Driver.LockedDriverState; /** * Manager for locks in Hive. Users should not instantiate a lock manager @@ -37,7 +38,7 @@ public HiveLock lock(HiveLockObject key, HiveLockMode mode, boolean keepAlive) throws LockException; public List lock(List objs, - boolean keepAlive) throws LockException; + boolean keepAlive, LockedDriverState lDrvState) throws LockException; public void unlock(HiveLock hiveLock) throws LockException; public void releaseLocks(List hiveLocks); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java index ce220a21de01a188da940e4511ee6876d0c15a4a..187a65876ef4b7afdc94c78a48000fe797a73cd6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java @@ -19,6 +19,7 @@ import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.Driver.LockedDriverState; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -70,6 +71,20 @@ void acquireLocks(QueryPlan plan, Context ctx, String username) throws LockException; /** + * Acquire all of the locks needed by a query. If used with a query that + * requires transactions, this should be called after {@link #openTxn(String)}. + * A list of acquired locks will be stored in the + * {@link org.apache.hadoop.hive.ql.Context} object and can be retrieved + * via {@link org.apache.hadoop.hive.ql.Context#getHiveLocks}. + * @param plan query plan + * @param ctx Context for this query + * @param username name of the user for this query + * @param lDrvState the state to inform if the query cancelled or not + * @throws LockException if there is an error getting the locks + */ + void acquireLocks(QueryPlan plan, Context ctx, String username, LockedDriverState lDrvState) throws LockException; + + /** * Release specified locks. * Transaction aware TxnManagers, which has {@code supportsAcid() == true}, * will track locks internally and ignore this parameter diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java index ed022d9193f14436ed527f9cbd3df45d48857cf4..a371a5a5ceac8cb837f896a0291ac8f6c4fc8d69 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java @@ -22,6 +22,9 @@ import java.util.Map; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.Driver.LockedDriverState; +import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData; @@ -56,6 +59,11 @@ public void closeTxnManager() { } @Override + public void acquireLocks(QueryPlan plan, Context ctx, String username, LockedDriverState lDrvState) throws LockException { + acquireLocks(plan, ctx, username); + } + + @Override protected void finalize() throws Throwable { destruct(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java index 14d0ef4e27e0518c1bafcbdcde12f09e101a3321..6ca05ede1aec0364e6c4ea5b306c14ea366581dc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java @@ -24,6 +24,8 @@ import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Driver.DriverState; +import org.apache.hadoop.hive.ql.Driver.LockedDriverState; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.lockmgr.*; import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData; @@ -146,7 +148,7 @@ private static String getLastObjectName(String parent, HiveLockObject key) { **/ @Override public List lock(List lockObjects, - boolean keepAlive) throws LockException + boolean keepAlive, LockedDriverState lDrvState) throws LockException { // Sort the objects first. You are guaranteed that if a partition is being locked, // the table has already been locked @@ -184,16 +186,29 @@ public int compare(HiveLockObj o1, HiveLockObj o2) { } HiveLock lock = null; - try { - lock = lock(lockObject.getObj(), lockObject.getMode(), keepAlive, true); - } catch (LockException e) { - console.printError("Error in acquireLocks..." ); - LOG.error("Error in acquireLocks...", e); - lock = null; + boolean isInterrupted = false; + if (lDrvState != null) { + lDrvState.stateLock.lock(); + if (lDrvState.driverState == DriverState.INTERRUPT) { + isInterrupted = true; + } + lDrvState.stateLock.unlock(); + } + if (!isInterrupted) { + try { + lock = lock(lockObject.getObj(), lockObject.getMode(), keepAlive, true); + } catch (LockException e) { + console.printError("Error in acquireLocks..." ); + LOG.error("Error in acquireLocks...", e); + lock = null; + } } if (lock == null) { releaseLocks(hiveLocks); + if (isInterrupted) { + throw new LockException(ErrorMsg.LOCK_ACQUIRE_CANCELLED.getMsg()); + } return null; } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDummyTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDummyTxnManager.java index e189d383b6d090ce151b6ab30fb240c261430239..de3b8ada959440519ee79bb0871bd6487b8b9a56 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDummyTxnManager.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDummyTxnManager.java @@ -26,6 +26,9 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.Driver.DriverState; +import org.apache.hadoop.hive.ql.Driver.LockedDriverState; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; @@ -95,8 +98,12 @@ public void testSingleReadTable() throws Exception { List expectedLocks = new ArrayList(); expectedLocks.add(new ZooKeeperHiveLock("default", new HiveLockObject(), HiveLockMode.SHARED)); expectedLocks.add(new ZooKeeperHiveLock("default.table1", new HiveLockObject(), HiveLockMode.SHARED)); - - when(mockLockManager.lock(anyListOf(HiveLockObj.class), eq(false))).thenReturn(expectedLocks); + LockedDriverState lDrvState = new LockedDriverState(); + LockedDriverState lDrvInp = new LockedDriverState(); + lDrvInp.driverState = DriverState.INTERRUPT; + LockException lEx = new LockException(ErrorMsg.LOCK_ACQUIRE_CANCELLED.getMsg()); + when(mockLockManager.lock(anyListOf(HiveLockObj.class), eq(false), eq(lDrvState))).thenReturn(expectedLocks); + when(mockLockManager.lock(anyListOf(HiveLockObj.class), eq(false), eq(lDrvInp))).thenThrow(lEx); doNothing().when(mockLockManager).setContext(any(HiveLockManagerCtx.class)); doNothing().when(mockLockManager).close(); ArgumentCaptor lockObjsCaptor = ArgumentCaptor.forClass(List.class); @@ -105,7 +112,7 @@ public void testSingleReadTable() throws Exception { when(mockQueryPlan.getOutputs()).thenReturn(new HashSet()); // Execute - txnMgr.acquireLocks(mockQueryPlan, ctx, "fred"); + txnMgr.acquireLocks(mockQueryPlan, ctx, "fred", lDrvState); // Verify Assert.assertEquals("db1", SessionState.get().getCurrentDatabase()); @@ -116,13 +123,22 @@ public void testSingleReadTable() throws Exception { Assert.assertEquals(expectedLocks.get(1).getHiveLockMode(), resultLocks.get(1).getHiveLockMode()); Assert.assertEquals(expectedLocks.get(0).getHiveLockObject().getName(), resultLocks.get(0).getHiveLockObject().getName()); - verify(mockLockManager).lock(lockObjsCaptor.capture(), eq(false)); + verify(mockLockManager).lock(lockObjsCaptor.capture(), eq(false), eq(lDrvState)); List lockObjs = lockObjsCaptor.getValue(); Assert.assertEquals(2, lockObjs.size()); Assert.assertEquals("default", lockObjs.get(0).getName()); Assert.assertEquals(HiveLockMode.SHARED, lockObjs.get(0).mode); Assert.assertEquals("default/table1", lockObjs.get(1).getName()); Assert.assertEquals(HiveLockMode.SHARED, lockObjs.get(1).mode); + + // Execute + try { + txnMgr.acquireLocks(mockQueryPlan, ctx, "fred", lDrvInp); + Assert.fail(); + } catch(LockException le) { + Assert.assertEquals(le.getMessage(), ErrorMsg.LOCK_ACQUIRE_CANCELLED.getMsg()); + } + } @Test