diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index d52f9941e19fc43cc1c59c701bae8fa0c156f382..b636721030e9dfd86971f96236f2208547f78f05 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1847,7 +1847,10 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { "Bind host on which to run the HiveServer2 Thrift service."), HIVE_SERVER2_PARALLEL_COMPILATION("hive.driver.parallel.compilation", false, "Whether to\n" + "enable parallel compilation between sessions on HiveServer2. The default is false."), - + HIVE_SERVER2_COMPILE_LOCK_TIMEOUT("hive.server2.compile.lock.timeout", "0s", + new TimeValidator(TimeUnit.SECONDS), + "Number of seconds a request will wait to acquire the compile lock before giving up. " + + "Setting it to 0s disables the timeout."), // HiveServer2 WebUI HIVE_SERVER2_WEBUI_BIND_HOST("hive.server2.webui.host", "0.0.0.0", "The host address the HiveServer2 WebUI will listen on"), HIVE_SERVER2_WEBUI_PORT("hive.server2.webui.port", 10002, "The port the HiveServer2 WebUI will listen on"), diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index d81e17aa85d9ae742335c51d7bf5134c07cda169..2606527b7407bf5ac9dd0278922a50d155089dc9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.Queue; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.lang.StringUtils; @@ -1219,9 +1220,12 @@ public CommandProcessorResponse compileAndRespond(String command) { private int compileInternal(String command) { boolean isParallelEnabled = SessionState.get().isHiveServerQuery() && this.isParallelEnabled; int ret; - final ReentrantLock compileLock = isParallelEnabled - ? SessionState.get().getCompileLock() : globalCompileLock; - compileLock.lock(); + final ReentrantLock compileLock = tryAcquireCompileLock(isParallelEnabled, + command); + if (compileLock == null) { + return ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCode(); + } + try { if (isParallelEnabled && LOG.isDebugEnabled()) { LOG.debug("Entering compile: " + command); @@ -1233,6 +1237,7 @@ private int compileInternal(String command) { } finally { compileLock.unlock(); } + if (ret != 0) { try { releaseLocksAndCommitOrRollback(false, null); @@ -1244,6 +1249,46 @@ private int compileInternal(String command) { return ret; } + /** + * Acquires the compile lock. If the compile lock wait timeout is configured, + * it will acquire the lock if it is not held by another thread within the given + * waiting time. + * @return the ReentrantLock object if the lock was successfully acquired, + * or {@code null} if compile lock wait timeout is configured and + * either the waiting time elapsed before the lock could be acquired + * or if the current thread is interrupted. + */ + private ReentrantLock tryAcquireCompileLock(boolean isParallelEnabled, + String command) { + final ReentrantLock compileLock = isParallelEnabled ? + SessionState.get().getCompileLock() : globalCompileLock; + long maxCompileLockWaitTime = HiveConf.getTimeVar( + this.conf, ConfVars.HIVE_SERVER2_COMPILE_LOCK_TIMEOUT, + TimeUnit.SECONDS); + if (maxCompileLockWaitTime > 0) { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Waiting to acquire compile lock: " + command); + } + if(!compileLock.tryLock(maxCompileLockWaitTime, TimeUnit.SECONDS)) { + errorMessage = ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCodedMsg(); + LOG.error(errorMessage + ": " + command); + return null; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + if (LOG.isDebugEnabled()) { + LOG.debug("Interrupted Exception ignored", e); + } + return null; + } + } else { + compileLock.lock(); + } + + return compileLock; + } + private CommandProcessorResponse runInternal(String command, boolean alreadyCompiled) throws CommandNeedRetryException { errorMessage = null; diff --git ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 9d9dd53e02bea2d12f5c40636a1fa6dac9592a2d..d7597392f9f45d807a657efabf4f9f7be2690afb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -426,6 +426,7 @@ TBL_SORTED_NOT_BUCKETED(10306, "Destination table {0} found to be sorted but not bucketed.", true), //{2} should be lockid LOCK_ACQUIRE_TIMEDOUT(10307, "Lock acquisition for {0} timed out after {1}ms. {2}", true), + COMPILE_LOCK_TIMED_OUT(10308, "Attempt to acquire compile lock timed out.", true), //========================== 20000 range starts here ========================// SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."), SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. " diff --git service/src/test/org/apache/hive/service/cli/CLIServiceTest.java service/src/test/org/apache/hive/service/cli/CLIServiceTest.java index 7bfbdb9856281ca44b83fb9283906731a4f548db..e78181a15993d99f1cab5a061c08bb21823d2171 100644 --- service/src/test/org/apache/hive/service/cli/CLIServiceTest.java +++ service/src/test/org/apache/hive/service/cli/CLIServiceTest.java @@ -23,8 +23,10 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.io.Serializable; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -33,11 +35,16 @@ import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.parse.ASTNode; +import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHook; +import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hive.service.server.HiveServer2; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -303,15 +310,15 @@ public void testExecuteStatementParallel() throws Exception { // Create callables with different queries. String query = "SELECT ID + %1$d FROM " + tableName; cs[0] = createQueryCallable( - query, confOverlay, longPollingTimeout, QUERY_COUNT, cdlIn, cdlOut); + query, confOverlay, longPollingTimeout, QUERY_COUNT, OperationState.FINISHED, true, cdlIn, cdlOut); query = "SELECT t1.ID, SUM(t2.ID) + %1$d FROM " + tableName + " t1 CROSS JOIN " + tableName + " t2 GROUP BY t1.ID HAVING t1.ID > 1"; cs[1] = createQueryCallable( - query, confOverlay, longPollingTimeout, QUERY_COUNT, cdlIn, cdlOut); + query, confOverlay, longPollingTimeout, QUERY_COUNT, OperationState.FINISHED, true, cdlIn, cdlOut); query = "SELECT b.a FROM (SELECT (t1.ID + %1$d) as a , t2.* FROM " + tableName + " t1 INNER JOIN " + tableName + " t2 ON t1.ID = t2.ID WHERE t2.ID > 2) b"; cs[2] = createQueryCallable( - query, confOverlay, longPollingTimeout, QUERY_COUNT, cdlIn, cdlOut); + query, confOverlay, longPollingTimeout, QUERY_COUNT, OperationState.FINISHED, true, cdlIn, cdlOut); @SuppressWarnings("unchecked") FutureTask[] tasks = new FutureTask[THREAD_COUNT]; @@ -334,13 +341,118 @@ public void testExecuteStatementParallel() throws Exception { client.closeSession(sessionHandle); } + public static class CompileLockTestSleepHook implements HiveSemanticAnalyzerHook { + @Override + public ASTNode preAnalyze(HiveSemanticAnalyzerHookContext context, + ASTNode ast) throws SemanticException { + try { + Thread.sleep(20 * 1000); + } catch (Throwable t) { + // do nothing + } + return ast; + } + + @Override + public void postAnalyze(HiveSemanticAnalyzerHookContext context, + List> rootTasks) throws SemanticException { + } + } + + @Test + public void testGlobalCompileLockTimeout() throws Exception { + String tableName = "TEST_COMPILE_LOCK_TIMEOUT"; + String columnDefinitions = "(ID STRING)"; + + // Open a session and set up the test data + SessionHandle sessionHandle = setupTestData(tableName, columnDefinitions, + new HashMap()); + assertNotNull(sessionHandle); + + int THREAD_COUNT = 3; + @SuppressWarnings("unchecked") + FutureTask[] tasks = (FutureTask[])new FutureTask[THREAD_COUNT]; + long longPollingTimeoutMs = 10 * 60 * 1000; // Larger than max compile duration used in test + + // 1st query acquires the lock and takes 20 secs to compile + Map confOverlay = getConfOverlay(0, longPollingTimeoutMs); + confOverlay.put(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname, + CompileLockTestSleepHook.class.getName()); + String query = "SELECT 0 FROM " + tableName; + tasks[0] = new FutureTask( + createQueryCallable(query, confOverlay, longPollingTimeoutMs, 1, + OperationState.FINISHED, false, null, null)); + new Thread(tasks[0]).start(); + Thread.sleep(5 * 1000); + + // 2nd query's session has compile lock timeout of 1 sec, so it should + // not be able to acquire the lock within that time period + confOverlay = getConfOverlay(1, longPollingTimeoutMs); + query = "SELECT 1 FROM " + tableName; + tasks[1] = new FutureTask( + createQueryCallable(query, confOverlay, longPollingTimeoutMs, 1, + OperationState.ERROR, false, null, null)); + new Thread(tasks[1]).start(); + + // 3rd query's session has compile lock timeout of 100 secs, so it should + // be able to acquire the lock and finish successfully + confOverlay = getConfOverlay(100, longPollingTimeoutMs); + query = "SELECT 2 FROM " + tableName; + tasks[2] = new FutureTask( + createQueryCallable(query, confOverlay, longPollingTimeoutMs, 1, + OperationState.FINISHED, false, null, null)); + new Thread(tasks[2]).start(); + + boolean foundExpectedException = false; + for (int i = 0; i < THREAD_COUNT; ++i) { + try { + tasks[i].get(); + } catch (Throwable t) { + if (i == 1) { + assertTrue(t.getMessage().contains( + ErrorMsg.COMPILE_LOCK_TIMED_OUT.getMsg())); + foundExpectedException = true; + } else { + throw new RuntimeException(t); + } + } + } + assertTrue(foundExpectedException); + + // Cleanup + client.executeStatement(sessionHandle, "DROP TABLE " + tableName, + getConfOverlay(0, longPollingTimeoutMs)); + client.closeSession(sessionHandle); + } + + private Map getConfOverlay(long compileLockTimeoutSecs, + long longPollingTimeoutMs) { + Map confOverlay = new HashMap(); + confOverlay.put( + HiveConf.ConfVars.HIVE_SERVER2_PARALLEL_COMPILATION.varname, "false"); + confOverlay.put( + HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT.varname, + longPollingTimeoutMs + "ms"); + if (compileLockTimeoutSecs > 0) { + confOverlay.put( + HiveConf.ConfVars.HIVE_SERVER2_COMPILE_LOCK_TIMEOUT.varname, + compileLockTimeoutSecs + "s"); + } + return confOverlay; + } + private Callable createQueryCallable(final String queryStringFormat, final Map confOverlay, final long longPollingTimeout, - final int queryCount, final CountDownLatch cdlIn, final CountDownLatch cdlOut) { + final int queryCount, final OperationState expectedOperationState, + final boolean syncThreadStart, final CountDownLatch cdlIn, + final CountDownLatch cdlOut) { return new Callable() { @Override public Void call() throws Exception { - syncThreadStart(cdlIn, cdlOut); + if (syncThreadStart) { + syncThreadStart(cdlIn, cdlOut); + } + SessionHandle sessionHandle = openSession(confOverlay); OperationHandle[] hs = new OperationHandle[queryCount]; for (int i = 0; i < hs.length; ++i) { @@ -349,7 +461,7 @@ public Void call() throws Exception { hs[i] = client.executeStatementAsync(sessionHandle, queryString, confOverlay); } for (int i = hs.length - 1; i >= 0; --i) { - waitForAsyncQuery(hs[i], OperationState.FINISHED, longPollingTimeout); + waitForAsyncQuery(hs[i], expectedOperationState, longPollingTimeout); } return null; } @@ -405,7 +517,6 @@ private OperationStatus runAsyncAndWait(SessionHandle sessionHandle, String quer return waitForAsyncQuery(h, expectedState, longPollingTimeout); } - private OperationStatus waitForAsyncQuery(OperationHandle opHandle, OperationState expectedState, long longPollingTimeout) throws HiveSQLException { long testIterationTimeout = System.currentTimeMillis() + 100000;