From 0527cc1d09254889d17fa6418286befc8479f2e2 Mon Sep 17 00:00:00 2001 From: Josh Elser Date: Fri, 18 Nov 2016 15:38:19 -0500 Subject: [PATCH] HBASE-16999 Implement master and regionserver synchronization of quota state * Implement the RegionServer reading violation from the quota table * Implement the Master reporting violations to the quota table * RegionServers need to track its enforced policies --- .../apache/hadoop/hbase/quotas/QuotaTableUtil.java | 98 ++++++++++++- .../org/apache/hadoop/hbase/master/HMaster.java | 35 ++++- .../hadoop/hbase/quotas/QuotaObserverChore.java | 5 +- ...nager.java => RegionServerRpcQuotaManager.java} | 8 +- .../quotas/RegionServerSpaceQuotaManager.java | 160 +++++++++++++++++++++ .../hbase/quotas/SpaceQuotaViolationNotifier.java | 16 ++- .../quotas/SpaceQuotaViolationNotifierFactory.java | 62 ++++++++ .../quotas/SpaceQuotaViolationNotifierForTest.java | 4 + .../SpaceQuotaViolationPolicyRefresherChore.java | 150 +++++++++++++++++++ .../quotas/TableSpaceQuotaViolationNotifier.java | 55 +++++++ .../hadoop/hbase/regionserver/HRegionServer.java | 20 ++- .../hadoop/hbase/regionserver/RSRpcServices.java | 7 +- .../hbase/regionserver/RegionServerServices.java | 12 +- .../hadoop/hbase/MockRegionServerServices.java | 10 +- .../hadoop/hbase/master/MockRegionServer.java | 10 +- .../TestQuotaObserverChoreWithMiniCluster.java | 2 + .../hadoop/hbase/quotas/TestQuotaTableUtil.java | 49 +++++++ .../hadoop/hbase/quotas/TestQuotaThrottle.java | 4 +- .../quotas/TestRegionServerSpaceQuotaManager.java | 127 ++++++++++++++++ ...estSpaceQuotaViolationPolicyRefresherChore.java | 131 +++++++++++++++++ .../TestTableSpaceQuotaViolationNotifier.java | 144 +++++++++++++++++++ 21 files changed, 1079 insertions(+), 30 deletions(-) rename hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/{RegionServerQuotaManager.java => RegionServerRpcQuotaManager.java} (96%) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierFactory.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationPolicyRefresherChore.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaViolationNotifier.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRegionServerSpaceQuotaManager.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotaViolationPolicyRefresherChore.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTableSpaceQuotaViolationNotifier.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java index 1640ddc..6d4a683 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java @@ -23,16 +23,20 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; @@ -43,7 +47,12 @@ import org.apache.hadoop.hbase.filter.QualifierFilter; import org.apache.hadoop.hbase.filter.RegexStringComparator; import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hbase.protobuf.ProtobufMagic; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas; +import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Strings; @@ -52,9 +61,8 @@ import org.apache.hadoop.hbase.util.Strings; *
  *     ROW-KEY      FAM/QUAL        DATA
  *   n.<namespace> q:s         <global-quotas>
- *   n.<namespace> u:du        <size in bytes>
  *   t.<table>     q:s         <global-quotas>
- *   t.<table>     u:du        <size in bytes>
+ *   t.<table>     u:v        <space violation policy>
  *   u.<user>      q:s         <global-quotas>
  *   u.<user>      q:s.<table> <table-quotas>
  *   u.<user>      q:s.<ns>:   <namespace-quotas>
