From 51008ed79f8b29a3c1e7d1170393d4f91a643a83 Mon Sep 17 00:00:00 2001 From: Josh Elser Date: Tue, 24 Jan 2017 19:22:05 -0500 Subject: [PATCH] HBASE-17525 Add space quota support to the deprecated SecureBulkLoadEndpoint --- .../hbase/quotas/TestSecureBulkLoadWithQuotas.java | 157 +++++++++++++++++++++ .../hadoop/hbase/regionserver/HRegionServer.java | 11 +- .../hbase/regionserver/SecureBulkLoadManager.java | 25 +++- .../hbase/quotas/SpaceQuotaHelperForTests.java | 34 +++++ .../hadoop/hbase/quotas/TestSpaceQuotas.java | 39 ++--- 5 files changed, 232 insertions(+), 34 deletions(-) create mode 100644 hbase-endpoint/src/test/java/org/apache/hadoop/hbase/quotas/TestSecureBulkLoadWithQuotas.java diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/quotas/TestSecureBulkLoadWithQuotas.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/quotas/TestSecureBulkLoadWithQuotas.java new file mode 100644 index 0000000..502f2b2 --- /dev/null +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/quotas/TestSecureBulkLoadWithQuotas.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import static org.junit.Assert.*; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.quotas.QuotaSettings; +import org.apache.hadoop.hbase.quotas.QuotaSettingsFactory; +import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; +import org.apache.hadoop.hbase.quotas.SpaceQuotaHelperForTests; +import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; +import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.util.StringUtils; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +/** + * Test class for the SecureBulkLoadEndpoint. + */ +@Category(MediumTests.class) +public class TestSecureBulkLoadWithQuotas { + private static final Log LOG = LogFactory.getLog(TestSecureBulkLoadWithQuotas.class); + protected static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @Rule + public TestName testName = new TestName(); + private SpaceQuotaHelperForTests helper; + + @BeforeClass + public static void setupCluster() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.set( + CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + "org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint"); + conf.set(QuotaUtil.QUOTA_CONF_KEY, "true"); + // Increase the frequency of some of the chores for responsiveness of the test + conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_DELAY_KEY, 1000); + conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_KEY, 1000); + conf.setInt(QuotaObserverChore.VIOLATION_OBSERVER_CHORE_DELAY_KEY, 1000); + conf.setInt(QuotaObserverChore.VIOLATION_OBSERVER_CHORE_PERIOD_KEY, 1000); + conf.setInt(SpaceQuotaRefresherChore.POLICY_REFRESHER_CHORE_DELAY_KEY, 1000); + conf.setInt(SpaceQuotaRefresherChore.POLICY_REFRESHER_CHORE_PERIOD_KEY, 1000); + TEST_UTIL.startMiniCluster(1); + } + + @AfterClass + public static void stopCluster() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void removeAllQuotas() throws Exception { + helper = new SpaceQuotaHelperForTests(TEST_UTIL, testName, new AtomicLong()); + } + + @Test + public void testSecureBulkLoads() throws Exception { + TableName tn = helper.createTableWithRegions(10); + + // Set a very small limit + final long sizeLimit = 1L * SpaceQuotaHelperForTests.ONE_KILOBYTE; + QuotaSettings settings = QuotaSettingsFactory.limitTableSpace( + tn, sizeLimit, SpaceViolationPolicy.NO_INSERTS); + TEST_UTIL.getAdmin().setQuota(settings); + + // Wait for the RS to acknowledge this small limit + HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0); + RegionServerSpaceQuotaManager spaceQuotaManager = rs.getRegionServerSpaceQuotaManager(); + TEST_UTIL.waitFor(60000, 3000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + Map snapshots = spaceQuotaManager.copyQuotaSnapshots(); + SpaceQuotaSnapshot snapshot = snapshots.get(tn); + LOG.debug("Snapshots: " + snapshots); + return null != snapshot && snapshot.getLimit() > 0; + } + }); + // Our quota limit should be reflected in the latest snapshot + Map snapshots = spaceQuotaManager.copyQuotaSnapshots(); + SpaceQuotaSnapshot snapshot = snapshots.get(tn); + assertEquals(0L, snapshot.getUsage()); + assertEquals(sizeLimit, snapshot.getLimit()); + + // Generate a file that is ~25KB + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Path baseDir = new Path(fs.getHomeDirectory(), testName.getMethodName() + "_files"); + fs.mkdirs(baseDir); + Path hfilesDir = new Path(baseDir, SpaceQuotaHelperForTests.F1); + fs.mkdirs(hfilesDir); + List> filesToLoad = helper.createFiles(tn, 1, 500, hfilesDir); + // Verify that they are the size we expecte + for (Pair pair : filesToLoad) { + String file = pair.getSecond(); + FileStatus[] statuses = fs.listStatus(new Path(file)); + assertEquals(1, statuses.length); + FileStatus status = statuses[0]; + assertTrue( + "Expected the file, " + file + ", length to be larger than 25KB, but was " + + status.getLen(), + status.getLen() > 25 * SpaceQuotaHelperForTests.ONE_KILOBYTE); + LOG.debug(file + " -> " + status.getLen() +"B"); + } + + // Use LoadIncrementalHFiles to load the file which should be rejected since + // it would violate the quota. + LoadIncrementalHFiles loader = new LoadIncrementalHFiles(TEST_UTIL.getConfiguration()); + try { + loader.run(new String[] {new Path( + fs.getHomeDirectory(), testName.getMethodName() + "_files").toString(), tn.toString()}); + fail("Expected the bulk load to be rejected, but it was not"); + } catch (Exception e) { + LOG.debug("Caught expected exception", e); + String stringifiedException = StringUtils.stringifyException(e); + assertTrue( + "Expected exception message to contain the SpaceLimitingException class name: " + + stringifiedException, + stringifiedException.contains(SpaceLimitingException.class.getName())); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index fed0cea..3e32f14 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -793,7 +793,12 @@ public class HRegionServer extends HasThread implements try { setupClusterConnection(); - this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, clusterConnection); + // Setup the Quota Manager + rsQuotaManager = new RegionServerRpcQuotaManager(this); + rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this); + + this.secureBulkLoadManager = new SecureBulkLoadManager( + this.conf, clusterConnection, getRegionServerSpaceQuotaManager()); this.secureBulkLoadManager.start(); // Health checker thread. @@ -925,14 +930,10 @@ public class HRegionServer extends HasThread implements nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this); } - // Setup the Quota Manager - rsQuotaManager = new RegionServerRpcQuotaManager(this); - rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this); if (QuotaUtil.isQuotaEnabled(conf)) { this.fsUtilizationChore = new FileSystemUtilizationChore(this); } - // Setup RPC client for master communication rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress( rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java index 1accae1..50009a2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java @@ -35,6 +35,10 @@ import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement; +import org.apache.hadoop.hbase.quotas.QuotaUtil; +import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; +import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; @@ -111,10 +115,13 @@ public class SecureBulkLoadManager { private UserProvider userProvider; private Connection conn; + private RegionServerSpaceQuotaManager spaceQuotaManager; - SecureBulkLoadManager(Configuration conf, Connection conn) { + SecureBulkLoadManager( + Configuration conf, Connection conn, RegionServerSpaceQuotaManager spaceQuotaManager) { this.conf = conf; this.conn = conn; + this.spaceQuotaManager = spaceQuotaManager; } public void start() throws IOException { @@ -214,6 +221,22 @@ public class SecureBulkLoadManager { if (region.getCoprocessorHost() != null) { bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths); } + + // Ensure that the files would not exceed the space quota. + if (QuotaUtil.isQuotaEnabled(conf)) { + ActivePolicyEnforcement activeSpaceQuotas = spaceQuotaManager.getActiveEnforcements(); + SpaceViolationPolicyEnforcement enforcement = activeSpaceQuotas.getPolicyEnforcement(region); + if (null != enforcement && enforcement.shouldCheckBulkLoads()) { + // Bulk loads must still be atomic. We must enact all or none. + List filePaths = new ArrayList<>(request.getFamilyPathCount()); + for (Pair familyPath : familyPaths) { + filePaths.add(familyPath.getSecond()); + } + // Check if the batch of files exceeds the current quota + enforcement.checkBulkLoad(fs, filePaths); + } + } + boolean loaded = false; Map> map = null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaHelperForTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaHelperForTests.java index 498b34e..f33abe1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaHelperForTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaHelperForTests.java @@ -20,7 +20,9 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Random; import java.util.Set; @@ -29,8 +31,11 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; @@ -38,7 +43,10 @@ import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.junit.rules.TestName; import com.google.common.collect.HashMultimap; @@ -223,4 +231,30 @@ public class SpaceQuotaHelperForTests { } } } + + Map getReportedSizesForTable(TableName tn) { + HMaster master = testUtil.getMiniHBaseCluster().getMaster(); + MasterQuotaManager quotaManager = master.getMasterQuotaManager(); + Map filteredRegionSizes = new HashMap<>(); + for (Entry entry : quotaManager.snapshotRegionSizes().entrySet()) { + if (entry.getKey().getTable().equals(tn)) { + filteredRegionSizes.put(entry.getKey(), entry.getValue()); + } + } + return filteredRegionSizes; + } + + List> createFiles(TableName tn, int numFiles, int numRowsPerFile, Path baseDir) throws IOException { + FileSystem fs = testUtil.getTestFileSystem(); + fs.mkdirs(baseDir); + final List> famPaths = new ArrayList>(); + for (int i = 1; i <= numFiles; i++) { + Path hfile = new Path(baseDir, "file" + i); + TestHRegionServerBulkLoad.createHFile( + fs, hfile, Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), + Bytes.toBytes("reject"), numRowsPerFile); + famPaths.add(new Pair<>(Bytes.toBytes(SpaceQuotaHelperForTests.F1), hfile.toString())); + } + return famPaths; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java index afec192..0af931b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java @@ -257,7 +257,9 @@ public class TestSpaceQuotas { TableName tableName = writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_WRITES, p); // The table is now in violation. Try to do a bulk load - ClientServiceCallable callable = generateFileToLoad(tableName, 1, 50); + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Path baseDir = new Path(fs.getHomeDirectory(), testName.getMethodName() + "_files"); + ClientServiceCallable callable = generateFileToLoad(tableName, 1, 50, baseDir); RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration()); RpcRetryingCaller caller = factory. newCaller(); try { @@ -281,7 +283,7 @@ public class TestSpaceQuotas { HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0); RegionServerSpaceQuotaManager spaceQuotaManager = rs.getRegionServerSpaceQuotaManager(); Map snapshots = spaceQuotaManager.copyQuotaSnapshots(); - Map regionSizes = getReportedSizesForTable(tn); + Map regionSizes = helper.getReportedSizesForTable(tn); while (true) { SpaceQuotaSnapshot snapshot = snapshots.get(tn); if (null != snapshot && snapshot.getLimit() > 0) { @@ -290,7 +292,7 @@ public class TestSpaceQuotas { LOG.debug("Snapshot does not yet realize quota limit: " + snapshots + ", regionsizes: " + regionSizes); Thread.sleep(3000); snapshots = spaceQuotaManager.copyQuotaSnapshots(); - regionSizes = getReportedSizesForTable(tn); + regionSizes = helper.getReportedSizesForTable(tn); } // Our quota limit should be reflected in the latest snapshot SpaceQuotaSnapshot snapshot = snapshots.get(tn); @@ -303,8 +305,9 @@ public class TestSpaceQuotas { assertTrue("Expected to find Noop policy, but got " + enforcement.getClass().getSimpleName(), enforcement instanceof BulkLoadCheckingViolationPolicyEnforcement); // Should generate two files, each of which is over 25KB each - ClientServiceCallable callable = generateFileToLoad(tn, 2, 500); FileSystem fs = TEST_UTIL.getTestFileSystem(); + Path baseDir = new Path(fs.getHomeDirectory(), testName.getMethodName() + "_files"); + ClientServiceCallable callable = generateFileToLoad(tn, 2, 500, baseDir); FileStatus[] files = fs.listStatus( new Path(fs.getHomeDirectory(), testName.getMethodName() + "_files")); for (FileStatus file : files) { @@ -360,18 +363,6 @@ public class TestSpaceQuotas { verifyViolation(policy, tn, p); } - private Map getReportedSizesForTable(TableName tn) { - HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster(); - MasterQuotaManager quotaManager = master.getMasterQuotaManager(); - Map filteredRegionSizes = new HashMap<>(); - for (Entry entry : quotaManager.snapshotRegionSizes().entrySet()) { - if (entry.getKey().getTable().equals(tn)) { - filteredRegionSizes.put(entry.getKey(), entry.getValue()); - } - } - return filteredRegionSizes; - } - private TableName writeUntilViolation(SpaceViolationPolicy policyToViolate) throws Exception { TableName tn = helper.createTableWithRegions(10); @@ -433,20 +424,12 @@ public class TestSpaceQuotas { assertTrue("Expected to see an exception writing data to a table exceeding its quota", sawError); } - private ClientServiceCallable generateFileToLoad(TableName tn, int numFiles, int numRowsPerFile) throws Exception { + private ClientServiceCallable generateFileToLoad( + TableName tn, int numFiles, int numRowsPerFile, Path baseDir) throws Exception { Connection conn = TEST_UTIL.getConnection(); - FileSystem fs = TEST_UTIL.getTestFileSystem(); Configuration conf = TEST_UTIL.getConfiguration(); - Path baseDir = new Path(fs.getHomeDirectory(), testName.getMethodName() + "_files"); - fs.mkdirs(baseDir); - final List> famPaths = new ArrayList>(); - for (int i = 1; i <= numFiles; i++) { - Path hfile = new Path(baseDir, "file" + i); - TestHRegionServerBulkLoad.createHFile( - fs, hfile, Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), - Bytes.toBytes("reject"), numRowsPerFile); - famPaths.add(new Pair<>(Bytes.toBytes(SpaceQuotaHelperForTests.F1), hfile.toString())); - } + final List> famPaths = helper.createFiles( + tn, numFiles, numRowsPerFile, baseDir); // bulk load HFiles Table table = conn.getTable(tn); -- 2.10.2