diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 7f9607129eb1f5f43e8a728cf7d2a56c1ed5af49..39682eb4b157568274a4a190b8d553f947bd0e75 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1843,7 +1843,11 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { "Bind host on which to run the HiveServer2 Thrift service."), HIVE_SERVER2_PARALLEL_COMPILATION("hive.driver.parallel.compilation", false, "Whether to\n" + "enable parallel compilation between sessions on HiveServer2. The default is false."), - + HIVE_SERVER2_GLOBAL_COMPILE_LOCK_TIMEOUT("hive.server2.global.compile.lock.timeout", "0s", + new TimeValidator(TimeUnit.SECONDS), + "Number of seconds a request will wait to acquire the global compile lock before giving up. " + + "This is only applicable if parallel compilation is disabled, i.e. a global HiveServer2-wide " + + "compile lock is in use. Setting it to 0s disables the timeout."), // HiveServer2 WebUI HIVE_SERVER2_WEBUI_BIND_HOST("hive.server2.webui.host", "0.0.0.0", "The host address the HiveServer2 WebUI will listen on"), HIVE_SERVER2_WEBUI_PORT("hive.server2.webui.port", 10002, "The port the HiveServer2 WebUI will listen on"), diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 62b608cbf53c371d1743df40988daf85f76a0867..56473b47890a77e741edc6e5c2c4508553d51f15 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -33,8 +33,11 @@ import java.util.Map; import java.util.Queue; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; +import com.google.common.annotations.VisibleForTesting; + import org.apache.commons.lang.StringUtils; import org.apache.hadoop.mapreduce.MRJobConfig; import org.slf4j.Logger; @@ -132,6 +135,12 @@ static final private LogHelper console = new LogHelper(LOG); static final int SHUTDOWN_HOOK_PRIORITY = 0; + // Used for testing to simulate global compile lock timeout + @VisibleForTesting + public static volatile boolean TEST_GLOBAL_COMPILE_LOCK_SLEEP_ENABLED = false; + @VisibleForTesting + public static volatile long TEST_GLOBAL_COMPILE_LOCK_SLEEP_MS = -1; + private int maxRows = 100; ByteStream.Output bos = new ByteStream.Output(); @@ -314,6 +323,14 @@ public Driver() { * The SQL query to compile. */ public int compile(String command) { + if (TEST_GLOBAL_COMPILE_LOCK_SLEEP_ENABLED) { + try { + Thread.sleep(TEST_GLOBAL_COMPILE_LOCK_SLEEP_MS); + } catch (InterruptedException e) { + // do nothing + } + } + return compile(command, true); } @@ -1217,9 +1234,11 @@ public CommandProcessorResponse compileAndRespond(String command) { private int compileInternal(String command) { boolean isParallelEnabled = SessionState.get().isHiveServerQuery() && this.isParallelEnabled; int ret; - final ReentrantLock compileLock = isParallelEnabled - ? SessionState.get().getCompileLock() : globalCompileLock; - compileLock.lock(); + final ReentrantLock compileLock = acquireCompileLock(isParallelEnabled); + if (compileLock == null) { + return ErrorMsg.GLOBAL_COMPILE_LOCK_TIMED_OUT.getErrorCode(); + } + try { if (isParallelEnabled && LOG.isDebugEnabled()) { LOG.debug("Entering compile: " + command); @@ -1231,6 +1250,7 @@ private int compileInternal(String command) { } finally { compileLock.unlock(); } + if (ret != 0) { try { releaseLocksAndCommitOrRollback(false, null); @@ -1242,6 +1262,37 @@ private int compileInternal(String command) { return ret; } + private ReentrantLock acquireCompileLock(boolean isParallelEnabled) { + if (isParallelEnabled) { + ReentrantLock sessionScopeCompileLock = SessionState.get().getCompileLock(); + sessionScopeCompileLock.lock(); + return sessionScopeCompileLock; + } + + long maxGlobalCompileLockWaitTime = HiveConf.getTimeVar( + this.conf, ConfVars.HIVE_SERVER2_GLOBAL_COMPILE_LOCK_TIMEOUT, + TimeUnit.SECONDS); + if (maxGlobalCompileLockWaitTime > 0) { + try { + LOG.info("Waiting to acquire global compile lock..."); + if(!globalCompileLock.tryLock(maxGlobalCompileLockWaitTime, TimeUnit.SECONDS)) { + errorMessage = ErrorMsg.GLOBAL_COMPILE_LOCK_TIMED_OUT.getErrorCodedMsg(); + LOG.error(errorMessage); + return null; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + if (LOG.isDebugEnabled()) { + LOG.debug("Interrupted Exception ignored", e); + } + } + } else { + globalCompileLock.lock(); + } + + return globalCompileLock; + } + private CommandProcessorResponse runInternal(String command, boolean alreadyCompiled) throws CommandNeedRetryException { errorMessage = null; diff --git ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 8a47605630066e39272f506c6e309b108b8455dd..27ea99c976b389b17f99b90c8a3527b4c9993e73 100644 --- ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -424,6 +424,7 @@ CTAS_LOCATION_NONEMPTY(10304, "CREATE-TABLE-AS-SELECT cannot create table with location to a non-empty directory."), CTAS_CREATES_VOID_TYPE(10305, "CREATE-TABLE-AS-SELECT creates a VOID type, please use CAST to specify the type, near field: "), TBL_SORTED_NOT_BUCKETED(10306, "Destination table {0} found to be sorted but not bucketed.", true), + GLOBAL_COMPILE_LOCK_TIMED_OUT(10307, "Attempt to acquire global compile lock timed out.", true), //========================== 20000 range starts here ========================// SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."), SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. " diff --git service/src/test/org/apache/hive/service/cli/CLIServiceTest.java service/src/test/org/apache/hive/service/cli/CLIServiceTest.java index d90002bd16e46b5ce970d4c6c544a9c7605328d1..e9466901eed92d387c19ebf3a4e0f3ec5337e962 100644 --- service/src/test/org/apache/hive/service/cli/CLIServiceTest.java +++ service/src/test/org/apache/hive/service/cli/CLIServiceTest.java @@ -33,11 +33,12 @@ import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hive.service.server.HiveServer2; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -303,15 +304,15 @@ public void testExecuteStatementParallel() throws Exception { // Create callables with different queries. String query = "SELECT ID + %1$d FROM " + tableName; cs[0] = createQueryCallable( - query, confOverlay, longPollingTimeout, QUERY_COUNT, cdlIn, cdlOut); + query, confOverlay, longPollingTimeout, QUERY_COUNT, OperationState.FINISHED, true, cdlIn, cdlOut); query = "SELECT t1.ID, SUM(t2.ID) + %1$d FROM " + tableName + " t1 CROSS JOIN " + tableName + " t2 GROUP BY t1.ID HAVING t1.ID > 1"; cs[1] = createQueryCallable( - query, confOverlay, longPollingTimeout, QUERY_COUNT, cdlIn, cdlOut); + query, confOverlay, longPollingTimeout, QUERY_COUNT, OperationState.FINISHED, true, cdlIn, cdlOut); query = "SELECT b.a FROM (SELECT (t1.ID + %1$d) as a , t2.* FROM " + tableName + " t1 INNER JOIN " + tableName + " t2 ON t1.ID = t2.ID WHERE t2.ID > 2) b"; cs[2] = createQueryCallable( - query, confOverlay, longPollingTimeout, QUERY_COUNT, cdlIn, cdlOut); + query, confOverlay, longPollingTimeout, QUERY_COUNT, OperationState.FINISHED, true, cdlIn, cdlOut); @SuppressWarnings("unchecked") FutureTask[] tasks = (FutureTask[])new FutureTask[THREAD_COUNT]; @@ -334,12 +335,98 @@ public void testExecuteStatementParallel() throws Exception { client.closeSession(sessionHandle); } + @Test + public void testGlobalCompileLockTimeout() throws Exception { + String tableName = "TEST_GLOBAL_COMPILE_LOCK_TIMEOUT"; + String columnDefinitions = "(ID STRING)"; + + // Open a session and set up the test data + SessionHandle sessionHandle = setupTestData(tableName, columnDefinitions, + new HashMap()); + assertNotNull(sessionHandle); + + int THREAD_COUNT = 3; + @SuppressWarnings("unchecked") + FutureTask[] tasks = (FutureTask[])new FutureTask[THREAD_COUNT]; + long longPollingTimeoutMs = 10 * 60 * 1000; // Larger than max compile duration used in test + + // 1st query acquires the lock and takes 20 secs to compile + Driver.TEST_GLOBAL_COMPILE_LOCK_SLEEP_ENABLED = true; + Driver.TEST_GLOBAL_COMPILE_LOCK_SLEEP_MS = 20 * 1000; + Map confOverlay = getConfOverlay(0, longPollingTimeoutMs); + String query = "SELECT 0 FROM " + tableName; + tasks[0] = new FutureTask( + createQueryCallable(query, confOverlay, longPollingTimeoutMs, 1, + OperationState.FINISHED, false, null, null)); + new Thread(tasks[0]).start(); + Thread.sleep(5 * 1000); + + // 2nd query's session has compile lock timeout of 1 sec, so it should + // not be able to acquire the lock within that time period + Driver.TEST_GLOBAL_COMPILE_LOCK_SLEEP_ENABLED = false; + confOverlay = getConfOverlay(1, longPollingTimeoutMs); + query = "SELECT 1 FROM " + tableName; + tasks[1] = new FutureTask( + createQueryCallable(query, confOverlay, longPollingTimeoutMs, 1, + OperationState.ERROR, false, null, null)); + new Thread(tasks[1]).start(); + + // 3rd query's session has compile lock timeout of 100 secs, so it should + // be able to acquire the lock and finish successfully + Driver.TEST_GLOBAL_COMPILE_LOCK_SLEEP_ENABLED = false; + confOverlay = getConfOverlay(100, longPollingTimeoutMs); + query = "SELECT 2 FROM " + tableName; + tasks[2] = new FutureTask( + createQueryCallable(query, confOverlay, longPollingTimeoutMs, 1, + OperationState.FINISHED, false, null, null)); + new Thread(tasks[2]).start(); + + for (int i = 0; i < THREAD_COUNT; ++i) { + try { + tasks[i].get(); + } catch (Throwable t) { + if (i == 1) { + assertTrue(t.getMessage().contains( + ErrorMsg.GLOBAL_COMPILE_LOCK_TIMED_OUT.getMsg())); + } else { + throw new RuntimeException(t); + } + } + } + + // Cleanup + client.executeStatement(sessionHandle, "DROP TABLE " + tableName, + getConfOverlay(0, longPollingTimeoutMs)); + client.closeSession(sessionHandle); + } + + private Map getConfOverlay(long compileLockTimeoutSecs, + long longPollingTimeoutMs) { + Map confOverlay = new HashMap(); + confOverlay.put( + HiveConf.ConfVars.HIVE_SERVER2_PARALLEL_COMPILATION.varname, "false"); + confOverlay.put( + HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT.varname, + longPollingTimeoutMs + "ms"); + if (compileLockTimeoutSecs > 0) { + confOverlay.put( + HiveConf.ConfVars.HIVE_SERVER2_GLOBAL_COMPILE_LOCK_TIMEOUT.varname, + compileLockTimeoutSecs + "s"); + } + return confOverlay; + } + private Callable createQueryCallable(final String queryStringFormat, final Map confOverlay, final long longPollingTimeout, - final int queryCount, final CountDownLatch cdlIn, final CountDownLatch cdlOut) { + final int queryCount, final OperationState expectedOperationState, + final boolean syncThreadStart, final CountDownLatch cdlIn, + final CountDownLatch cdlOut) { return new Callable() { public Void call() throws Exception { - syncThreadStart(cdlIn, cdlOut); + if (syncThreadStart) { + syncThreadStart(cdlIn, cdlOut); + } + SessionHandle sessionHandle = openSession(confOverlay); OperationHandle[] hs = new OperationHandle[queryCount]; for (int i = 0; i < hs.length; ++i) { @@ -348,7 +435,7 @@ public Void call() throws Exception { hs[i] = client.executeStatementAsync(sessionHandle, queryString, confOverlay); } for (int i = hs.length - 1; i >= 0; --i) { - waitForAsyncQuery(hs[i], OperationState.FINISHED, longPollingTimeout); + waitForAsyncQuery(hs[i], expectedOperationState, longPollingTimeout); } return null; } @@ -404,7 +491,6 @@ private OperationStatus runAsyncAndWait(SessionHandle sessionHandle, String quer return waitForAsyncQuery(h, expectedState, longPollingTimeout); } - private OperationStatus waitForAsyncQuery(OperationHandle opHandle, OperationState expectedState, long longPollingTimeout) throws HiveSQLException { long testIterationTimeout = System.currentTimeMillis() + 100000;