diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namespace/NamespaceAuditor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namespace/NamespaceAuditor.java index 99fbf3d..2ba771d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namespace/NamespaceAuditor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namespace/NamespaceAuditor.java @@ -41,7 +41,7 @@ public class NamespaceAuditor { public NamespaceAuditor(MasterServices masterServices) { this.masterServices = masterServices; - stateManager = new NamespaceStateManager(masterServices); + stateManager = new NamespaceStateManager(masterServices, masterServices.getZooKeeper()); } public void start() throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namespace/NamespaceStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namespace/NamespaceStateManager.java index 4d5a0e0..3830c5d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namespace/NamespaceStateManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namespace/NamespaceStateManager.java @@ -20,28 +20,38 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.RegionTransition; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.MetaScanner; +import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.TableNamespaceManager; import org.apache.hadoop.hbase.quotas.QuotaExceededException; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKAssign; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.zookeeper.data.Stat; /** * NamespaceStateManager manages state (in terms of quota) of all the namespaces. It contains a * cache which is updated based on the hooks in the NamespaceAuditor class. */ @InterfaceAudience.Private -class NamespaceStateManager { +class NamespaceStateManager extends ZooKeeperListener { private static Log LOG = LogFactory.getLog(NamespaceStateManager.class); private ConcurrentMap nsStateCache; private MasterServices master; private volatile boolean initialized = false; - public NamespaceStateManager(MasterServices masterServices) { + public NamespaceStateManager(MasterServices masterServices, ZooKeeperWatcher zkw) { + super(zkw); nsStateCache = new ConcurrentHashMap(); master = masterServices; } @@ -54,6 +64,7 @@ class NamespaceStateManager { public void start() throws IOException { LOG.info("Namespace State Manager started."); initialize(); + watcher.registerListenerFirst(this); } /** @@ -210,4 +221,52 @@ class NamespaceStateManager { throw new IOException("Namespace state found null for namespace : " + namespace); } } + + private void checkSplittingOrMergingNode(String path) { + String msg = "Error reading data from zookeeper"; + try { + if (path.startsWith(watcher.assignmentZNode)) { + List children = ZKUtil.listChildrenAndWatchForNewChildren(watcher, + watcher.assignmentZNode); + if (children != null) { + for (String child : children) { + Stat stat = new Stat(); + byte[] data = ZKAssign.getDataAndWatch(watcher, + ZKUtil.joinZNode(watcher.assignmentZNode, child), stat); + if (data != null) { + RegionTransition rt = RegionTransition.parseFrom(data); + if (rt.getEventType().equals(EventType.RS_ZK_REQUEST_REGION_SPLIT)) { + TableName table = HRegionInfo.getTable(rt.getRegionName()); + if (!checkAndUpdateNamespaceRegionCount(table, rt.getRegionName(), 1)) { + ZKUtil.deleteNode(watcher, ZKUtil.joinZNode(watcher.assignmentZNode, child)); + } + } else if (rt.getEventType().equals(EventType.RS_ZK_REQUEST_REGION_MERGE)) { + TableName table = HRegionInfo.getTable(rt.getRegionName()); + checkAndUpdateNamespaceRegionCount(table, rt.getRegionName(), -1); + } + } + } + } + } + } catch (KeeperException ke) { + LOG.error(msg, ke); + watcher.abort(msg, ke); + } catch (DeserializationException e) { + LOG.error(msg, e); + watcher.abort(msg, e); + } catch (IOException e) { + LOG.error(msg, e); + watcher.abort(msg, e); + } + } + + @Override + public void nodeCreated(String path) { + checkSplittingOrMergingNode(path); + } + + @Override + public void nodeChildrenChanged(String path) { + checkSplittingOrMergingNode(path); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namespace/TestNamespaceAuditor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namespace/TestNamespaceAuditor.java index 03e6f33..1f0a827 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/namespace/TestNamespaceAuditor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namespace/TestNamespaceAuditor.java @@ -91,7 +91,7 @@ public class TestNamespaceAuditor { setupOnce(); } - static void setupOnce() throws Exception, IOException { + public static void setupOnce() throws Exception, IOException { Configuration conf = UTIL.getConfiguration(); conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, CustomObserver.class.getName()); conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, MasterSyncObserver.class.getName());