@@ -73,7 +81,7 @@ public class QuotaTableUtil {
   protected static final byte[] QUOTA_FAMILY_USAGE = Bytes.toBytes("u");
   protected static final byte[] QUOTA_QUALIFIER_SETTINGS = Bytes.toBytes("s");
   protected static final byte[] QUOTA_QUALIFIER_SETTINGS_PREFIX = Bytes.toBytes("s.");
-  protected static final byte[] QUOTA_QUALIFIER_DISKUSAGE = Bytes.toBytes("du");
+  protected static final byte[] QUOTA_QUALIFIER_VIOLATION = Bytes.toBytes("v");
   protected static final byte[] QUOTA_USER_ROW_KEY_PREFIX = Bytes.toBytes("u.");
   protected static final byte[] QUOTA_TABLE_ROW_KEY_PREFIX = Bytes.toBytes("t.");
   protected static final byte[] QUOTA_NAMESPACE_ROW_KEY_PREFIX = Bytes.toBytes("n.");
@@ -202,6 +210,49 @@ public class QuotaTableUtil {
     return filterList;
   }
 
+  /**
+   * Creates a {@link Scan} which returns only quota violations from the quota table.
+   */
+  public static Scan makeQuotaViolationScan() {
+    Scan s = new Scan();
+    // Limit to "u:v" column
+    s.addColumn(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_VIOLATION);
+    // Limit rowspace to the "t:" prefix
+    s.setRowPrefixFilter(QUOTA_TABLE_ROW_KEY_PREFIX);
+    return s;
+  }
+
+  /**
+   * Extracts the {@link SpaceViolationPolicy} and {@link TableName} from the provided
+   * {@link Result} and adds them to the given {@link Map}. If the result does not contain
+   * the expected information or the serialized policy in the value is invalid, this method
+   * will throw an {@link IllegalArgumentException}.
+   *
+   * @param result A row from the quota table.
+   * @param policies A map of policies to add the result of this method into.
+   */
+  public static void extractViolationPolicy(
+      Result result, Map policies) {
+    byte[] row = Objects.requireNonNull(result).getRow();
+    if (null == row) {
+      throw new IllegalArgumentException("Provided result had a null row");
+    }
+    final TableName targetTableName = getTableFromRowKey(row);
+    Cell c = result.getColumnLatestCell(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_VIOLATION);
+    if (null == c) {
+      throw new IllegalArgumentException("Result did not contain the expected column.");
+    }
+    ByteString buffer = UnsafeByteOperations.unsafeWrap(
+        c.getValueArray(), c.getValueOffset(), c.getValueLength());
+    try {
+      SpaceQuota quota = SpaceQuota.parseFrom(buffer);
+      policies.put(targetTableName, getViolationPolicy(quota));
+    } catch (InvalidProtocolBufferException e) {
+      throw new IllegalArgumentException(
+          "Result did not contain a valid SpaceQuota protocol buffer message", e);
+    }
+  }
+
   public static interface UserQuotasVisitor {
     void visitUserQuotas(final String userName, final Quotas quotas)
       throws IOException;
@@ -297,6 +348,34 @@ public class QuotaTableUtil {
     }
   }
 
+  /**
+   * Creates a {@link Scan} which only returns violation policy records in the quota table.
+   */
+  public static Scan getScanForViolations() {
+    Scan s = new Scan();
+    s.addColumn(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_VIOLATION);
+    return s;
+  }
+
+  /**
+   * Creates a {@link Put} to enable the given policy on the table.
+   */
+  public static Put enableViolationPolicy(TableName tableName, SpaceViolationPolicy policy) {
+    Put p = new Put(getTableRowKey(tableName));
+    SpaceQuota quota = getProtoViolationPolicy(policy);
+    p.addColumn(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_VIOLATION, quota.toByteArray());
+    return p;
+  }
+
+  /**
+   * Creates a {@link Delete} to remove a policy on the given table.
+   */
+  public static Delete removeViolationPolicy(TableName tableName) {
+    Delete d = new Delete(getTableRowKey(tableName));
+    d.addColumn(QUOTA_FAMILY_USAGE, QUOTA_QUALIFIER_VIOLATION);
+    return d;
+  }
+
   /* =========================================================================
    *  Quotas protobuf helpers
    */
@@ -418,4 +497,17 @@ public class QuotaTableUtil {
   protected static String getUserFromRowKey(final byte[] key) {
     return Bytes.toString(key, QUOTA_USER_ROW_KEY_PREFIX.length);
   }
+
+  protected static SpaceQuota getProtoViolationPolicy(SpaceViolationPolicy policy) {
+    return SpaceQuota.newBuilder()
+          .setViolationPolicy(ProtobufUtil.toProtoViolationPolicy(policy))
+          .build();
+  }
+
+  protected static SpaceViolationPolicy getViolationPolicy(SpaceQuota proto) {
+    if (!proto.hasViolationPolicy()) {
+      throw new IllegalArgumentException("Protobuf SpaceQuota does not have violation policy.");
+    }
+    return ProtobufUtil.toViolationPolicy(proto.getViolationPolicy());
+  }
 }
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 616c22d..411c33d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -132,8 +132,9 @@ import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
 import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
 import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
 import org.apache.hadoop.hbase.quotas.QuotaObserverChore;
+import org.apache.hadoop.hbase.quotas.QuotaUtil;
 import org.apache.hadoop.hbase.quotas.SpaceQuotaViolationNotifier;
-import org.apache.hadoop.hbase.quotas.SpaceQuotaViolationNotifierForTest;
+import org.apache.hadoop.hbase.quotas.SpaceQuotaViolationNotifierFactory;
 import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.HStore;
@@ -149,11 +150,14 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
 import org.apache.hadoop.hbase.replication.master.TableCFsUpdater;
 import org.apache.hadoop.hbase.replication.regionserver.Replication;
+import org.apache.hadoop.hbase.security.AccessDeniedException;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceViolationPolicy;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
 import org.apache.hadoop.hbase.util.Addressing;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -861,7 +865,7 @@ public class HMaster extends HRegionServer implements MasterServices {
 
     status.setStatus("Starting quota manager");
     initQuotaManager();
-    this.spaceQuotaViolationNotifier = new SpaceQuotaViolationNotifierForTest();
+    this.spaceQuotaViolationNotifier = createQuotaViolationNotifier();
     this.quotaObserverChore = new QuotaObserverChore(this);
     // Start the chore to read the region FS space reports and act on them
     getChoreService().scheduleChore(quotaObserverChore);
@@ -952,6 +956,13 @@ public class HMaster extends HRegionServer implements MasterServices {
     this.quotaManager = quotaManager;
   }
 
+  SpaceQuotaViolationNotifier createQuotaViolationNotifier() {
+    SpaceQuotaViolationNotifier notifier =
+        SpaceQuotaViolationNotifierFactory.getInstance().create(getConfiguration());
+    notifier.initialize(getClusterConnection());
+    return notifier;
+  }
+
   boolean isCatalogJanitorEnabled() {
     return catalogJanitorChore != null ?
       catalogJanitorChore.getEnabled() : false;
@@ -2129,6 +2140,26 @@ public class HMaster extends HRegionServer implements MasterServices {
       protected void run() throws IOException {
         getMaster().getMasterCoprocessorHost().preEnableTable(tableName);
 
+        // Normally, it would make sense for this authorization check to exist inside
+        // AccessController, but because the authorization check is done based on internal state
+        // (rather than explicit permissions) we'll do the check here instead of in the
+        // coprocessor.
+        MasterQuotaManager quotaManager = getMasterQuotaManager();
+        if (null != quotaManager) {
+          if (quotaManager.isQuotaEnabled()) {
+            Quotas quotaForTable = QuotaUtil.getTableQuota(getConnection(), tableName);
+            if (null != quotaForTable && quotaForTable.hasSpace()) {
+              SpaceViolationPolicy policy = quotaForTable.getSpace().getViolationPolicy();
+              if (SpaceViolationPolicy.DISABLE == policy) {
+                throw new AccessDeniedException("Enabling the table '" + tableName
+                    + "' is disallowed due to a violated space quota.");
+              }
+            }
+          } else if (LOG.isTraceEnabled()) {
+            LOG.trace("Unable to check for space quotas as the MasterQuotaManager is not enabled");
+          }
+        }
+
         LOG.info(getClientIdAuditPrefix() + " enable " + tableName);
 
         // Execute the operation asynchronously - client will check the progress of the operation
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java
index 88a6149..8b127d9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaObserverChore.java
@@ -352,14 +352,15 @@ public class QuotaObserverChore extends ScheduledChore {
   /**
    * Transitions the given table to violation of its quota, enabling the violation policy.
    */
-  private void transitionTableToViolation(TableName table, SpaceViolationPolicy violationPolicy) {
+  private void transitionTableToViolation(TableName table, SpaceViolationPolicy violationPolicy)
+      throws IOException {
     this.violationNotifier.transitionTableToViolation(table, violationPolicy);
   }
 
   /**
    * Transitions the given table to observance of its quota, disabling the violation policy.
    */
-  private void transitionTableToObservance(TableName table) {
+  private void transitionTableToObservance(TableName table) throws IOException {
     this.violationNotifier.transitionTableToObservance(table);
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
similarity index 96%
rename from hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerQuotaManager.java
rename to hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
index 4961e06..756251a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerQuotaManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
@@ -46,14 +46,14 @@ import com.google.common.annotations.VisibleForTesting;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public class RegionServerQuotaManager {
-  private static final Log LOG = LogFactory.getLog(RegionServerQuotaManager.class);
+public class RegionServerRpcQuotaManager {
+  private static final Log LOG = LogFactory.getLog(RegionServerRpcQuotaManager.class);
 
   private final RegionServerServices rsServices;
 
   private QuotaCache quotaCache = null;
 
-  public RegionServerQuotaManager(final RegionServerServices rsServices) {
+  public RegionServerRpcQuotaManager(final RegionServerServices rsServices) {
     this.rsServices = rsServices;
   }
 
@@ -63,7 +63,7 @@ public class RegionServerQuotaManager {
       return;
     }
 
-    LOG.info("Initializing quota support");
+    LOG.info("Initializing RPC quota support");
 
     // Initialize quota cache
     quotaCache = new QuotaCache(rsServices);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java
new file mode 100644
index 0000000..4d84d48
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerSpaceQuotaManager.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * A manager for filesystem space quotas in the RegionServer.
+ *
+ * This class is responsible for reading quota violation policies from the quota
+ * table and then enacting them on the given table.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class RegionServerSpaceQuotaManager {
+  private static final Log LOG = LogFactory.getLog(RegionServerSpaceQuotaManager.class);
+
+  private final RegionServerServices rsServices;
+
+  private SpaceQuotaViolationPolicyRefresherChore spaceQuotaRefresher;
+  private Map enforcedPolicies;
+
+  public RegionServerSpaceQuotaManager(RegionServerServices rsServices) {
+    this.rsServices = Objects.requireNonNull(rsServices);
+  }
+
+  public synchronized void start() throws IOException {
+    if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) {
+      LOG.info("Quota support disabled, not starting space quota manager.");
+      return;
+    }
+
+    spaceQuotaRefresher = new SpaceQuotaViolationPolicyRefresherChore(this);
+    enforcedPolicies = new HashMap<>();
+  }
+
+  public synchronized void stop() {
+    if (null != spaceQuotaRefresher) {
+      spaceQuotaRefresher.cancel();
+      spaceQuotaRefresher = null;
+    }
+  }
+
+  Connection getConnection() {
+    return rsServices.getConnection();
+  }
+
+  /**
+   * Returns the collection of tables which have quota violation policies enforced on
+   * this RegionServer.
+   */
+  public synchronized Map getActiveViolationPolicyEnforcements()
+      throws IOException {
+    return new HashMap<>(this.enforcedPolicies);
+  }
+
+  /**
+   * Wrapper around {@link QuotaTableUtil#extractViolationPolicy(Result, Map)} for testing.
+   */
+  void extractViolationPolicy(Result result, Map activePolicies) {
+    QuotaTableUtil.extractViolationPolicy(result, activePolicies);
+  }
+
+  /**
+   * Reads all quota violation policies which are to be enforced from the quota table.
+   *
+   * @return The collection of tables which are in violation of their quota and the policy which
+   *    should be enforced.
+   */
+  public Map getViolationPoliciesToEnforce() throws IOException {
+    try (Table quotaTable = getConnection().getTable(QuotaUtil.QUOTA_TABLE_NAME);
+        ResultScanner scanner = quotaTable.getScanner(QuotaTableUtil.makeQuotaViolationScan())) {
+      Map activePolicies = new HashMap<>();
+      for (Result result : scanner) {
+        try {
+          extractViolationPolicy(result, activePolicies);
+        } catch (IllegalArgumentException e) {
+          final String msg = "Failed to parse result for row " + Bytes.toString(result.getRow());
+          LOG.error(msg, e);
+          throw new IOException(msg, e);
+        }
+      }
+      return activePolicies;
+    }
+  }
+
+  /**
+   * Enforces the given violationPolicy on the given table in this RegionServer.
+   */
+  synchronized void enforceViolationPolicy(
+      TableName tableName, SpaceViolationPolicy violationPolicy) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(
+          "Enabling violation policy enforcement on " + tableName
+          + " with policy " + violationPolicy);
+    }
+    // Enact the policy
+    enforceOnRegionServer(tableName, violationPolicy);
+    // Publicize our enacting of the policy
+    enforcedPolicies.put(tableName, violationPolicy);
+  }
+
+  /**
+   * Enacts the given violation policy on this table in the RegionServer.
+   */
+  void enforceOnRegionServer(TableName tableName, SpaceViolationPolicy violationPolicy) {
+    throw new UnsupportedOperationException("TODO");
+  }
+
+  /**
+   * Disables enforcement on any violation policy on the given tableName.
+   */
+  synchronized void disableViolationPolicyEnforcement(TableName tableName) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Disabling violation policy enforcement on " + tableName);
+    }
+    disableOnRegionServer(tableName);
+    enforcedPolicies.remove(tableName);
+  }
+
+  /**
+   * Disables any violation policy on this table in the RegionServer.
+   */
+  void disableOnRegionServer(TableName tableName) {
+    throw new UnsupportedOperationException("TODO");
+  }
+
+  RegionServerServices getRegionServerServices() {
+    return rsServices;
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifier.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifier.java
index bccf519..261dea7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifier.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifier.java
@@ -16,29 +16,39 @@
  */
 package org.apache.hadoop.hbase.quotas;
 
+import java.io.IOException;
+
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
 
 /**
  * An interface which abstract away the action taken to enable or disable
- * a space quota violation policy across the HBase cluster.
+ * a space quota violation policy across the HBase cluster. Implementations
+ * must have a no-args constructor.
  */
 @InterfaceAudience.Private
 public interface SpaceQuotaViolationNotifier {
 
   /**
+   * Initializes the notifier.
+   */
+  void initialize(Connection conn);
+
+  /**
    * Instructs the cluster that the given table is in violation of a space quota. The
    * provided violation policy is the action which should be taken on the table.
    *
    * @param tableName The name of the table in violation of the quota.
    * @param violationPolicy The policy which should be enacted on the table.
    */
-  void transitionTableToViolation(TableName tableName, SpaceViolationPolicy violationPolicy);
+  void transitionTableToViolation(
+      TableName tableName, SpaceViolationPolicy violationPolicy) throws IOException;
 
   /**
    * Instructs the cluster that the given table is in observance of any applicable space quota.
    *
    * @param tableName The name of the table in observance.
    */
-  void transitionTableToObservance(TableName tableName);
+  void transitionTableToObservance(TableName tableName) throws IOException;
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierFactory.java
new file mode 100644
index 0000000..43f5513
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.util.Objects;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Factory for creating {@link SpaceQuotaViolationNotifier} implementations. Implementations
+ * must have a no-args constructor.
+ */
+@InterfaceAudience.Private
+public class SpaceQuotaViolationNotifierFactory {
+  private static final SpaceQuotaViolationNotifierFactory INSTANCE =
+      new SpaceQuotaViolationNotifierFactory();
+
+  public static final String VIOLATION_NOTIFIER_KEY = "hbase.master.quota.violation.notifier.impl";
+  public static final Class VIOLATION_NOTIFIER_DEFAULT =
+      SpaceQuotaViolationNotifierForTest.class;
+
+  // Private
+  private SpaceQuotaViolationNotifierFactory() {}
+
+  public static SpaceQuotaViolationNotifierFactory getInstance() {
+    return INSTANCE;
+  }
+
+  /**
+   * Instantiates the {@link SpaceQuotaViolationNotifier} implementation as defined in the
+   * configuration provided.
+   *
+   * @param conf Configuration object
+   * @return The SpaceQuotaViolationNotifier implementation
+   * @throws IllegalArgumentException if the class could not be instantiated
+   */
+  public SpaceQuotaViolationNotifier create(Configuration conf) {
+    Class clz = Objects.requireNonNull(conf)
+        .getClass(VIOLATION_NOTIFIER_KEY, VIOLATION_NOTIFIER_DEFAULT,
+            SpaceQuotaViolationNotifier.class);
+    try {
+      return clz.newInstance();
+    } catch (InstantiationException | IllegalAccessException e) {
+      throw new IllegalArgumentException("Failed to instantiate the implementation", e);
+    }
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierForTest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierForTest.java
index 4ab9834..65dc979 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierForTest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationNotifierForTest.java
@@ -21,6 +21,7 @@ import java.util.Map;
 
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
 
 /**
  * A SpaceQuotaViolationNotifier implementation for verifying testing.
@@ -31,6 +32,9 @@ public class SpaceQuotaViolationNotifierForTest implements SpaceQuotaViolationNo
   private final Map tablesInViolation = new HashMap<>();
 
   @Override
+  public void initialize(Connection conn) {}
+
+  @Override
   public void transitionTableToViolation(TableName tableName, SpaceViolationPolicy violationPolicy) {
     tablesInViolation.put(tableName, violationPolicy);
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationPolicyRefresherChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationPolicyRefresherChore.java
new file mode 100644
index 0000000..8ed4e42
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/SpaceQuotaViolationPolicyRefresherChore.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.TableName;
+
+/**
+ * A {@link ScheduledChore} which periodically updates a local copy of tables which have
+ * space quota violation policies enacted on them.
+ */
+public class SpaceQuotaViolationPolicyRefresherChore extends ScheduledChore {
+  private static final Log LOG = LogFactory.getLog(SpaceQuotaViolationPolicyRefresherChore.class);
+
+  static final String POLICY_REFRESHER_CHORE_PERIOD_KEY =
+      "hbase.regionserver.quotas.policy.refresher.chore.period";
+  static final int POLICY_REFRESHER_CHORE_PERIOD_DEFAULT = 1000 * 60 * 5; // 5 minutes in millis
+
+  static final String POLICY_REFRESHER_CHORE_DELAY_KEY =
+      "hbase.regionserver.quotas.policy.refresher.chore.delay";
+  static final long POLICY_REFRESHER_CHORE_DELAY_DEFAULT = 1000L * 60L; // 1 minute
+
+  static final String POLICY_REFRESHER_CHORE_TIMEUNIT_KEY =
+      "hbase.regionserver.quotas.policy.refresher.chore.timeunit";
+  static final String POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name();
+
+  static final String POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY =
+      "hbase.regionserver.quotas.policy.refresher.report.percent";
+  static final double POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT= 0.95;
+
+  private final RegionServerSpaceQuotaManager manager;
+
+  public SpaceQuotaViolationPolicyRefresherChore(RegionServerSpaceQuotaManager manager) {
+    super(SpaceQuotaViolationPolicyRefresherChore.class.getSimpleName(),
+        manager.getRegionServerServices(),
+        getPeriod(manager.getRegionServerServices().getConfiguration()),
+        getInitialDelay(manager.getRegionServerServices().getConfiguration()),
+        getTimeUnit(manager.getRegionServerServices().getConfiguration()));
+    this.manager = manager;
+  }
+
+  @Override
+  protected void chore() {
+    try {
+      // Tables with a policy currently enforced
+      final Map activeViolationPolicies =
+          manager.getActiveViolationPolicyEnforcements();
+      // Tables with policies that should be enforced
+      final Map violationPolicies =
+          manager.getViolationPoliciesToEnforce();
+      // Ensure each policy which should be enacted is enacted.
+      for (Entry entry : violationPolicies.entrySet()) {
+        final TableName tableName = entry.getKey();
+        final SpaceViolationPolicy policyToEnforce = entry.getValue();
+        final SpaceViolationPolicy currentPolicy = activeViolationPolicies.get(tableName);
+        if (currentPolicy != policyToEnforce) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Enabling " + policyToEnforce + " on " + tableName);
+          }
+          manager.enforceViolationPolicy(tableName, policyToEnforce);
+        }
+      }
+      // Remove policies which should no longer be enforced
+      Iterator iter = activeViolationPolicies.keySet().iterator();
+      while (iter.hasNext()) {
+        final TableName localTableWithPolicy = iter.next();
+        if (!violationPolicies.containsKey(localTableWithPolicy)) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Removing quota violation policy on " + localTableWithPolicy);
+          }
+          manager.disableViolationPolicyEnforcement(localTableWithPolicy);
+          iter.remove();
+        }
+      }
+    } catch (IOException e) {
+      LOG.warn(
+          "Caught exception while refreshing enforced quota violation policies, will retry.", e);
+    }
+  }
+
+  /**
+   * Extracts the period for the chore from the configuration.
+   *
+   * @param conf The configuration object.
+   * @return The configured chore period or the default value.
+   */
+  static int getPeriod(Configuration conf) {
+    return conf.getInt(POLICY_REFRESHER_CHORE_PERIOD_KEY,
+        POLICY_REFRESHER_CHORE_PERIOD_DEFAULT);
+  }
+
+  /**
+   * Extracts the initial delay for the chore from the configuration.
+   *
+   * @param conf The configuration object.
+   * @return The configured chore initial delay or the default value.
+   */
+  static long getInitialDelay(Configuration conf) {
+    return conf.getLong(POLICY_REFRESHER_CHORE_DELAY_KEY,
+        POLICY_REFRESHER_CHORE_DELAY_DEFAULT);
+  }
+
+  /**
+   * Extracts the time unit for the chore period and initial delay from the configuration. The
+   * configuration value for {@link #POLICY_REFRESHER_CHORE_TIMEUNIT_KEY} must correspond to
+   * a {@link TimeUnit} value.
+   *
+   * @param conf The configuration object.
+   * @return The configured time unit for the chore period and initial delay or the default value.
+   */
+  static TimeUnit getTimeUnit(Configuration conf) {
+    return TimeUnit.valueOf(conf.get(POLICY_REFRESHER_CHORE_TIMEUNIT_KEY,
+        POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT));
+  }
+
+  /**
+   * Extracts the percent of Regions for a table to have been reported to enable quota violation
+   * state change.
+   *
+   * @param conf The configuration object.
+   * @return The percent of regions reported to use.
+   */
+  static Double getRegionReportPercent(Configuration conf) {
+    return conf.getDouble(POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY,
+        POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT);
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaViolationNotifier.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaViolationNotifier.java
new file mode 100644
index 0000000..4d1a8c3
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TableSpaceQuotaViolationNotifier.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+
+/**
+ * A {@link SpaceQuotaViolationNotifier} which uses the hbase:quota table.
+ */
+public class TableSpaceQuotaViolationNotifier implements SpaceQuotaViolationNotifier {
+
+  private Connection conn;
+
+  @Override
+  public void transitionTableToViolation(
+      TableName tableName, SpaceViolationPolicy violationPolicy) throws IOException {
+    final Put p = QuotaTableUtil.enableViolationPolicy(tableName, violationPolicy);
+    try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
+      quotaTable.put(p);
+    }
+  }
+
+  @Override
+  public void transitionTableToObservance(TableName tableName) throws IOException {
+    final Delete d = QuotaTableUtil.removeViolationPolicy(tableName);
+    try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
+      quotaTable.delete(d);
+    }
+  }
+
+  @Override
+  public void initialize(Connection conn) {
+    this.conn = conn;
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index d04243c..a633380 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -120,7 +120,8 @@ import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
 import org.apache.hadoop.hbase.mob.MobCacheConfig;
 import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
 import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore;
-import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
@@ -475,7 +476,8 @@ public class HRegionServer extends HasThread implements
 
   private RegionServerProcedureManagerHost rspmHost;
 
-  private RegionServerQuotaManager rsQuotaManager;
+  private RegionServerRpcQuotaManager rsQuotaManager;
+  private RegionServerSpaceQuotaManager rsSpaceQuotaManager;
 
   /**
    * Nonce manager. Nonces are used to make operations like increment and append idempotent
@@ -924,7 +926,8 @@ public class HRegionServer extends HasThread implements
     }
 
     // Setup the Quota Manager
-    rsQuotaManager = new RegionServerQuotaManager(this);
+    rsQuotaManager = new RegionServerRpcQuotaManager(this);
+    rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this);
 
     this.fsUtilizationChore = new FileSystemUtilizationChore(this);
 
@@ -998,6 +1001,7 @@ public class HRegionServer extends HasThread implements
 
         // Start the Quota Manager
         rsQuotaManager.start(getRpcServer().getScheduler());
+        rsSpaceQuotaManager.start();
       }
 
       // We registered with the Master.  Go into run mode.
@@ -1089,6 +1093,9 @@ public class HRegionServer extends HasThread implements
     if (rsQuotaManager != null) {
       rsQuotaManager.stop();
     }
+    if (rsSpaceQuotaManager != null) {
+      rsSpaceQuotaManager.stop();
+    }
 
     // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
     if (rspmHost != null) {
@@ -2853,7 +2860,7 @@ public class HRegionServer extends HasThread implements
   }
 
   @Override
-  public RegionServerQuotaManager getRegionServerQuotaManager() {
+  public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() {
     return rsQuotaManager;
   }
 
@@ -3711,4 +3718,9 @@ public class HRegionServer extends HasThread implements
     return new LockServiceClient(conf, lockStub, clusterConnection.getNonceGenerator())
       .regionLock(regionInfos, description, abort);
   }
+
+  @Override
+  public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() {
+    return this.rsSpaceQuotaManager;
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index a072dce..0b80e11 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -98,7 +98,7 @@ import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.hadoop.hbase.master.MasterRpcServices;
 import org.apache.hadoop.hbase.quotas.OperationQuota;
-import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
 import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
 import org.apache.hadoop.hbase.regionserver.Leases.Lease;
 import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
@@ -196,6 +196,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescr
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
+import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.DNS;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -1259,8 +1260,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     return regionServer.getConfiguration();
   }
 
-  private RegionServerQuotaManager getQuotaManager() {
-    return regionServer.getRegionServerQuotaManager();
+  private RegionServerRpcQuotaManager getQuotaManager() {
+    return regionServer.getRegionServerRpcQuotaManager();
   }
 
   void start() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
index c92124c..8eab22f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
@@ -35,7 +35,8 @@ import org.apache.hadoop.hbase.client.locking.EntityLock;
 import org.apache.hadoop.hbase.executor.ExecutorService;
 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
-import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.zookeeper.KeeperException;
@@ -78,9 +79,9 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi
   RegionServerAccounting getRegionServerAccounting();
 
   /**
-   * @return RegionServer's instance of {@link RegionServerQuotaManager}
+   * @return RegionServer's instance of {@link RegionServerRpcQuotaManager}
    */
-  RegionServerQuotaManager getRegionServerQuotaManager();
+  RegionServerRpcQuotaManager getRegionServerRpcQuotaManager();
 
   /**
    * @return RegionServer's instance of {@link SecureBulkLoadManager}
@@ -88,6 +89,11 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi
   SecureBulkLoadManager getSecureBulkLoadManager();
 
   /**
+   * @return RegionServer's instance of {@link RegionServerSpaceQuotaManager}
+   */
+  RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager();
+
+  /**
    * Context for postOpenDeployTasks().
    */
   class PostOpenDeployContext {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
index 5e2a70f..8d20466 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
@@ -38,7 +38,8 @@ import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
-import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
 import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
 import org.apache.hadoop.hbase.regionserver.FlushRequester;
 import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
@@ -189,7 +190,7 @@ public class MockRegionServerServices implements RegionServerServices {
   }
 
   @Override
-  public RegionServerQuotaManager getRegionServerQuotaManager() {
+  public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() {
     return null;
   }
 
@@ -356,4 +357,9 @@ public class MockRegionServerServices implements RegionServerServices {
   public SecureBulkLoadManager getSecureBulkLoadManager() {
     return null;
   }
+
+  @Override
+  public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() {
+    return null;
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index 467d4a5..94ecf09 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -101,7 +101,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBul
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
-import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
+import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
 import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
 import org.apache.hadoop.hbase.regionserver.FlushRequester;
 import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -334,7 +335,7 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
   }
 
   @Override
-  public RegionServerQuotaManager getRegionServerQuotaManager() {
+  public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() {
     return null;
   }
 
@@ -719,4 +720,9 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
   public SecureBulkLoadManager getSecureBulkLoadManager() {
     return null;
   }
+
+  @Override
+  public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() {
+    return null;
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreWithMiniCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreWithMiniCluster.java
index 98236c2..c493b25 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreWithMiniCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaObserverChoreWithMiniCluster.java
@@ -94,6 +94,8 @@ public class TestQuotaObserverChoreWithMiniCluster {
     conf.setInt(QuotaObserverChore.VIOLATION_OBSERVER_CHORE_DELAY_KEY, 1000);
     conf.setInt(QuotaObserverChore.VIOLATION_OBSERVER_CHORE_PERIOD_KEY, 1000);
     conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
+    conf.setClass(SpaceQuotaViolationNotifierFactory.VIOLATION_NOTIFIER_KEY,
+        SpaceQuotaViolationNotifierForTest.class, SpaceQuotaViolationNotifier.class);
     TEST_UTIL.startMiniCluster(1);
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaTableUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaTableUtil.java
index 5306be9..f897ad4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaTableUtil.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaTableUtil.java
@@ -21,6 +21,10 @@ package org.apache.hadoop.hbase.quotas;
 import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -28,6 +32,10 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Throttle;
@@ -37,8 +45,10 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
 
 /**
  * Test the quota table helpers (e.g. CRUD operations)
@@ -48,6 +58,10 @@ public class TestQuotaTableUtil {
 
   private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
   private Connection connection;
+  private int tableNameCounter;
+
+  @Rule
+  public TestName testName = new TestName();
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
@@ -70,6 +84,7 @@ public class TestQuotaTableUtil {
   @Before
   public void before() throws IOException {
     this.connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
+    this.tableNameCounter = 0;
   }
 
   @After
@@ -179,4 +194,38 @@ public class TestQuotaTableUtil {
     resQuotaNS = QuotaUtil.getUserQuota(this.connection, user, namespace);
     assertEquals(null, resQuotaNS);
   }
+
+  @Test
+  public void testSerDeViolationPolicies() throws Exception {
+    final TableName tn1 = getUniqueTableName();
+    final SpaceViolationPolicy policy1 = SpaceViolationPolicy.DISABLE;
+    final TableName tn2 = getUniqueTableName();
+    final SpaceViolationPolicy policy2 = SpaceViolationPolicy.NO_INSERTS;
+    final TableName tn3 = getUniqueTableName();
+    final SpaceViolationPolicy policy3 = SpaceViolationPolicy.NO_WRITES;
+    List puts = new ArrayList<>();
+    puts.add(QuotaTableUtil.enableViolationPolicy(tn1, policy1));
+    puts.add(QuotaTableUtil.enableViolationPolicy(tn2, policy2));
+    puts.add(QuotaTableUtil.enableViolationPolicy(tn3, policy3));
+    final Map expectedPolicies = new HashMap<>();
+    expectedPolicies.put(tn1, policy1);
+    expectedPolicies.put(tn2, policy2);
+    expectedPolicies.put(tn3, policy3);
+
+    final Map actualPolicies = new HashMap<>();
+    try (Table quotaTable = connection.getTable(QuotaUtil.QUOTA_TABLE_NAME)) {
+      quotaTable.put(puts);
+      ResultScanner scanner = quotaTable.getScanner(QuotaTableUtil.makeQuotaViolationScan());
+      for (Result r : scanner) {
+        QuotaTableUtil.extractViolationPolicy(r, actualPolicies);
+      }
+      scanner.close();
+    }
+
+    assertEquals(expectedPolicies, actualPolicies);
+  }
+
+  private TableName getUniqueTableName() {
+    return TableName.valueOf(testName.getMethodName() + "_" + tableNameCounter++);
+  }
 }
\ No newline at end of file
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java
index 0c06588..ffd6443 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java
@@ -105,7 +105,7 @@ public class TestQuotaThrottle {
   @After
   public void tearDown() throws Exception {
     for (RegionServerThread rst: TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
-      RegionServerQuotaManager quotaManager = rst.getRegionServer().getRegionServerQuotaManager();
+      RegionServerRpcQuotaManager quotaManager = rst.getRegionServer().getRegionServerRpcQuotaManager();
       QuotaCache quotaCache = quotaManager.getQuotaCache();
       quotaCache.getNamespaceQuotaCache().clear();
       quotaCache.getTableQuotaCache().clear();
@@ -557,7 +557,7 @@ public class TestQuotaThrottle {
       boolean nsLimiter, final TableName... tables) throws Exception {
     envEdge.incValue(2 * REFRESH_TIME);
     for (RegionServerThread rst: TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
-      RegionServerQuotaManager quotaManager = rst.getRegionServer().getRegionServerQuotaManager();
+      RegionServerRpcQuotaManager quotaManager = rst.getRegionServer().getRegionServerRpcQuotaManager();
       QuotaCache quotaCache = quotaManager.getQuotaCache();
 
       quotaCache.triggerCacheRefresh();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRegionServerSpaceQuotaManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRegionServerSpaceQuotaManager.java
new file mode 100644
index 0000000..e5ab317
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRegionServerSpaceQuotaManager.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import static org.apache.hadoop.hbase.util.Bytes.toBytes;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Test class for {@link RegionServerSpaceQuotaManager}.
+ */
+@Category(SmallTests.class)
+public class TestRegionServerSpaceQuotaManager {
+
+  private RegionServerSpaceQuotaManager quotaManager;
+  private Connection conn;
+  private Table quotaTable;
+  private ResultScanner scanner;
+
+  @Before
+  @SuppressWarnings("unchecked")
+  public void setup() throws Exception {
+    quotaManager = mock(RegionServerSpaceQuotaManager.class);
+    conn = mock(Connection.class);
+    quotaTable = mock(Table.class);
+    scanner = mock(ResultScanner.class);
+    // Call the real getViolationPoliciesToEnforce()
+    when(quotaManager.getViolationPoliciesToEnforce()).thenCallRealMethod();
+    // Mock out creating a scanner
+    when(quotaManager.getConnection()).thenReturn(conn);
+    when(conn.getTable(QuotaUtil.QUOTA_TABLE_NAME)).thenReturn(quotaTable);
+    when(quotaTable.getScanner(any(Scan.class))).thenReturn(scanner);
+    // Mock out the static method call with some indirection
+    doAnswer(new Answer(){
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        Result result = invocation.getArgumentAt(0, Result.class);
+        Map policies = invocation.getArgumentAt(1, Map.class);
+        QuotaTableUtil.extractViolationPolicy(result, policies);
+        return null;
+      }
+    }).when(quotaManager).extractViolationPolicy(any(Result.class), any(Map.class));
+  }
+
+  @Test
+  public void testMissingAllColumns() {
+    List results = new ArrayList<>();
+    results.add(Result.create(Collections.emptyList()));
+    when(scanner.iterator()).thenReturn(results.iterator());
+    try {
+      quotaManager.getViolationPoliciesToEnforce();
+      fail("Expected an IOException, but did not receive one.");
+    } catch (IOException e) {
+      // Expected an error because we had no cells in the row.
+      // This should only happen due to programmer error.
+    }
+  }
+
+  @Test
+  public void testMissingDesiredColumn() {
+    List results = new ArrayList<>();
+    // Give a column that isn't the one we want
+    Cell c = new KeyValue(toBytes("t:inviolation"), toBytes("q"), toBytes("s"), new byte[0]);
+    results.add(Result.create(Collections.singletonList(c)));
+    when(scanner.iterator()).thenReturn(results.iterator());
+    try {
+      quotaManager.getViolationPoliciesToEnforce();
+      fail("Expected an IOException, but did not receive one.");
+    } catch (IOException e) {
+      // Expected an error because we were missing the column we expected in this row.
+      // This should only happen due to programmer error.
+    }
+  }
+
+  @Test
+  public void testParsingError() {
+    List results = new ArrayList<>();
+    Cell c = new KeyValue(toBytes("t:inviolation"), toBytes("u"), toBytes("v"), new byte[0]);
+    results.add(Result.create(Collections.singletonList(c)));
+    when(scanner.iterator()).thenReturn(results.iterator());
+    try {
+      quotaManager.getViolationPoliciesToEnforce();
+      fail("Expected an IOException, but did not receive one.");
+    } catch (IOException e) {
+      // We provided a garbage serialized protobuf message (empty byte array), this should
+      // in turn throw an IOException
+    }
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotaViolationPolicyRefresherChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotaViolationPolicyRefresherChore.java
new file mode 100644
index 0000000..160de46
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotaViolationPolicyRefresherChore.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test class for {@link SpaceQuotaViolationPolicyRefresherChore}.
+ */
+@Category(SmallTests.class)
+public class TestSpaceQuotaViolationPolicyRefresherChore {
+
+  private RegionServerSpaceQuotaManager manager;
+  private RegionServerServices rss;
+  private SpaceQuotaViolationPolicyRefresherChore chore;
+  private Configuration conf;
+
+  @Before
+  public void setup() {
+    conf = HBaseConfiguration.create();
+    rss = mock(RegionServerServices.class);
+    manager = mock(RegionServerSpaceQuotaManager.class);
+    when(manager.getRegionServerServices()).thenReturn(rss);
+    when(rss.getConfiguration()).thenReturn(conf);
+    chore = new SpaceQuotaViolationPolicyRefresherChore(manager);
+  }
+
+  @Test
+  public void testPoliciesAreEnforced() throws IOException {
+    final Map policiesToEnforce = new HashMap<>();
+    policiesToEnforce.put(TableName.valueOf("table1"), SpaceViolationPolicy.DISABLE);
+    policiesToEnforce.put(TableName.valueOf("table2"), SpaceViolationPolicy.NO_INSERTS);
+    policiesToEnforce.put(TableName.valueOf("table3"), SpaceViolationPolicy.NO_WRITES);
+    policiesToEnforce.put(TableName.valueOf("table4"), SpaceViolationPolicy.NO_WRITES_COMPACTIONS);
+
+    // No active enforcements
+    when(manager.getActiveViolationPolicyEnforcements()).thenReturn(Collections.emptyMap());
+    // Policies to enforce
+    when(manager.getViolationPoliciesToEnforce()).thenReturn(policiesToEnforce);
+
+    chore.chore();
+
+    for (Entry entry : policiesToEnforce.entrySet()) {
+      // Ensure we enforce the policy
+      verify(manager).enforceViolationPolicy(entry.getKey(), entry.getValue());
+      // Don't disable any policies
+      verify(manager, never()).disableViolationPolicyEnforcement(entry.getKey());
+    }
+  }
+
+  @Test
+  public void testOldPoliciesAreRemoved() throws IOException {
+    final Map policiesToEnforce = new HashMap<>();
+    policiesToEnforce.put(TableName.valueOf("table1"), SpaceViolationPolicy.DISABLE);
+    policiesToEnforce.put(TableName.valueOf("table2"), SpaceViolationPolicy.NO_INSERTS);
+
+    final Map previousPolicies = new HashMap<>();
+    previousPolicies.put(TableName.valueOf("table3"), SpaceViolationPolicy.NO_WRITES);
+    previousPolicies.put(TableName.valueOf("table4"), SpaceViolationPolicy.NO_WRITES);
+
+    // No active enforcements
+    when(manager.getActiveViolationPolicyEnforcements()).thenReturn(previousPolicies);
+    // Policies to enforce
+    when(manager.getViolationPoliciesToEnforce()).thenReturn(policiesToEnforce);
+
+    chore.chore();
+
+    for (Entry entry : policiesToEnforce.entrySet()) {
+      verify(manager).enforceViolationPolicy(entry.getKey(), entry.getValue());
+    }
+
+    for (Entry entry : previousPolicies.entrySet()) {
+      verify(manager).disableViolationPolicyEnforcement(entry.getKey());
+    }
+  }
+
+  @Test
+  public void testNewPolicyOverridesOld() throws IOException {
+    final Map policiesToEnforce = new HashMap<>();
+    policiesToEnforce.put(TableName.valueOf("table1"), SpaceViolationPolicy.DISABLE);
+    policiesToEnforce.put(TableName.valueOf("table2"), SpaceViolationPolicy.NO_WRITES);
+    policiesToEnforce.put(TableName.valueOf("table3"), SpaceViolationPolicy.NO_INSERTS);
+
+    final Map previousPolicies = new HashMap<>();
+    previousPolicies.put(TableName.valueOf("table1"), SpaceViolationPolicy.NO_WRITES);
+
+    // No active enforcements
+    when(manager.getActiveViolationPolicyEnforcements()).thenReturn(previousPolicies);
+    // Policies to enforce
+    when(manager.getViolationPoliciesToEnforce()).thenReturn(policiesToEnforce);
+
+    chore.chore();
+
+    for (Entry entry : policiesToEnforce.entrySet()) {
+      verify(manager).enforceViolationPolicy(entry.getKey(), entry.getValue());
+    }
+    verify(manager, never()).disableViolationPolicyEnforcement(TableName.valueOf("table1"));
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTableSpaceQuotaViolationNotifier.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTableSpaceQuotaViolationNotifier.java
new file mode 100644
index 0000000..4a7000c
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestTableSpaceQuotaViolationNotifier.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.Objects;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.ArgumentMatcher;
+
+/**
+ * Test case for {@link TableSpaceQuotaViolationNotifier}.
+ */
+@Category(SmallTests.class)
+public class TestTableSpaceQuotaViolationNotifier {
+
+  private TableSpaceQuotaViolationNotifier notifier;
+  private Connection conn;
+
+  @Before
+  public void setup() throws Exception {
+    notifier = new TableSpaceQuotaViolationNotifier();
+    conn = mock(Connection.class);
+    notifier.initialize(conn);
+  }
+
+  @Test
+  public void testToViolation() throws Exception {
+    final TableName tn = TableName.valueOf("inviolation");
+    final SpaceViolationPolicy policy = SpaceViolationPolicy.NO_INSERTS;
+    final Table quotaTable = mock(Table.class);
+    when(conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)).thenReturn(quotaTable);
+
+    final Put expectedPut = new Put(Bytes.toBytes("t." + tn.getNameAsString()));
+    final SpaceQuota protoQuota = SpaceQuota.newBuilder()
+        .setViolationPolicy(ProtobufUtil.toProtoViolationPolicy(policy))
+        .build();
+    expectedPut.addColumn(Bytes.toBytes("u"), Bytes.toBytes("v"), protoQuota.toByteArray());
+
+    notifier.transitionTableToViolation(tn, policy);
+
+    verify(quotaTable).put(argThat(new SingleCellPutMatcher(expectedPut)));
+  }
+
+  @Test
+  public void testToObservance() throws Exception {
+    final TableName tn = TableName.valueOf("notinviolation");
+    final Table quotaTable = mock(Table.class);
+    when(conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)).thenReturn(quotaTable);
+
+    final Delete expectedDelete = new Delete(Bytes.toBytes("t." + tn.getNameAsString()));
+    expectedDelete.addColumn(Bytes.toBytes("u"), Bytes.toBytes("v"));
+
+    notifier.transitionTableToObservance(tn);
+
+    verify(quotaTable).delete(argThat(new SingleCellDeleteMatcher(expectedDelete)));
+  }
+
+  /**
+   * Parameterized for Puts.
+   */
+  private static class SingleCellPutMatcher extends SingleCellMutationMatcher {
+    private SingleCellPutMatcher(Put expected) {
+      super(expected);
+    }
+  }
+
+  /**
+   * Parameterized for Deletes.
+   */
+  private static class SingleCellDeleteMatcher extends SingleCellMutationMatcher {
+    private SingleCellDeleteMatcher(Delete expected) {
+      super(expected);
+    }
+  }
+
+  /**
+   * Quick hack to verify a Mutation with one column.
+   */
+  private static class SingleCellMutationMatcher extends ArgumentMatcher {
+    private final Mutation expected;
+
+    private SingleCellMutationMatcher(Mutation expected) {
+      this.expected = expected;
+    }
+
+    @Override
+    public boolean matches(Object argument) {
+      if (!expected.getClass().isAssignableFrom(argument.getClass())) {
+        return false;
+      }
+      Mutation actual = (Mutation) argument;
+      if (!Arrays.equals(expected.getRow(), actual.getRow())) {
+        return false;
+      }
+      if (expected.size() != actual.size()) {
+        return false;
+      }
+      NavigableMap> expectedCells = expected.getFamilyCellMap();
+      NavigableMap> actualCells = actual.getFamilyCellMap();
+      Entry> expectedEntry = expectedCells.entrySet().iterator().next();
+      Entry> actualEntry = actualCells.entrySet().iterator().next();
+      if (!Arrays.equals(expectedEntry.getKey(), actualEntry.getKey())) {
+        return false;
+      }
+      return Objects.equals(expectedEntry.getValue(), actualEntry.getValue());
+    }
+  }
+}
-- 
2.10.2