From 6523cb311e5f8eb3ebd512c50eb42f355183e97d Mon Sep 17 00:00:00 2001 From: Josh Elser Date: Mon, 23 Apr 2018 21:15:42 -0400 Subject: [PATCH] HBASE-20468 Count each multi-action as a request in RPC quotas RPC quotas presently only increment requests for RPCs which is not very effective in controlling utilization of HBase because often each RPC is an arbitrary batch of work. --- .../hadoop/hbase/quotas/DefaultOperationQuota.java | 4 +- .../hadoop/hbase/quotas/NoopQuotaLimiter.java | 5 +- .../apache/hadoop/hbase/quotas/QuotaLimiter.java | 8 +- .../hadoop/hbase/quotas/TimeBasedLimiter.java | 16 +- .../hbase/quotas/TestQuotaRequestCounting.java | 213 +++++++++++++++++++++ .../apache/hadoop/hbase/quotas/TestQuotaState.java | 9 +- 6 files changed, 238 insertions(+), 17 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaRequestCounting.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java index 80b39a86da..1265a4223a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java @@ -69,13 +69,13 @@ public class DefaultOperationQuota implements OperationQuota { for (final QuotaLimiter limiter: limiters) { if (limiter.isBypass()) continue; - limiter.checkQuota(writeConsumed, readConsumed); + limiter.checkQuota(numWrites, writeConsumed, numReads + numScans, readConsumed); readAvailable = Math.min(readAvailable, limiter.getReadAvailable()); writeAvailable = Math.min(writeAvailable, limiter.getWriteAvailable()); } for (final QuotaLimiter limiter: limiters) { - limiter.grabQuota(writeConsumed, readConsumed); + limiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java index acfdc5285a..85328139f1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java @@ -35,13 +35,14 @@ class NoopQuotaLimiter implements QuotaLimiter { } @Override - public void checkQuota(long estimateWriteSize, long estimateReadSize) + public void checkQuota( + long numWrites, long estimateWriteSize, long numReads, long estimateReadSize) throws RpcThrottlingException { // no-op } @Override - public void grabQuota(long writeSize, long readSize) { + public void grabQuota(long numWrites, long writeSize, long numReads, long readSize) { // no-op } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java index 1144aec1a6..db02f4e746 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java @@ -31,11 +31,13 @@ public interface QuotaLimiter { /** * Checks if it is possible to execute the specified operation. * + * @param numWrites the number of write actions that will be checked against the available quota * @param estimateWriteSize the write size that will be checked against the available quota + * @param numReads the number of read actions that will be checked against the available quota * @param estimateReadSize the read size that will be checked against the available quota * @throws RpcThrottlingException thrown if not enough avialable resources to perform operation. */ - void checkQuota(long estimateWriteSize, long estimateReadSize) + void checkQuota(long numWrites, long estimateWriteSize, long numReads, long estimateReadSize) throws RpcThrottlingException; /** @@ -43,10 +45,12 @@ public interface QuotaLimiter { * At this point the write and read amount will be an estimate, * that will be later adjusted with a consumeWrite()/consumeRead() call. * + * @param numWrites the number of write actions that will be checked against the available quota * @param writeSize the write size that will be removed from the current quota + * @param numReads the number of read actions that will be checked against the available quota * @param readSize the read size that will be removed from the current quota */ - void grabQuota(long writeSize, long readSize); + void grabQuota(long numWrites, long writeSize, long numReads, long readSize); /** * Removes or add back some write amount to the quota. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java index 12bee80927..28d9a54671 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java @@ -110,8 +110,8 @@ public class TimeBasedLimiter implements QuotaLimiter { } @Override - public void checkQuota(long writeSize, long readSize) throws RpcThrottlingException { - if (!reqsLimiter.canExecute()) { + public void checkQuota(long numWrites, long writeSize, long numReads, long readSize) throws RpcThrottlingException { + if (!reqsLimiter.canExecute(numWrites + numReads)) { RpcThrottlingException.throwNumRequestsExceeded(reqsLimiter.waitInterval()); } if (!reqSizeLimiter.canExecute(writeSize + readSize)) { @@ -120,7 +120,7 @@ public class TimeBasedLimiter implements QuotaLimiter { } if (writeSize > 0) { - if (!writeReqsLimiter.canExecute()) { + if (!writeReqsLimiter.canExecute(numWrites)) { RpcThrottlingException.throwNumWriteRequestsExceeded(writeReqsLimiter.waitInterval()); } if (!writeSizeLimiter.canExecute(writeSize)) { @@ -129,7 +129,7 @@ public class TimeBasedLimiter implements QuotaLimiter { } if (readSize > 0) { - if (!readReqsLimiter.canExecute()) { + if (!readReqsLimiter.canExecute(numReads)) { RpcThrottlingException.throwNumReadRequestsExceeded(readReqsLimiter.waitInterval()); } if (!readSizeLimiter.canExecute(readSize)) { @@ -139,18 +139,18 @@ public class TimeBasedLimiter implements QuotaLimiter { } @Override - public void grabQuota(long writeSize, long readSize) { + public void grabQuota(long numWrites, long writeSize, long numReads, long readSize) { assert writeSize != 0 || readSize != 0; - reqsLimiter.consume(1); + reqsLimiter.consume(numWrites + numReads); reqSizeLimiter.consume(writeSize + readSize); if (writeSize > 0) { - writeReqsLimiter.consume(1); + writeReqsLimiter.consume(numWrites); writeSizeLimiter.consume(writeSize); } if (readSize > 0) { - readReqsLimiter.consume(1); + readReqsLimiter.consume(numReads); readSizeLimiter.consume(readSize); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaRequestCounting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaRequestCounting.java new file mode 100644 index 0000000000..9a2d1fffc3 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaRequestCounting.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter.Predicate; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hbase.thirdparty.com.google.common.base.Throwables; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Iterables; + +/** + * minicluster tests that validate that quota entries are properly set in the quota table + */ +@Category({ClientTests.class, MediumTests.class}) +public class TestQuotaRequestCounting { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestQuotaRequestCounting.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestQuotaRequestCounting.class); + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); + TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, 1000); + TEST_UTIL.getConfiguration().setInt("hbase.hstore.compactionThreshold", 10); + TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100); + TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); + TEST_UTIL.getConfiguration().setBoolean("hbase.master.enabletable.roundrobin", true); + TEST_UTIL.startMiniCluster(1); + TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME); + } + + @After + public void clearQuotaTable() throws Exception { + if (TEST_UTIL.getAdmin().tableExists(QuotaUtil.QUOTA_TABLE_NAME)) { + TEST_UTIL.getAdmin().disableTable(QuotaUtil.QUOTA_TABLE_NAME); + TEST_UTIL.getAdmin().truncateTable(QuotaUtil.QUOTA_TABLE_NAME, false); + } + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testRequestThrottling() throws Exception { + final Admin admin = TEST_UTIL.getAdmin(); + final TableName tn = TableName.valueOf("throttle_requests"); + TableDescriptor desc = TableDescriptorBuilder.newBuilder(tn).setColumnFamily( + ColumnFamilyDescriptorBuilder.of("f1")).build(); + admin.createTable(desc); + + // Before we create the quota, it will be the "noop" implementation + List rsThreads = TEST_UTIL.getHBaseCluster().getRegionServerThreads(); + assertEquals(1, rsThreads.size()); + HRegionServer rs = rsThreads.get(0).getRegionServer(); + final RegionServerRpcQuotaManager rpcQuotaManager = rs.getRegionServerRpcQuotaManager(); + final OperationQuota noopQuota = rpcQuotaManager.getQuota(User.getCurrent().getUGI(), tn); + + admin.setQuota( + QuotaSettingsFactory.throttleTable(tn, ThrottleType.REQUEST_NUMBER, 5, TimeUnit.MINUTES)); + + // Wait until the RS sees the new quota + TEST_UTIL.waitFor(30_000, new Predicate() { + @Override + public boolean evaluate() throws Exception { + // The noop is a singleton -- when we see this change, we know the RS has the new config + return noopQuota != rpcQuotaManager.getQuota(User.getCurrent().getUGI(), tn); + } + }); + + try (Table table = TEST_UTIL.getConnection().getTable(tn)) { + // A single request is allowed + Get g = new Get(Bytes.toBytes("a")); + Result r = table.get(g); + assertTrue("Result should be empty as table was empty", r.isEmpty()); + // 10 requests in one RPC should be rejected + ArrayList gets = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + gets.add(g); + } + try { + table.get(gets); + fail("Expected to get a exception but did not"); + } catch (IOException e) { + LOG.debug("Caught an exception as desired", e); + List causes = Throwables.getCausalChain(e); + // Nb. We should see the RpcThrottlingException in the causal chain, but we don't + // presently. Hopefully this changes sometime. + assertFalse( + "Expected to find RpcThrottlingException in causal chain, got: " + causes, + Iterables.isEmpty(Iterables.filter(causes, RetriesExhaustedWithDetailsException.class))); + assertTrue( + "Expected the exception message to contain RpcThrottlingException, but it was: " + e.getMessage(), + e.getMessage().contains("RpcThrottlingException")); + } + } + } + + @Test + public void testMixedRequestThrottling() throws Exception { + final Admin admin = TEST_UTIL.getAdmin(); + final TableName tn = TableName.valueOf("throttle_mixed_requests"); + TableDescriptor desc = TableDescriptorBuilder.newBuilder(tn).setColumnFamily( + ColumnFamilyDescriptorBuilder.of("f1")).build(); + admin.createTable(desc); + + // Before we create the quota, it will be the "noop" implementation + List rsThreads = TEST_UTIL.getHBaseCluster().getRegionServerThreads(); + assertEquals(1, rsThreads.size()); + HRegionServer rs = rsThreads.get(0).getRegionServer(); + final RegionServerRpcQuotaManager rpcQuotaManager = rs.getRegionServerRpcQuotaManager(); + final OperationQuota noopQuota = rpcQuotaManager.getQuota(User.getCurrent().getUGI(), tn); + + admin.setQuota( + QuotaSettingsFactory.throttleTable(tn, ThrottleType.REQUEST_NUMBER, 5, TimeUnit.MINUTES)); + + // Wait until the RS sees the new quota + TEST_UTIL.waitFor(30_000, new Predicate() { + @Override + public boolean evaluate() throws Exception { + // The noop is a singleton -- when we see this change, we know the RS has the new config + return noopQuota != rpcQuotaManager.getQuota(User.getCurrent().getUGI(), tn); + } + }); + + try (Table table = TEST_UTIL.getConnection().getTable(tn)) { + // A single request is allowed + Get g = new Get(Bytes.toBytes("a")); + Result r = table.get(g); + assertTrue("Result should be empty as table was empty", r.isEmpty()); + // 10 requests in one RPC should be rejected + ArrayList puts = new ArrayList<>(); + Put p = new Put(Bytes.toBytes("asdf")); + p.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("q1"), Bytes.toBytes("v1")); + for (int i = 0; i < 10; i++) { + puts.add(p); + } + try { + table.put(puts); + fail("Expected to get a exception but did not"); + } catch (IOException e) { + LOG.debug("Caught an exception as desired", e); + List causes = Throwables.getCausalChain(e); + // Nb. We should see the RpcThrottlingException in the causal chain, but we don't + // presently. Hopefully this changes sometime. + assertFalse( + "Expected to find RpcThrottlingException in causal chain, got: " + causes, + Iterables.isEmpty(Iterables.filter(causes, RetriesExhaustedWithDetailsException.class))); + assertTrue( + "Expected the exception message to contain RpcThrottlingException, but it was: " + e.getMessage(), + e.getMessage().contains("RpcThrottlingException")); + } + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java index 8a77e0e291..5afdf41297 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java @@ -214,7 +214,8 @@ public class TestQuotaState { private void assertThrottleException(final QuotaLimiter limiter, final int availReqs) { assertNoThrottleException(limiter, availReqs); try { - limiter.checkQuota(1, 1); + // Check one write request + limiter.checkQuota(1, 1, 0, 0); fail("Should have thrown ThrottlingException"); } catch (RpcThrottlingException e) { // expected @@ -224,11 +225,13 @@ public class TestQuotaState { private void assertNoThrottleException(final QuotaLimiter limiter, final int availReqs) { for (int i = 0; i < availReqs; ++i) { try { - limiter.checkQuota(1, 1); + // Check one write request + limiter.checkQuota(1, 1, 0, 0); } catch (RpcThrottlingException e) { fail("Unexpected ThrottlingException after " + i + " requests. limit=" + availReqs); } - limiter.grabQuota(1, 1); + // Consume one write request + limiter.grabQuota(1, 1, 0, 0); } } -- 2.16.3