From 0289e8052cbed735f4a6d38a20b96ef1c1be4aa9 Mon Sep 17 00:00:00 2001 From: Mike Drob Date: Fri, 18 Aug 2017 11:11:46 -0500 Subject: [PATCH] HBASE-18628 Fix event pre-emption in ZKPermWatcher Instead of using an Atomic Reference to data and aborting when we detect that new data comes in, use the native cancellation/pre-emption features of Java Future. --- .../hbase/security/access/ZKPermissionWatcher.java | 55 ++++++++++------------ ...nsWatcher.java => TestZKPermissionWatcher.java} | 4 +- 2 files changed, 28 insertions(+), 31 deletions(-) rename hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/{TestZKPermissionsWatcher.java => TestZKPermissionWatcher.java} (99%) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java index 3324b90a3d..631beac3dd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java @@ -38,6 +38,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicReference; @@ -55,11 +56,11 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable private static final Log LOG = LogFactory.getLog(ZKPermissionWatcher.class); // parent node for permissions lists static final String ACL_NODE = "acl"; - TableAuthManager authManager; - String aclZNode; - CountDownLatch initialized = new CountDownLatch(1); - AtomicReference> nodes = new AtomicReference<>(null); - ExecutorService executor; + private final TableAuthManager authManager; + private final String aclZNode; + private final CountDownLatch initialized = new CountDownLatch(1); + private final ExecutorService executor; + private Future childrenChangedFuture; public ZKPermissionWatcher(ZooKeeperWatcher watcher, TableAuthManager authManager, Configuration conf) { @@ -82,7 +83,7 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable List existing = ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode); if (existing != null) { - refreshNodes(existing, null); + refreshNodes(existing); } return null; } @@ -126,7 +127,7 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable try { List nodes = ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode); - refreshNodes(nodes, null); + refreshNodes(nodes); } catch (KeeperException ke) { LOG.error("Error reading data from zookeeper", ke); // only option is to abort @@ -179,42 +180,38 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable } } + @Override public void nodeChildrenChanged(final String path) { waitUntilStarted(); if (path.equals(aclZNode)) { try { - List nodeList = + final List nodeList = ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode); - while (!nodes.compareAndSet(null, nodeList)) { - try { - Thread.sleep(20); - } catch (InterruptedException e) { - LOG.warn("Interrupted while setting node list", e); - Thread.currentThread().interrupt(); + // allows subsequent nodeChildrenChanged event to preempt current processing of + // nodeChildrenChanged event + if (! childrenChangedFuture.isDone()) { + boolean cancelled = childrenChangedFuture.cancel(true); + if (!cancelled) { + if (childrenChangedFuture.isDone()) { + // finished between our check and cancel, this is fine. + } else { + LOG.warn("Could not cancel children changed task, please file a JIRA with info: " + childrenChangedFuture); + } } } + childrenChangedFuture = asyncProcessNodeUpdate(() -> refreshNodes(nodeList)); } catch (KeeperException ke) { LOG.error("Error reading data from zookeeper for path "+path, ke); watcher.abort("ZooKeeper error get node children for path "+path, ke); } - asyncProcessNodeUpdate(new Runnable() { - // allows subsequent nodeChildrenChanged event to preempt current processing of - // nodeChildrenChanged event - @Override - public void run() { - List nodeList = nodes.get(); - nodes.set(null); - refreshNodes(nodeList, nodes); - } - }); } } - private void asyncProcessNodeUpdate(Runnable runnable) { + private Future asyncProcessNodeUpdate(Runnable runnable) { if (!executor.isShutdown()) { try { - executor.submit(runnable); + return executor.submit(runnable); } catch (RejectedExecutionException e) { if (executor.isShutdown()) { LOG.warn("aclZNode changed after ZKPermissionWatcher was shutdown"); @@ -223,12 +220,12 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable } } } + return null; // TODO : A future that represetns failure } - private void refreshNodes(List nodes, AtomicReference ref) { + private void refreshNodes(List nodes) { for (ZKUtil.NodeAndData n : nodes) { - if (ref != null && ref.get() != null) { - // there is a newer list + if (Thread.currentThread().isInterrupted()) { break; } if (n.isEmpty()) continue; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestZKPermissionsWatcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestZKPermissionWatcher.java similarity index 99% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestZKPermissionsWatcher.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestZKPermissionWatcher.java index cb36246e73..7856cc8903 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestZKPermissionsWatcher.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestZKPermissionWatcher.java @@ -44,8 +44,8 @@ import org.junit.experimental.categories.Category; * Test the reading and writing of access permissions to and from zookeeper. */ @Category({SecurityTests.class, LargeTests.class}) -public class TestZKPermissionsWatcher { - private static final Log LOG = LogFactory.getLog(TestZKPermissionsWatcher.class); +public class TestZKPermissionWatcher { + private static final Log LOG = LogFactory.getLog(TestZKPermissionWatcher.class); private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); private static TableAuthManager AUTH_A; private static TableAuthManager AUTH_B; -- 2.14.1