diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 8c39de3e77..2796d720ff 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3058,6 +3058,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Bind host on which to run the HiveServer2 Thrift service."), HIVE_SERVER2_PARALLEL_COMPILATION("hive.driver.parallel.compilation", false, "Whether to\n" + "enable parallel compilation of the queries between sessions and within the same session on HiveServer2. The default is false."), + HIVE_SERVER2_PARALLEL_COMPILATION_LIMIT("hive.driver.parallel.compilation.global.limit", -1, "Determines the " + + "degree of parallelism for compilation queries between sessions on HiveServer2. The default is -1."), HIVE_SERVER2_COMPILE_LOCK_TIMEOUT("hive.server2.compile.lock.timeout", "0s", new TimeValidator(TimeUnit.SECONDS), "Number of seconds a request will wait to acquire the compile lock before giving up. " + @@ -4395,7 +4397,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal + ",fs.s3a.secret.key" + ",fs.s3a.proxy.password" + ",dfs.adls.oauth2.credential" - + ",fs.adl.oauth2.credential", + + ",fs.adl.oauth2.credential" + + ",hive.driver.parallel.compilation.global.limit", "Comma separated list of configuration options which should not be read by normal user like passwords"), HIVE_CONF_INTERNAL_VARIABLE_LIST("hive.conf.internal.variable.list", "hive.added.files.path,hive.added.jars.path,hive.added.archives.path", diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 737debd2ad..dd87399f00 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -36,7 +36,6 @@ import java.util.Map.Entry; import java.util.Queue; import java.util.Set; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; @@ -93,6 +92,8 @@ import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.lock.CompileLock; +import org.apache.hadoop.hive.ql.lock.CompileLockFactory; import org.apache.hadoop.hive.ql.lockmgr.HiveLock; import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode; import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; @@ -169,7 +170,6 @@ ByteStream.Output bos = new ByteStream.Output(); private final HiveConf conf; - private final boolean isParallelEnabled; private DataInput resStream; private Context ctx; private DriverContext driverCxt; @@ -450,8 +450,6 @@ public Driver(QueryState queryState, String userName, QueryInfo queryInfo) { public Driver(QueryState queryState, String userName, QueryInfo queryInfo, HiveTxnManager txnMgr) { this.queryState = queryState; this.conf = queryState.getConf(); - isParallelEnabled = (conf != null) - && HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_PARALLEL_COMPILATION); this.userName = userName; this.hookRunner = new HookRunner(conf, console); this.queryInfo = queryInfo; @@ -503,7 +501,8 @@ public int compile(String command, boolean resetTaskIds) { // deferClose indicates if the close/destroy should be deferred when the process has been // interrupted, it should be set to true if the compile is called within another method like // runInternal, which defers the close to the called in that method. - private void compile(String command, boolean resetTaskIds, boolean deferClose) throws CommandProcessorResponse { + @VisibleForTesting + void compile(String command, boolean resetTaskIds, boolean deferClose) throws CommandProcessorResponse { PerfLogger perfLogger = SessionState.getPerfLogger(); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.COMPILE); lDrvState.stateLock.lock(); @@ -1840,8 +1839,6 @@ public void lockAndRespond() throws CommandProcessorResponse { } } - private static final ReentrantLock globalCompileLock = new ReentrantLock(); - private void compileInternal(String command, boolean deferClose) throws CommandProcessorResponse { Metrics metrics = MetricsFactory.getInstance(); if (metrics != null) { @@ -1850,29 +1847,30 @@ private void compileInternal(String command, boolean deferClose) throws CommandP PerfLogger perfLogger = SessionState.getPerfLogger(true); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.WAIT_COMPILE); - final ReentrantLock compileLock = tryAcquireCompileLock(isParallelEnabled, - command); - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.WAIT_COMPILE); - if (metrics != null) { - metrics.decrementCounter(MetricsConstant.WAITING_COMPILE_OPS, 1); - } - if (compileLock == null) { - throw createProcessorResponse(ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCode()); - } + try (CompileLock compileLock = CompileLockFactory.newInstance(conf, command)) { + boolean success = compileLock.tryAcquire(); + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.WAIT_COMPILE); + + if (metrics != null) { + metrics.decrementCounter(MetricsConstant.WAITING_COMPILE_OPS, 1); + } + if (!success) { + errorMessage = ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCodedMsg(); + throw createProcessorResponse(ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCode()); + } - try { - compile(command, true, deferClose); - } catch (CommandProcessorResponse cpr) { try { - releaseLocksAndCommitOrRollback(false); - } catch (LockException e) { - LOG.warn("Exception in releasing locks. " + org.apache.hadoop.util.StringUtils.stringifyException(e)); + compile(command, true, deferClose); + } catch (CommandProcessorResponse cpr) { + try { + releaseLocksAndCommitOrRollback(false); + } catch (LockException e) { + LOG.warn("Exception in releasing locks. " + org.apache.hadoop.util.StringUtils.stringifyException(e)); + } + throw cpr; } - throw cpr; - } finally { - compileLock.unlock(); } //Save compile-time PerfLogging for WebUI. //Execution-time Perf logs are done by either another thread's PerfLogger @@ -1881,65 +1879,6 @@ private void compileInternal(String command, boolean deferClose) throws CommandP queryDisplay.setPerfLogEnds(QueryDisplay.Phase.COMPILATION, perfLogger.getEndTimes()); } - /** - * Acquires the compile lock. If the compile lock wait timeout is configured, - * it will acquire the lock if it is not held by another thread within the given - * waiting time. - * @return the ReentrantLock object if the lock was successfully acquired, - * or {@code null} if compile lock wait timeout is configured and - * either the waiting time elapsed before the lock could be acquired - * or if the current thread is interrupted. - */ - private ReentrantLock tryAcquireCompileLock(boolean isParallelEnabled, - String command) { - final ReentrantLock compileLock = isParallelEnabled ? - SessionState.get().getCompileLock() : globalCompileLock; - long maxCompileLockWaitTime = HiveConf.getTimeVar( - this.conf, ConfVars.HIVE_SERVER2_COMPILE_LOCK_TIMEOUT, - TimeUnit.SECONDS); - - final String lockAcquiredMsg = "Acquired the compile lock."; - // First shot without waiting. - try { - if (compileLock.tryLock(0, TimeUnit.SECONDS)) { - LOG.debug(lockAcquiredMsg); - return compileLock; - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - if (LOG.isDebugEnabled()) { - LOG.debug("Interrupted Exception ignored", e); - } - return null; - } - - // If the first shot fails, then we log the waiting messages. - if (LOG.isDebugEnabled()) { - LOG.debug("Waiting to acquire compile lock: " + command); - } - - if (maxCompileLockWaitTime > 0) { - try { - if(!compileLock.tryLock(maxCompileLockWaitTime, TimeUnit.SECONDS)) { - errorMessage = ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCodedMsg(); - LOG.error(errorMessage + ": " + command); - return null; - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - if (LOG.isDebugEnabled()) { - LOG.debug("Interrupted Exception ignored", e); - } - return null; - } - } else { - compileLock.lock(); - } - - LOG.debug(lockAcquiredMsg); - return compileLock; - } - private void runInternal(String command, boolean alreadyCompiled) throws CommandProcessorResponse { errorMessage = null; SQLState = null; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lock/CompileLock.java b/ql/src/java/org/apache/hadoop/hive/ql/lock/CompileLock.java new file mode 100644 index 0000000000..ee450cd0ff --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/lock/CompileLock.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.lock; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import org.apache.hadoop.hive.ql.ErrorMsg; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Encapsulates HS2 compile lock logic. + */ +public final class CompileLock implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CompileLock.class); + + private static final String LOCK_ACQUIRED_MSG = "Acquired the compile lock."; + private static final String WAIT_LOCK_ACQUIRE_MSG = "Waiting to acquire compile lock: "; + + private final Lock underlying; + private final long defaultTimeout; + + private final String command; + private boolean isLocked = false; + + public CompileLock(Lock underlying, long timeout, String command) { + this.underlying = underlying; + this.command = command; + this.defaultTimeout = timeout; + } + + public boolean tryAcquire() { + return tryAcquire(defaultTimeout, TimeUnit.SECONDS); + } + + /** + * Acquires the compile lock. If the compile lock wait timeout is configured, + * it will acquire the lock if it is not held by another thread within the given + * waiting time. + * + * @return {@code true} if the lock was successfully acquired, + * or {@code false} if compile lock wait timeout is configured and + * either the waiting time elapsed before the lock could be acquired + * or the current thread was interrupted. + */ + public boolean tryAcquire(long timeout, TimeUnit unit) { + // First shot without waiting. + try { + if (underlying.tryLock(0, unit)) { + LOG.debug(LOCK_ACQUIRED_MSG); + return locked(true); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + if (LOG.isDebugEnabled()) { + LOG.debug("Interrupted Exception ignored", e); + } + return locked(false); + } + + // If the first shot fails, then we log the waiting messages. + if (LOG.isDebugEnabled()) { + LOG.debug(WAIT_LOCK_ACQUIRE_MSG + command); + } + + if (timeout > 0) { + try { + if (!underlying.tryLock(timeout, unit)) { + LOG.error(ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCodedMsg() + ": " + command); + return locked(false); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + if (LOG.isDebugEnabled()) { + LOG.debug("Interrupted Exception ignored", e); + } + return locked(false); + } + } else { + underlying.lock(); + } + + LOG.debug(LOCK_ACQUIRED_MSG); + return locked(true); + } + + private boolean locked(boolean isLocked) { + this.isLocked = isLocked; + return isLocked; + } + + public void release() { + underlying.unlock(); + isLocked = false; + } + + @Override + public void close() { + if (isLocked) { + release(); + } + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lock/CompileLockFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/lock/CompileLockFactory.java new file mode 100644 index 0000000000..848cbc5a8a --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/lock/CompileLockFactory.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.lock; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.session.SessionState; + +/** + * Compile Lock Factory. + */ +public final class CompileLockFactory { + + private static final ReentrantLock SERIALIZABLE_COMPILE_LOCK = new ReentrantLock(); + + private CompileLockFactory() { + } + + public static CompileLock newInstance(HiveConf conf, String command) { + Lock underlying = SERIALIZABLE_COMPILE_LOCK; + + boolean isParallelEnabled = (conf != null) + && HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_SERVER2_PARALLEL_COMPILATION); + + if (isParallelEnabled) { + int compileQuota = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_SERVER2_PARALLEL_COMPILATION_LIMIT); + + underlying = (compileQuota > 0) ? + SessionWithQuotaCompileLock.instance : SessionState.get().getCompileLock(); + } + + long timeout = HiveConf.getTimeVar(conf, + HiveConf.ConfVars.HIVE_SERVER2_COMPILE_LOCK_TIMEOUT, TimeUnit.SECONDS); + + return new CompileLock(underlying, timeout, command); + } + + /** + * Combination of global semaphore and session reentrant lock. + */ + private enum SessionWithQuotaCompileLock implements Lock { + + instance(SessionState.getSessionConf() + .getIntVar(HiveConf.ConfVars.HIVE_SERVER2_PARALLEL_COMPILATION_LIMIT)); + + private final Semaphore globalCompileQuotas; + + SessionWithQuotaCompileLock(int compilePoolSize) { + globalCompileQuotas = new Semaphore(compilePoolSize); + } + + @Override + public void lock() { + getSessionLock().lock(); + globalCompileQuotas.acquireUninterruptibly(); + } + + @Override + public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { + boolean result = false; + long startTime = System.nanoTime(); + + try { + result = getSessionLock().tryLock(time, unit) + && globalCompileQuotas.tryAcquire( + getRemainingTime(startTime, unit.toNanos(time)), TimeUnit.NANOSECONDS); + } finally { + if (!result && getSessionLock().isHeldByCurrentThread()) { + getSessionLock().unlock(); + } + } + return result; + } + + @Override + public void unlock() { + getSessionLock().unlock(); + globalCompileQuotas.release(); + } + + private ReentrantLock getSessionLock() { + return SessionState.get().getCompileLock(); + } + + private long getRemainingTime(long startTime, long time) { + long timeout = time - (System.nanoTime() - startTime); + return (timeout < 0) ? 0 : timeout; + } + + @Override + public void lockInterruptibly() throws InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean tryLock() { + throw new UnsupportedOperationException(); + } + + @Override + public Condition newCondition() { + throw new UnsupportedOperationException(); + } + } + +} + diff --git a/ql/src/test/org/apache/hadoop/hive/ql/CompileLockTest.java b/ql/src/test/org/apache/hadoop/hive/ql/CompileLockTest.java new file mode 100644 index 0000000000..b2ab7dc92d --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/CompileLockTest.java @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql; + +import java.lang.reflect.Field; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.List; +import java.util.SortedMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; +import org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; + +import com.codahale.metrics.Counter; + +import org.hamcrest.Matcher; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.apache.hadoop.hive.common.metrics.common.MetricsConstant.WAITING_COMPILE_OPS; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_SERVER2_COMPILE_LOCK_TIMEOUT; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_SERVER2_PARALLEL_COMPILATION; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_SERVER2_PARALLEL_COMPILATION_LIMIT; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.not; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.eq; + +/** + * Class for testing HS2 compile lock behavior (serializable, parallel unbounded, parallel bounded). + */ +public class CompileLockTest { + + private static final int CONCURRENT_COMPILATION = 15151; + + private Driver driver; + private HiveConf conf; + + @Before + public void init() throws Exception { + conf = new HiveConf(); + + conf.setBoolVar(HIVE_SERVER2_METRICS_ENABLED, true); + conf.setVar(HiveConf.ConfVars.DOWNLOADED_RESOURCES_DIR, System.getProperty("java.io.tmpdir")); + conf.setTimeVar(HIVE_SERVER2_COMPILE_LOCK_TIMEOUT, 15, TimeUnit.SECONDS); + + MetricsFactory.close(); + MetricsFactory.init(conf); + } + + private void initDriver(HiveConf conf, int threadCount) throws Exception { + driver = Mockito.spy(new Driver(conf)); + resetParallelCompilationLimit(conf); + + AtomicInteger count = new AtomicInteger(threadCount); + + Mockito.doAnswer(invocation -> { + Thread.sleep(500); + verifyThatWaitingCompileOpsCountIsEqualTo(count.decrementAndGet()); + return null; + }).when(driver).compile(eq(""), eq(true), eq(false)); + } + + @Test + public void testSerializableCompilation() throws Exception { + conf.setBoolVar(HIVE_SERVER2_PARALLEL_COMPILATION, false); + + initDriver(conf, 10); + List responseList = compileAndRespond(10); + + verifyThatWaitingCompileOpsCountIsEqualTo(0); + verifyThatTimedOutCompileOpsCountIsZero(responseList); + + verifyThatNoConcurrentCompilationWasIndeed(responseList); + } + + @Test + public void testParallelCompilationWithSingleQuota() throws Exception { + conf.setBoolVar(HIVE_SERVER2_PARALLEL_COMPILATION, true); + conf.setIntVar(HIVE_SERVER2_PARALLEL_COMPILATION_LIMIT, 1); + + initDriver(conf, 10); + List responseList = compileAndRespond(10); + + verifyThatWaitingCompileOpsCountIsEqualTo(0); + verifyThatTimedOutCompileOpsCountIsZero(responseList); + + verifyThatNoConcurrentCompilationWasIndeed(responseList); + } + + @Test + public void testParallelCompilationWithUnboundedQuota() throws Exception { + conf.setBoolVar(HIVE_SERVER2_PARALLEL_COMPILATION, true); + conf.setIntVar(HIVE_SERVER2_PARALLEL_COMPILATION_LIMIT, -1); + + initDriver(conf, 10); + List responseList = compileAndRespond(10); + + verifyThatWaitingCompileOpsCountIsEqualTo(0); + verifyThatTimedOutCompileOpsCountIsZero(responseList); + + verifyThatConcurrentCompilationWasIndeed(responseList); + } + + @Test + public void testParallelCompilationWithUnboundedQuotaAndSingleSession() throws Exception { + conf.setBoolVar(HIVE_SERVER2_PARALLEL_COMPILATION, true); + conf.setIntVar(HIVE_SERVER2_PARALLEL_COMPILATION_LIMIT, -1); + + initDriver(conf, 10); + List responseList = compileAndRespond(true, 10); + + verifyThatWaitingCompileOpsCountIsEqualTo(0); + verifyThatTimedOutCompileOpsCountIsZero(responseList); + + verifyThatNoConcurrentCompilationWasIndeed(responseList); + } + + @Test + public void testParallelCompilationTimeoutWithSingleQuota() throws Exception { + conf.setBoolVar(HIVE_SERVER2_PARALLEL_COMPILATION, true); + conf.setIntVar(HIVE_SERVER2_PARALLEL_COMPILATION_LIMIT, 1); + conf.setTimeVar(HIVE_SERVER2_COMPILE_LOCK_TIMEOUT, 1, TimeUnit.SECONDS); + + initDriver(conf, 10); + List responseList = compileAndRespond(10); + + verifyThatWaitingCompileOpsCountIsEqualTo(0); + verifyThatTimedOutCompileOpsCountIsNotZero(responseList); + } + + @Test + public void testParallelCompilationWithSingleQuotaAndZeroTimeout() throws Exception { + conf.setBoolVar(HIVE_SERVER2_PARALLEL_COMPILATION, true); + conf.setIntVar(HIVE_SERVER2_PARALLEL_COMPILATION_LIMIT, 1); + conf.setTimeVar(HIVE_SERVER2_COMPILE_LOCK_TIMEOUT, 0, TimeUnit.SECONDS); + + initDriver(conf, 10); + List responseList = compileAndRespond(10); + + verifyThatWaitingCompileOpsCountIsEqualTo(0); + verifyThatTimedOutCompileOpsCountIsZero(responseList); + + verifyThatNoConcurrentCompilationWasIndeed(responseList); + } + + @Test + public void testParallelCompilationWithMultipleQuotas() throws Exception { + conf.setBoolVar(HIVE_SERVER2_PARALLEL_COMPILATION, true); + conf.setIntVar(HIVE_SERVER2_PARALLEL_COMPILATION_LIMIT, 2); + + initDriver(conf, 10); + List responseList = compileAndRespond(10); + + verifyThatWaitingCompileOpsCountIsEqualTo(0); + verifyThatTimedOutCompileOpsCountIsZero(responseList); + + verifyThatConcurrentCompilationWasIndeed(responseList); + } + + @Test + public void testParallelCompilationWithMultipleQuotasAndClientSessionConcurrency() throws Exception { + conf.setBoolVar(HIVE_SERVER2_PARALLEL_COMPILATION, true); + conf.setIntVar(HIVE_SERVER2_PARALLEL_COMPILATION_LIMIT, 2); + + initDriver(conf, 10); + List responseList = new ArrayList<>(); + + List>> callables = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + callables.add(() -> compileAndRespond(true, 2)); + } + + ExecutorService pool = Executors.newFixedThreadPool(callables.size()); + try { + List>> futures = pool.invokeAll(callables); + for (Future> future : futures) { + responseList.addAll(future.get()); + } + } finally { + pool.shutdown(); + } + + verifyThatWaitingCompileOpsCountIsEqualTo(0); + verifyThatTimedOutCompileOpsCountIsZero(responseList); + + verifyThatConcurrentCompilationWasIndeed(responseList); + } + + private List compileAndRespond(int threadCount) throws Exception { + return compileAndRespond(false, threadCount); + } + + private List compileAndRespond(boolean reuseSession, int threadCount) throws Exception { + List responseList = new ArrayList<>(); + SessionState sessionState = new SessionState(conf); + + List> callables = new ArrayList<>(); + for (int i = 0; i < threadCount; i++) { + callables.add(() -> { + SessionState ss = (reuseSession)? sessionState : new SessionState(conf); + SessionState.setCurrentSessionState(ss); + + CommandProcessorResponse response; + try{ + response = driver.compileAndRespond(""); + + } finally { + SessionState.detachSession(); + } + return response; + }); + } + + ExecutorService pool = Executors.newFixedThreadPool(callables.size()); + try{ + List> futures = pool.invokeAll(callables); + + for (Future future : futures) { + try { + responseList.add(future.get()); + + } catch (ExecutionException ex) { + responseList.add( + (ex.getCause() instanceof CommandProcessorResponse) ? + new CommandProcessorResponse(ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCode()) : + new CommandProcessorResponse(CONCURRENT_COMPILATION)); + } + } + } finally { + pool.shutdown(); + } + + return responseList; + } + + private void resetParallelCompilationLimit(HiveConf conf) throws Exception { + Enum compileLock = createEnumInstance("instance", Class.forName("org.apache.hadoop.hive.ql.lock" + + ".CompileLockFactory$SessionWithQuotaCompileLock")); + + Field field = compileLock.getClass().getDeclaredField("globalCompileQuotas"); + field.setAccessible(true); + + int compileLimit = conf.getIntVar(HIVE_SERVER2_PARALLEL_COMPILATION_LIMIT); + field.set(compileLock, new Semaphore(compileLimit)); + } + + @SuppressWarnings("unchecked") + private static > T createEnumInstance(String name, Type type) { + return Enum.valueOf((Class) type, name); + } + + private void verifyThatTimedOutCompileOpsCountIsZero(List responseList) { + verifyErrorCount(ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCode(), + is(equalTo(0)), responseList); + } + + private void verifyThatTimedOutCompileOpsCountIsNotZero(List responseList) { + verifyErrorCount(ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCode(), + is(not(equalTo(0))), responseList); + } + + private void verifyThatConcurrentCompilationWasIndeed(List responseList){ + verifyErrorCount(CONCURRENT_COMPILATION, + is(not(equalTo(0))), responseList); + } + + private void verifyThatNoConcurrentCompilationWasIndeed(List responseList){ + verifyErrorCount(CONCURRENT_COMPILATION, + is(equalTo(0)), responseList); + } + private void verifyErrorCount(int code, Matcher matcher, List responseList) { + int count = 0; + + for(CommandProcessorResponse response : responseList){ + if(code == response.getResponseCode()){ + count++; + } + } + assertThat(count, matcher); + } + + private void verifyThatWaitingCompileOpsCountIsEqualTo(long count) { + Counter counter = getCounter(WAITING_COMPILE_OPS); + assertNotNull(counter); + assertThat(counter.getCount(), is(equalTo(count))); + } + + private Counter getCounter(String counter) { + CodahaleMetrics metrics = (CodahaleMetrics) MetricsFactory.getInstance(); + SortedMap counters = metrics.getMetricRegistry().getCounters(); + + assertNotNull(counters); + return counters.get(counter); + } + +}