diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperListener.java hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperListener.java index 7d5a995..2480566 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperListener.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperListener.java @@ -36,7 +36,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; public abstract class ZooKeeperListener { // Reference to the zk watcher which also contains configuration and constants - protected ZooKeeperWatcher watcher; + protected final ZooKeeperWatcher watcher; /** * Construct a ZooKeeper event listener. diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java index 08ff6da..622b3a6 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java @@ -971,7 +971,7 @@ public class AccessController extends BaseMasterAndRegionObserver // throw RuntimeException so that the coprocessor is unloaded. if (zk != null) { try { - this.authManager = TableAuthManager.get(zk, env.getConfiguration()); + this.authManager = TableAuthManager.getOrCreate(zk, env.getConfiguration()); } catch (IOException ioe) { throw new RuntimeException("Error obtaining TableAuthManager", ioe); } @@ -984,7 +984,9 @@ public class AccessController extends BaseMasterAndRegionObserver @Override public void stop(CoprocessorEnvironment env) { - + if (this.authManager != null) { + TableAuthManager.release(authManager); + } } @Override diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java index 7370ee5..3d471f2 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/TableAuthManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.security.access; +import java.io.Closeable; import java.io.IOException; import java.util.HashMap; import java.util.List; @@ -48,7 +49,7 @@ import com.google.common.collect.Lists; * Performs authorization checks for a given user's assigned permissions */ @InterfaceAudience.Private -public class TableAuthManager { +public class TableAuthManager implements Closeable { private static class PermissionCache { /** Cache of user permissions */ private ListMultimap userCache = ArrayListMultimap.create(); @@ -95,8 +96,6 @@ public class TableAuthManager { private static final Log LOG = LogFactory.getLog(TableAuthManager.class); - private static TableAuthManager instance; - /** Cache of global permissions */ private volatile PermissionCache globalCache; @@ -125,6 +124,11 @@ public class TableAuthManager { } } + @Override + public void close() { + this.zkperms.close(); + } + /** * Returns a new {@code PermissionCache} initialized with permission assignments * from the {@code hbase.superuser} configuration key. @@ -739,16 +743,42 @@ public class TableAuthManager { return mtime.get(); } - static Map managerMap = + private static Map managerMap = new HashMap(); - public synchronized static TableAuthManager get( + private static Map refCount = new HashMap<>(); + + /** Returns a TableAuthManager from the cache. If not cached, constructs a new one. Returned + * instance should be released back by calling {@link #release(TableAuthManager)}. */ + public synchronized static TableAuthManager getOrCreate( ZooKeeperWatcher watcher, Configuration conf) throws IOException { - instance = managerMap.get(watcher); + TableAuthManager instance = managerMap.get(watcher); if (instance == null) { instance = new TableAuthManager(watcher, conf); managerMap.put(watcher, instance); } + int ref = refCount.get(instance) == null ? 0 : refCount.get(instance).intValue(); + refCount.put(instance, ref + 1); return instance; } + + /** + * Releases the resources for the given TableAuthManager if the reference count is down to 0. + */ + public synchronized static void release(TableAuthManager instance) { + if (refCount.get(instance) == null || refCount.get(instance) < 1) { + LOG.warn("CODE BUG: Something wrong with the TableAuthManager reference counting: " + + refCount.get(instance)); + instance.close(); + managerMap.remove(instance.getZKPermissionWatcher().getWatcher()); + refCount.put(instance, 0); + } else { + int ref = refCount.get(instance); + refCount.put(instance, ref-1); + if (ref-1 == 0) { + instance.close(); + managerMap.remove(instance.getZKPermissionWatcher().getWatcher()); + } + } + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java index d5fdd41..c0afe78 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java @@ -22,6 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DaemonThreadFactory; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZKUtil; @@ -29,9 +30,14 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; +import java.io.Closeable; import java.io.IOException; import java.util.List; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** * Handles synchronization of access control list entries and updates @@ -43,13 +49,14 @@ import java.util.concurrent.CountDownLatch; * trigger updates in the {@link TableAuthManager} permission cache. */ @InterfaceAudience.Private -public class ZKPermissionWatcher extends ZooKeeperListener { +public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable { private static final Log LOG = LogFactory.getLog(ZKPermissionWatcher.class); // parent node for permissions lists static final String ACL_NODE = "acl"; TableAuthManager authManager; String aclZNode; CountDownLatch initialized = new CountDownLatch(1); + ExecutorService executor; public ZKPermissionWatcher(ZooKeeperWatcher watcher, TableAuthManager authManager, Configuration conf) { @@ -57,16 +64,34 @@ public class ZKPermissionWatcher extends ZooKeeperListener { this.authManager = authManager; String aclZnodeParent = conf.get("zookeeper.znode.acl.parent", ACL_NODE); this.aclZNode = ZKUtil.joinZNode(watcher.baseZNode, aclZnodeParent); + executor = Executors.newSingleThreadExecutor( + new DaemonThreadFactory("zkpermissionwatcher")); } public void start() throws KeeperException { try { watcher.registerListener(this); if (ZKUtil.watchAndCheckExists(watcher, aclZNode)) { - List existing = - ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode); - if (existing != null) { - refreshNodes(existing); + try { + executor.submit(new Callable() { + @Override + public Void call() throws KeeperException { + List existing = + ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode); + if (existing != null) { + refreshNodes(existing); + } + return null; + } + }).get(); + } catch (ExecutionException ex) { + if (ex.getCause() instanceof KeeperException) { + throw (KeeperException)ex.getCause(); + } else { + throw new RuntimeException(ex.getCause()); + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); } } } finally { @@ -74,11 +99,16 @@ public class ZKPermissionWatcher extends ZooKeeperListener { } } + @Override + public void close() { + executor.shutdown(); + } + private void waitUntilStarted() { try { initialized.await(); } catch (InterruptedException e) { - LOG.warn("Interrupted while waiting", e); + LOG.warn("Interrupted while waiting for start", e); Thread.currentThread().interrupt(); } } @@ -87,63 +117,83 @@ public class ZKPermissionWatcher extends ZooKeeperListener { public void nodeCreated(String path) { waitUntilStarted(); if (path.equals(aclZNode)) { - try { - List nodes = - ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode); - refreshNodes(nodes); - } catch (KeeperException ke) { - LOG.error("Error reading data from zookeeper", ke); - // only option is to abort - watcher.abort("Zookeeper error obtaining acl node children", ke); - } + executor.submit(new Runnable() { + @Override + public void run() { + try { + List nodes = + ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode); + refreshNodes(nodes); + } catch (KeeperException ke) { + LOG.error("Error reading data from zookeeper", ke); + // only option is to abort + watcher.abort("Zookeeper error obtaining acl node children", ke); + } + } + }); } } @Override - public void nodeDeleted(String path) { + public void nodeDeleted(final String path) { waitUntilStarted(); if (aclZNode.equals(ZKUtil.getParent(path))) { - String table = ZKUtil.getNodeName(path); - if(AccessControlLists.isNamespaceEntry(table)) { - authManager.removeNamespace(Bytes.toBytes(table)); - } else { - authManager.removeTable(TableName.valueOf(table)); - } + executor.submit(new Runnable() { + @Override + public void run() { + String table = ZKUtil.getNodeName(path); + if(AccessControlLists.isNamespaceEntry(table)) { + authManager.removeNamespace(Bytes.toBytes(table)); + } else { + authManager.removeTable(TableName.valueOf(table)); + } + } + }); } } @Override - public void nodeDataChanged(String path) { + public void nodeDataChanged(final String path) { waitUntilStarted(); if (aclZNode.equals(ZKUtil.getParent(path))) { - // update cache on an existing table node - String entry = ZKUtil.getNodeName(path); - try { - byte[] data = ZKUtil.getDataAndWatch(watcher, path); - refreshAuthManager(entry, data); - } catch (KeeperException ke) { - LOG.error("Error reading data from zookeeper for node " + entry, ke); - // only option is to abort - watcher.abort("Zookeeper error getting data for node " + entry, ke); - } catch (IOException ioe) { - LOG.error("Error reading permissions writables", ioe); - } + executor.submit(new Runnable() { + @Override + public void run() { + // update cache on an existing table node + String entry = ZKUtil.getNodeName(path); + try { + byte[] data = ZKUtil.getDataAndWatch(watcher, path); + refreshAuthManager(entry, data); + } catch (KeeperException ke) { + LOG.error("Error reading data from zookeeper for node " + entry, ke); + // only option is to abort + watcher.abort("Zookeeper error getting data for node " + entry, ke); + } catch (IOException ioe) { + LOG.error("Error reading permissions writables", ioe); + } + } + }); } } @Override - public void nodeChildrenChanged(String path) { + public void nodeChildrenChanged(final String path) { waitUntilStarted(); if (path.equals(aclZNode)) { - // table permissions changed - try { - List nodes = - ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode); - refreshNodes(nodes); - } catch (KeeperException ke) { - LOG.error("Error reading data from zookeeper for path "+path, ke); - watcher.abort("Zookeeper error get node children for path "+path, ke); - } + executor.submit(new Runnable() { + @Override + public void run() { + // table permissions changed + try { + List nodes = + ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode); + refreshNodes(nodes); + } catch (KeeperException ke) { + LOG.error("Error reading data from zookeeper for path "+path, ke); + watcher.abort("Zookeeper error get node children for path "+path, ke); + } + } + }); } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java index 3456158..1f23576 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestTablePermissions.java @@ -434,7 +434,7 @@ public class TestTablePermissions { /* test a race condition causing TableAuthManager to sometimes fail global permissions checks * when the global cache is being updated */ - TableAuthManager authManager = TableAuthManager.get(ZKW, conf); + TableAuthManager authManager = TableAuthManager.getOrCreate(ZKW, conf); // currently running user is the system user and should have global admin perms User currentUser = User.getCurrent(); assertTrue(authManager.authorize(currentUser, Permission.Action.ADMIN)); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestZKPermissionsWatcher.java hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestZKPermissionsWatcher.java index 9c2bc3c..b8e7b53 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestZKPermissionsWatcher.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestZKPermissionsWatcher.java @@ -75,9 +75,9 @@ public class TestZKPermissionsWatcher { // start minicluster UTIL.startMiniCluster(); - AUTH_A = TableAuthManager.get(new ZooKeeperWatcher(conf, + AUTH_A = TableAuthManager.getOrCreate(new ZooKeeperWatcher(conf, "TestZKPermissionsWatcher_1", ABORTABLE), conf); - AUTH_B = TableAuthManager.get(new ZooKeeperWatcher(conf, + AUTH_B = TableAuthManager.getOrCreate(new ZooKeeperWatcher(conf, "TestZKPermissionsWatcher_2", ABORTABLE), conf); }