diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java index a15aeb6..088e101 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java @@ -69,13 +69,13 @@ public class DefaultOperationQuota implements OperationQuota { for (final QuotaLimiter limiter: limiters) { if (limiter.isBypass()) continue; - limiter.checkQuota(writeConsumed, readConsumed); + limiter.checkQuota(numWrites, writeConsumed, numReads + numScans, readConsumed); readAvailable = Math.min(readAvailable, limiter.getReadAvailable()); writeAvailable = Math.min(writeAvailable, limiter.getWriteAvailable()); } for (final QuotaLimiter limiter: limiters) { - limiter.grabQuota(writeConsumed, readConsumed); + limiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed); } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java index 6230af2..cfbb3b9 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java @@ -41,11 +41,22 @@ class NoopQuotaLimiter implements QuotaLimiter { } @Override + public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, long estimateReadSize) + throws ThrottlingException { + // no-op + } + + @Override public void grabQuota(long writeSize, long readSize) { // no-op } @Override + public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize) { + // no-op + } + + @Override public void consumeWrite(final long size) { // no-op } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java index db63f6e..19ba85a 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java @@ -39,6 +39,18 @@ public interface QuotaLimiter { throws ThrottlingException; /** + * Checks if it is possible to execute the specified operation. + * + * @param writeReqs the write requests that will be checked against the available quota + * @param estimateWriteSize the write size that will be checked against the available quota + * @param readReqs the read requests that will be checked against the available quota + * @param estimateReadSize the read size that will be checked against the available quota + * @throws ThrottlingException thrown if not enough avialable resources to perform operation. + */ + void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, long estimateReadSize) + throws ThrottlingException; + + /** * Removes the specified write and read amount from the quota. * At this point the write and read amount will be an estimate, * that will be later adjusted with a consumeWrite()/consumeRead() call. @@ -49,6 +61,18 @@ public interface QuotaLimiter { void grabQuota(long writeSize, long readSize); /** + * Removes the specified write and read amount from the quota. + * At this point the write and read amount will be an estimate, + * that will be later adjusted with a consumeWrite()/consumeRead() call. + * + * @param writeReqs the write requests that will be removed from the current quota + * @param writeSize the write size that will be removed from the current quota + * @param readReqs the read requests that will be removed from the current quota + * @param readSize the read size that will be removed from the current quota + */ + void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize); + + /** * Removes or add back some write amount to the quota. * (called at the end of an operation in case the estimate quota was off) */ diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java index 0d77443..fd5e091 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java @@ -111,46 +111,61 @@ public class TimeBasedLimiter implements QuotaLimiter { @Override public void checkQuota(long writeSize, long readSize) throws ThrottlingException { - if (!reqsLimiter.canExecute()) { + long writeReqs = (writeSize > 0) ? 1 : 0; + long readReqs = (readSize > 0) ? 1 : 0; + checkQuota(writeReqs, writeSize, readReqs, readSize); + } + + @Override + public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, long estimateReadSize) + throws ThrottlingException { + if (!reqsLimiter.canExecute(writeReqs + readReqs)) { ThrottlingException.throwNumRequestsExceeded(reqsLimiter.waitInterval()); } - if (!reqSizeLimiter.canExecute(writeSize + readSize)) { + if (!reqSizeLimiter.canExecute(estimateWriteSize + estimateReadSize)) { ThrottlingException.throwRequestSizeExceeded(reqSizeLimiter - .waitInterval(writeSize + readSize)); + .waitInterval(estimateWriteSize + estimateReadSize)); } - if (writeSize > 0) { - if (!writeReqsLimiter.canExecute()) { + if (estimateWriteSize > 0) { + if (!writeReqsLimiter.canExecute(writeReqs)) { ThrottlingException.throwNumWriteRequestsExceeded(writeReqsLimiter.waitInterval()); } - if (!writeSizeLimiter.canExecute(writeSize)) { - ThrottlingException.throwWriteSizeExceeded(writeSizeLimiter.waitInterval(writeSize)); + if (!writeSizeLimiter.canExecute(estimateWriteSize)) { + ThrottlingException.throwWriteSizeExceeded(writeSizeLimiter.waitInterval(estimateWriteSize)); } } - if (readSize > 0) { - if (!readReqsLimiter.canExecute()) { + if (estimateReadSize > 0) { + if (!readReqsLimiter.canExecute(readReqs)) { ThrottlingException.throwNumReadRequestsExceeded(readReqsLimiter.waitInterval()); } - if (!readSizeLimiter.canExecute(readSize)) { - ThrottlingException.throwReadSizeExceeded(readSizeLimiter.waitInterval(readSize)); + if (!readSizeLimiter.canExecute(estimateReadSize)) { + ThrottlingException.throwReadSizeExceeded(readSizeLimiter.waitInterval(estimateReadSize)); } } } @Override public void grabQuota(long writeSize, long readSize) { + long readReqs = (readSize > 0) ? 1 : 0; + long writeReqs = (writeSize > 0) ? 1 : 0; + grabQuota(writeReqs, writeSize, readReqs, readSize); + } + + @Override + public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize) { assert writeSize != 0 || readSize != 0; - reqsLimiter.consume(1); + reqsLimiter.consume(writeReqs + readReqs); reqSizeLimiter.consume(writeSize + readSize); if (writeSize > 0) { - writeReqsLimiter.consume(1); + writeReqsLimiter.consume(writeReqs); writeSizeLimiter.consume(writeSize); } if (readSize > 0) { - readReqsLimiter.consume(1); + readReqsLimiter.consume(readReqs); readSizeLimiter.consume(readSize); } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java index c16478b..ae62772 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java @@ -203,6 +203,34 @@ public class TestQuotaState { assertNoopLimiter(quotaInfo.getTableLimiter(UNKNOWN_TABLE_NAME)); } + @Test(timeout = 60000) + public void testTableThrottleWithBatch() { + final TableName TABLE_A = TableName.valueOf("TableA"); + final int TABLE_A_THROTTLE_1 = 3; + final long LAST_UPDATE_1 = 10; + + UserQuotaState quotaInfo = new UserQuotaState(); + assertEquals(0, quotaInfo.getLastUpdate()); + assertTrue(quotaInfo.isBypass()); + + // Add A table limiters + UserQuotaState otherQuotaState = new UserQuotaState(LAST_UPDATE_1); + otherQuotaState.setQuotas(TABLE_A, buildReqNumThrottle(TABLE_A_THROTTLE_1)); + assertEquals(LAST_UPDATE_1, otherQuotaState.getLastUpdate()); + assertFalse(otherQuotaState.isBypass()); + + quotaInfo.update(otherQuotaState); + assertEquals(LAST_UPDATE_1, quotaInfo.getLastUpdate()); + assertFalse(quotaInfo.isBypass()); + QuotaLimiter limiter = quotaInfo.getTableLimiter(TABLE_A); + try { + limiter.checkQuota(TABLE_A_THROTTLE_1 + 1, TABLE_A_THROTTLE_1 + 1, 0, 0); + fail("Should have thrown ThrottlingException"); + } catch (ThrottlingException e) { + // expected + } + } + private Quotas buildReqNumThrottle(final long limit) { return Quotas.newBuilder() .setThrottle(Throttle.newBuilder() @@ -214,7 +242,7 @@ public class TestQuotaState { private void assertThrottleException(final QuotaLimiter limiter, final int availReqs) { assertNoThrottleException(limiter, availReqs); try { - limiter.checkQuota(1, 1); + limiter.checkQuota(1, 0); fail("Should have thrown ThrottlingException"); } catch (ThrottlingException e) { // expected @@ -224,11 +252,11 @@ public class TestQuotaState { private void assertNoThrottleException(final QuotaLimiter limiter, final int availReqs) { for (int i = 0; i < availReqs; ++i) { try { - limiter.checkQuota(1, 1); + limiter.checkQuota(1, 0); } catch (ThrottlingException e) { fail("Unexpected ThrottlingException after " + i + " requests. limit=" + availReqs); } - limiter.grabQuota(1, 1); + limiter.grabQuota(1, 0); } }