From a46b9c1e7a26ba47e77d5dba45138099e2e22718 Mon Sep 17 00:00:00 2001 From: Josh Elser Date: Wed, 8 Mar 2017 20:56:37 -0500 Subject: [PATCH] HBASE-17749 Compute the size of snapshots relative to source table --- .../apache/hadoop/hbase/quotas/QuotaTableUtil.java | 58 ++- .../hadoop/hbase/quotas/SpaceQuotaSnapshot.java | 4 +- .../hbase/master/MetricsMasterQuotaSource.java | 21 + .../hbase/master/MetricsMasterQuotaSourceImpl.java | 8 + .../org/apache/hadoop/hbase/master/HMaster.java | 13 + .../apache/hadoop/hbase/master/MetricsMaster.java | 21 + .../hadoop/hbase/quotas/QuotaObserverChore.java | 2 +- .../hbase/quotas/SnapshotQuotaObserverChore.java | 503 +++++++++++++++++++++ .../hbase/quotas/SpaceQuotaHelperForTests.java | 140 +++++- .../TestQuotaObserverChoreWithMiniCluster.java | 6 +- .../quotas/TestSnapshotQuotaObserverChore.java | 345 ++++++++++++++ .../hadoop/hbase/quotas/TestSpaceQuotas.java | 34 +- .../hbase/quotas/TestSpaceQuotasWithSnapshots.java | 250 ++++++++++ 13 files changed, 1355 insertions(+), 50 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SnapshotQuotaObserverChore.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSnapshotQuotaObserverChore.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotasWithSnapshots.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java index 725f170947..d537bd95f4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java @@ -71,16 +71,18 @@ import org.apache.hadoop.hbase.util.Strings; /** * Helper class to interact with the quota table. - *
- *     ROW-KEY      FAM/QUAL        DATA
- *   n.<namespace> q:s         <global-quotas>
- *   t.<namespace> u:p        <namespace-quota policy>
- *   t.<table>     q:s         <global-quotas>
- *   t.<table>     u:p        <table-quota policy>
- *   u.<user>      q:s         <global-quotas>
- *   u.<user>      q:s.<table> <table-quotas>
- *   u.<user>      q:s.<ns>:   <namespace-quotas>
- * 
+ * + * + * + * + * + * + * + * + * + * + * + *
ROW-KEYFAM/QUALDATA
n.<namespace>q:s<global-quotas>
n.<namespace>u:p<namespace-quota policy>
n.<namespace>u:s.<snapshot name><SpaceQuotaSnapshot>
t.<table>q:s<global-quotas>
t.<table>u:p<table-quota policy>
t.<table>u:s.<snapshot name><SpaceQuotaSnapshot>
u.<user>q:s<global-quotas>
u.<user>q:s.<table><table-quotas>
u.<user>q:s.<ns><namespace-quotas>
procedureExecutor; private WALProcedureStore procedureStore; @@ -872,6 +874,10 @@ public class HMaster extends HRegionServer implements MasterServices { this.quotaObserverChore = new QuotaObserverChore(this, getMasterMetrics()); // Start the chore to read the region FS space reports and act on them getChoreService().scheduleChore(quotaObserverChore); + + this.snapshotQuotaChore = new SnapshotQuotaObserverChore(this, getMasterMetrics()); + // Start the chore to read snapshots and add their usage to table/NS quotas + getChoreService().scheduleChore(snapshotQuotaChore); } // clear the dead servers with same host name and port of online server because we are not @@ -1166,6 +1172,9 @@ public class HMaster extends HRegionServer implements MasterServices { if (this.quotaObserverChore != null) { quotaObserverChore.cancel(); } + if (this.snapshotQuotaChore != null) { + snapshotQuotaChore.cancel(); + } } /** @@ -3293,6 +3302,10 @@ public class HMaster extends HRegionServer implements MasterServices { return this.quotaObserverChore; } + public SnapshotQuotaObserverChore getSnapshotQuotaChore() { + return this.snapshotQuotaChore; + } + public SpaceQuotaSnapshotNotifier getSpaceQuotaSnapshotNotifier() { return this.spaceQuotaSnapshotNotifier; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMaster.java index b5bc3d76f7..c1e70ab982 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMaster.java @@ -101,4 +101,25 @@ public class MetricsMaster { public void incrementQuotaObserverTime(final long executionTime) { masterQuotaSource.incrementSpaceQuotaObserverChoreTime(executionTime); } + + /** + * Sets the execution time of a period of the {@code SnapshotQuotaObserverChore}. + */ + public void incrementSnapshotObserverTime(final long executionTime) { + masterQuotaSource.incrementSnapshotObserverChoreTime(executionTime); + } + + /** + * Sets the execution time to compute the size of a single snapshot. + */ + public void incrementSnapshotSizeComputationTime(final long executionTime) { + masterQuotaSource.incrementSnapshotObserverSnapshotComputationTime(executionTime); + } + + /** + * Sets the execution time to fetch the mapping of snapshots to originating table. + */ + public void incrementSnapshotFetchTime(long executionTime) { + masterQuotaSource.incrementSnapshotObserverSnapshotFetchTime(executionTime); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java index 254f2a107e..9233a4d44c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java @@ -161,7 +161,7 @@ public class QuotaObserverChore extends ScheduledChore { // The current "view" of region space use. Used henceforth. final Map reportedRegionSpaceUse = quotaManager.snapshotRegionSizes(); if (LOG.isTraceEnabled()) { - LOG.trace("Using " + reportedRegionSpaceUse.size() + " region space use reports"); + LOG.trace("Using " + reportedRegionSpaceUse.size() + " region space use reports: " + reportedRegionSpaceUse); } // Remove the "old" region reports diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SnapshotQuotaObserverChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SnapshotQuotaObserverChore.java new file mode 100644 index 0000000000..403dcfa268 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SnapshotQuotaObserverChore.java @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.MetricsMaster; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.SnapshotDescription; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest.FamilyFiles; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest.StoreFile; +import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; +import org.apache.hadoop.hbase.snapshot.SnapshotManifest; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.HFileArchiveUtil; +import org.apache.hadoop.util.StringUtils; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; + +/** + * A Master-invoked {@code Chore} that computes the size of each snapshot which was created from + * a table which has a space quota. + */ +@InterfaceAudience.Private +public class SnapshotQuotaObserverChore extends ScheduledChore { + private static final Log LOG = LogFactory.getLog(SnapshotQuotaObserverChore.class); + static final String SNAPSHOT_QUOTA_CHORE_PERIOD_KEY = + "hbase.master.quotas.snapshot.chore.period"; + static final int SNAPSHOT_QUOTA_CHORE_PERIOD_DEFAULT = 1000 * 60 * 5; // 5 minutes in millis + + static final String SNAPSHOT_QUOTA_CHORE_DELAY_KEY = + "hbase.master.quotas.snapshot.chore.delay"; + static final long SNAPSHOT_QUOTA_CHORE_DELAY_DEFAULT = 1000L * 60L; // 1 minute in millis + + static final String SNAPSHOT_QUOTA_CHORE_TIMEUNIT_KEY = + "hbase.master.quotas.snapshot.chore.timeunit"; + static final String SNAPSHOT_QUOTA_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name(); + + private final Connection conn; + private final Configuration conf; + private final MetricsMaster metrics; + private final FileSystem fs; + + public SnapshotQuotaObserverChore(HMaster master, MetricsMaster metrics) { + this( + master.getConnection(), master.getConfiguration(), master.getFileSystem(), master, metrics); + } + + SnapshotQuotaObserverChore( + Connection conn, Configuration conf, FileSystem fs, Stoppable stopper, + MetricsMaster metrics) { + super( + QuotaObserverChore.class.getSimpleName(), stopper, getPeriod(conf), + getInitialDelay(conf), getTimeUnit(conf)); + this.conn = conn; + this.conf = conf; + this.metrics = metrics; + this.fs = fs; + } + + @Override + protected void chore() { + try { + if (LOG.isTraceEnabled()) { + LOG.trace("Computing sizes of snapshots for quota management."); + } + long start = System.nanoTime(); + _chore(); + if (null != metrics) { + metrics.incrementSnapshotObserverTime((System.nanoTime() - start) / 1_000_000); + } + } catch (IOException e) { + LOG.warn("Failed to compute the size of snapshots, will retry", e); + } + } + + void _chore() throws IOException { + // Gets all tables with quotas that also have snapshots. + // This values are all of the snapshots that we need to compute the size of. + long start = System.nanoTime(); + Multimap snapshotsToComputeSize = getSnapshotsToComputeSize(); + if (null != metrics) { + metrics.incrementSnapshotFetchTime((System.nanoTime() - start) / 1_000_000); + } + + // For each table, compute the size of each snapshot + Multimap snapshotsWithSize = computeSnapshotSizes( + snapshotsToComputeSize); + + // Write the size data to the quota table. + persistSnapshotSizes(snapshotsWithSize); + } + + /** + * Fetches each table with a quota (table or namespace quota), and then fetch the name of each + * snapshot which was created from that table. + * + * @return A mapping of table to snapshots created from that table + */ + Multimap getSnapshotsToComputeSize() throws IOException { + Set tablesToFetchSnapshotsFrom = new HashSet<>(); + QuotaFilter filter = new QuotaFilter(); + filter.addTypeFilter(QuotaType.SPACE); + // Pull all of the tables that have quotas (direct, or from namespace) + for (QuotaSettings qs : QuotaRetriever.open(conf, filter)) { + String ns = qs.getNamespace(); + TableName tn = qs.getTableName(); + if ((null == ns && null == tn) || (null != ns && null != tn)) { + throw new IllegalStateException("Expected one of namespace and tablename to be null"); + } + // Collect either the table name itself, or all of the tables in the namespace + if (null != ns) { + try (Admin admin = conn.getAdmin()) { + tablesToFetchSnapshotsFrom.addAll(Arrays.asList(admin.listTableNamesByNamespace(ns))); + } + } else { + tablesToFetchSnapshotsFrom.add(tn); + } + } + // Fetch all snapshots that were created from these tables + return getSnapshotsFromTables(tablesToFetchSnapshotsFrom); + } + + /** + * Computes a mapping of originating {@code TableName} to snapshots, when the {@code TableName} + * exists in the provided {@code Set}. + */ + Multimap getSnapshotsFromTables( + Set tablesToFetchSnapshotsFrom) throws IOException { + try (Admin admin = conn.getAdmin()) { + Multimap snapshotsToCompute = HashMultimap.create(); + for (org.apache.hadoop.hbase.client.SnapshotDescription sd : admin.listSnapshots()) { + TableName tn = sd.getTableName(); + if (tablesToFetchSnapshotsFrom.contains(tn)) { + snapshotsToCompute.put(tn, sd.getName()); + } + } + return snapshotsToCompute; + } + } + + /** + * Computes the size of each snapshot provided given the current files referenced by the table. + * + * @param snapshotsToComputeSize The snapshots to compute the size of + * @return A mapping of table to snapshot created from that table and the snapshot's size. + */ + Multimap computeSnapshotSizes( + Multimap snapshotsToComputeSize) throws IOException { + Multimap snapshotSizes = HashMultimap.create(); + for (Entry> entry : snapshotsToComputeSize.asMap().entrySet()) { + final TableName tn = entry.getKey(); + final List snapshotNames = new ArrayList<>(entry.getValue()); + // Sort the snapshots so we process them in lexicographic order + Collections.sort(snapshotNames); + final Path rootDir = FSUtils.getRootDir(conf); + // Get the map of store file names to store file path for this table + // TODO is the store-file name unique enough? Does this need to be region+family+storefile? + final Map tableReferencedStoreFiles; + try { + tableReferencedStoreFiles = FSUtils.getTableStoreFilePathMap(fs, rootDir); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } + + if (LOG.isTraceEnabled()) { + LOG.trace("Paths for " + tn + ": " + tableReferencedStoreFiles); + } + + // For each snapshot on this table, get the files which the snapshot references which + // the table does not. + Set snapshotReferencedFiles = new HashSet<>(); + for (String snapshotName : snapshotNames) { + final long start = System.nanoTime(); + Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir); + SnapshotDescription sd = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); + SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, sd); + + if (LOG.isTraceEnabled()) { + LOG.trace("Files referenced by other snapshots: " + snapshotReferencedFiles); + } + + // Get the set of files from the manifest that this snapshot references which are not also + // referenced by the originating table. + Set unreferencedStoreFileNames = getStoreFilesFromSnapshot( + manifest, (sfn) -> !tableReferencedStoreFiles.containsKey(sfn) + && !snapshotReferencedFiles.contains(sfn)); + + if (LOG.isTraceEnabled()) { + LOG.trace("Snapshot " + snapshotName + " solely references the files: " + + unreferencedStoreFileNames); + } + + // Compute the size of the store files for this snapshot + long size = getSizeOfStoreFiles(tn, unreferencedStoreFileNames); + if (LOG.isTraceEnabled()) { + LOG.trace("Computed size of " + snapshotName + " to be " + size); + } + + // Persist this snapshot's size into the map + snapshotSizes.put(tn, new SnapshotWithSize(snapshotName, size)); + + // Make sure that we don't double-count the same file + snapshotReferencedFiles.addAll(unreferencedStoreFileNames); + // Update the amount of time it took to compute the snapshot's size + if (null != metrics) { + metrics.incrementSnapshotSizeComputationTime((System.nanoTime() - start) / 1_000_000); + } + } + } + return snapshotSizes; + } + + /** + * Extracts the names of the store files referenced by this snapshot which satisfy the given + * predicate (the predicate returns {@code true}). + */ + Set getStoreFilesFromSnapshot( + SnapshotManifest manifest, Predicate filter) { + Set references = new HashSet<>(); + // For each region referenced by the snapshot + for (SnapshotRegionManifest rm : manifest.getRegionManifests()) { + StoreFileReference regionReference = new StoreFileReference( + HRegionInfo.convert(rm.getRegionInfo()).getEncodedName()); + references.add(regionReference); + + // For each column family in this region + for (FamilyFiles ff : rm.getFamilyFilesList()) { + final String familyName = ff.getFamilyName().toStringUtf8(); + // And each store file in that family + for (StoreFile sf : ff.getStoreFilesList()) { + String storeFileName = sf.getName(); + if (filter.test(storeFileName)) { + regionReference.addFamilyStoreFile(familyName, storeFileName); + } + } + } + } + return references; + } + + /** + * Calculates the directory in HDFS for a table based on the configuration. + */ + Path getTableDir(TableName tn) throws IOException { + Path rootDir = FSUtils.getRootDir(conf); + return FSUtils.getTableDir(rootDir, tn); + } + + /** + * Computes the size of each store file in {@code storeFileNames} + */ + long getSizeOfStoreFiles(TableName tn, Set storeFileNames) { + return storeFileNames.stream() + .collect(Collectors.summingLong((sfr) -> getSizeOfStoreFile(tn, sfr))); + } + + /** + * Computes the size of the store files for a single region. + */ + long getSizeOfStoreFile(TableName tn, StoreFileReference storeFileName) { + String regionName = storeFileName.getRegionName(); + return storeFileName.getFamilyToFilesMapping() + .entries().stream() + .collect(Collectors.summingLong((e) -> + getSizeOfStoreFile(tn, regionName, e.getKey(), e.getValue()))); + } + + /** + * Computes the size of the store file given its name, region and family name in + * the archive directory. + */ + long getSizeOfStoreFile( + TableName tn, String regionName, String family, String storeFile) { + Path familyArchivePath; + try { + familyArchivePath = HFileArchiveUtil.getStoreArchivePath(conf, tn, regionName, family); + } catch (IOException e) { + LOG.warn("Could not compute path for the archive directory for the region", e); + return 0L; + } + Path fileArchivePath = new Path(familyArchivePath, storeFile); + try { + if (fs.exists(fileArchivePath)) { + FileStatus[] status = fs.listStatus(fileArchivePath); + if (1 != status.length) { + LOG.warn("Expected " + fileArchivePath + + " to be a file but was a directory, ignoring reference"); + return 0L; + } + return status[0].getLen(); + } + } catch (IOException e) { + LOG.warn("Could not obtain the status of " + fileArchivePath, e); + return 0L; + } + LOG.warn("Expected " + fileArchivePath + " to exist but does not, ignoring reference."); + return 0L; + } + + /** + * Writes the snapshot sizes to the {@code hbase:quota} table. + * + * @param snapshotsWithSize The snapshot sizes to write. + */ + void persistSnapshotSizes( + Multimap snapshotsWithSize) throws IOException { + try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) { + persistSnapshotSizes(quotaTable, snapshotsWithSize); + } + } + + /** + * Writes the snapshot sizes to the provided {@code table}. + */ + void persistSnapshotSizes( + Table table, Multimap snapshotsWithSize) throws IOException { + // Convert each entry in the map to a Put and write them to the quota table + table.put(snapshotsWithSize.entries() + .stream() + .map(e -> QuotaTableUtil.createPutSnapshotSize( + e.getKey(), e.getValue().getName(), e.getValue().getSize())) + .collect(Collectors.toList())); + } + + /** + * A struct encapsulating the name of a snapshot and its "size" on the filesystem. This size is + * defined as the amount of filesystem space taken by the files the snapshot refers to which + * the originating table no longer refers to. + */ + static class SnapshotWithSize { + private final String name; + private final long size; + + SnapshotWithSize(String name, long size) { + this.name = Objects.requireNonNull(name); + this.size = size; + } + + String getName() { + return name; + } + + long getSize() { + return size; + } + + @Override + public int hashCode() { + return new HashCodeBuilder().append(name).append(size).toHashCode(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (!(o instanceof SnapshotWithSize)) { + return false; + } + + SnapshotWithSize other = (SnapshotWithSize) o; + return name.equals(other.name) && size == other.size; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(32); + return sb.append("SnapshotWithSize:[").append(name).append(" ") + .append(StringUtils.byteDesc(size)).append("]").toString(); + } + } + + /** + * A reference to a collection of files in the archive directory for a single region. + */ + static class StoreFileReference { + private final String regionName; + private final Multimap familyToFiles; + + StoreFileReference(String regionName) { + this.regionName = Objects.requireNonNull(regionName); + familyToFiles = HashMultimap.create(); + } + + String getRegionName() { + return regionName; + } + + Multimap getFamilyToFilesMapping() { + return familyToFiles; + } + + void addFamilyStoreFile(String family, String storeFileName) { + familyToFiles.put(family, storeFileName); + } + + @Override + public int hashCode() { + return new HashCodeBuilder().append(regionName).append(familyToFiles).toHashCode(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof StoreFileReference)) { + return false; + } + StoreFileReference other = (StoreFileReference) o; + return regionName.equals(other.regionName) && familyToFiles.equals(other.familyToFiles); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + return sb.append("StoreFileReference[region=").append(regionName).append(", files=") + .append(familyToFiles).append("]").toString(); + } + } + + /** + * Extracts the period for the chore from the configuration. + * + * @param conf The configuration object. + * @return The configured chore period or the default value. + */ + static int getPeriod(Configuration conf) { + return conf.getInt(SNAPSHOT_QUOTA_CHORE_PERIOD_KEY, + SNAPSHOT_QUOTA_CHORE_PERIOD_DEFAULT); + } + + /** + * Extracts the initial delay for the chore from the configuration. + * + * @param conf The configuration object. + * @return The configured chore initial delay or the default value. + */ + static long getInitialDelay(Configuration conf) { + return conf.getLong(SNAPSHOT_QUOTA_CHORE_DELAY_KEY, + SNAPSHOT_QUOTA_CHORE_DELAY_DEFAULT); + } + + /** + * Extracts the time unit for the chore period and initial delay from the configuration. The + * configuration value for {@link #SNAPSHOT_QUOTA_CHORE_TIMEUNIT_KEY} must correspond to + * a {@link TimeUnit} value. + * + * @param conf The configuration object. + * @return The configured time unit for the chore period and initial delay or the default value. + */ + static TimeUnit getTimeUnit(Configuration conf) { + return TimeUnit.valueOf(conf.get(SNAPSHOT_QUOTA_CHORE_TIMEUNIT_KEY, + SNAPSHOT_QUOTA_CHORE_TIMEUNIT_DEFAULT)); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaHelperForTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaHelperForTests.java index 888978de4a..0034e3d6e7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaHelperForTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaHelperForTests.java @@ -20,6 +20,7 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map.Entry; import java.util.Objects; @@ -29,16 +30,23 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter.Predicate; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.Bytes; import org.junit.rules.TestName; @@ -53,6 +61,7 @@ public class SpaceQuotaHelperForTests { public static final String F1 = "f1"; public static final long ONE_KILOBYTE = 1024L; public static final long ONE_MEGABYTE = ONE_KILOBYTE * ONE_KILOBYTE; + public static final long ONE_GIGABYTE = ONE_MEGABYTE * ONE_KILOBYTE; private final HBaseTestingUtility testUtil; private final TestName testName; @@ -66,9 +75,52 @@ public class SpaceQuotaHelperForTests { } // + // Static helpers + // + + static void updateConfigForQuotas(Configuration conf) { + // Increase the frequency of some of the chores for responsiveness of the test + conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_DELAY_KEY, 1000); + conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_KEY, 1000); + conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_DELAY_KEY, 1000); + conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_PERIOD_KEY, 1000); + conf.setInt(SpaceQuotaRefresherChore.POLICY_REFRESHER_CHORE_DELAY_KEY, 1000); + conf.setInt(SpaceQuotaRefresherChore.POLICY_REFRESHER_CHORE_PERIOD_KEY, 1000); + conf.setInt(SnapshotQuotaObserverChore.SNAPSHOT_QUOTA_CHORE_DELAY_KEY, 1000); + conf.setInt(SnapshotQuotaObserverChore.SNAPSHOT_QUOTA_CHORE_PERIOD_KEY, 1000); + conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); + } + + // // Helpers // + void removeAllQuotas() throws Exception { + final Connection conn = testUtil.getConnection(); + // Wait for the quota table to be created + if (!conn.getAdmin().tableExists(QuotaUtil.QUOTA_TABLE_NAME)) { + do { + LOG.debug("Quota table does not yet exist"); + Thread.sleep(1000); + } while (!conn.getAdmin().tableExists(QuotaUtil.QUOTA_TABLE_NAME)); + } else { + // Or, clean up any quotas from previous test runs. + QuotaRetriever scanner = QuotaRetriever.open(conn.getConfiguration()); + for (QuotaSettings quotaSettings : scanner) { + final String namespace = quotaSettings.getNamespace(); + final TableName tableName = quotaSettings.getTableName(); + if (null != namespace) { + LOG.debug("Deleting quota for namespace: " + namespace); + QuotaUtil.deleteNamespaceQuota(conn, namespace); + } else { + assert null != tableName; + LOG.debug("Deleting quota for table: "+ tableName); + QuotaUtil.deleteTableQuota(conn, tableName); + } + } + } + } + void writeData(TableName tn, long sizeInBytes) throws IOException { final Connection conn = testUtil.getConnection(); final Table table = conn.getTable(tn); @@ -113,6 +165,12 @@ public class SpaceQuotaHelperForTests { } } + NamespaceDescriptor createNamespace() throws Exception { + NamespaceDescriptor nd = NamespaceDescriptor.create("ns" + counter.getAndIncrement()).build(); + testUtil.getAdmin().createNamespace(nd); + return nd; + } + Multimap createTablesWithSpaceQuotas() throws Exception { final Admin admin = testUtil.getAdmin(); final Multimap tablesWithQuotas = HashMultimap.create(); @@ -120,8 +178,7 @@ public class SpaceQuotaHelperForTests { final TableName tn1 = createTable(); final TableName tn2 = createTable(); - NamespaceDescriptor nd = NamespaceDescriptor.create("ns" + counter.getAndIncrement()).build(); - admin.createNamespace(nd); + NamespaceDescriptor nd = createNamespace(); final TableName tn3 = createTableInNamespace(nd); final TableName tn4 = createTableInNamespace(nd); final TableName tn5 = createTableInNamespace(nd); @@ -225,4 +282,83 @@ public class SpaceQuotaHelperForTests { } } } + + /** + * Abstraction to simplify the case where a test needs to verify a certain state + * on a {@code SpaceQuotaSnapshot}. This class fails-fast when there is no such + * snapshot obtained from the Master. As such, it is not useful to verify the + * lack of a snapshot. + */ + static abstract class SpaceQuotaSnapshotPredicate implements Predicate { + private final Connection conn; + private final TableName tn; + private final String ns; + + SpaceQuotaSnapshotPredicate(Connection conn, TableName tn) { + this(Objects.requireNonNull(conn), Objects.requireNonNull(tn), null); + } + + SpaceQuotaSnapshotPredicate(Connection conn, String ns) { + this(Objects.requireNonNull(conn), null, Objects.requireNonNull(ns)); + } + + private SpaceQuotaSnapshotPredicate(Connection conn, TableName tn, String ns) { + this.conn = conn; + this.tn = tn; + this.ns = ns; + } + + @Override + public boolean evaluate() throws Exception { + SpaceQuotaSnapshot snapshot; + if (null == ns) { + snapshot = QuotaTableUtil.getCurrentSnapshot(conn, tn); + } else { + snapshot = QuotaTableUtil.getCurrentSnapshot(conn, ns); + } + + LOG.debug("Saw quota snapshot for " + tn + ": " + snapshot); + if (null == snapshot) { + return false; + } + return evaluate(snapshot); + } + + /** + * Must determine if the given {@code SpaceQuotaSnapshot} meets some criteria. + * + * @param snapshot a non-null snapshot obtained from the HBase Master + * @return true if the criteria is met, false otherwise + */ + abstract boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception; + } + + /** + * Predicate that waits for all store files in a table to have no compacted files. + */ + static class NoFilesToDischarge implements Predicate { + private final MiniHBaseCluster cluster; + private final TableName tn; + + NoFilesToDischarge(MiniHBaseCluster cluster, TableName tn) { + this.cluster = cluster; + this.tn = tn; + } + + @Override + public boolean evaluate() throws Exception { + for (HRegion region : cluster.getRegions(tn)) { + for (Store store : region.getStores()) { + HStore hstore = (HStore) store; + Collection files = + hstore.getStoreEngine().getStoreFileManager().getCompactedfiles(); + if (null != files && !files.isEmpty()) { + LOG.debug(region.getRegionInfo().getEncodedName() + " still has compacted files"); + return false; + } + } + } + return true; + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreWithMiniCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreWithMiniCluster.java index 63198a8a0a..bd24083f2d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreWithMiniCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreWithMiniCluster.java @@ -77,11 +77,7 @@ public class TestQuotaObserverChoreWithMiniCluster { @BeforeClass public static void setUp() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); - conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_DELAY_KEY, 1000); - conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_KEY, 1000); - conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_DELAY_KEY, 1000); - conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_PERIOD_KEY, 1000); - conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); + SpaceQuotaHelperForTests.updateConfigForQuotas(conf); conf.setClass(SpaceQuotaSnapshotNotifierFactory.SNAPSHOT_NOTIFIER_KEY, SpaceQuotaSnapshotNotifierForTest.class, SpaceQuotaSnapshotNotifier.class); TEST_UTIL.startMiniCluster(1); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSnapshotQuotaObserverChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSnapshotQuotaObserverChore.java new file mode 100644 index 0000000000..8be251b996 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSnapshotQuotaObserverChore.java @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.SnapshotDescription; +import org.apache.hadoop.hbase.client.SnapshotType; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.quotas.SnapshotQuotaObserverChore.SnapshotWithSize; +import org.apache.hadoop.hbase.quotas.SpaceQuotaHelperForTests.SpaceQuotaSnapshotPredicate; +import org.apache.hadoop.hbase.quotas.SpaceQuotaHelperForTests.NoFilesToDischarge; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Multimap; + +/** + * Test class for the {@link SnapshotQuotaObserverChore}. + */ +@Category(MediumTests.class) +public class TestSnapshotQuotaObserverChore { + private static final Log LOG = LogFactory.getLog(TestSnapshotQuotaObserverChore.class); + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final AtomicLong COUNTER = new AtomicLong(); + + @Rule + public TestName testName = new TestName(); + + private Connection conn; + private Admin admin; + private SpaceQuotaHelperForTests helper; + private HMaster master; + private SnapshotQuotaObserverChore testChore; + + @BeforeClass + public static void setUp() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + SpaceQuotaHelperForTests.updateConfigForQuotas(conf); + // Clean up the compacted files faster than normal (15s instead of 2mins) + conf.setInt("hbase.hfile.compaction.discharger.interval", 15 * 1000); + TEST_UTIL.startMiniCluster(1); + } + + @AfterClass + public static void tearDown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void setup() throws Exception { + conn = TEST_UTIL.getConnection(); + admin = TEST_UTIL.getAdmin(); + helper = new SpaceQuotaHelperForTests(TEST_UTIL, testName, COUNTER); + master = TEST_UTIL.getHBaseCluster().getMaster(); + helper.removeAllQuotas(); + testChore = new SnapshotQuotaObserverChore( + TEST_UTIL.getConnection(), TEST_UTIL.getConfiguration(), master.getFileSystem(), master, + null); + } + + @Test + public void testSnapshotSizePersistence() throws IOException { + final Admin admin = TEST_UTIL.getAdmin(); + final TableName tn = TableName.valueOf("quota_snapshotSizePersistence"); + if (admin.tableExists(tn)) { + admin.disableTable(tn); + admin.deleteTable(tn); + } + HTableDescriptor desc = new HTableDescriptor(tn); + desc.addFamily(new HColumnDescriptor(QuotaTableUtil.QUOTA_FAMILY_USAGE)); + admin.createTable(desc); + + Multimap snapshotsWithSizes = HashMultimap.create(); + try (Table table = conn.getTable(tn)) { + // Writing no values will result in no records written. + verify(table, () -> { + testChore.persistSnapshotSizes(table, snapshotsWithSizes); + assertEquals(0, count(table)); + }); + + verify(table, () -> { + TableName originatingTable = TableName.valueOf("t1"); + snapshotsWithSizes.put(originatingTable, new SnapshotWithSize("ss1", 1024L)); + snapshotsWithSizes.put(originatingTable, new SnapshotWithSize("ss2", 4096L)); + testChore.persistSnapshotSizes(table, snapshotsWithSizes); + assertEquals(2, count(table)); + assertEquals(1024L, extractSnapshotSize(table, originatingTable, "ss1")); + assertEquals(4096L, extractSnapshotSize(table, originatingTable, "ss2")); + }); + + snapshotsWithSizes.clear(); + verify(table, () -> { + snapshotsWithSizes.put(TableName.valueOf("t1"), new SnapshotWithSize("ss1", 1024L)); + snapshotsWithSizes.put(TableName.valueOf("t2"), new SnapshotWithSize("ss2", 4096L)); + snapshotsWithSizes.put(TableName.valueOf("t3"), new SnapshotWithSize("ss3", 8192L)); + testChore.persistSnapshotSizes(table, snapshotsWithSizes); + assertEquals(3, count(table)); + assertEquals(1024L, extractSnapshotSize(table, TableName.valueOf("t1"), "ss1")); + assertEquals(4096L, extractSnapshotSize(table, TableName.valueOf("t2"), "ss2")); + assertEquals(8192L, extractSnapshotSize(table, TableName.valueOf("t3"), "ss3")); + }); + } + } + + @Test + public void testSnapshotsFromTables() throws Exception { + TableName tn1 = helper.createTableWithRegions(1); + TableName tn2 = helper.createTableWithRegions(1); + TableName tn3 = helper.createTableWithRegions(1); + + // Set a space quota on table 1 and 2 (but not 3) + admin.setQuota(QuotaSettingsFactory.limitTableSpace( + tn1, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS)); + admin.setQuota(QuotaSettingsFactory.limitTableSpace( + tn2, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS)); + + // Create snapshots on each table (we didn't write any data, so just skipflush) + admin.snapshot(new SnapshotDescription(tn1 + "snapshot", tn1, SnapshotType.SKIPFLUSH)); + admin.snapshot(new SnapshotDescription(tn2 + "snapshot", tn2, SnapshotType.SKIPFLUSH)); + admin.snapshot(new SnapshotDescription(tn3 + "snapshot", tn3, SnapshotType.SKIPFLUSH)); + + Multimap mapping = testChore.getSnapshotsToComputeSize(); + assertEquals(2, mapping.size()); + assertEquals(1, mapping.get(tn1).size()); + assertEquals(tn1 + "snapshot", mapping.get(tn1).iterator().next()); + assertEquals(1, mapping.get(tn2).size()); + assertEquals(tn2 + "snapshot", mapping.get(tn2).iterator().next()); + + admin.snapshot(new SnapshotDescription(tn2 + "snapshot1", tn2, SnapshotType.SKIPFLUSH)); + admin.snapshot(new SnapshotDescription(tn3 + "snapshot1", tn3, SnapshotType.SKIPFLUSH)); + + mapping = testChore.getSnapshotsToComputeSize(); + assertEquals(3, mapping.size()); + assertEquals(1, mapping.get(tn1).size()); + assertEquals(tn1 + "snapshot", mapping.get(tn1).iterator().next()); + assertEquals(2, mapping.get(tn2).size()); + assertEquals( + new HashSet(Arrays.asList(tn2 + "snapshot", tn2 + "snapshot1")), mapping.get(tn2)); + } + + @Test + public void testSnapshotsFromNamespaces() throws Exception { + NamespaceDescriptor ns = NamespaceDescriptor.create("snapshots_from_namespaces").build(); + admin.createNamespace(ns); + + TableName tn1 = helper.createTableWithRegions(ns.getName(), 1); + TableName tn2 = helper.createTableWithRegions(ns.getName(), 1); + TableName tn3 = helper.createTableWithRegions(1); + + // Set a space quota on the namespace + admin.setQuota(QuotaSettingsFactory.limitNamespaceSpace( + ns.getName(), SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS)); + + // Create snapshots on each table (we didn't write any data, so just skipflush) + admin.snapshot(new SnapshotDescription( + tn1.getQualifierAsString() + "snapshot", tn1, SnapshotType.SKIPFLUSH)); + admin.snapshot(new SnapshotDescription( + tn2.getQualifierAsString() + "snapshot", tn2, SnapshotType.SKIPFLUSH)); + admin.snapshot(new SnapshotDescription( + tn3.getQualifierAsString() + "snapshot", tn3, SnapshotType.SKIPFLUSH)); + + Multimap mapping = testChore.getSnapshotsToComputeSize(); + assertEquals(2, mapping.size()); + assertEquals(1, mapping.get(tn1).size()); + assertEquals(tn1.getQualifierAsString() + "snapshot", mapping.get(tn1).iterator().next()); + assertEquals(1, mapping.get(tn2).size()); + assertEquals(tn2.getQualifierAsString() + "snapshot", mapping.get(tn2).iterator().next()); + + admin.snapshot(new SnapshotDescription( + tn2.getQualifierAsString() + "snapshot1", tn2, SnapshotType.SKIPFLUSH)); + admin.snapshot(new SnapshotDescription( + tn3.getQualifierAsString() + "snapshot2", tn3, SnapshotType.SKIPFLUSH)); + + mapping = testChore.getSnapshotsToComputeSize(); + assertEquals(3, mapping.size()); + assertEquals(1, mapping.get(tn1).size()); + assertEquals(tn1.getQualifierAsString() + "snapshot", mapping.get(tn1).iterator().next()); + assertEquals(2, mapping.get(tn2).size()); + assertEquals( + new HashSet(Arrays.asList(tn2.getQualifierAsString() + "snapshot", + tn2.getQualifierAsString() + "snapshot1")), mapping.get(tn2)); + } + + @Test + public void testSnapshotSize() throws Exception { + // Create a table and set a quota + TableName tn1 = helper.createTableWithRegions(5); + admin.setQuota(QuotaSettingsFactory.limitTableSpace( + tn1, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS)); + + // Write some data and flush it + helper.writeData(tn1, 256L * SpaceQuotaHelperForTests.ONE_KILOBYTE); + admin.flush(tn1); + + final AtomicReference lastSeenSize = new AtomicReference<>(); + // Wait for the Master chore to run to see the usage (with a fudge factor) + TEST_UTIL.waitFor(30_000, new SpaceQuotaSnapshotPredicate(conn, tn1) { + @Override + boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { + lastSeenSize.set(snapshot.getUsage()); + return snapshot.getUsage() > 230L * SpaceQuotaHelperForTests.ONE_KILOBYTE; + } + }); + + // Create a snapshot on the table + final String snapshotName = tn1 + "snapshot"; + admin.snapshot(new SnapshotDescription(snapshotName, tn1, SnapshotType.SKIPFLUSH)); + + // Get the snapshots + Multimap snapshotsToCompute = testChore.getSnapshotsToComputeSize(); + assertEquals( + "Expected to see the single snapshot: " + snapshotsToCompute, 1, snapshotsToCompute.size()); + + // Get the size of our snapshot + Multimap snapshotsWithSize = testChore.computeSnapshotSizes( + snapshotsToCompute); + assertEquals(1, snapshotsWithSize.size()); + SnapshotWithSize sws = Iterables.getOnlyElement(snapshotsWithSize.get(tn1)); + assertEquals(snapshotName, sws.getName()); + // The snapshot should take up no space since the table refers to it completely + assertEquals(0, sws.getSize()); + + // Write some more data, flush it, and then major_compact the table + helper.writeData(tn1, 256L * SpaceQuotaHelperForTests.ONE_KILOBYTE); + admin.flush(tn1); + TEST_UTIL.compact(tn1, true); + + // Test table should reflect it's original size since ingest was deterministic + TEST_UTIL.waitFor(30_000, new SpaceQuotaSnapshotPredicate(conn, tn1) { + @Override + boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { + LOG.debug("Current usage=" + snapshot.getUsage() + " lastSeenSize=" + lastSeenSize.get()); + return closeInSize( + snapshot.getUsage(), lastSeenSize.get(), SpaceQuotaHelperForTests.ONE_KILOBYTE); + } + }); + + // Wait for no compacted files on the regions of our table + TEST_UTIL.waitFor(30_000, new NoFilesToDischarge(TEST_UTIL.getMiniHBaseCluster(), tn1)); + + // Still should see only one snapshot + snapshotsToCompute = testChore.getSnapshotsToComputeSize(); + assertEquals( + "Expected to see the single snapshot: " + snapshotsToCompute, 1, snapshotsToCompute.size()); + snapshotsWithSize = testChore.computeSnapshotSizes( + snapshotsToCompute); + assertEquals(1, snapshotsWithSize.size()); + sws = Iterables.getOnlyElement(snapshotsWithSize.get(tn1)); + assertEquals(snapshotName, sws.getName()); + // The snapshot should take up the size the table originally took up + assertEquals(lastSeenSize.get().longValue(), sws.getSize()); + } + + private long count(Table t) throws IOException { + try (ResultScanner rs = t.getScanner(new Scan())) { + long sum = 0; + for (Result r : rs) { + while (r.advance()) { + sum++; + } + } + return sum; + } + } + + private long extractSnapshotSize( + Table quotaTable, TableName tn, String snapshot) throws IOException { + Get g = QuotaTableUtil.makeGetForSnapshotSize(tn, snapshot); + Result r = quotaTable.get(g); + assertNotNull(r); + CellScanner cs = r.cellScanner(); + cs.advance(); + Cell c = cs.current(); + assertNotNull(c); + return QuotaTableUtil.extractSnapshotSize( + c.getValueArray(), c.getValueOffset(), c.getValueLength()); + } + + private void verify(Table t, IOThrowingRunnable test) throws IOException { + admin.disableTable(t.getName()); + admin.truncateTable(t.getName(), false); + test.run(); + } + + @FunctionalInterface + private interface IOThrowingRunnable { + void run() throws IOException; + } + + /** + * Computes if {@code size2} is within {@code delta} of {@code size1}, inclusive. + */ + boolean closeInSize(long size1, long size2, long delta) { + long lower = size1 - delta; + long upper = size1 + delta; + return lower <= size2 && size2 <= upper; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java index ffe0ce2fc0..3194d3a8dc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java @@ -79,6 +79,7 @@ import org.junit.rules.TestName; public class TestSpaceQuotas { private static final Log LOG = LogFactory.getLog(TestSpaceQuotas.class); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + // Global for all tests in the class private static final AtomicLong COUNTER = new AtomicLong(0); private static final int NUM_RETRIES = 10; @@ -89,14 +90,7 @@ public class TestSpaceQuotas { @BeforeClass public static void setUp() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); - // Increase the frequency of some of the chores for responsiveness of the test - conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_DELAY_KEY, 1000); - conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_KEY, 1000); - conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_DELAY_KEY, 1000); - conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_PERIOD_KEY, 1000); - conf.setInt(SpaceQuotaRefresherChore.POLICY_REFRESHER_CHORE_DELAY_KEY, 1000); - conf.setInt(SpaceQuotaRefresherChore.POLICY_REFRESHER_CHORE_PERIOD_KEY, 1000); - conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); + SpaceQuotaHelperForTests.updateConfigForQuotas(conf); TEST_UTIL.startMiniCluster(1); } @@ -107,30 +101,8 @@ public class TestSpaceQuotas { @Before public void removeAllQuotas() throws Exception { - final Connection conn = TEST_UTIL.getConnection(); - // Wait for the quota table to be created - if (!conn.getAdmin().tableExists(QuotaUtil.QUOTA_TABLE_NAME)) { - do { - LOG.debug("Quota table does not yet exist"); - Thread.sleep(1000); - } while (!conn.getAdmin().tableExists(QuotaUtil.QUOTA_TABLE_NAME)); - } else { - // Or, clean up any quotas from previous test runs. - QuotaRetriever scanner = QuotaRetriever.open(TEST_UTIL.getConfiguration()); - for (QuotaSettings quotaSettings : scanner) { - final String namespace = quotaSettings.getNamespace(); - final TableName tableName = quotaSettings.getTableName(); - if (null != namespace) { - LOG.debug("Deleting quota for namespace: " + namespace); - QuotaUtil.deleteNamespaceQuota(conn, namespace); - } else { - assert null != tableName; - LOG.debug("Deleting quota for table: "+ tableName); - QuotaUtil.deleteTableQuota(conn, tableName); - } - } - } helper = new SpaceQuotaHelperForTests(TEST_UTIL, testName, COUNTER); + helper.removeAllQuotas(); } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotasWithSnapshots.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotasWithSnapshots.java new file mode 100644 index 0000000000..2c0477d6c1 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotasWithSnapshots.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import static org.junit.Assert.assertNotNull; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter.Predicate; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.quotas.SpaceQuotaHelperForTests.SpaceQuotaSnapshotPredicate; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +/** + * Test class to exercise the inclusion of snapshots in space quotas + */ +@Category({LargeTests.class}) +public class TestSpaceQuotasWithSnapshots { + private static final Log LOG = LogFactory.getLog(TestSpaceQuotasWithSnapshots.class); + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + // Global for all tests in the class + private static final AtomicLong COUNTER = new AtomicLong(0); + + @Rule + public TestName testName = new TestName(); + private SpaceQuotaHelperForTests helper; + private Connection conn; + private Admin admin; + + @BeforeClass + public static void setUp() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + SpaceQuotaHelperForTests.updateConfigForQuotas(conf); + TEST_UTIL.startMiniCluster(1); + } + + @AfterClass + public static void tearDown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void removeAllQuotas() throws Exception { + helper = new SpaceQuotaHelperForTests(TEST_UTIL, testName, COUNTER); + conn = TEST_UTIL.getConnection(); + admin = TEST_UTIL.getAdmin(); + } + + @Test + public void testTablesInheritSnapshotSize() throws Exception { + final long fudgeAmount = 500L * SpaceQuotaHelperForTests.ONE_KILOBYTE; + TableName tn = helper.createTableWithRegions(1); + LOG.info("Writing data"); + // Set a quota + QuotaSettings settings = QuotaSettingsFactory.limitTableSpace( + tn, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS); + admin.setQuota(settings); + // Write some data + final long initialSize = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE; + helper.writeData(tn, initialSize); + + LOG.info("Waiting until table size reflects written data"); + // Wait until that data is seen by the master + TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) { + @Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { + return snapshot.getUsage() >= initialSize; + } + }); + + // The actual size on disk after we wrote our data the first time + final long actualInitialSize = QuotaTableUtil.getCurrentSnapshot(conn, tn).getUsage(); + LOG.info("Initial table size was " + initialSize); + + LOG.info("Snapshot the table"); + final String snapshot1 = tn.toString() + "_snapshot1"; + admin.snapshot(snapshot1, tn); + + // Write the same data again, then flush+compact. This should make sure that + // the snapshot is referencing files that the table no longer references. + LOG.info("Write more data"); + helper.writeData(tn, initialSize); + LOG.info("Flush the table"); + admin.flush(tn); + LOG.info("Synchronously compacting the table"); + TEST_UTIL.compact(tn, true); + + final long upperBound = initialSize + fudgeAmount; + final long lowerBound = initialSize - fudgeAmount; + + // Store the actual size after writing more data and then compacting it down to one file + final AtomicReference lastSeenSize = new AtomicReference<>(); + + LOG.info("Waiting for the region reports to reflect the correct size, between (" + + lowerBound + ", " + upperBound + ")"); + TEST_UTIL.waitFor(30 * 1000, 500, new Predicate() { + @Override + public boolean evaluate() throws Exception { + Map sizes = QuotaTableUtil.getMasterReportedTableSizes(conn); + LOG.debug("Master observed table sizes from region size reports: " + sizes); + Long size = sizes.get(tn); + if (null == size) { + return false; + } + lastSeenSize.set(size); + return size < upperBound && size > lowerBound; + } + }); + + assertNotNull("Did not expect to see a null size", lastSeenSize.get()); + LOG.info("Last seen size: " + lastSeenSize.get()); + + // Make sure the QuotaObserverChore has time to reflect the new region size reports + // (we saw above). The usage of the table should *not* decrease when we check it below, + // though, because the snapshot on our table will cause the table to "retain" the size. + TEST_UTIL.waitFor(10 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) { + @Override + public boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { + return lastSeenSize.get() >= snapshot.getUsage(); + } + }); + + // The final usage should be the sum of the initial size (referenced by the snapshot) and the + // new size we just wrote above. + long expectedFinalSize = actualInitialSize + lastSeenSize.get(); + LOG.info("Expecting table usage to be " + expectedFinalSize); + // The size of the table (WRT quotas) should now be approximately double what it was previously + TEST_UTIL.waitFor(30 * 1000, 1000, new SpaceQuotaSnapshotPredicate(conn, tn) { + @Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { + return expectedFinalSize == snapshot.getUsage(); + } + }); + } + + @Test + public void testNamespacesInheritSnapshotSize() throws Exception { + final long fudgeAmount = 500L * SpaceQuotaHelperForTests.ONE_KILOBYTE; + String ns = helper.createNamespace().getName(); + TableName tn = helper.createTableWithRegions(ns, 1); + LOG.info("Writing data"); + // Set a quota + QuotaSettings settings = QuotaSettingsFactory.limitNamespaceSpace( + ns, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS); + admin.setQuota(settings); + + // Write some data + final long initialSize = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE; + helper.writeData(tn, initialSize); + + LOG.info("Waiting until namespace size reflects written data"); + // Wait until that data is seen by the master + TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, ns) { + @Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { + return snapshot.getUsage() >= initialSize; + } + }); + + // The actual size on disk after we wrote our data the first time + final long actualInitialSize = QuotaTableUtil.getCurrentSnapshot(conn, ns).getUsage(); + LOG.info("Initial table size was " + initialSize); + + LOG.info("Snapshot the table"); + final String snapshot1 = tn.getQualifierAsString() + "_snapshot1"; + admin.snapshot(snapshot1, tn); + + // Write the same data again, then flush+compact. This should make sure that + // the snapshot is referencing files that the table no longer references. + LOG.info("Write more data"); + helper.writeData(tn, initialSize); + LOG.info("Flush the table"); + admin.flush(tn); + LOG.info("Synchronously compacting the table"); + TEST_UTIL.compact(tn, true); + + final long upperBound = initialSize + fudgeAmount; + final long lowerBound = initialSize - fudgeAmount; + + // Store the actual size after writing more data and then compacting it down to one file + final AtomicReference lastSeenSize = new AtomicReference<>(); + + LOG.info("Waiting for the region reports to reflect the correct size, between (" + + lowerBound + ", " + upperBound + ")"); + TEST_UTIL.waitFor(30 * 1000, 500, new Predicate() { + @Override + public boolean evaluate() throws Exception { + Map sizes = QuotaTableUtil.getMasterReportedTableSizes(conn); + LOG.debug("Master observed table sizes from region size reports: " + sizes); + Long size = sizes.get(tn); + if (null == size) { + return false; + } + lastSeenSize.set(size); + return size < upperBound && size > lowerBound; + } + }); + + assertNotNull("Did not expect to see a null size", lastSeenSize.get()); + LOG.info("Last seen size: " + lastSeenSize.get()); + + // Make sure the QuotaObserverChore has time to reflect the new region size reports + // (we saw above). The usage of the table should *not* decrease when we check it below, + // though, because the snapshot on our table will cause the table to "retain" the size. + TEST_UTIL.waitFor(10 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, ns) { + @Override + public boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { + return lastSeenSize.get() >= snapshot.getUsage(); + } + }); + + // The final usage should be the sum of the initial size (referenced by the snapshot) and the + // new size we just wrote above. + long expectedFinalSize = actualInitialSize + lastSeenSize.get(); + LOG.info("Expecting namespace usage to be " + expectedFinalSize); + // The size of the table (WRT quotas) should now be approximately double what it was previously + TEST_UTIL.waitFor(30 * 1000, 1000, new SpaceQuotaSnapshotPredicate(conn, ns) { + @Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { + return expectedFinalSize == snapshot.getUsage(); + } + }); + } +} -- 2.12.0