From a220317201641567229f2f20371254b229f876bf Mon Sep 17 00:00:00 2001 From: Josh Elser Date: Tue, 8 Nov 2016 18:55:12 -0500 Subject: [PATCH 1/7] HBASE-16998 Implement Master-side analysis of region space reports Adds a new Chore to the Master that analyzes the reports that are sent by RegionServers. The Master must then, for all tables with quotas, determine the tables that are violating quotas and move those tables into violation. Similarly, tables no longer violating the quota can be moved out of violation. The Chore is the "stateful" bit, managing which tables are and are not in violation. Everything else is just performing computation and informing the Chore on the updated state. --- .../apache/hadoop/hbase/quotas/QuotaRetriever.java | 27 +- .../org/apache/hadoop/hbase/master/HMaster.java | 24 + .../hadoop/hbase/quotas/MasterQuotaManager.java | 1 + .../hbase/quotas/NamespaceQuotaViolationStore.java | 98 ++++ .../hadoop/hbase/quotas/QuotaObserverChore.java | 576 ++++++++++++++++++++ .../hadoop/hbase/quotas/QuotaViolationStore.java | 65 +++ .../hbase/quotas/SpaceQuotaViolationNotifier.java | 42 ++ .../quotas/SpaceQuotaViolationNotifierForTest.java | 48 ++ .../hbase/quotas/TableQuotaViolationStore.java | 98 ++++ .../quotas/TestNamespaceQuotaViolationStore.java | 156 ++++++ .../hbase/quotas/TestQuotaObserverChore.java | 106 ++++ .../TestQuotaObserverChoreWithMiniCluster.java | 595 +++++++++++++++++++++ .../hadoop/hbase/quotas/TestQuotaTableUtil.java | 4 - .../hbase/quotas/TestTableQuotaViolationStore.java | 151 ++++++ .../hadoop/hbase/quotas/TestTablesWithQuotas.java | 198 +++++++ 15 files changed, 2180 insertions(+), 9 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NamespaceQuotaViolationStore.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaViolationStore.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifier.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierForTest.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableQuotaViolationStore.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestNamespaceQuotaViolationStore.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChore.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreWithMiniCluster.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTableQuotaViolationStore.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTablesWithQuotas.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaRetriever.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaRetriever.java index 37e4a92..825f3ff 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaRetriever.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaRetriever.java @@ -22,6 +22,7 @@ import java.io.Closeable; import java.io.IOException; import java.util.Iterator; import java.util.LinkedList; +import java.util.Objects; import java.util.Queue; import org.apache.commons.logging.Log; @@ -56,11 +57,23 @@ public class QuotaRetriever implements Closeable, Iterable { private Connection connection; private Table table; - private QuotaRetriever() { + /** + * Should QutoaRetrieve manage the state of the connection, or leave it be. + */ + private boolean isManagedConnection = false; + + QuotaRetriever() { } void init(final Configuration conf, final Scan scan) throws IOException { - this.connection = ConnectionFactory.createConnection(conf); + // Set this before creating the connection and passing it down to make sure + // it's cleaned up if we fail to construct the Scanner. + this.isManagedConnection = true; + init(ConnectionFactory.createConnection(conf), scan); + } + + void init(final Connection conn, final Scan scan) throws IOException { + this.connection = Objects.requireNonNull(conn); this.table = this.connection.getTable(QuotaTableUtil.QUOTA_TABLE_NAME); try { scanner = table.getScanner(scan); @@ -79,10 +92,14 @@ public class QuotaRetriever implements Closeable, Iterable { this.table.close(); this.table = null; } - if (this.connection != null) { - this.connection.close(); - this.connection = null; + // Null out the connection on close() even if we didn't explicitly close it + // to maintain typical semantics. + if (isManagedConnection) { + if (this.connection != null) { + this.connection.close(); + } } + this.connection = null; } public QuotaSettings next() throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 75da73f..607a1af 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -131,6 +131,9 @@ import org.apache.hadoop.hbase.procedure2.ProcedureEvent; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; import org.apache.hadoop.hbase.quotas.MasterQuotaManager; +import org.apache.hadoop.hbase.quotas.QuotaObserverChore; +import org.apache.hadoop.hbase.quotas.SpaceQuotaViolationNotifier; +import org.apache.hadoop.hbase.quotas.SpaceQuotaViolationNotifierForTest; import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HStore; @@ -366,6 +369,8 @@ public class HMaster extends HRegionServer implements MasterServices { // it is assigned after 'initialized' guard set to true, so should be volatile private volatile MasterQuotaManager quotaManager; + private SpaceQuotaViolationNotifier spaceQuotaViolationNotifier; + private QuotaObserverChore quotaObserverChore; private ProcedureExecutor procedureExecutor; private WALProcedureStore procedureStore; @@ -858,6 +863,10 @@ public class HMaster extends HRegionServer implements MasterServices { status.setStatus("Starting quota manager"); initQuotaManager(); + this.spaceQuotaViolationNotifier = createQuotaViolationNotifier(); + this.quotaObserverChore = new QuotaObserverChore(this); + // Start the chore to read the region FS space reports and act on them + getChoreService().scheduleChore(quotaObserverChore); // clear the dead servers with same host name and port of online server because we are not // removing dead server with same hostname and port of rs which is trying to check in before @@ -945,6 +954,10 @@ public class HMaster extends HRegionServer implements MasterServices { this.quotaManager = quotaManager; } + SpaceQuotaViolationNotifier createQuotaViolationNotifier() { + return new SpaceQuotaViolationNotifierForTest(); + } + boolean isCatalogJanitorEnabled() { return catalogJanitorChore != null ? catalogJanitorChore.getEnabled() : false; @@ -1142,6 +1155,9 @@ public class HMaster extends HRegionServer implements MasterServices { if (this.periodicDoMetricsChore != null) { periodicDoMetricsChore.cancel(); } + if (this.quotaObserverChore != null) { + quotaObserverChore.cancel(); + } } /** @@ -3289,4 +3305,12 @@ public class HMaster extends HRegionServer implements MasterServices { this.zooKeeper.prefix("Unable to remove drain for '" + server.getServerName() + "'."), ke); } } + + public QuotaObserverChore getQuotaObserverChore() { + return this.quotaObserverChore; + } + + public SpaceQuotaViolationNotifier getSpaceQuotaViolationNotifier() { + return this.spaceQuotaViolationNotifier; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java index fc24e52..cd81270 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java @@ -523,6 +523,7 @@ public class MasterQuotaManager implements RegionStateListener { public void addRegionSize(HRegionInfo hri, long size) { // TODO Make proper API + // TODO Prevent from growing indefinitely regionSizes.put(hri, size); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NamespaceQuotaViolationStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NamespaceQuotaViolationStore.java new file mode 100644 index 0000000..19ddf6f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NamespaceQuotaViolationStore.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.Map.Entry; + +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas; +import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota; + +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; + +/** + * {@link QuotaViolationStore} implementation for namespaces. + */ +public class NamespaceQuotaViolationStore implements QuotaViolationStore { + private final Connection conn; + private final QuotaObserverChore chore; + private final Map regionUsage; + + public NamespaceQuotaViolationStore(Connection conn, QuotaObserverChore chore, Map regionUsage) { + this.conn = Objects.requireNonNull(conn); + this.chore = Objects.requireNonNull(chore); + this.regionUsage = Objects.requireNonNull(regionUsage); + } + + @Override + public SpaceQuota getSpaceQuota(String namespace) throws IOException { + Quotas quotas = getQuotaForNamespace(namespace); + if (null != quotas && quotas.hasSpace()) { + return quotas.getSpace(); + } + return null; + } + + /** + * Fetches the namespace quota. Visible for mocking/testing. + */ + Quotas getQuotaForNamespace(String namespace) throws IOException { + return QuotaTableUtil.getNamespaceQuota(conn, namespace); + } + + @Override + public ViolationState getCurrentState(String namespace) { + // Defer the "current state" to the chore + return this.chore.getNamespaceQuotaViolation(namespace); + } + + @Override + public ViolationState getTargetState(String subject, SpaceQuota spaceQuota) { + final long sizeLimitInBytes = spaceQuota.getSoftLimit(); + long sum = 0L; + for (Entry entry : filterBySubject(subject)) { + sum += entry.getValue(); + if (sum > sizeLimitInBytes) { + // Short-circuit early + return ViolationState.IN_VIOLATION; + } + } + // Observance is defined as the size of the table being less than the limit + return sum <= sizeLimitInBytes ? ViolationState.IN_OBSERVANCE : ViolationState.IN_VIOLATION; + } + + @Override + public Iterable> filterBySubject(String namespace) { + return Iterables.filter(regionUsage.entrySet(), new Predicate>() { + @Override + public boolean apply(Entry input) { + return namespace.equals(input.getKey().getTable().getNamespaceAsString()); + } + }); + } + + @Override + public void setCurrentState(String namespace, ViolationState state) { + // Defer the "current state" to the chore + this.chore.setNamespaceQuotaViolation(namespace, state); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java new file mode 100644 index 0000000..bb42075 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java @@ -0,0 +1,576 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.quotas.QuotaViolationStore.ViolationState; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas; +import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Multimap; + +/** + * Reads the currently received Region filesystem-space use reports and acts on those which + * violate a defined quota. + */ +public class QuotaObserverChore extends ScheduledChore { + private static final Log LOG = LogFactory.getLog(QuotaObserverChore.class); + static final String VIOLATION_OBSERVER_CHORE_PERIOD_KEY = + "hbase.master.quotas.violation.observer.chore.period"; + static final int VIOLATION_OBSERVER_CHORE_PERIOD_DEFAULT = 1000 * 60 * 5; // 5 minutes in millis + + static final String VIOLATION_OBSERVER_CHORE_DELAY_KEY = + "hbase.master.quotas.violation.observer.chore.delay"; + static final long VIOLATION_OBSERVER_CHORE_DELAY_DEFAULT = 1000L * 60L; // 1 minute + + static final String VIOLATION_OBSERVER_CHORE_TIMEUNIT_KEY = + "hbase.master.quotas.violation.observer.chore.timeunit"; + static final String VIOLATION_OBSERVER_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name(); + + static final String VIOLATION_OBSERVER_CHORE_REPORT_PERCENT_KEY = + "hbase.master.quotas.violation.observer.report.percent"; + static final double VIOLATION_OBSERVER_CHORE_REPORT_PERCENT_DEFAULT= 0.95; + + + private final HMaster master; + private final MasterQuotaManager quotaManager; + /* + * Callback that changes in quota violation are passed to. + */ + private final SpaceQuotaViolationNotifier violationNotifier; + + /* + * Preserves the state of quota violations for tables and namespaces + */ + private final Map tableQuotaViolationStates; + private final Map namespaceQuotaViolationStates; + + /* + * Encapsulates logic for moving tables/namespaces into or out of quota violation + */ + private QuotaViolationStore tableViolationStore; + private QuotaViolationStore namespaceViolationStore; + + public QuotaObserverChore(HMaster master) { + this(master, master.getSpaceQuotaViolationNotifier()); + } + + QuotaObserverChore(HMaster master, SpaceQuotaViolationNotifier violationNotifier) { + super(QuotaObserverChore.class.getSimpleName(), master, getPeriod(master.getConfiguration()), + getInitialDelay(master.getConfiguration()), getTimeUnit(master.getConfiguration())); + this.master = master; + this.quotaManager = this.master.getMasterQuotaManager(); + this.violationNotifier = violationNotifier; + this.tableQuotaViolationStates = new HashMap<>(); + this.namespaceQuotaViolationStates = new HashMap<>(); + } + + @Override + protected void chore() { + try { + _chore(); + } catch (IOException e) { + LOG.warn("Failed to process quota reports and update quota violation state. Will retry.", e); + } + } + + void _chore() throws IOException { + // Get the total set of tables that have quotas defined. Includes table quotas + // and tables included by namespace quotas. + TablesWithQuotas tablesWithQuotas = fetchAllTablesWithQuotasDefined(); + if (LOG.isTraceEnabled()) { + LOG.trace("Found following tables with quotas: " + tablesWithQuotas); + } + + // The current "view" of region space use. Used henceforth. + final Map reportedRegionSpaceUse = quotaManager.snapshotRegionSizes(); + if (LOG.isTraceEnabled()) { + LOG.trace("Using " + reportedRegionSpaceUse.size() + " region space use reports"); + } + + // Create the stores to track table and namespace violations + initializeViolationStores(reportedRegionSpaceUse); + + // Filter out tables for which we don't have adequate regionspace reports yet. + // Important that we do this after we instantiate the stores above + tablesWithQuotas.filterInsufficientlyReportedTables(tableViolationStore); + + if (LOG.isTraceEnabled()) { + LOG.trace("Filtered insufficiently reported tables, left with " + + reportedRegionSpaceUse.size() + " regions reported"); + } + + // Transition each table to/from quota violation based on the current and target state. + // Only table quotas are enacted. + final Set tablesWithTableQuotas = tablesWithQuotas.getTableQuotaTables(); + for (TableName table : tablesWithTableQuotas) { + final SpaceQuota spaceQuota = tableViolationStore.getSpaceQuota(table); + if (null == spaceQuota) { + if (LOG.isDebugEnabled()) { + LOG.debug("Unexpectedly did not find a space quota for " + table + + ", maybe it was recently deleted."); + } + continue; + } + final ViolationState currentState = tableViolationStore.getCurrentState(table); + final ViolationState targetState = tableViolationStore.getTargetState(table, spaceQuota); + + if (currentState == ViolationState.IN_VIOLATION) { + if (targetState == ViolationState.IN_OBSERVANCE) { + LOG.info(table + " moving into observance of table space quota."); + transitionTableToObservance(table); + tableViolationStore.setCurrentState(table, ViolationState.IN_OBSERVANCE); + } else if (targetState == ViolationState.IN_VIOLATION) { + if (LOG.isTraceEnabled()) { + LOG.trace(table + " remains in violation of quota."); + } + tableViolationStore.setCurrentState(table, ViolationState.IN_VIOLATION); + } + } else if (currentState == ViolationState.IN_OBSERVANCE) { + if (targetState == ViolationState.IN_VIOLATION) { + LOG.info(table + " moving into violation of table space quota."); + transitionTableToViolation(table, getViolationPolicy(spaceQuota)); + tableViolationStore.setCurrentState(table, ViolationState.IN_VIOLATION); + } else if (targetState == ViolationState.IN_OBSERVANCE) { + if (LOG.isTraceEnabled()) { + LOG.trace(table + " remains in observance of quota."); + } + tableViolationStore.setCurrentState(table, ViolationState.IN_OBSERVANCE); + } + } + } + + // For each Namespace quota, transition each table in the namespace in or out of violation + // only if a table quota violation policy has not already been applied. + final Set namespacesWithQuotas = tablesWithQuotas.getNamespacesWithQuotas(); + final Multimap tablesByNamespace = tablesWithQuotas.getTablesByNamespace(); + for (String namespace : namespacesWithQuotas) { + // Get the quota definition for the namespace + final SpaceQuota spaceQuota = namespaceViolationStore.getSpaceQuota(namespace); + if (null == spaceQuota) { + if (LOG.isDebugEnabled()) { + LOG.debug("Could not get Namespace space quota for " + namespace + + ", maybe it was recently deleted."); + } + continue; + } + final ViolationState currentState = namespaceViolationStore.getCurrentState(namespace); + final ViolationState targetState = namespaceViolationStore.getTargetState(namespace, spaceQuota); + // When in observance, check if we need to move to violation. + if (ViolationState.IN_OBSERVANCE == currentState) { + if (ViolationState.IN_VIOLATION == targetState) { + for (TableName tableInNS : tablesByNamespace.get(namespace)) { + if (ViolationState.IN_VIOLATION == tableViolationStore.getCurrentState(tableInNS)) { + // Table-level quota violation policy is being applied here. + if (LOG.isTraceEnabled()) { + LOG.trace("Not activating Namespace violation policy because Table violation" + + " policy is already in effect for " + tableInNS); + } + continue; + } else { + LOG.info(tableInNS + " moving into violation of namespace space quota"); + transitionTableToViolation(tableInNS, getViolationPolicy(spaceQuota)); + } + } + } else { + // still in observance + if (LOG.isTraceEnabled()) { + LOG.trace(namespace + " remains in observance of quota."); + } + } + } else if (ViolationState.IN_VIOLATION == currentState) { + // When in violation, check if we need to move to observance. + if (ViolationState.IN_OBSERVANCE == targetState) { + for (TableName tableInNS : tablesByNamespace.get(namespace)) { + if (ViolationState.IN_VIOLATION == tableViolationStore.getCurrentState(tableInNS)) { + // Table-level quota violation policy is being applied here. + if (LOG.isTraceEnabled()) { + LOG.trace("Not activating Namespace violation policy because Table violation" + + " policy is already in effect for " + tableInNS); + } + continue; + } else { + LOG.info(tableInNS + " moving into observance of namespace space quota"); + transitionTableToObservance(tableInNS); + } + } + } else { + // Remains in violation + if (LOG.isTraceEnabled()) { + LOG.trace(namespace + " remains in violation of quota."); + } + } + } + } + } + + void initializeViolationStores(Map regionSizes) { + Map immutableRegionSpaceUse = Collections.unmodifiableMap(regionSizes); + tableViolationStore = new TableQuotaViolationStore(master.getConnection(), this, + immutableRegionSpaceUse); + namespaceViolationStore = new NamespaceQuotaViolationStore(master.getConnection(), this, + immutableRegionSpaceUse); + } + + /** + * Computes the set of all tables that have quotas defined. This includes tables with quotas + * explicitly set on them, in addition to tables that exist namespaces which have a quota + * defined. + */ + TablesWithQuotas fetchAllTablesWithQuotasDefined() throws IOException { + final Scan scan = QuotaTableUtil.makeScan(null); + final QuotaRetriever scanner = new QuotaRetriever(); + final TablesWithQuotas tablesWithQuotas = new TablesWithQuotas(master.getConnection(), + master.getConfiguration()); + try { + scanner.init(master.getConnection(), scan); + for (QuotaSettings quotaSettings : scanner) { + // Only one of namespace and tablename should be 'null' + final String namespace = quotaSettings.getNamespace(); + final TableName tableName = quotaSettings.getTableName(); + if (QuotaType.SPACE != quotaSettings.getQuotaType()) { + continue; + } + + if (null != namespace) { + assert null == quotaSettings.getTableName(); + // Collect all of the tables in the namespace + TableName[] tablesInNS = master.getConnection().getAdmin() + .listTableNamesByNamespace(namespace); + for (TableName tableUnderNs : tablesInNS) { + if (LOG.isTraceEnabled()) { + LOG.trace("Adding " + tableUnderNs + " under " + namespace + + " as having a namespace quota"); + } + tablesWithQuotas.addNamespaceQuotaTable(tableUnderNs); + } + } else { + assert null != tableName; + if (LOG.isTraceEnabled()) { + LOG.trace("Adding " + tableName + " as having table quota."); + } + // namespace is already null, must be a non-null tableName + tablesWithQuotas.addTableQuotaTable(tableName); + } + } + return tablesWithQuotas; + } finally { + if (null != scanner) { + scanner.close(); + } + } + } + + @VisibleForTesting + QuotaViolationStore getTableViolationStore() { + return tableViolationStore; + } + + @VisibleForTesting + QuotaViolationStore getNamespaceViolationStore() { + return namespaceViolationStore; + } + + /** + * Transitions the given table to violation of its quota, enabling the violation policy. + */ + private void transitionTableToViolation(TableName table, SpaceViolationPolicy violationPolicy) { + this.violationNotifier.transitionTableToViolation(table, violationPolicy); + } + + /** + * Transitions the given table to observance of its quota, disabling the violation policy. + */ + private void transitionTableToObservance(TableName table) { + this.violationNotifier.transitionTableToObservance(table); + } + + /** + * Fetch the {@link ViolationState} for the given table. + */ + ViolationState getTableQuotaViolation(TableName table) { + // TODO Can one instance of a Chore be executed concurrently? + ViolationState state = this.tableQuotaViolationStates.get(table); + if (null == state) { + // No tracked state implies observance. + return ViolationState.IN_OBSERVANCE; + } + return state; + } + + /** + * Stores the quota violation state for the given table. + */ + void setTableQuotaViolation(TableName table, ViolationState state) { + this.tableQuotaViolationStates.put(table, state); + } + + /** + * Fetches the {@link ViolationState} for the given namespace. + */ + ViolationState getNamespaceQuotaViolation(String namespace) { + // TODO Can one instance of a Chore be executed concurrently? + ViolationState state = this.namespaceQuotaViolationStates.get(namespace); + if (null == state) { + // No tracked state implies observance. + return ViolationState.IN_OBSERVANCE; + } + return state; + } + + /** + * Stores the quota violation state for the given namespace. + */ + void setNamespaceQuotaViolation(String namespace, ViolationState state) { + this.namespaceQuotaViolationStates.put(namespace, state); + } + + /** + * Extracts the {@link SpaceViolationPolicy} from the serialized {@link Quotas} protobuf. + * @throws IllegalArgumentException If the SpaceQuota lacks a ViolationPolicy + */ + SpaceViolationPolicy getViolationPolicy(SpaceQuota spaceQuota) { + if (!spaceQuota.hasViolationPolicy()) { + throw new IllegalArgumentException("SpaceQuota had no associated violation policy: " + + spaceQuota); + } + return ProtobufUtil.toViolationPolicy(spaceQuota.getViolationPolicy()); + } + + /** + * Extracts the period for the chore from the configuration. + * + * @param conf The configuration object. + * @return The configured chore period or the default value. + */ + static int getPeriod(Configuration conf) { + return conf.getInt(VIOLATION_OBSERVER_CHORE_PERIOD_KEY, + VIOLATION_OBSERVER_CHORE_PERIOD_DEFAULT); + } + + /** + * Extracts the initial delay for the chore from the configuration. + * + * @param conf The configuration object. + * @return The configured chore initial delay or the default value. + */ + static long getInitialDelay(Configuration conf) { + return conf.getLong(VIOLATION_OBSERVER_CHORE_DELAY_KEY, + VIOLATION_OBSERVER_CHORE_DELAY_DEFAULT); + } + + /** + * Extracts the time unit for the chore period and initial delay from the configuration. The + * configuration value for {@link #VIOLATION_OBSERVER_CHORE_TIMEUNIT_KEY} must correspond to + * a {@link TimeUnit} value. + * + * @param conf The configuration object. + * @return The configured time unit for the chore period and initial delay or the default value. + */ + static TimeUnit getTimeUnit(Configuration conf) { + return TimeUnit.valueOf(conf.get(VIOLATION_OBSERVER_CHORE_TIMEUNIT_KEY, + VIOLATION_OBSERVER_CHORE_TIMEUNIT_DEFAULT)); + } + + /** + * Extracts the percent of Regions for a table to have been reported to enable quota violation + * state change. + * + * @param conf The configuration object. + * @return The percent of regions reported to use. + */ + static Double getRegionReportPercent(Configuration conf) { + return conf.getDouble(VIOLATION_OBSERVER_CHORE_REPORT_PERCENT_KEY, + VIOLATION_OBSERVER_CHORE_REPORT_PERCENT_DEFAULT); + } + + /** + * A container which encapsulates the tables which have a table quota and the tables which + * are contained in a namespace which have a namespace quota. + */ + static class TablesWithQuotas { + private final Set tablesWithTableQuotas = new HashSet<>(); + private final Set tablesWithNamespaceQuotas = new HashSet<>(); + private final Connection conn; + private final Configuration conf; + + public TablesWithQuotas(Connection conn, Configuration conf) { + this.conn = Objects.requireNonNull(conn); + this.conf = Objects.requireNonNull(conf); + } + + Configuration getConfiguration() { + return conf; + } + + /** + * Adds a table with a table quota. + */ + public void addTableQuotaTable(TableName tn) { + tablesWithTableQuotas.add(tn); + } + + /** + * Adds a table with a namespace quota. + */ + public void addNamespaceQuotaTable(TableName tn) { + tablesWithNamespaceQuotas.add(tn); + } + + /** + * Returns true if the given table has a table quota. + */ + public boolean hasTableQuota(TableName tn) { + return tablesWithTableQuotas.contains(tn); + } + + /** + * Returns true if the table exists in a namespace with a namespace quota. + */ + public boolean hasNamespaceQuota(TableName tn) { + return tablesWithNamespaceQuotas.contains(tn); + } + + /** + * Returns an unmodifiable view of all tables with table quotas. + */ + public Set getTableQuotaTables() { + return Collections.unmodifiableSet(tablesWithTableQuotas); + } + + /** + * Returns an unmodifiable view of all tables in namespaces that have + * namespace quotas. + */ + public Set getNamespaceQuotaTables() { + return Collections.unmodifiableSet(tablesWithNamespaceQuotas); + } + + public Set getNamespacesWithQuotas() { + Set namespaces = new HashSet<>(); + for (TableName tn : tablesWithNamespaceQuotas) { + namespaces.add(tn.getNamespaceAsString()); + } + return namespaces; + } + + /** + * Returns a view of all tables that reside in a namespace with a namespace + * quota, grouped by the namespace itself. + */ + public Multimap getTablesByNamespace() { + Multimap tablesByNS = HashMultimap.create(); + for (TableName tn : tablesWithNamespaceQuotas) { + tablesByNS.put(tn.getNamespaceAsString(), tn); + } + return tablesByNS; + } + + /** + * Filters out all tables for which the Master currently doesn't have enough region space + * reports received from RegionServers yet. + */ + public void filterInsufficientlyReportedTables(QuotaViolationStore tableStore) + throws IOException { + final double percentRegionsReportedThreshold = getRegionReportPercent(getConfiguration()); + Set tablesToRemove = new HashSet<>(); + for (TableName table : Iterables.concat(tablesWithTableQuotas, tablesWithNamespaceQuotas)) { + // Don't recompute a table we've already computed + if (tablesToRemove.contains(table)) { + continue; + } + final int numRegionsInTable = getNumRegions(table); + // If the table doesn't exist (no regions), bail out. + if (0 == numRegionsInTable) { + if (LOG.isTraceEnabled()) { + LOG.trace("Filtering " + table + " because no regions were reported"); + } + tablesToRemove.add(table); + continue; + } + final int reportedRegionsInQuota = getNumReportedRegions(table, tableStore); + final double ratioReported = ((double) reportedRegionsInQuota) / numRegionsInTable; + if (ratioReported < percentRegionsReportedThreshold) { + if (LOG.isTraceEnabled()) { + LOG.trace("Filtering " + table + " because " + reportedRegionsInQuota + " of " + + numRegionsInTable + " were reported."); + } + tablesToRemove.add(table); + } else if (LOG.isTraceEnabled()) { + LOG.trace("Retaining " + table + " because " + reportedRegionsInQuota + " of " + + numRegionsInTable + " were reported."); + } + } + for (TableName tableToRemove : tablesToRemove) { + tablesWithTableQuotas.remove(tableToRemove); + tablesWithNamespaceQuotas.remove(tableToRemove); + } + } + + /** + * Computes the total number of regions in a table. + */ + int getNumRegions(TableName table) throws IOException { + List regions = this.conn.getAdmin().getTableRegions(table); + if (null == regions) { + return 0; + } + return regions.size(); + } + + /** + * Computes the number of regions reported for a table. + */ + int getNumReportedRegions(TableName table, QuotaViolationStore tableStore) + throws IOException { + return Iterables.size(tableStore.filterBySubject(table)); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder(32); + sb.append(getClass().getSimpleName()) + .append(": tablesWithTableQuotas=") + .append(this.tablesWithTableQuotas) + .append(", tablesWithNamespaceQuotas=") + .append(this.tablesWithNamespaceQuotas); + return sb.toString(); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaViolationStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaViolationStore.java new file mode 100644 index 0000000..37418e6 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaViolationStore.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import java.io.IOException; +import java.util.Map.Entry; + +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota; + +/** + * A common interface for computing quota observance/violation for tables or namespaces. + */ +public interface QuotaViolationStore { + + /** + * The current state of a table with respect to the policy set forth by a quota. + */ + public enum ViolationState { + IN_VIOLATION, + IN_OBSERVANCE, + } + + /** + * Fetch the Quota for the given table. May be null. + */ + SpaceQuota getSpaceQuota(T subject) throws IOException; + + /** + * Returns the current {@link ViolationState} for the given subject. + */ + ViolationState getCurrentState(T subject); + + /** + * Computes the target {@link ViolationState} for the given subject. + */ + ViolationState getTargetState(T subject, SpaceQuota spaceQuota); + + /** + * Filters the provided regions, returning those which match the given + * subject. + * + * @param subject The filter criteria. + */ + Iterable> filterBySubject(T subject); + + /** + * Sets the current {@link ViolationState} for the subject. + */ + void setCurrentState(T subject, ViolationState state); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifier.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifier.java new file mode 100644 index 0000000..745bfc0 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifier.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import org.apache.hadoop.hbase.TableName; + +/** + * An interface which abstract away the action taken to enable or disable + * a space quota violation policy across the HBase cluster. + */ +public interface SpaceQuotaViolationNotifier { + + /** + * Instructs the cluster that the given table is in violation of a space quota. The + * provided violation policy is the action which should be taken on the table. + * + * @param tableName The name of the table in violation of the quota. + * @param violationPolicy The policy which should be enacted on the table. + */ + void transitionTableToViolation(TableName tableName, SpaceViolationPolicy violationPolicy); + + /** + * Instructs the cluster that the given table is in observance of any applicable space quota. + * + * @param tableName The name of the table in observance. + */ + void transitionTableToObservance(TableName tableName); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierForTest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierForTest.java new file mode 100644 index 0000000..7d1cb56 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierForTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.hbase.TableName; + +/** + * A SpaceQuotaViolationNotifier implementation for verifying testing. + */ +public class SpaceQuotaViolationNotifierForTest implements SpaceQuotaViolationNotifier { + + private final Map tablesInViolation = new HashMap<>(); + + @Override + public void transitionTableToViolation(TableName tableName, SpaceViolationPolicy violationPolicy) { + tablesInViolation.put(tableName, violationPolicy); + } + + @Override + public void transitionTableToObservance(TableName tableName) { + tablesInViolation.remove(tableName); + } + + public Map snapshotTablesInViolation() { + return new HashMap<>(this.tablesInViolation); + } + + public void clearTableViolations() { + this.tablesInViolation.clear(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableQuotaViolationStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableQuotaViolationStore.java new file mode 100644 index 0000000..517a89b --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableQuotaViolationStore.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.Map.Entry; + +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas; +import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota; + +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; + +/** + * {@link QuotaViolationStore} for tables. + */ +public class TableQuotaViolationStore implements QuotaViolationStore { + private final Connection conn; + private final QuotaObserverChore chore; + private final Map regionUsage; + + public TableQuotaViolationStore(Connection conn, QuotaObserverChore chore, Map regionUsage) { + this.conn = Objects.requireNonNull(conn); + this.chore = Objects.requireNonNull(chore); + this.regionUsage = Objects.requireNonNull(regionUsage); + } + + @Override + public SpaceQuota getSpaceQuota(TableName subject) throws IOException { + Quotas quotas = getQuotaForTable(subject); + if (null != quotas && quotas.hasSpace()) { + return quotas.getSpace(); + } + return null; + } + /** + * Fetches the table quota. Visible for mocking/testing. + */ + Quotas getQuotaForTable(TableName table) throws IOException { + return QuotaTableUtil.getTableQuota(conn, table); + } + + @Override + public ViolationState getCurrentState(TableName table) { + // Defer the "current state" to the chore + return chore.getTableQuotaViolation(table); + } + + @Override + public ViolationState getTargetState(TableName table, SpaceQuota spaceQuota) { + final long sizeLimitInBytes = spaceQuota.getSoftLimit(); + long sum = 0L; + for (Entry entry : filterBySubject(table)) { + sum += entry.getValue(); + if (sum > sizeLimitInBytes) { + // Short-circuit early + return ViolationState.IN_VIOLATION; + } + } + // Observance is defined as the size of the table being less than the limit + return sum <= sizeLimitInBytes ? ViolationState.IN_OBSERVANCE : ViolationState.IN_VIOLATION; + } + + @Override + public Iterable> filterBySubject(TableName table) { + return Iterables.filter(regionUsage.entrySet(), new Predicate>() { + @Override + public boolean apply(Entry input) { + return table.equals(input.getKey().getTable()); + } + }); + } + + @Override + public void setCurrentState(TableName table, ViolationState state) { + // Defer the "current state" to the chore + this.chore.setTableQuotaViolation(table, state); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestNamespaceQuotaViolationStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestNamespaceQuotaViolationStore.java new file mode 100644 index 0000000..ae4c382 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestNamespaceQuotaViolationStore.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import static com.google.common.collect.Iterables.size; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.quotas.QuotaViolationStore.ViolationState; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas; +import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * Test class for {@link NamespaceQuotaViolationStore}. + */ +@Category(SmallTests.class) +public class TestNamespaceQuotaViolationStore { + private static final long ONE_MEGABYTE = 1024L * 1024L; + + private Connection conn; + private QuotaObserverChore chore; + private Map regionReports; + private NamespaceQuotaViolationStore store; + + @Before + public void setup() { + conn = mock(Connection.class); + chore = mock(QuotaObserverChore.class); + regionReports = new HashMap<>(); + store = new NamespaceQuotaViolationStore(conn, chore, regionReports); + } + + @Test + public void testGetSpaceQuota() throws Exception { + NamespaceQuotaViolationStore mockStore = mock(NamespaceQuotaViolationStore.class); + when(mockStore.getSpaceQuota(any(String.class))).thenCallRealMethod(); + + Quotas quotaWithSpace = Quotas.newBuilder().setSpace( + SpaceQuota.newBuilder() + .setSoftLimit(1024L) + .setViolationPolicy(QuotaProtos.SpaceViolationPolicy.DISABLE) + .build()) + .build(); + Quotas quotaWithoutSpace = Quotas.newBuilder().build(); + + AtomicReference quotaRef = new AtomicReference<>(); + when(mockStore.getQuotaForNamespace(any(String.class))).then(new Answer() { + @Override + public Quotas answer(InvocationOnMock invocation) throws Throwable { + return quotaRef.get(); + } + }); + + quotaRef.set(quotaWithSpace); + assertEquals(quotaWithSpace.getSpace(), mockStore.getSpaceQuota("ns")); + quotaRef.set(quotaWithoutSpace); + assertNull(mockStore.getSpaceQuota("ns")); + } + + @Test + public void testTargetViolationState() { + final String NS = "ns"; + TableName tn1 = TableName.valueOf(NS, "tn1"); + TableName tn2 = TableName.valueOf(NS, "tn2"); + TableName tn3 = TableName.valueOf("tn3"); + SpaceQuota quota = SpaceQuota.newBuilder() + .setSoftLimit(ONE_MEGABYTE) + .setViolationPolicy(ProtobufUtil.toProtoViolationPolicy(SpaceViolationPolicy.DISABLE)) + .build(); + + // Create some junk data to filter. Makes sure it's so large that it would + // immediately violate the quota. + for (int i = 0; i < 3; i++) { + regionReports.put(new HRegionInfo(tn3, Bytes.toBytes(i), Bytes.toBytes(i + 1)), + 5L * ONE_MEGABYTE); + } + + regionReports.put(new HRegionInfo(tn1, Bytes.toBytes(0), Bytes.toBytes(1)), 1024L * 512L); + regionReports.put(new HRegionInfo(tn1, Bytes.toBytes(1), Bytes.toBytes(2)), 1024L * 256L); + + // Below the quota + assertEquals(ViolationState.IN_OBSERVANCE, store.getTargetState(NS, quota)); + + regionReports.put(new HRegionInfo(tn2, Bytes.toBytes(2), Bytes.toBytes(3)), 1024L * 256L); + + // Equal to the quota is still in observance + assertEquals(ViolationState.IN_OBSERVANCE, store.getTargetState(NS, quota)); + + regionReports.put(new HRegionInfo(tn2, Bytes.toBytes(3), Bytes.toBytes(4)), 1024L); + + // Exceeds the quota, should be in violation + assertEquals(ViolationState.IN_VIOLATION, store.getTargetState(NS, quota)); + } + + @Test + public void testFilterRegionsByNamespace() { + TableName tn1 = TableName.valueOf("foo"); + TableName tn2 = TableName.valueOf("sn", "bar"); + TableName tn3 = TableName.valueOf("ns", "foo"); + TableName tn4 = TableName.valueOf("ns", "bar"); + + assertEquals(0, size(store.filterBySubject("asdf"))); + + for (int i = 0; i < 5; i++) { + regionReports.put(new HRegionInfo(tn1, Bytes.toBytes(i), Bytes.toBytes(i+1)), 0L); + } + for (int i = 0; i < 3; i++) { + regionReports.put(new HRegionInfo(tn2, Bytes.toBytes(i), Bytes.toBytes(i+1)), 0L); + } + for (int i = 0; i < 10; i++) { + regionReports.put(new HRegionInfo(tn3, Bytes.toBytes(i), Bytes.toBytes(i+1)), 0L); + } + for (int i = 0; i < 8; i++) { + regionReports.put(new HRegionInfo(tn4, Bytes.toBytes(i), Bytes.toBytes(i+1)), 0L); + } + assertEquals(26, regionReports.size()); + assertEquals(5, size(store.filterBySubject(NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR))); + assertEquals(3, size(store.filterBySubject("sn"))); + assertEquals(18, size(store.filterBySubject("ns"))); + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChore.java new file mode 100644 index 0000000..db549e4 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChore.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.Iterables; + +/** + * Non-HBase cluster unit tests for {@link QuotaObserverChore}. + */ +@Category(SmallTests.class) +public class TestQuotaObserverChore { + private Connection conn; + private QuotaObserverChore chore; + + @Before + public void setup() throws Exception { + conn = mock(Connection.class); + chore = mock(QuotaObserverChore.class); + // Set up some rules to call the real method on the mock. + when(chore.getViolationPolicy(any(SpaceQuota.class))).thenCallRealMethod(); + } + + @Test + public void testNumRegionsForTable() { + TableName tn1 = TableName.valueOf("t1"); + TableName tn2 = TableName.valueOf("t2"); + TableName tn3 = TableName.valueOf("t3"); + + final int numTable1Regions = 10; + final int numTable2Regions = 15; + final int numTable3Regions = 8; + Map regionReports = new HashMap<>(); + for (int i = 0; i < numTable1Regions; i++) { + regionReports.put(new HRegionInfo(tn1, Bytes.toBytes(i), Bytes.toBytes(i + 1)), 0L); + } + + for (int i = 0; i < numTable2Regions; i++) { + regionReports.put(new HRegionInfo(tn2, Bytes.toBytes(i), Bytes.toBytes(i + 1)), 0L); + } + + for (int i = 0; i < numTable3Regions; i++) { + regionReports.put(new HRegionInfo(tn3, Bytes.toBytes(i), Bytes.toBytes(i + 1)), 0L); + } + + TableQuotaViolationStore store = new TableQuotaViolationStore(conn, chore, regionReports); + when(chore.getTableViolationStore()).thenReturn(store); + + assertEquals(numTable1Regions, Iterables.size(store.filterBySubject(tn1))); + assertEquals(numTable2Regions, Iterables.size(store.filterBySubject(tn2))); + assertEquals(numTable3Regions, Iterables.size(store.filterBySubject(tn3))); + } + + @Test + public void testExtractViolationPolicy() { + for (SpaceViolationPolicy policy : SpaceViolationPolicy.values()) { + SpaceQuota spaceQuota = SpaceQuota.newBuilder() + .setSoftLimit(1024L) + .setViolationPolicy(ProtobufUtil.toProtoViolationPolicy(policy)) + .build(); + assertEquals(policy, chore.getViolationPolicy(spaceQuota)); + } + SpaceQuota malformedQuota = SpaceQuota.newBuilder() + .setSoftLimit(1024L) + .build(); + try { + chore.getViolationPolicy(malformedQuota); + fail("Should have thrown an IllegalArgumentException."); + } catch (IllegalArgumentException e) { + // Pass + } + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreWithMiniCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreWithMiniCluster.java new file mode 100644 index 0000000..8638677 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreWithMiniCluster.java @@ -0,0 +1,595 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.NamespaceNotFoundException; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.quotas.QuotaObserverChore.TablesWithQuotas; +import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Multimap; + +/** + * Test class for {@link QuotaObserverChore} that uses a live HBase cluster. + */ +@Category(LargeTests.class) +public class TestQuotaObserverChoreWithMiniCluster { + private static final Log LOG = LogFactory.getLog(TestQuotaObserverChoreWithMiniCluster.class); + private static final int SIZE_PER_VALUE = 256; + private static final String F1 = "f1"; + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final AtomicLong COUNTER = new AtomicLong(0); + private static final long ONE_MEGABYTE = 1024L * 1024L; + + @Rule + public TestName testName = new TestName(); + + private HMaster master; + private QuotaObserverChore chore; + private SpaceQuotaViolationNotifierForTest violationNotifier; + + @BeforeClass + public static void setUp() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_DELAY_KEY, 1000); + conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_KEY, 1000); + conf.setInt(QuotaObserverChore.VIOLATION_OBSERVER_CHORE_DELAY_KEY, 1000); + conf.setInt(QuotaObserverChore.VIOLATION_OBSERVER_CHORE_PERIOD_KEY, 1000); + conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); + TEST_UTIL.startMiniCluster(1); + } + + @AfterClass + public static void tearDown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void removeAllQuotas() throws Exception { + final Connection conn = TEST_UTIL.getConnection(); + // Wait for the quota table to be created + if (!conn.getAdmin().tableExists(QuotaUtil.QUOTA_TABLE_NAME)) { + do { + LOG.debug("Quota table does not yet exist"); + Thread.sleep(1000); + } while (!conn.getAdmin().tableExists(QuotaUtil.QUOTA_TABLE_NAME)); + } else { + // Or, clean up any quotas from previous test runs. + QuotaRetriever scanner = QuotaRetriever.open(TEST_UTIL.getConfiguration()); + for (QuotaSettings quotaSettings : scanner) { + final String namespace = quotaSettings.getNamespace(); + final TableName tableName = quotaSettings.getTableName(); + if (null != namespace) { + LOG.debug("Deleting quota for namespace: " + namespace); + QuotaUtil.deleteNamespaceQuota(conn, namespace); + } else { + assert null != tableName; + LOG.debug("Deleting quota for table: "+ tableName); + QuotaUtil.deleteTableQuota(conn, tableName); + } + } + } + + master = TEST_UTIL.getMiniHBaseCluster().getMaster(); + violationNotifier = + (SpaceQuotaViolationNotifierForTest) master.getSpaceQuotaViolationNotifier(); + violationNotifier.clearTableViolations(); + chore = master.getQuotaObserverChore(); + } + + @Test + public void testTableViolatesQuota() throws Exception { + TableName tn = createTableWithRegions(10); + + final long sizeLimit = 2L * ONE_MEGABYTE; + final SpaceViolationPolicy violationPolicy = SpaceViolationPolicy.NO_INSERTS; + QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn, sizeLimit, violationPolicy); + TEST_UTIL.getAdmin().setQuota(settings); + + // Write more data than should be allowed + writeData(tn, 3L * ONE_MEGABYTE); + + Map violatedQuotas = violationNotifier.snapshotTablesInViolation(); + while (violatedQuotas.isEmpty()) { + LOG.info("Found no violated quotas, sleeping and retrying. Current reports: " + + master.getMasterQuotaManager().snapshotRegionSizes()); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.debug("Interrupted while sleeping.", e); + Thread.currentThread().interrupt(); + } + violatedQuotas = violationNotifier.snapshotTablesInViolation(); + } + + Entry entry = Iterables.getOnlyElement(violatedQuotas.entrySet()); + assertEquals(tn, entry.getKey()); + assertEquals(violationPolicy, entry.getValue()); + } + + @Test + public void testNamespaceViolatesQuota() throws Exception { + final String namespace = testName.getMethodName(); + final Admin admin = TEST_UTIL.getAdmin(); + // Ensure the namespace exists + try { + admin.getNamespaceDescriptor(namespace); + } catch (NamespaceNotFoundException e) { + NamespaceDescriptor desc = NamespaceDescriptor.create(namespace).build(); + admin.createNamespace(desc); + } + + TableName tn1 = createTableWithRegions(namespace, 5); + TableName tn2 = createTableWithRegions(namespace, 5); + TableName tn3 = createTableWithRegions(namespace, 5); + + final long sizeLimit = 5L * ONE_MEGABYTE; + final SpaceViolationPolicy violationPolicy = SpaceViolationPolicy.DISABLE; + QuotaSettings settings = QuotaSettingsFactory.limitNamespaceSpace(namespace, sizeLimit, violationPolicy); + admin.setQuota(settings); + + writeData(tn1, 2L * ONE_MEGABYTE); + admin.flush(tn1); + Map violatedQuotas = violationNotifier.snapshotTablesInViolation(); + for (int i = 0; i < 5; i++) { + // Check a few times to make sure we don't prematurely move to violation + assertEquals("Should not see any quota violations after writing 2MB of data", 0, violatedQuotas.size()); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.debug("Interrupted while sleeping." , e); + } + violatedQuotas = violationNotifier.snapshotTablesInViolation(); + } + + writeData(tn2, 2L * ONE_MEGABYTE); + admin.flush(tn2); + violatedQuotas = violationNotifier.snapshotTablesInViolation(); + for (int i = 0; i < 5; i++) { + // Check a few times to make sure we don't prematurely move to violation + assertEquals("Should not see any quota violations after writing 4MB of data", 0, + violatedQuotas.size()); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.debug("Interrupted while sleeping." , e); + } + violatedQuotas = violationNotifier.snapshotTablesInViolation(); + } + + // Writing the final 2MB of data will push the namespace over the 5MB limit (6MB in total) + // and should push all three tables in the namespace into violation. + writeData(tn3, 2L * ONE_MEGABYTE); + admin.flush(tn3); + violatedQuotas = violationNotifier.snapshotTablesInViolation(); + while (violatedQuotas.size() < 3) { + LOG.debug("Saw fewer violations than desired (expected 3): " + violatedQuotas + + ". Current reports: " + master.getMasterQuotaManager().snapshotRegionSizes()); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.debug("Interrupted while sleeping.", e); + Thread.currentThread().interrupt(); + } + violatedQuotas = violationNotifier.snapshotTablesInViolation(); + } + + SpaceViolationPolicy vp1 = violatedQuotas.remove(tn1); + assertNotNull("tn1 should be in violation", vp1); + assertEquals(violationPolicy, vp1); + SpaceViolationPolicy vp2 = violatedQuotas.remove(tn2); + assertNotNull("tn2 should be in violation", vp2); + assertEquals(violationPolicy, vp2); + SpaceViolationPolicy vp3 = violatedQuotas.remove(tn3); + assertNotNull("tn3 should be in violation", vp3); + assertEquals(violationPolicy, vp3); + assertTrue("Unexpected additional quota violations: " + violatedQuotas, violatedQuotas.isEmpty()); + } + + @Test + public void testTableQuotaOverridesNamespaceQuota() throws Exception { + final String namespace = testName.getMethodName(); + final Admin admin = TEST_UTIL.getAdmin(); + // Ensure the namespace exists + try { + admin.getNamespaceDescriptor(namespace); + } catch (NamespaceNotFoundException e) { + NamespaceDescriptor desc = NamespaceDescriptor.create(namespace).build(); + admin.createNamespace(desc); + } + + TableName tn1 = createTableWithRegions(namespace, 5); + TableName tn2 = createTableWithRegions(namespace, 5); + + final long namespaceSizeLimit = 3L * ONE_MEGABYTE; + final SpaceViolationPolicy namespaceViolationPolicy = SpaceViolationPolicy.DISABLE; + QuotaSettings namespaceSettings = QuotaSettingsFactory.limitNamespaceSpace(namespace, + namespaceSizeLimit, namespaceViolationPolicy); + admin.setQuota(namespaceSettings); + + writeData(tn1, 2L * ONE_MEGABYTE); + admin.flush(tn1); + Map violatedQuotas = violationNotifier.snapshotTablesInViolation(); + for (int i = 0; i < 5; i++) { + // Check a few times to make sure we don't prematurely move to violation + assertEquals("Should not see any quota violations after writing 2MB of data", 0, + violatedQuotas.size()); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.debug("Interrupted while sleeping." , e); + } + violatedQuotas = violationNotifier.snapshotTablesInViolation(); + } + + writeData(tn2, 2L * ONE_MEGABYTE); + admin.flush(tn2); + violatedQuotas = violationNotifier.snapshotTablesInViolation(); + while (violatedQuotas.size() < 2) { + LOG.debug("Saw fewer violations than desired (expected 2): " + violatedQuotas + + ". Current reports: " + master.getMasterQuotaManager().snapshotRegionSizes()); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.debug("Interrupted while sleeping.", e); + Thread.currentThread().interrupt(); + } + violatedQuotas = violationNotifier.snapshotTablesInViolation(); + } + + SpaceViolationPolicy actualPolicyTN1 = violatedQuotas.get(tn1); + assertNotNull("Expected to see violation policy for tn1", actualPolicyTN1); + assertEquals(namespaceViolationPolicy, actualPolicyTN1); + SpaceViolationPolicy actualPolicyTN2 = violatedQuotas.get(tn2); + assertNotNull("Expected to see violation policy for tn2", actualPolicyTN2); + assertEquals(namespaceViolationPolicy, actualPolicyTN2); + + // Override the namespace quota with a table quota + final long tableSizeLimit = ONE_MEGABYTE; + final SpaceViolationPolicy tableViolationPolicy = SpaceViolationPolicy.NO_INSERTS; + QuotaSettings tableSettings = QuotaSettingsFactory.limitTableSpace(tn1, tableSizeLimit, + tableViolationPolicy); + admin.setQuota(tableSettings); + + // Keep checking for the table quota policy to override the namespace quota + while (true) { + violatedQuotas = violationNotifier.snapshotTablesInViolation(); + SpaceViolationPolicy actualTableViolationPolicy = violatedQuotas.get(tn1); + assertNotNull("Violation policy should never be null", actualTableViolationPolicy); + if (tableViolationPolicy != actualTableViolationPolicy) { + LOG.debug("Saw unexpected table violation policy, waiting and re-checking."); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.debug("Interrupted while sleeping"); + Thread.currentThread().interrupt(); + } + continue; + } + assertEquals(tableViolationPolicy, actualTableViolationPolicy); + break; + } + + // This should not change with the introduction of the table quota for tn1 + actualPolicyTN2 = violatedQuotas.get(tn2); + assertNotNull("Expected to see violation policy for tn2", actualPolicyTN2); + assertEquals(namespaceViolationPolicy, actualPolicyTN2); + } + + @Test + public void testGetAllTablesWithQuotas() throws Exception { + final Multimap quotas = createTablesWithSpaceQuotas(); + Set tablesWithQuotas = new HashSet<>(); + Set namespaceTablesWithQuotas = new HashSet<>(); + // Partition the tables with quotas by table and ns quota + partitionTablesByQuotaTarget(quotas, tablesWithQuotas, namespaceTablesWithQuotas); + + TablesWithQuotas tables = chore.fetchAllTablesWithQuotasDefined(); + assertEquals("Found tables: " + tables, tablesWithQuotas, tables.getTableQuotaTables()); + assertEquals("Found tables: " + tables, namespaceTablesWithQuotas, tables.getNamespaceQuotaTables()); + } + + @Test + public void testRpcQuotaTablesAreFiltered() throws Exception { + final Multimap quotas = createTablesWithSpaceQuotas(); + Set tablesWithQuotas = new HashSet<>(); + Set namespaceTablesWithQuotas = new HashSet<>(); + // Partition the tables with quotas by table and ns quota + partitionTablesByQuotaTarget(quotas, tablesWithQuotas, namespaceTablesWithQuotas); + + TableName rpcQuotaTable = createTable(); + TEST_UTIL.getAdmin().setQuota(QuotaSettingsFactory + .throttleTable(rpcQuotaTable, ThrottleType.READ_NUMBER, 6, TimeUnit.MINUTES)); + + // The `rpcQuotaTable` should not be included in this Set + TablesWithQuotas tables = chore.fetchAllTablesWithQuotasDefined(); + assertEquals("Found tables: " + tables, tablesWithQuotas, tables.getTableQuotaTables()); + assertEquals("Found tables: " + tables, namespaceTablesWithQuotas, tables.getNamespaceQuotaTables()); + } + + @Test + public void testFilterRegions() throws Exception { + Map mockReportedRegions = new HashMap<>(); + // Can't mock because of primitive int as a return type -- Mockito + // can only handle an Integer. + TablesWithQuotas tables = new TablesWithQuotas(TEST_UTIL.getConnection(), + TEST_UTIL.getConfiguration()) { + @Override + int getNumReportedRegions(TableName table, QuotaViolationStore tableStore) { + Integer i = mockReportedRegions.get(table); + if (null == i) { + return 0; + } + return i; + } + }; + + // Create the tables + TableName tn1 = createTableWithRegions(20); + TableName tn2 = createTableWithRegions(20); + TableName tn3 = createTableWithRegions(20); + + // Add them to the Tables with Quotas object + tables.addTableQuotaTable(tn1); + tables.addTableQuotaTable(tn2); + tables.addTableQuotaTable(tn3); + + // Mock the number of regions reported + mockReportedRegions.put(tn1, 10); // 50% + mockReportedRegions.put(tn2, 19); // 95% + mockReportedRegions.put(tn3, 20); // 100% + + // Argument is un-used + tables.filterInsufficientlyReportedTables(null); + // The default of 95% reported should prevent tn1 from appearing + assertEquals(new HashSet<>(Arrays.asList(tn2, tn3)), tables.getTableQuotaTables()); + } + + @Test + public void testFetchSpaceQuota() throws Exception { + Multimap tables = createTablesWithSpaceQuotas(); + // Can pass in an empty map, we're not consulting it. + chore.initializeViolationStores(Collections.emptyMap()); + // All tables that were created should have a quota defined. + for (Entry entry : tables.entries()) { + final TableName table = entry.getKey(); + final QuotaSettings qs = entry.getValue(); + + assertTrue("QuotaSettings was an instance of " + qs.getClass(), + qs instanceof SpaceLimitSettings); + + SpaceQuota spaceQuota = null; + if (null != qs.getTableName()) { + spaceQuota = chore.getTableViolationStore().getSpaceQuota(table); + assertNotNull("Could not find table space quota for " + table, spaceQuota); + } else if (null != qs.getNamespace()) { + spaceQuota = chore.getNamespaceViolationStore().getSpaceQuota(table.getNamespaceAsString()); + assertNotNull("Could not find namespace space quota for " + table.getNamespaceAsString(), spaceQuota); + } else { + fail("Expected table or namespace space quota"); + } + + final SpaceLimitSettings sls = (SpaceLimitSettings) qs; + assertEquals(sls.getProto().getQuota(), spaceQuota); + } + + TableName tableWithoutQuota = createTable(); + assertNull(chore.getTableViolationStore().getSpaceQuota(tableWithoutQuota)); + } + + // + // Helpers + // + + void writeData(TableName tn, long sizeInBytes) throws IOException { + final Connection conn = TEST_UTIL.getConnection(); + final Table table = conn.getTable(tn); + try { + List updates = new ArrayList<>(); + long bytesToWrite = sizeInBytes; + long rowKeyId = 0L; + final StringBuilder sb = new StringBuilder(); + final Random r = new Random(); + while (bytesToWrite > 0L) { + sb.setLength(0); + sb.append(Long.toString(rowKeyId)); + // Use the reverse counter as the rowKey to get even spread across all regions + Put p = new Put(Bytes.toBytes(sb.reverse().toString())); + byte[] value = new byte[SIZE_PER_VALUE]; + r.nextBytes(value); + p.addColumn(Bytes.toBytes(F1), Bytes.toBytes("q1"), value); + updates.add(p); + + // Batch 50K worth of updates + if (updates.size() > 50) { + table.put(updates); + updates.clear(); + } + + // Just count the value size, ignore the size of rowkey + column + bytesToWrite -= SIZE_PER_VALUE; + rowKeyId++; + } + + // Write the final batch + if (!updates.isEmpty()) { + table.put(updates); + } + + LOG.debug("Data was written to HBase"); + // Push the data to disk. + TEST_UTIL.getAdmin().flush(tn); + LOG.debug("Data flushed to disk"); + } finally { + table.close(); + } + } + + Multimap createTablesWithSpaceQuotas() throws Exception { + final Admin admin = TEST_UTIL.getAdmin(); + final Multimap tablesWithQuotas = HashMultimap.create(); + + final TableName tn1 = createTable(); + final TableName tn2 = createTable(); + + NamespaceDescriptor nd = NamespaceDescriptor.create("ns" + COUNTER.getAndIncrement()).build(); + admin.createNamespace(nd); + final TableName tn3 = createTableInNamespace(nd); + final TableName tn4 = createTableInNamespace(nd); + final TableName tn5 = createTableInNamespace(nd); + + final long sizeLimit1 = 1024L * 1024L * 1024L * 1024L * 5L; // 5TB + final SpaceViolationPolicy violationPolicy1 = SpaceViolationPolicy.NO_WRITES; + QuotaSettings qs1 = QuotaSettingsFactory.limitTableSpace(tn1, sizeLimit1, violationPolicy1); + tablesWithQuotas.put(tn1, qs1); + admin.setQuota(qs1); + + final long sizeLimit2 = 1024L * 1024L * 1024L * 200L; // 200GB + final SpaceViolationPolicy violationPolicy2 = SpaceViolationPolicy.NO_WRITES_COMPACTIONS; + QuotaSettings qs2 = QuotaSettingsFactory.limitTableSpace(tn2, sizeLimit2, violationPolicy2); + tablesWithQuotas.put(tn2, qs2); + admin.setQuota(qs2); + + final long sizeLimit3 = 1024L * 1024L * 1024L * 1024L * 100L; // 100TB + final SpaceViolationPolicy violationPolicy3 = SpaceViolationPolicy.NO_INSERTS; + QuotaSettings qs3 = QuotaSettingsFactory.limitNamespaceSpace(nd.getName(), sizeLimit3, violationPolicy3); + tablesWithQuotas.put(tn3, qs3); + tablesWithQuotas.put(tn4, qs3); + tablesWithQuotas.put(tn5, qs3); + admin.setQuota(qs3); + + final long sizeLimit4 = 1024L * 1024L * 1024L * 5L; // 5GB + final SpaceViolationPolicy violationPolicy4 = SpaceViolationPolicy.NO_INSERTS; + QuotaSettings qs4 = QuotaSettingsFactory.limitTableSpace(tn5, sizeLimit4, violationPolicy4); + // Override the ns quota for tn5, import edge-case to catch table quota taking + // precedence over ns quota. + tablesWithQuotas.put(tn5, qs4); + admin.setQuota(qs4); + + return tablesWithQuotas; + } + + TableName createTable() throws Exception { + return createTableWithRegions(1); + } + + TableName createTableWithRegions(int numRegions) throws Exception { + return createTableWithRegions(NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR, numRegions); + } + + TableName createTableWithRegions(String namespace, int numRegions) throws Exception { + final Admin admin = TEST_UTIL.getAdmin(); + final TableName tn = TableName.valueOf(namespace, testName.getMethodName() + COUNTER.getAndIncrement()); + + // Delete the old table + if (admin.tableExists(tn)) { + admin.disableTable(tn); + admin.deleteTable(tn); + } + + // Create the table + HTableDescriptor tableDesc = new HTableDescriptor(tn); + tableDesc.addFamily(new HColumnDescriptor(F1)); + if (numRegions == 1) { + admin.createTable(tableDesc); + } else { + admin.createTable(tableDesc, Bytes.toBytes("0"), Bytes.toBytes("9"), numRegions); + } + return tn; + } + + TableName createTableInNamespace(NamespaceDescriptor nd) throws Exception { + final Admin admin = TEST_UTIL.getAdmin(); + final TableName tn = TableName.valueOf(nd.getName(), + testName.getMethodName() + COUNTER.getAndIncrement()); + + // Delete the old table + if (admin.tableExists(tn)) { + admin.disableTable(tn); + admin.deleteTable(tn); + } + + // Create the table + HTableDescriptor tableDesc = new HTableDescriptor(tn); + tableDesc.addFamily(new HColumnDescriptor(F1)); + + admin.createTable(tableDesc); + return tn; + } + + void partitionTablesByQuotaTarget(Multimap quotas, + Set tablesWithTableQuota, Set tablesWithNamespaceQuota) { + // Partition the tables with quotas by table and ns quota + for (Entry entry : quotas.entries()) { + SpaceLimitSettings settings = (SpaceLimitSettings) entry.getValue(); + TableName tn = entry.getKey(); + if (null != settings.getTableName()) { + tablesWithTableQuota.add(tn); + } + if (null != settings.getNamespace()) { + tablesWithNamespaceQuota.add(tn); + } + + if (null == settings.getTableName() && null == settings.getNamespace()) { + fail("Unexpected table name with null tableName and namespace: " + tn); + } + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaTableUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaTableUtil.java index 5ade57d..5306be9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaTableUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaTableUtil.java @@ -23,14 +23,11 @@ import static org.junit.Assert.assertEquals; import java.io.IOException; import java.util.concurrent.TimeUnit; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Throttle; @@ -48,7 +45,6 @@ import org.junit.experimental.categories.Category; */ @Category({MasterTests.class, MediumTests.class}) public class TestQuotaTableUtil { - private static final Log LOG = LogFactory.getLog(TestQuotaTableUtil.class); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private Connection connection; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTableQuotaViolationStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTableQuotaViolationStore.java new file mode 100644 index 0000000..efc046b --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTableQuotaViolationStore.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import static com.google.common.collect.Iterables.size; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.quotas.QuotaViolationStore.ViolationState; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas; +import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * Test class for {@link TableQuotaViolationStore}. + */ +@Category(SmallTests.class) +public class TestTableQuotaViolationStore { + private static final long ONE_MEGABYTE = 1024L * 1024L; + + private Connection conn; + private QuotaObserverChore chore; + private Map regionReports; + private TableQuotaViolationStore store; + + @Before + public void setup() { + conn = mock(Connection.class); + chore = mock(QuotaObserverChore.class); + regionReports = new HashMap<>(); + store = new TableQuotaViolationStore(conn, chore, regionReports); + } + + @Test + public void testFilterRegionsByTable() throws Exception { + TableName tn1 = TableName.valueOf("foo"); + TableName tn2 = TableName.valueOf("bar"); + TableName tn3 = TableName.valueOf("ns", "foo"); + + assertEquals(0, size(store.filterBySubject(tn1))); + + for (int i = 0; i < 5; i++) { + regionReports.put(new HRegionInfo(tn1, Bytes.toBytes(i), Bytes.toBytes(i+1)), 0L); + } + for (int i = 0; i < 3; i++) { + regionReports.put(new HRegionInfo(tn2, Bytes.toBytes(i), Bytes.toBytes(i+1)), 0L); + } + for (int i = 0; i < 10; i++) { + regionReports.put(new HRegionInfo(tn3, Bytes.toBytes(i), Bytes.toBytes(i+1)), 0L); + } + assertEquals(18, regionReports.size()); + assertEquals(5, size(store.filterBySubject(tn1))); + assertEquals(3, size(store.filterBySubject(tn2))); + assertEquals(10, size(store.filterBySubject(tn3))); + } + + @Test + public void testTargetViolationState() { + TableName tn1 = TableName.valueOf("violation1"); + TableName tn2 = TableName.valueOf("observance1"); + TableName tn3 = TableName.valueOf("observance2"); + SpaceQuota quota = SpaceQuota.newBuilder() + .setSoftLimit(1024L * 1024L) + .setViolationPolicy(ProtobufUtil.toProtoViolationPolicy(SpaceViolationPolicy.DISABLE)) + .build(); + + // Create some junk data to filter. Makes sure it's so large that it would + // immediately violate the quota. + for (int i = 0; i < 3; i++) { + regionReports.put(new HRegionInfo(tn2, Bytes.toBytes(i), Bytes.toBytes(i + 1)), + 5L * ONE_MEGABYTE); + regionReports.put(new HRegionInfo(tn3, Bytes.toBytes(i), Bytes.toBytes(i + 1)), + 5L * ONE_MEGABYTE); + } + + regionReports.put(new HRegionInfo(tn1, Bytes.toBytes(0), Bytes.toBytes(1)), 1024L * 512L); + regionReports.put(new HRegionInfo(tn1, Bytes.toBytes(1), Bytes.toBytes(2)), 1024L * 256L); + + // Below the quota + assertEquals(ViolationState.IN_OBSERVANCE, store.getTargetState(tn1, quota)); + + regionReports.put(new HRegionInfo(tn1, Bytes.toBytes(2), Bytes.toBytes(3)), 1024L * 256L); + + // Equal to the quota is still in observance + assertEquals(ViolationState.IN_OBSERVANCE, store.getTargetState(tn1, quota)); + + regionReports.put(new HRegionInfo(tn1, Bytes.toBytes(3), Bytes.toBytes(4)), 1024L); + + // Exceeds the quota, should be in violation + assertEquals(ViolationState.IN_VIOLATION, store.getTargetState(tn1, quota)); + } + + @Test + public void testGetSpaceQuota() throws Exception { + TableQuotaViolationStore mockStore = mock(TableQuotaViolationStore.class); + when(mockStore.getSpaceQuota(any(TableName.class))).thenCallRealMethod(); + + Quotas quotaWithSpace = Quotas.newBuilder().setSpace( + SpaceQuota.newBuilder() + .setSoftLimit(1024L) + .setViolationPolicy(QuotaProtos.SpaceViolationPolicy.DISABLE) + .build()) + .build(); + Quotas quotaWithoutSpace = Quotas.newBuilder().build(); + + AtomicReference quotaRef = new AtomicReference<>(); + when(mockStore.getQuotaForTable(any(TableName.class))).then(new Answer() { + @Override + public Quotas answer(InvocationOnMock invocation) throws Throwable { + return quotaRef.get(); + } + }); + + quotaRef.set(quotaWithSpace); + assertEquals(quotaWithSpace.getSpace(), mockStore.getSpaceQuota(TableName.valueOf("foo"))); + quotaRef.set(quotaWithoutSpace); + assertNull(mockStore.getSpaceQuota(TableName.valueOf("foo"))); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTablesWithQuotas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTablesWithQuotas.java new file mode 100644 index 0000000..bb8d5cd --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTablesWithQuotas.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.quotas.QuotaObserverChore.TablesWithQuotas; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.Multimap; + +/** + * Non-HBase cluster unit tests for {@link TablesWithQuotas}. + */ +@Category(SmallTests.class) +public class TestTablesWithQuotas { + private Connection conn; + private Configuration conf; + + @Before + public void setup() throws Exception { + conn = mock(Connection.class); + conf = HBaseConfiguration.create(); + } + + @Test + public void testImmutableGetters() { + Set tablesWithTableQuotas = new HashSet<>(); + Set tablesWithNamespaceQuotas = new HashSet<>(); + final TablesWithQuotas tables = new TablesWithQuotas(conn, conf); + for (int i = 0; i < 5; i++) { + TableName tn = TableName.valueOf("tn" + i); + tablesWithTableQuotas.add(tn); + tables.addTableQuotaTable(tn); + } + for (int i = 0; i < 3; i++) { + TableName tn = TableName.valueOf("tn_ns" + i); + tablesWithNamespaceQuotas.add(tn); + tables.addNamespaceQuotaTable(tn); + } + Set actualTableQuotaTables = tables.getTableQuotaTables(); + Set actualNamespaceQuotaTables = tables.getNamespaceQuotaTables(); + assertEquals(tablesWithTableQuotas, actualTableQuotaTables); + assertEquals(tablesWithNamespaceQuotas, actualNamespaceQuotaTables); + try { + actualTableQuotaTables.add(null); + fail("Should not be able to add an element"); + } catch (UnsupportedOperationException e) { + // pass + } + try { + actualNamespaceQuotaTables.add(null); + fail("Should not be able to add an element"); + } catch (UnsupportedOperationException e) { + // pass + } + } + + @Test + public void testInsufficientlyReportedTableFiltering() throws Exception { + final Map reportedRegions = new HashMap<>(); + final Map actualRegions = new HashMap<>(); + final Configuration conf = HBaseConfiguration.create(); + conf.setDouble(QuotaObserverChore.VIOLATION_OBSERVER_CHORE_REPORT_PERCENT_KEY, 0.95); + + TableName tooFewRegionsTable = TableName.valueOf("tn1"); + TableName sufficientRegionsTable = TableName.valueOf("tn2"); + TableName tooFewRegionsNamespaceTable = TableName.valueOf("ns1", "tn2"); + TableName sufficientRegionsNamespaceTable = TableName.valueOf("ns1", "tn2"); + final TablesWithQuotas tables = new TablesWithQuotas(conn, conf) { + @Override + Configuration getConfiguration() { + return conf; + } + + @Override + int getNumRegions(TableName tableName) { + return actualRegions.get(tableName); + } + + @Override + int getNumReportedRegions(TableName table, QuotaViolationStore tableStore) { + return reportedRegions.get(table); + } + }; + tables.addTableQuotaTable(tooFewRegionsTable); + tables.addTableQuotaTable(sufficientRegionsTable); + tables.addNamespaceQuotaTable(tooFewRegionsNamespaceTable); + tables.addNamespaceQuotaTable(sufficientRegionsNamespaceTable); + + reportedRegions.put(tooFewRegionsTable, 5); + actualRegions.put(tooFewRegionsTable, 10); + reportedRegions.put(sufficientRegionsTable, 19); + actualRegions.put(sufficientRegionsTable, 20); + reportedRegions.put(tooFewRegionsNamespaceTable, 9); + actualRegions.put(tooFewRegionsNamespaceTable, 10); + reportedRegions.put(sufficientRegionsNamespaceTable, 98); + actualRegions.put(sufficientRegionsNamespaceTable, 100); + + // Unused argument + tables.filterInsufficientlyReportedTables(null); + Set filteredTablesWithTableQuotas = tables.getTableQuotaTables(); + assertEquals(Collections.singleton(sufficientRegionsTable), filteredTablesWithTableQuotas); + Set filteredTablesWithNamespaceQutoas = tables.getNamespaceQuotaTables(); + assertEquals(Collections.singleton(sufficientRegionsNamespaceTable), filteredTablesWithNamespaceQutoas); + } + + @Test + public void testGetTablesByNamespace() { + final TablesWithQuotas tables = new TablesWithQuotas(conn, conf); + tables.addTableQuotaTable(TableName.valueOf("ignored1")); + tables.addTableQuotaTable(TableName.valueOf("ignored2")); + tables.addNamespaceQuotaTable(TableName.valueOf("ns1", "t1")); + tables.addNamespaceQuotaTable(TableName.valueOf("ns1", "t2")); + tables.addNamespaceQuotaTable(TableName.valueOf("ns1", "t3")); + tables.addNamespaceQuotaTable(TableName.valueOf("ns2", "t1")); + tables.addNamespaceQuotaTable(TableName.valueOf("ns2", "t2")); + + Multimap tablesByNamespace = tables.getTablesByNamespace(); + Collection tablesInNs = tablesByNamespace.get("ns1"); + assertEquals(3, tablesInNs.size()); + assertTrue("Unexpected results for ns1: " + tablesInNs, + tablesInNs.containsAll(Arrays.asList( + TableName.valueOf("ns1", "t1"), + TableName.valueOf("ns1", "t2"), + TableName.valueOf("ns1", "t3")))); + tablesInNs = tablesByNamespace.get("ns2"); + assertEquals(2, tablesInNs.size()); + assertTrue("Unexpected results for ns2: " + tablesInNs, + tablesInNs.containsAll(Arrays.asList( + TableName.valueOf("ns2", "t1"), + TableName.valueOf("ns2", "t2")))); + } + + @Test + public void testFilteringMissingTables() throws Exception { + final TableName missingTable = TableName.valueOf("doesNotExist"); + // Set up Admin to return null (match the implementation) + Admin admin = mock(Admin.class); + when(conn.getAdmin()).thenReturn(admin); + when(admin.getTableRegions(missingTable)).thenReturn(null); + + QuotaObserverChore chore = mock(QuotaObserverChore.class); + Map regionUsage = new HashMap<>(); + TableQuotaViolationStore store = new TableQuotaViolationStore(conn, chore, regionUsage); + + // A super dirty hack to verify that, after getting no regions for our table, + // we bail out and start processing the next element (which there is none). + final TablesWithQuotas tables = new TablesWithQuotas(conn, conf) { + @Override + int getNumReportedRegions(TableName table, QuotaViolationStore tableStore) { + throw new RuntimeException("Should should not reach here"); + } + }; + tables.addTableQuotaTable(missingTable); + + tables.filterInsufficientlyReportedTables(store); + + final Set tablesWithQuotas = tables.getTableQuotaTables(); + assertTrue( + "Expected to find no tables, but found " + tablesWithQuotas, tablesWithQuotas.isEmpty()); + } +} -- 2.10.2