From 9495c04051ca44b54b3cff7b7da37e3ec71f9074 Mon Sep 17 00:00:00 2001 From: meiyi Date: Mon, 13 Aug 2018 19:53:46 +0800 Subject: [PATCH] HBASE-21034 Add new throttle type: read/write capacity unit --- .../hadoop/hbase/quotas/ThrottleSettings.java | 4 ++ .../apache/hadoop/hbase/quotas/ThrottleType.java | 9 +++ .../hadoop/hbase/shaded/protobuf/ProtobufUtil.java | 2 + .../src/main/protobuf/Quota.proto | 7 +++ .../hadoop/hbase/quotas/DefaultOperationQuota.java | 73 ++++++++++++++++++---- .../hbase/quotas/GlobalQuotaSettingsImpl.java | 6 ++ .../hadoop/hbase/quotas/NoopQuotaLimiter.java | 11 ++-- .../apache/hadoop/hbase/quotas/QuotaLimiter.java | 18 ++++-- .../org/apache/hadoop/hbase/quotas/QuotaUtil.java | 7 +++ .../hbase/quotas/RegionServerRpcQuotaManager.java | 4 +- .../hadoop/hbase/quotas/TimeBasedLimiter.java | 67 ++++++++++++++++++-- .../apache/hadoop/hbase/quotas/TestQuotaState.java | 8 +-- .../hadoop/hbase/quotas/TestQuotaThrottle.java | 41 +++++++++++- hbase-shell/src/main/ruby/hbase/quotas.rb | 5 +- .../src/main/ruby/shell/commands/set_quota.rb | 10 ++- 15 files changed, 232 insertions(+), 40 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java index e424d8a..f52d0fc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java @@ -95,6 +95,10 @@ class ThrottleSettings extends QuotaSettings { case READ_SIZE: builder.append(sizeToString(timedQuota.getSoftLimit())); break; + case READ_CAPACITY_UNIT: + case WRITE_CAPACITY_UNIT: + builder.append(String.format("%dcapacityUnit", timedQuota.getSoftLimit())); + break; } } else if (timedQuota.hasShare()) { builder.append(String.format("%.2f%%", timedQuota.getShare())); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java index 0b0ee60..386ff8e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java @@ -30,6 +30,9 @@ public enum ThrottleType { /** Throttling based on the read+write data size */ REQUEST_SIZE, + /** Throttling based on the read+write capacity unit */ + REQUEST_UNIT, + /** Throttling based on the number of write requests per time-unit */ WRITE_NUMBER, @@ -41,4 +44,10 @@ public enum ThrottleType { /** Throttling based on the read data size */ READ_SIZE, + + /** Throttling based on the write data capacity unit */ + WRITE_CAPACITY_UNIT, + + /** Throttling based on the read data capacity unit */ + READ_CAPACITY_UNIT, } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index 24d2ab7..de38579 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -2406,6 +2406,8 @@ public final class ProtobufUtil { case WRITE_SIZE: return QuotaProtos.ThrottleType.WRITE_SIZE; case READ_NUMBER: return QuotaProtos.ThrottleType.READ_NUMBER; case READ_SIZE: return QuotaProtos.ThrottleType.READ_SIZE; + case READ_CAPACITY_UNIT: return QuotaProtos.ThrottleType.READ_CAPACITY_UNIT; + case WRITE_CAPACITY_UNIT: return QuotaProtos.ThrottleType.WRITE_CAPACITY_UNIT; } throw new RuntimeException("Invalid ThrottleType " + type); } diff --git a/hbase-protocol-shaded/src/main/protobuf/Quota.proto b/hbase-protocol-shaded/src/main/protobuf/Quota.proto index cd4c7df..f87df77 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Quota.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Quota.proto @@ -46,6 +46,9 @@ enum ThrottleType { WRITE_SIZE = 4; READ_NUMBER = 5; READ_SIZE = 6; + REQUEST_CAPACITY_UNIT = 7; + WRITE_CAPACITY_UNIT = 8; + READ_CAPACITY_UNIT = 9; } message Throttle { @@ -57,6 +60,10 @@ message Throttle { optional TimedQuota read_num = 5; optional TimedQuota read_size = 6; + + optional TimedQuota req_capacity_unit = 7; + optional TimedQuota write_capacity_unit = 8; + optional TimedQuota read_capacity_unit = 9; } message ThrottleRequest { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java index 1265a42..b6d638e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.quotas; import java.util.Arrays; import java.util.List; +import org.apache.hadoop.conf.Configuration; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; @@ -34,20 +35,34 @@ public class DefaultOperationQuota implements OperationQuota { private static final Logger LOG = LoggerFactory.getLogger(DefaultOperationQuota.class); private final List limiters; + private final long writeCapacityUnit; + private final long readCapacityUnit; + private final long scanCapacityUnit; + private long writeAvailable = 0; private long readAvailable = 0; private long writeConsumed = 0; private long readConsumed = 0; + private long scanConsumed = 0; + private long writeCapacityUnitConsumed = 0; + private long readCapacityUnitConsumed = 0; + private long scanCapacityUnitConsumed = 0; private final long[] operationSize; - public DefaultOperationQuota(final QuotaLimiter... limiters) { - this(Arrays.asList(limiters)); + public DefaultOperationQuota(final Configuration conf, final QuotaLimiter... limiters) { + this(conf, Arrays.asList(limiters)); } /** * NOTE: The order matters. It should be something like [user, table, namespace, global] */ - public DefaultOperationQuota(final List limiters) { + public DefaultOperationQuota(final Configuration conf, final List limiters) { + this.writeCapacityUnit = + conf.getInt(QuotaUtil.WRITE_CAPACITY_UNIT_CONF_KEY, QuotaUtil.DEFAULT_WRITE_CAPACITY_UNIT); + this.readCapacityUnit = + conf.getInt(QuotaUtil.READ_CAPACITY_UNIT_CONF_KEY, QuotaUtil.DEFAULT_READ_CAPACITY_UNIT); + this.scanCapacityUnit = + conf.getInt(QuotaUtil.SCAN_CAPACITY_UNIT_CONF_KEY, QuotaUtil.DEFAULT_SCAN_CAPACITY_UNIT); this.limiters = limiters; int size = OperationType.values().length; operationSize = new long[size]; @@ -58,24 +73,29 @@ public class DefaultOperationQuota implements OperationQuota { } @Override - public void checkQuota(int numWrites, int numReads, int numScans) - throws RpcThrottlingException { + public void checkQuota(int numWrites, int numReads, int numScans) throws RpcThrottlingException { writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100); - readConsumed = estimateConsume(OperationType.GET, numReads, 100); - readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000); + readConsumed = estimateConsume(OperationType.GET, numReads, 100); + scanConsumed = estimateConsume(OperationType.SCAN, numScans, 1000); + + writeCapacityUnitConsumed = calculateWriteCapacityUnit(writeConsumed); + readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed); + scanCapacityUnitConsumed = calculateScanCapacityUnit(scanConsumed); writeAvailable = Long.MAX_VALUE; readAvailable = Long.MAX_VALUE; for (final QuotaLimiter limiter: limiters) { if (limiter.isBypass()) continue; - limiter.checkQuota(numWrites, writeConsumed, numReads + numScans, readConsumed); + limiter.checkQuota(numWrites, writeConsumed, numReads + numScans, readConsumed + scanConsumed, + writeCapacityUnitConsumed, readCapacityUnitConsumed + scanCapacityUnitConsumed); readAvailable = Math.min(readAvailable, limiter.getReadAvailable()); writeAvailable = Math.min(writeAvailable, limiter.getWriteAvailable()); } for (final QuotaLimiter limiter: limiters) { - limiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed); + limiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed + scanConsumed, + writeCapacityUnitConsumed, readCapacityUnitConsumed + scanCapacityUnitConsumed); } } @@ -83,12 +103,15 @@ public class DefaultOperationQuota implements OperationQuota { public void close() { // Adjust the quota consumed for the specified operation long writeDiff = operationSize[OperationType.MUTATE.ordinal()] - writeConsumed; - long readDiff = operationSize[OperationType.GET.ordinal()] + - operationSize[OperationType.SCAN.ordinal()] - readConsumed; + long readDiff = operationSize[OperationType.GET.ordinal()] - readConsumed; + long scanDiff = operationSize[OperationType.SCAN.ordinal()] - scanConsumed; + long writeUnitDiff = calculateWriteCapacityUnitDiff(operationSize[OperationType.MUTATE.ordinal()], writeConsumed); + long readUnitDiff = calculateReadCapacityUnitDiff( operationSize[OperationType.GET.ordinal()], readConsumed); + long scanUnitDiff = calculateScanCapacityUnitDiff(operationSize[OperationType.SCAN.ordinal()], scanConsumed); for (final QuotaLimiter limiter: limiters) { - if (writeDiff != 0) limiter.consumeWrite(writeDiff); - if (readDiff != 0) limiter.consumeRead(readDiff); + if (writeDiff != 0) limiter.consumeWrite(writeDiff, writeUnitDiff); + if (readDiff != 0) limiter.consumeRead(readDiff + scanDiff, readUnitDiff + scanUnitDiff); } } @@ -123,4 +146,28 @@ public class DefaultOperationQuota implements OperationQuota { } return 0; } + + private long calculateWriteCapacityUnit(final long size) { + return (long) Math.ceil(size * 1.0 / this.writeCapacityUnit); + } + + private long calculateReadCapacityUnit(final long size) { + return (long) Math.ceil(size * 1.0 / this.readCapacityUnit); + } + + private long calculateScanCapacityUnit(final long size) { + return (long) Math.ceil(size * 1.0 / this.scanCapacityUnit); + } + + private long calculateWriteCapacityUnitDiff(final long actualSize, final long estimateSize) { + return calculateWriteCapacityUnit(actualSize) - calculateWriteCapacityUnit(estimateSize); + } + + private long calculateReadCapacityUnitDiff(final long actualSize, final long estimateSize) { + return calculateReadCapacityUnit(actualSize) - calculateReadCapacityUnit(estimateSize); + } + + private long calculateScanCapacityUnitDiff(final long actualSize, final long estimateSize) { + return calculateScanCapacityUnit(actualSize) - calculateScanCapacityUnit(estimateSize); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/GlobalQuotaSettingsImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/GlobalQuotaSettingsImpl.java index 3119691..79dc8d8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/GlobalQuotaSettingsImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/GlobalQuotaSettingsImpl.java @@ -149,6 +149,12 @@ public class GlobalQuotaSettingsImpl extends GlobalQuotaSettings { case READ_SIZE: throttleBuilder.setReadSize(otherProto.getTimedQuota()); break; + case READ_CAPACITY_UNIT: + throttleBuilder.setReadCapacityUnit(otherProto.getTimedQuota()); + break; + case WRITE_CAPACITY_UNIT: + throttleBuilder.setWriteCapacityUnit(otherProto.getTimedQuota()); + break; } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java index 3cca955..71dd3c7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.quotas; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; -import org.apache.hadoop.hbase.quotas.OperationQuota.OperationType; /** * Noop quota limiter returned when no limiter is associated to the user/table @@ -36,22 +35,24 @@ class NoopQuotaLimiter implements QuotaLimiter { @Override public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, - long estimateReadSize) throws RpcThrottlingException { + long estimateReadSize, long estimateWriteCapacityUnit, long estimateReadCapacityUnit) + throws RpcThrottlingException { // no-op } @Override - public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize) { + public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize, + long writeCapacityUnit, long readCapacityUnit) { // no-op } @Override - public void consumeWrite(final long size) { + public void consumeWrite(final long size, long capacityUnit) { // no-op } @Override - public void consumeRead(final long size) { + public void consumeRead(final long size, long capacityUnit) { // no-op } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java index 7cb29b3..9260ec2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.quotas; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; -import org.apache.hadoop.hbase.quotas.OperationQuota.OperationType; /** * Internal interface used to interact with the user/table quota. @@ -35,10 +34,14 @@ public interface QuotaLimiter { * @param estimateWriteSize the write size that will be checked against the available quota * @param readReqs the read requests that will be checked against the available quota * @param estimateReadSize the read size that will be checked against the available quota + * @param estimateWriteCapacityUnit the write capacity unit that will be checked against the + * available quota + * @param estimateReadCapacityUnit the read capacity unit that will be checked against the + * available quota * @throws RpcThrottlingException thrown if not enough available resources to perform operation. */ - void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, long estimateReadSize) - throws RpcThrottlingException; + void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, long estimateReadSize, + long estimateWriteCapacityUnit, long estimateReadCapacityUnit) throws RpcThrottlingException; /** * Removes the specified write and read amount from the quota. @@ -49,20 +52,23 @@ public interface QuotaLimiter { * @param writeSize the write size that will be removed from the current quota * @param readReqs the read requests that will be removed from the current quota * @param readSize the read size that will be removed from the current quota + * @param writeCapacityUnit the write capacity unit that will be removed from the current quota + * @param readCapacityUnit the read capacity unit num that will be removed from the current quota */ - void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize); + void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize, + long writeCapacityUnit, long readCapacityUnit); /** * Removes or add back some write amount to the quota. * (called at the end of an operation in case the estimate quota was off) */ - void consumeWrite(long size); + void consumeWrite(long size, long capacityUnit); /** * Removes or add back some read amount to the quota. * (called at the end of an operation in case the estimate quota was off) */ - void consumeRead(long size); + void consumeRead(long size, long capacityUnit); /** @return true if the limiter is a noop */ boolean isBypass(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java index 6bc3ce9..4a1ebb4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java @@ -57,6 +57,13 @@ public class QuotaUtil extends QuotaTableUtil { public static final String QUOTA_CONF_KEY = "hbase.quota.enabled"; private static final boolean QUOTA_ENABLED_DEFAULT = false; + public static final String READ_CAPACITY_UNIT_CONF_KEY = "hbase.read.capacity.unit"; + public static final int DEFAULT_READ_CAPACITY_UNIT = 1024; + public static final String WRITE_CAPACITY_UNIT_CONF_KEY = "hbase.write.capacity.unit"; + public static final int DEFAULT_WRITE_CAPACITY_UNIT = 1024; + public static final String SCAN_CAPACITY_UNIT_CONF_KEY = "hbase.scan.capacity.unit"; + public static final int DEFAULT_SCAN_CAPACITY_UNIT = 1024 * 16; + /** Table descriptor for Quota internal table */ public static final HTableDescriptor QUOTA_TABLE_DESC = new HTableDescriptor(QUOTA_TABLE_NAME); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java index 7c21f45..a47daa0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java @@ -102,7 +102,7 @@ public class RegionServerRpcQuotaManager { LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter); } if (!useNoop) { - return new DefaultOperationQuota(userLimiter); + return new DefaultOperationQuota(this.rsServices.getConfiguration(), userLimiter); } } else { QuotaLimiter nsLimiter = quotaCache.getNamespaceLimiter(table.getNamespaceAsString()); @@ -113,7 +113,7 @@ public class RegionServerRpcQuotaManager { userLimiter + " tableLimiter=" + tableLimiter + " nsLimiter=" + nsLimiter); } if (!useNoop) { - return new DefaultOperationQuota(userLimiter, tableLimiter, nsLimiter); + return new DefaultOperationQuota(this.rsServices.getConfiguration(), userLimiter, tableLimiter, nsLimiter); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java index 02dffcf..3d6628d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java @@ -40,6 +40,9 @@ public class TimeBasedLimiter implements QuotaLimiter { private RateLimiter writeSizeLimiter = null; private RateLimiter readReqsLimiter = null; private RateLimiter readSizeLimiter = null; + private RateLimiter reqCapacityUnitLimiter = null; + private RateLimiter writeCapacityUnitLimiter = null; + private RateLimiter readCapacityUnitLimiter = null; private TimeBasedLimiter() { if (FixedIntervalRateLimiter.class.getName().equals( @@ -51,6 +54,9 @@ public class TimeBasedLimiter implements QuotaLimiter { writeSizeLimiter = new FixedIntervalRateLimiter(); readReqsLimiter = new FixedIntervalRateLimiter(); readSizeLimiter = new FixedIntervalRateLimiter(); + reqCapacityUnitLimiter = new FixedIntervalRateLimiter(); + writeCapacityUnitLimiter = new FixedIntervalRateLimiter(); + readCapacityUnitLimiter = new FixedIntervalRateLimiter(); } else { reqsLimiter = new AverageIntervalRateLimiter(); reqSizeLimiter = new AverageIntervalRateLimiter(); @@ -58,6 +64,9 @@ public class TimeBasedLimiter implements QuotaLimiter { writeSizeLimiter = new AverageIntervalRateLimiter(); readReqsLimiter = new AverageIntervalRateLimiter(); readSizeLimiter = new AverageIntervalRateLimiter(); + reqCapacityUnitLimiter = new AverageIntervalRateLimiter(); + writeCapacityUnitLimiter = new AverageIntervalRateLimiter(); + readCapacityUnitLimiter = new AverageIntervalRateLimiter(); } } @@ -93,6 +102,21 @@ public class TimeBasedLimiter implements QuotaLimiter { setFromTimedQuota(limiter.readSizeLimiter, throttle.getReadSize()); isBypass = false; } + + if (throttle.hasReqCapacityUnit()) { + setFromTimedQuota(limiter.reqCapacityUnitLimiter, throttle.getReqCapacityUnit()); + isBypass = false; + } + + if (throttle.hasWriteCapacityUnit()) { + setFromTimedQuota(limiter.writeCapacityUnitLimiter, throttle.getWriteCapacityUnit()); + isBypass = false; + } + + if (throttle.hasReadCapacityUnit()) { + setFromTimedQuota(limiter.readCapacityUnitLimiter, throttle.getReadCapacityUnit()); + isBypass = false; + } return isBypass ? NoopQuotaLimiter.get() : limiter; } @@ -103,6 +127,9 @@ public class TimeBasedLimiter implements QuotaLimiter { writeSizeLimiter.update(other.writeSizeLimiter); readReqsLimiter.update(other.readReqsLimiter); readSizeLimiter.update(other.readSizeLimiter); + reqCapacityUnitLimiter.update(other.reqCapacityUnitLimiter); + writeCapacityUnitLimiter.update(other.writeCapacityUnitLimiter); + readCapacityUnitLimiter.update(other.readCapacityUnitLimiter); } private static void setFromTimedQuota(final RateLimiter limiter, final TimedQuota timedQuota) { @@ -111,7 +138,8 @@ public class TimeBasedLimiter implements QuotaLimiter { @Override public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, - long estimateReadSize) throws RpcThrottlingException { + long estimateReadSize, long estimateWriteCapacityUnit, long estimateReadCapacityUnit) + throws RpcThrottlingException { if (!reqsLimiter.canExecute(writeReqs + readReqs)) { RpcThrottlingException.throwNumRequestsExceeded(reqsLimiter.waitInterval()); } @@ -119,6 +147,10 @@ public class TimeBasedLimiter implements QuotaLimiter { RpcThrottlingException.throwRequestSizeExceeded( reqSizeLimiter.waitInterval(estimateWriteSize + estimateReadSize)); } + if (!reqCapacityUnitLimiter.canExecute(estimateWriteCapacityUnit + estimateReadCapacityUnit)) { + RpcThrottlingException.throwRequestSizeExceeded( + reqCapacityUnitLimiter.waitInterval(estimateWriteCapacityUnit + estimateReadCapacityUnit)); + } if (estimateWriteSize > 0) { if (!writeReqsLimiter.canExecute(writeReqs)) { @@ -128,6 +160,10 @@ public class TimeBasedLimiter implements QuotaLimiter { RpcThrottlingException.throwWriteSizeExceeded( writeSizeLimiter.waitInterval(estimateWriteSize)); } + if (!writeCapacityUnitLimiter.canExecute(estimateWriteCapacityUnit)) { + RpcThrottlingException.throwWriteSizeExceeded( + writeCapacityUnitLimiter.waitInterval(estimateWriteCapacityUnit)); + } } if (estimateReadSize > 0) { @@ -138,11 +174,16 @@ public class TimeBasedLimiter implements QuotaLimiter { RpcThrottlingException.throwReadSizeExceeded( readSizeLimiter.waitInterval(estimateReadSize)); } + if (!readCapacityUnitLimiter.canExecute(estimateReadCapacityUnit)) { + RpcThrottlingException + .throwWriteSizeExceeded(readCapacityUnitLimiter.waitInterval(estimateReadCapacityUnit)); + } } } @Override - public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize) { + public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize, + long writeCapacityUnit, long readCapacityUnit) { assert writeSize != 0 || readSize != 0; reqsLimiter.consume(writeReqs + readReqs); @@ -156,18 +197,30 @@ public class TimeBasedLimiter implements QuotaLimiter { readReqsLimiter.consume(readReqs); readSizeLimiter.consume(readSize); } + if (writeCapacityUnit > 0) { + reqCapacityUnitLimiter.consume(writeCapacityUnit); + writeCapacityUnitLimiter.consume(writeCapacityUnit); + } + if (readCapacityUnit > 0) { + reqCapacityUnitLimiter.consume(readCapacityUnit); + readCapacityUnitLimiter.consume(readCapacityUnit); + } } @Override - public void consumeWrite(final long size) { + public void consumeWrite(final long size, long capacityUnit) { reqSizeLimiter.consume(size); writeSizeLimiter.consume(size); + reqCapacityUnitLimiter.consume(capacityUnit); + writeCapacityUnitLimiter.consume(capacityUnit); } @Override - public void consumeRead(final long size) { + public void consumeRead(final long size, long capacityUnit) { reqSizeLimiter.consume(size); readSizeLimiter.consume(size); + reqCapacityUnitLimiter.consume(capacityUnit); + readCapacityUnitLimiter.consume(capacityUnit); } @Override @@ -195,6 +248,12 @@ public class TimeBasedLimiter implements QuotaLimiter { if (!writeSizeLimiter.isBypass()) builder.append(" writeSize=" + writeSizeLimiter); if (!readReqsLimiter.isBypass()) builder.append(" readReqs=" + readReqsLimiter); if (!readSizeLimiter.isBypass()) builder.append(" readSize=" + readSizeLimiter); + if (!reqCapacityUnitLimiter.isBypass()) + builder.append(" reqCapacityUnit=" + reqCapacityUnitLimiter); + if (!writeCapacityUnitLimiter.isBypass()) + builder.append(" writeCapacityUnit=" + writeCapacityUnitLimiter); + if (!readCapacityUnitLimiter.isBypass()) + builder.append(" readCapacityUnit=" + readCapacityUnitLimiter); builder.append(')'); return builder.toString(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java index 0cbc445..73b253c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java @@ -224,7 +224,7 @@ public class TestQuotaState { assertFalse(quotaInfo.isBypass()); QuotaLimiter limiter = quotaInfo.getTableLimiter(TABLE_A); try { - limiter.checkQuota(TABLE_A_THROTTLE_1 + 1, TABLE_A_THROTTLE_1 + 1, 0, 0); + limiter.checkQuota(TABLE_A_THROTTLE_1 + 1, TABLE_A_THROTTLE_1 + 1, 0, 0, 1, 0); fail("Should have thrown RpcThrottlingException"); } catch (RpcThrottlingException e) { // expected @@ -242,7 +242,7 @@ public class TestQuotaState { private void assertThrottleException(final QuotaLimiter limiter, final int availReqs) { assertNoThrottleException(limiter, availReqs); try { - limiter.checkQuota(1, 1, 0, 0); + limiter.checkQuota(1, 1, 0, 0, 1, 0); fail("Should have thrown RpcThrottlingException"); } catch (RpcThrottlingException e) { // expected @@ -252,11 +252,11 @@ public class TestQuotaState { private void assertNoThrottleException(final QuotaLimiter limiter, final int availReqs) { for (int i = 0; i < availReqs; ++i) { try { - limiter.checkQuota(1, 1, 0, 0); + limiter.checkQuota(1, 1, 0, 0, 1, 0); } catch (RpcThrottlingException e) { fail("Unexpected RpcThrottlingException after " + i + " requests. limit=" + availReqs); } - limiter.grabQuota(1, 1, 0, 0); + limiter.grabQuota(1, 1, 0, 0, 1, 0); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java index 59ba322..10e30cb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java @@ -509,13 +509,42 @@ public class TestQuotaThrottle { assertEquals(30, doGets(30, tables[1])); } + @Test + public void testUserAndTableCapacityUnitThrottle() throws Exception { + final Admin admin = TEST_UTIL.getAdmin(); + final String userName = User.getCurrent().getShortName(); + + // Add 6capacityUnit/min limit + admin.setQuota(QuotaSettingsFactory.throttleUser(userName, TABLE_NAMES[0], + ThrottleType.WRITE_CAPACITY_UNIT, 6, TimeUnit.MINUTES)); + triggerUserCacheRefresh(false, TABLE_NAMES[0]); + + // should execute at max 6 capacity units because each put size is 1 capacity unit on tables[0] + assertEquals(6, doPuts(100, 10, tables[0])); + + // wait a minute and you should execute at max 3 capacity units because each put size is 2 + // capacity unit on tables[0] + waitMinuteQuota(); + assertEquals(3, doPuts(100, 1025, tables[0])); + } + private int doPuts(int maxOps, final Table... tables) throws Exception { + return doPuts(maxOps, -1, tables); + } + + private int doPuts(int maxOps, int valueSize, final Table... tables) throws Exception { int count = 0; try { while (count < maxOps) { Put put = new Put(Bytes.toBytes("row-" + count)); - put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("data-" + count)); - for (final Table table: tables) { + byte[] value; + if (valueSize < 0) { + value = Bytes.toBytes("data-" + count); + } else { + value = generateValue(valueSize); + } + put.addColumn(FAMILY, QUALIFIER, value); + for (final Table table : tables) { table.put(put); } count += tables.length; @@ -526,6 +555,14 @@ public class TestQuotaThrottle { return count; } + private byte[] generateValue(int valueSize) { + byte[] bytes = new byte[valueSize]; + for (int i = 0; i < valueSize; i++) { + bytes[i] = 'a'; + } + return bytes; + } + private long doGets(int maxOps, final Table... tables) throws Exception { int count = 0; try { diff --git a/hbase-shell/src/main/ruby/hbase/quotas.rb b/hbase-shell/src/main/ruby/hbase/quotas.rb index 1ea8d28..eb45236 100644 --- a/hbase-shell/src/main/ruby/hbase/quotas.rb +++ b/hbase-shell/src/main/ruby/hbase/quotas.rb @@ -259,11 +259,14 @@ module Hbase def _parse_limit(str_limit, type_cls, type) str_limit = str_limit.downcase - match = /(\d+)(req|[bkmgtp])\/(sec|min|hour|day)/.match(str_limit) + match = /(\d+)(req|unit|[bkmgtp])\/(sec|min|hour|day)/.match(str_limit) if match if match[2] == 'req' limit = match[1].to_i type = type_cls.valueOf(type + '_NUMBER') + elsif match[2] == 'unit' + limit = match[1].to_i + type = type_cls.valueOf(type + '_UNIT') else limit = _size_from_str(match[1].to_i, match[2]) type = type_cls.valueOf(type + '_SIZE') diff --git a/hbase-shell/src/main/ruby/shell/commands/set_quota.rb b/hbase-shell/src/main/ruby/shell/commands/set_quota.rb index ed593b6..4053a48 100644 --- a/hbase-shell/src/main/ruby/shell/commands/set_quota.rb +++ b/hbase-shell/src/main/ruby/shell/commands/set_quota.rb @@ -26,11 +26,12 @@ Set a quota for a user, table, or namespace. Syntax : set_quota TYPE => , TYPE => THROTTLE -User can either set quota on read, write or on both the requests together(i.e., read+write) +User can either set quota on read, write or on both the requests together(i.e., read+write). The read, write, or read+write(default throttle type) request limit can be expressed using -the form 100req/sec, 100req/min and the read, write, read+write(default throttle type) limit +the form 100req/sec, 100req/min; the read, write, read+write(default throttle type) limit can be expressed using the form 100k/sec, 100M/min with (B, K, M, G, T, P) as valid size unit -and (sec, min, hour, day) as valid time unit. +; the read, write, read+write(default throttle type) limit can be expressed using the form +100unit/sec as capacity unit. The valid time units are (sec, min, hour, day). Currently the throttle limit is per machine - a limit of 100req/min means that each machine can execute 100req/min. @@ -42,6 +43,9 @@ For example: hbase> set_quota TYPE => THROTTLE, USER => 'u1', LIMIT => '10M/sec' hbase> set_quota TYPE => THROTTLE, THROTTLE_TYPE => WRITE, USER => 'u1', LIMIT => '10M/sec' + hbase> set_quota TYPE => THROTTLE, USER => 'u1', LIMIT => '10unit/sec' + hbase> set_quota TYPE => THROTTLE, THROTTLE_TYPE => WRITE, USER => 'u1', LIMIT => '10unit/sec' + hbase> set_quota TYPE => THROTTLE, USER => 'u1', TABLE => 't2', LIMIT => '5K/min' hbase> set_quota TYPE => THROTTLE, USER => 'u1', NAMESPACE => 'ns2', LIMIT => NONE -- 2.7.4