diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java index 08ff6da..2e727ec 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java @@ -984,7 +984,9 @@ public class AccessController extends BaseMasterAndRegionObserver @Override public void stop(CoprocessorEnvironment env) { - + if (this.authManager != null) { + this.authManager.close(); + } } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java index 7370ee5..314ae1a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.security.access; +import java.io.Closeable; import java.io.IOException; import java.util.HashMap; import java.util.List; @@ -48,7 +49,7 @@ import com.google.common.collect.Lists; * Performs authorization checks for a given user's assigned permissions */ @InterfaceAudience.Private -public class TableAuthManager { +public class TableAuthManager implements Closeable { private static class PermissionCache { /** Cache of user permissions */ private ListMultimap userCache = ArrayListMultimap.create(); @@ -125,6 +126,11 @@ public class TableAuthManager { } } + @Override + public void close() { + this.zkperms.close(); + } + /** * Returns a new {@code PermissionCache} initialized with permission assignments * from the {@code hbase.superuser} configuration key. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java index d5fdd41..839339c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java @@ -29,9 +29,13 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; +import java.io.Closeable; import java.io.IOException; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; /** * Handles synchronization of access control list entries and updates @@ -43,13 +47,16 @@ import java.util.concurrent.CountDownLatch; * trigger updates in the {@link TableAuthManager} permission cache. */ @InterfaceAudience.Private -public class ZKPermissionWatcher extends ZooKeeperListener { +public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable { private static final Log LOG = LogFactory.getLog(ZKPermissionWatcher.class); // parent node for permissions lists static final String ACL_NODE = "acl"; TableAuthManager authManager; String aclZNode; CountDownLatch initialized = new CountDownLatch(1); + AtomicReference> nodes = + new AtomicReference>(null); + ExecutorService executor; public ZKPermissionWatcher(ZooKeeperWatcher watcher, TableAuthManager authManager, Configuration conf) { @@ -57,8 +64,32 @@ public class ZKPermissionWatcher extends ZooKeeperListener { this.authManager = authManager; String aclZnodeParent = conf.get("zookeeper.znode.acl.parent", ACL_NODE); this.aclZNode = ZKUtil.joinZNode(watcher.baseZNode, aclZnodeParent); + executor = Executors.newSingleThreadExecutor(); } + /* + * This runs refreshNodes() + */ + private Runnable runnable = new Runnable() { + @Override + public void run() { + while (true) { + List nodeList = nodes.get(); + if (nodeList == null) { + try { + Thread.sleep(2); + continue; + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for node list", e); + Thread.currentThread().interrupt(); + } + } + nodes.set(null); + refreshNodes(nodeList, nodes); + } + } + }; + public void start() throws KeeperException { try { watcher.registerListener(this); @@ -66,7 +97,7 @@ public class ZKPermissionWatcher extends ZooKeeperListener { List existing = ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode); if (existing != null) { - refreshNodes(existing); + refreshNodes(existing, null); } } } finally { @@ -74,11 +105,16 @@ public class ZKPermissionWatcher extends ZooKeeperListener { } } + @Override + public void close() { + executor.shutdown(); + } + private void waitUntilStarted() { try { initialized.await(); } catch (InterruptedException e) { - LOG.warn("Interrupted while waiting", e); + LOG.warn("Interrupted while waiting for start", e); Thread.currentThread().interrupt(); } } @@ -90,7 +126,7 @@ public class ZKPermissionWatcher extends ZooKeeperListener { try { List nodes = ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode); - refreshNodes(nodes); + refreshNodes(nodes, null); } catch (KeeperException ke) { LOG.error("Error reading data from zookeeper", ke); // only option is to abort @@ -137,9 +173,17 @@ public class ZKPermissionWatcher extends ZooKeeperListener { if (path.equals(aclZNode)) { // table permissions changed try { - List nodes = + List nodeList = ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode); - refreshNodes(nodes); + while (!nodes.compareAndSet(null, nodeList)) { + try { + Thread.sleep(2); + } catch (InterruptedException e) { + LOG.warn("Interrupted while setting node list", e); + Thread.currentThread().interrupt(); + } + } + executor.submit(runnable); } catch (KeeperException ke) { LOG.error("Error reading data from zookeeper for path "+path, ke); watcher.abort("Zookeeper error get node children for path "+path, ke); @@ -147,9 +191,13 @@ public class ZKPermissionWatcher extends ZooKeeperListener { } } - private void refreshNodes(List nodes) { + private void refreshNodes(List nodes, AtomicReference ref) { for (ZKUtil.NodeAndData n : nodes) { if (n.isEmpty()) continue; + if (ref != null && ref.get() != null) { + // there is a newer list + continue; + } String path = n.getNode(); String entry = (ZKUtil.getNodeName(path)); try {