diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index aa58d74..cc632cd 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3059,7 +3059,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVE_SERVER2_THRIFT_BIND_HOST("hive.server2.thrift.bind.host", "", "Bind host on which to run the HiveServer2 Thrift service."), HIVE_SERVER2_PARALLEL_COMPILATION("hive.driver.parallel.compilation", false, "Whether to\n" + - "enable parallel compilation of the queries between sessions and within the same session on HiveServer2. The default is false."), + "enable parallel compilation of the queries between sessions on HiveServer2. The default is false."), + HIVE_SERVER2_PARALLEL_COMPILATION_LIMIT("hive.driver.parallel.compilation.limit", -1, "Determines the degree " + + "of parallelism for compilation queries between sessions on HiveServer2. The default is -1."), HIVE_SERVER2_COMPILE_LOCK_TIMEOUT("hive.server2.compile.lock.timeout", "0s", new TimeValidator(TimeUnit.SECONDS), "Number of seconds a request will wait to acquire the compile lock before giving up. " + @@ -4403,7 +4405,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal + ",fs.s3a.secret.key" + ",fs.s3a.proxy.password" + ",dfs.adls.oauth2.credential" - + ",fs.adl.oauth2.credential", + + ",fs.adl.oauth2.credential" + + ",hive.driver.parallel.compilation.limit", "Comma separated list of configuration options which should not be read by normal user like passwords"), HIVE_CONF_INTERNAL_VARIABLE_LIST("hive.conf.internal.variable.list", "hive.added.files.path,hive.added.jars.path,hive.added.archives.path", diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index dad2035..21638fc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -36,8 +36,11 @@ import java.util.Map.Entry; import java.util.Queue; import java.util.Set; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; @@ -170,7 +173,6 @@ ByteStream.Output bos = new ByteStream.Output(); private final HiveConf conf; - private final boolean isParallelEnabled; private DataInput resStream; private Context ctx; private DriverContext driverCxt; @@ -240,6 +242,165 @@ ERROR } + private static final class CompileLock { + + private static final String LOCK_ACQUIRED_MSG = "Acquired the compile lock."; + private static final String WAIT_LOCK_ACQUIRE_MSG = "Waiting to acquire compile lock: "; + + private final HiveConf conf; + + private Lock compileLock; + private final String command; + + private CompileLock(HiveConf conf, String command){ + this.compileLock = serializableCompileLock; + this.conf = conf; + + boolean isParallelEnabled = (conf != null) + && HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_PARALLEL_COMPILATION); + + if (isParallelEnabled) { + int compileQuota = HiveConf.getIntVar(conf, ConfVars.HIVE_SERVER2_PARALLEL_COMPILATION_LIMIT); + + this.compileLock = (compileQuota > 0) ? + SessionWithQuotaCompileLock.instance : SessionState.get().getCompileLock(); + } + this.command = command; + } + + public static CompileLock getInstance(HiveConf conf, String command){ + return new CompileLock(conf, command); + } + + private boolean tryAcquire() { + long timeout = HiveConf.getTimeVar(conf, + ConfVars.HIVE_SERVER2_COMPILE_LOCK_TIMEOUT, TimeUnit.SECONDS); + + return tryAcquire(timeout, TimeUnit.SECONDS); + } + + /** + * Acquires the compile lock. If the compile lock wait timeout is configured, + * it will acquire the lock if it is not held by another thread within the given + * waiting time. + * @return the ReentrantLock object if the lock was successfully acquired, + * or {@code null} if compile lock wait timeout is configured and + * either the waiting time elapsed before the lock could be acquired + * or if the current thread is interrupted. + */ + private boolean tryAcquire(long timeout, TimeUnit unit) { + // First shot without waiting. + try { + if (compileLock.tryLock(0, unit)) { + LOG.debug(LOCK_ACQUIRED_MSG); + return true; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + if (LOG.isDebugEnabled()) { + LOG.debug("Interrupted Exception ignored", e); + } + return false; + } + + // If the first shot fails, then we log the waiting messages. + if (LOG.isDebugEnabled()) { + LOG.debug(WAIT_LOCK_ACQUIRE_MSG + command); + } + + if (timeout > 0) { + try { + if (!compileLock.tryLock(timeout, unit)) { + LOG.error(ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCodedMsg() + ": " + command); + return false; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + if (LOG.isDebugEnabled()) { + LOG.debug("Interrupted Exception ignored", e); + } + return false; + } + } else { + compileLock.lock(); + } + + LOG.debug(LOCK_ACQUIRED_MSG); + return true; + } + + public void release() { + compileLock.unlock(); + } + } + + private static final ReentrantLock serializableCompileLock = new ReentrantLock(); + + private enum SessionWithQuotaCompileLock implements Lock { + + instance(SessionState.getSessionConf() + .getIntVar(ConfVars.HIVE_SERVER2_PARALLEL_COMPILATION_LIMIT)); + + private final Semaphore globalCompileQuotas; + + SessionWithQuotaCompileLock(int compilePoolSize){ + globalCompileQuotas = new Semaphore(compilePoolSize); + } + + @Override + public void lock() { + getCompileLock().lock(); + globalCompileQuotas.acquireUninterruptibly(); + } + + @Override + public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { + boolean result = false; + long startTime = System.nanoTime(); + + try { + result = getCompileLock().tryLock(time, unit) + && globalCompileQuotas.tryAcquire( + getTimeout(startTime, unit.toNanos(time)), TimeUnit.NANOSECONDS); + } finally { + if(!result && getCompileLock().isHeldByCurrentThread()) { + getCompileLock().unlock(); + } + } + return result; + } + + @Override + public void unlock() { + getCompileLock().unlock(); + globalCompileQuotas.release(); + } + + private ReentrantLock getCompileLock() { + return SessionState.get().getCompileLock(); + } + + private long getTimeout(long startTime, long time) { + long timeout = time - (System.nanoTime() - startTime); + return (timeout < 0) ? 0 : timeout; + } + + @Override + public void lockInterruptibly() throws InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean tryLock() { + throw new UnsupportedOperationException(); + } + + @Override + public Condition newCondition() { + throw new UnsupportedOperationException(); + } + } + public static class LockedDriverState { // a lock is used for synchronizing the state transition and its associated // resource releases @@ -451,8 +612,6 @@ public Driver(QueryState queryState, String userName, QueryInfo queryInfo) { public Driver(QueryState queryState, String userName, QueryInfo queryInfo, HiveTxnManager txnMgr) { this.queryState = queryState; this.conf = queryState.getConf(); - isParallelEnabled = (conf != null) - && HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_PARALLEL_COMPILATION); this.userName = userName; this.hookRunner = new HookRunner(conf, console); this.queryInfo = queryInfo; @@ -504,7 +663,7 @@ public int compile(String command, boolean resetTaskIds) { // deferClose indicates if the close/destroy should be deferred when the process has been // interrupted, it should be set to true if the compile is called within another method like // runInternal, which defers the close to the called in that method. - private void compile(String command, boolean resetTaskIds, boolean deferClose) throws CommandProcessorResponse { + void compile(String command, boolean resetTaskIds, boolean deferClose) throws CommandProcessorResponse { PerfLogger perfLogger = SessionState.getPerfLogger(); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.COMPILE); lDrvState.stateLock.lock(); @@ -1846,8 +2005,6 @@ public void lockAndRespond() throws CommandProcessorResponse { } } - private static final ReentrantLock globalCompileLock = new ReentrantLock(); - private void compileInternal(String command, boolean deferClose) throws CommandProcessorResponse { Metrics metrics = MetricsFactory.getInstance(); if (metrics != null) { @@ -1856,18 +2013,20 @@ private void compileInternal(String command, boolean deferClose) throws CommandP PerfLogger perfLogger = SessionState.getPerfLogger(true); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.WAIT_COMPILE); - final ReentrantLock compileLock = tryAcquireCompileLock(isParallelEnabled, - command); + + final CompileLock compileLock = CompileLock.getInstance(conf, command); + boolean success = compileLock.tryAcquire(); + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.WAIT_COMPILE); + if (metrics != null) { metrics.decrementCounter(MetricsConstant.WAITING_COMPILE_OPS, 1); } - - if (compileLock == null) { + if (!success) { + errorMessage = ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCodedMsg(); throw createProcessorResponse(ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCode()); } - try { compile(command, true, deferClose); } catch (CommandProcessorResponse cpr) { @@ -1878,7 +2037,7 @@ private void compileInternal(String command, boolean deferClose) throws CommandP } throw cpr; } finally { - compileLock.unlock(); + compileLock.release(); } //Save compile-time PerfLogging for WebUI. //Execution-time Perf logs are done by either another thread's PerfLogger @@ -1887,65 +2046,6 @@ private void compileInternal(String command, boolean deferClose) throws CommandP queryDisplay.setPerfLogEnds(QueryDisplay.Phase.COMPILATION, perfLogger.getEndTimes()); } - /** - * Acquires the compile lock. If the compile lock wait timeout is configured, - * it will acquire the lock if it is not held by another thread within the given - * waiting time. - * @return the ReentrantLock object if the lock was successfully acquired, - * or {@code null} if compile lock wait timeout is configured and - * either the waiting time elapsed before the lock could be acquired - * or if the current thread is interrupted. - */ - private ReentrantLock tryAcquireCompileLock(boolean isParallelEnabled, - String command) { - final ReentrantLock compileLock = isParallelEnabled ? - SessionState.get().getCompileLock() : globalCompileLock; - long maxCompileLockWaitTime = HiveConf.getTimeVar( - this.conf, ConfVars.HIVE_SERVER2_COMPILE_LOCK_TIMEOUT, - TimeUnit.SECONDS); - - final String lockAcquiredMsg = "Acquired the compile lock."; - // First shot without waiting. - try { - if (compileLock.tryLock(0, TimeUnit.SECONDS)) { - LOG.debug(lockAcquiredMsg); - return compileLock; - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - if (LOG.isDebugEnabled()) { - LOG.debug("Interrupted Exception ignored", e); - } - return null; - } - - // If the first shot fails, then we log the waiting messages. - if (LOG.isDebugEnabled()) { - LOG.debug("Waiting to acquire compile lock: " + command); - } - - if (maxCompileLockWaitTime > 0) { - try { - if(!compileLock.tryLock(maxCompileLockWaitTime, TimeUnit.SECONDS)) { - errorMessage = ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCodedMsg(); - LOG.error(errorMessage + ": " + command); - return null; - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - if (LOG.isDebugEnabled()) { - LOG.debug("Interrupted Exception ignored", e); - } - return null; - } - } else { - compileLock.lock(); - } - - LOG.debug(lockAcquiredMsg); - return compileLock; - } - private void runInternal(String command, boolean alreadyCompiled) throws CommandProcessorResponse { errorMessage = null; SQLState = null; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/CompileLockTest.java b/ql/src/test/org/apache/hadoop/hive/ql/CompileLockTest.java new file mode 100644 index 0000000..cd54384 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/CompileLockTest.java @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql; + +import java.lang.reflect.Field; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.List; +import java.util.SortedMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; +import org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; + +import com.codahale.metrics.Counter; + +import org.hamcrest.Matcher; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.apache.hadoop.hive.common.metrics.common.MetricsConstant.WAITING_COMPILE_OPS; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_SERVER2_COMPILE_LOCK_TIMEOUT; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_SERVER2_PARALLEL_COMPILATION; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_SERVER2_PARALLEL_COMPILATION_LIMIT; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.not; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.eq; + +/** + * Class for testing Hive compile lock behavior (serializable, parallel unbounded, parallel bounded). + */ +public class CompileLockTest { + + private static final int CONCURRENT_COMPILATION = 15151; + + private Driver driver; + private HiveConf conf; + + @Before + public void init() throws Exception { + conf = new HiveConf(); + + conf.setBoolVar(HIVE_SERVER2_METRICS_ENABLED, true); + conf.setVar(HiveConf.ConfVars.DOWNLOADED_RESOURCES_DIR, System.getProperty("java.io.tmpdir")); + conf.setTimeVar(HIVE_SERVER2_COMPILE_LOCK_TIMEOUT, 15, TimeUnit.SECONDS); + + MetricsFactory.close(); + MetricsFactory.init(conf); + } + + private void initDriver(HiveConf conf, int threadCount) throws Exception { + driver = Mockito.spy(new Driver(conf)); + resetParallelCompilationLimit(conf); + + AtomicInteger count = new AtomicInteger(threadCount); + + Mockito.doAnswer(invocation -> { + Thread.sleep(500); + verifyThatWaitingCompileOpsCountIsEqualTo(count.decrementAndGet()); + return null; + }).when(driver).compile(eq(""), eq(true), eq(false)); + } + + @Test + public void testSerializableCompilation() throws Exception { + conf.setBoolVar(HIVE_SERVER2_PARALLEL_COMPILATION, false); + + initDriver(conf, 10); + List responseList = compileAndRespond(10); + + verifyThatWaitingCompileOpsCountIsEqualTo(0); + verifyThatTimedOutCompileOpsCountIsZero(responseList); + + verifyThatNoConcurrentCompilationWasIndeed(responseList); + } + + @Test + public void testParallelCompilationWithSingleQuota() throws Exception { + conf.setBoolVar(HIVE_SERVER2_PARALLEL_COMPILATION, true); + conf.setIntVar(HIVE_SERVER2_PARALLEL_COMPILATION_LIMIT, 1); + + initDriver(conf, 10); + List responseList = compileAndRespond(10); + + verifyThatWaitingCompileOpsCountIsEqualTo(0); + verifyThatTimedOutCompileOpsCountIsZero(responseList); + + verifyThatNoConcurrentCompilationWasIndeed(responseList); + } + + @Test + public void testParallelCompilationWithUnboundedQuota() throws Exception { + conf.setBoolVar(HIVE_SERVER2_PARALLEL_COMPILATION, true); + conf.setIntVar(HIVE_SERVER2_PARALLEL_COMPILATION_LIMIT, -1); + + initDriver(conf, 10); + List responseList = compileAndRespond(10); + + verifyThatWaitingCompileOpsCountIsEqualTo(0); + verifyThatTimedOutCompileOpsCountIsZero(responseList); + + verifyThatConcurrentCompilationWasIndeed(responseList); + } + + @Test + public void testParallelCompilationWithUnboundedQuotaAndSingleSession() throws Exception { + conf.setBoolVar(HIVE_SERVER2_PARALLEL_COMPILATION, true); + conf.setIntVar(HIVE_SERVER2_PARALLEL_COMPILATION_LIMIT, -1); + + initDriver(conf, 10); + List responseList = compileAndRespond(true, 10); + + verifyThatWaitingCompileOpsCountIsEqualTo(0); + verifyThatTimedOutCompileOpsCountIsZero(responseList); + + verifyThatNoConcurrentCompilationWasIndeed(responseList); + } + + @Test + public void testParallelCompilationTimeoutWithSingleQuota() throws Exception { + conf.setBoolVar(HIVE_SERVER2_PARALLEL_COMPILATION, true); + conf.setIntVar(HIVE_SERVER2_PARALLEL_COMPILATION_LIMIT, 1); + conf.setTimeVar(HIVE_SERVER2_COMPILE_LOCK_TIMEOUT, 1, TimeUnit.SECONDS); + + initDriver(conf, 10); + List responseList = compileAndRespond(10); + + verifyThatWaitingCompileOpsCountIsEqualTo(0); + verifyThatTimedOutCompileOpsCountIsNotZero(responseList); + } + + @Test + public void testParallelCompilationWithSingleQuotaAndZeroTimeout() throws Exception { + conf.setBoolVar(HIVE_SERVER2_PARALLEL_COMPILATION, true); + conf.setIntVar(HIVE_SERVER2_PARALLEL_COMPILATION_LIMIT, 1); + conf.setTimeVar(HIVE_SERVER2_COMPILE_LOCK_TIMEOUT, 0, TimeUnit.SECONDS); + + initDriver(conf, 10); + List responseList = compileAndRespond(10); + + verifyThatWaitingCompileOpsCountIsEqualTo(0); + verifyThatTimedOutCompileOpsCountIsZero(responseList); + + verifyThatNoConcurrentCompilationWasIndeed(responseList); + } + + @Test + public void testParallelCompilationWithMultipleQuotas() throws Exception { + conf.setBoolVar(HIVE_SERVER2_PARALLEL_COMPILATION, true); + conf.setIntVar(HIVE_SERVER2_PARALLEL_COMPILATION_LIMIT, 2); + + initDriver(conf, 10); + List responseList = compileAndRespond(10); + + verifyThatWaitingCompileOpsCountIsEqualTo(0); + verifyThatTimedOutCompileOpsCountIsZero(responseList); + + verifyThatConcurrentCompilationWasIndeed(responseList); + } + + @Test + public void testParallelCompilationWithMultipleQuotasAndClientSessionConcurrency() throws Exception { + conf.setBoolVar(HIVE_SERVER2_PARALLEL_COMPILATION, true); + conf.setIntVar(HIVE_SERVER2_PARALLEL_COMPILATION_LIMIT, 2); + + initDriver(conf, 10); + List responseList = new ArrayList<>(); + + List>> callables = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + callables.add(() -> compileAndRespond(true, 2)); + } + + ExecutorService pool = Executors.newFixedThreadPool(callables.size()); + try { + List>> futures = pool.invokeAll(callables); + for (Future> future : futures) { + responseList.addAll(future.get()); + } + } finally { + pool.shutdown(); + } + + verifyThatWaitingCompileOpsCountIsEqualTo(0); + verifyThatTimedOutCompileOpsCountIsZero(responseList); + + verifyThatConcurrentCompilationWasIndeed(responseList); + } + + private List compileAndRespond(int threadCount) throws Exception { + return compileAndRespond(false, threadCount); + } + + private List compileAndRespond(boolean reuseSession, int threadCount) throws Exception { + List responseList = new ArrayList<>(); + SessionState sessionState = new SessionState(conf); + + List> callables = new ArrayList<>(); + for (int i = 0; i < threadCount; i++) { + callables.add(() -> { + SessionState ss = (reuseSession)? sessionState : new SessionState(conf); + SessionState.setCurrentSessionState(ss); + + CommandProcessorResponse response; + try{ + response = driver.compileAndRespond(""); + + } finally { + SessionState.detachSession(); + } + return response; + }); + } + + ExecutorService pool = Executors.newFixedThreadPool(callables.size()); + try{ + List> futures = pool.invokeAll(callables); + + for (Future future : futures) { + try { + responseList.add(future.get()); + + } catch (ExecutionException ex) { + responseList.add( + (ex.getCause() instanceof CommandProcessorResponse) ? + new CommandProcessorResponse(ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCode()) : + new CommandProcessorResponse(CONCURRENT_COMPILATION)); + } + } + } finally { + pool.shutdown(); + } + + return responseList; + } + + private void resetParallelCompilationLimit(HiveConf conf) throws Exception { + Enum compileLock = createEnumInstance("instance", Class.forName("org.apache.hadoop.hive.ql" + + ".Driver$SessionWithQuotaCompileLock")); + + Field field = compileLock.getClass().getDeclaredField("globalCompileQuotas"); + field.setAccessible(true); + + int compileLimit = conf.getIntVar(HIVE_SERVER2_PARALLEL_COMPILATION_LIMIT); + field.set(compileLock, new Semaphore(compileLimit)); + } + + @SuppressWarnings("unchecked") + private static > T createEnumInstance(String name, Type type) { + return Enum.valueOf((Class) type, name); + } + + private void verifyThatTimedOutCompileOpsCountIsZero(List responseList) { + verifyErrorCount(ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCode(), + is(equalTo(0)), responseList); + } + + private void verifyThatTimedOutCompileOpsCountIsNotZero(List responseList) { + verifyErrorCount(ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCode(), + is(not(equalTo(0))), responseList); + } + + private void verifyThatConcurrentCompilationWasIndeed(List responseList){ + verifyErrorCount(CONCURRENT_COMPILATION, + is(not(equalTo(0))), responseList); + } + + private void verifyThatNoConcurrentCompilationWasIndeed(List responseList){ + verifyErrorCount(CONCURRENT_COMPILATION, + is(equalTo(0)), responseList); + } + private void verifyErrorCount(int code, Matcher matcher, List responseList) { + int count = 0; + + for(CommandProcessorResponse response : responseList){ + if(code == response.getResponseCode()){ + count++; + } + } + assertThat(count, matcher); + } + + private void verifyThatWaitingCompileOpsCountIsEqualTo(long count) { + Counter counter = getCounter(WAITING_COMPILE_OPS); + assertNotNull(counter); + assertThat(counter.getCount(), is(equalTo(count))); + } + + private Counter getCounter(String counter) { + CodahaleMetrics metrics = (CodahaleMetrics) MetricsFactory.getInstance(); + SortedMap counters = metrics.getMetricRegistry().getCounters(); + + assertNotNull(counters); + return counters.get(counter); + } + +}