diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java index 56a9789..54dffb2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java @@ -93,9 +93,15 @@ public class DisabledTableSnapshotHandler extends TakeSnapshotHandler { snapshotManifest.addRegion(FSUtils.getTableDir(rootDir, snapshotTable), regionInfo); } }); - } finally { - exec.shutdown(); + } catch(IOException e){ + exec.shutdownNow(); + while(!exec.isTerminated()){ + Thread.sleep(2000); + } + throw e; } + exec.shutdown(); + } catch (Exception e) { // make sure we capture the exception to propagate back to the client later String reason = "Failed snapshot " + ClientSnapshotDescriptionUtils.toString(snapshot) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java index ead27d2..ce84cb1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java @@ -1031,7 +1031,7 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable String name = master.getServerName().toString(); ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads); ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs( - master.getZooKeeper(), SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name); + master.getZooKeeper(), SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name, conf); this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency); this.executorService = master.getExecutorService(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Procedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Procedure.java index 007b7c1..ab22b77 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Procedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Procedure.java @@ -107,6 +107,9 @@ public class Procedure implements Callable, ForeignExceptionListener { private final List inBarrierMembers; private ProcedureCoordinator coord; + private final long maximumAttempts; + private final long waitTimeBetweenAttempts; + /** * Creates a procedure. (FOR TESTING) * @@ -130,6 +133,13 @@ public class Procedure implements Callable, ForeignExceptionListener { this.monitor = monitor; this.wakeFrequency = wakeFreq; + this.waitTimeBetweenAttempts = + this.coord.getRpcs().getConfiguration() + .getLong("hbase.procedure.clearznodes.wait.time", 3000l); + this.maximumAttempts = + this.coord.getRpcs().getConfiguration() + .getLong("hbase.procedure.clearznodes.maximum.attempts", 10); + int count = expectedMembers.size(); this.acquiredBarrierLatch = new CountDownLatch(count); this.releasedBarrierLatch = new CountDownLatch(count); @@ -220,8 +230,11 @@ public class Procedure implements Callable, ForeignExceptionListener { Thread.currentThread().interrupt(); } String msg = "Procedure '" + procName +"' execution failed!"; + ForeignException ee = new ForeignException(getName(), e); + coord.getRpcs().sendAbortToMembers(this, ee); + clearChildrenZnodes(); LOG.error(msg, e); - receive(new ForeignException(getName(), e)); + receive(ee); } finally { LOG.debug("Running finish phase."); sendGlobalBarrierComplete(); @@ -378,4 +391,14 @@ public class Procedure implements Callable, ForeignExceptionListener { monitor.rethrowException(); } } + + private void clearChildrenZnodes() { + try { + coord.clearChildrenZnodes(this, args, waitTimeBetweenAttempts, maximumAttempts); + } catch (IOException e) { + ForeignException ee = new ForeignException(this.getName(), e); + monitor.receive(ee); + } + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java index f360968..eea5026 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java @@ -299,4 +299,10 @@ public class ProcedureCoordinator { public Set getProcedureNames() { return new HashSet(procedures.keySet()); } + + public void clearChildrenZnodes(Procedure procName, byte[] args, long wakeFrequency, long retry) + throws IOException { + rpcs.clearChildrenZnodes(procName, args, wakeFrequency, retry); + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinatorRpcs.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinatorRpcs.java index 631c270..b9c3c2b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinatorRpcs.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinatorRpcs.java @@ -21,6 +21,7 @@ import java.io.Closeable; import java.io.IOException; import java.util.List; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.errorhandling.ForeignException; @@ -80,4 +81,10 @@ public interface ProcedureCoordinatorRpcs extends Closeable { * @throws IOException if the remote notification mechanism cannot be reached */ void resetMembers(Procedure procName) throws IOException; + + void clearChildrenZnodes(Procedure procName, byte[] info, long wakeFrequency, long retry) + throws IOException; + + Configuration getConfiguration(); + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java index 1f22022..c3a1d62 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java @@ -21,6 +21,7 @@ import java.io.Closeable; import java.io.IOException; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -48,6 +49,13 @@ public class ProcedureMember implements Closeable { private final SubprocedureFactory builder; private final ProcedureMemberRpcs rpcs; + /** + * The cached task which is submitted. + * When the subproc is terminated, future#cancel() can be accurately executed through the cache. + * */ + private final ConcurrentMap> submitSubprocedures= + new MapMaker().concurrencyLevel(4).weakValues().makeMap(); + private final ConcurrentMap subprocs = new MapMaker().concurrencyLevel(4).weakValues().makeMap(); private final ExecutorService pool; @@ -144,6 +152,7 @@ public class ProcedureMember implements Closeable { LOG.error("Another thread has replaced existing subproc '" + procName + "'. Bailing out"); return false; } + submitSubprocedures.remove(procName); } LOG.debug("Submitting new Subprocedure:" + procName); @@ -151,7 +160,7 @@ public class ProcedureMember implements Closeable { // kick off the subprocedure try { if (subprocs.putIfAbsent(procName, subproc) == null) { - this.pool.submit(subproc); + submitSubprocedures.put(procName, this.pool.submit(subproc)); return true; } else { LOG.error("Another thread has submitted subproc '" + procName + "'. Bailing out"); @@ -159,7 +168,8 @@ public class ProcedureMember implements Closeable { } } catch (RejectedExecutionException e) { subprocs.remove(procName, subproc); - + submitSubprocedures.remove(procName); + // the thread pool is full and we can't run the subprocedure String msg = "Subprocedure pool is full!"; subproc.cancel(msg, e.getCause()); @@ -239,5 +249,15 @@ public class ProcedureMember implements Closeable { String msg = "Propagating foreign exception to subprocedure " + sub.getName(); LOG.error(msg, ee); sub.cancel(msg, ee); + + Future f=submitSubprocedures.remove(sub.getName()); + if(f!=null){ + f.cancel(false); + //wait future cancel + while(!f.isCancelled()&&!f.isDone()){} + } + //del znode acquired reached + this.rpcs.deleteMemberAcquired(procName); + this.rpcs.deleteMemberReached(procName); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMemberRpcs.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMemberRpcs.java index 8d31c65..6639fa8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMemberRpcs.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMemberRpcs.java @@ -69,4 +69,8 @@ public interface ProcedureMemberRpcs extends Closeable { * @throws IOException if we can't reach the coordinator */ void sendMemberCompleted(Subprocedure sub) throws IOException; + + void deleteMemberAcquired(String procName); + + void deleteMemberReached(String procName); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java index 07786e8..fe6b40d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java @@ -74,7 +74,7 @@ abstract public class Subprocedure implements Callable { protected final long wakeFrequency; protected final TimeoutExceptionInjector executionTimeoutTimer; protected final ProcedureMemberRpcs rpcs; - + protected final ProcedureMember member; private volatile boolean complete = false; /** @@ -92,11 +92,12 @@ abstract public class Subprocedure implements Callable { assert member.getRpcs() != null : "rpc handlers should be non-null"; assert procName != null : "procedure name should be non-null"; assert monitor != null : "monitor should be non-null"; - + // Default to a very large timeout this.rpcs = member.getRpcs(); this.barrierName = procName; this.monitor = monitor; + this.member=member; // forward any failures to coordinator. Since this is a dispatcher, resend loops should not be // possible. this.monitor.addListener(new ForeignExceptionListener() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java index d6dd49d..12f016b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.errorhandling.ForeignException; @@ -45,7 +46,8 @@ public class ZKProcedureCoordinatorRpcs implements ProcedureCoordinatorRpcs { ZooKeeperWatcher watcher; String procedureType; String coordName; - + + private final Configuration conf; /** * @param watcher zookeeper watcher. Owned by this and closed via {@link #close()} * @param procedureClass procedure type name is a category for when there are multiple kinds of @@ -54,10 +56,11 @@ public class ZKProcedureCoordinatorRpcs implements ProcedureCoordinatorRpcs { * @throws KeeperException if an unexpected zk error occurs */ public ZKProcedureCoordinatorRpcs(ZooKeeperWatcher watcher, - String procedureClass, String coordName) throws KeeperException { + String procedureClass, String coordName, Configuration conf) throws KeeperException { this.watcher = watcher; this.procedureType = procedureClass; this.coordName = coordName; + this.conf=conf; } /** @@ -268,4 +271,53 @@ public class ZKProcedureCoordinatorRpcs implements ProcedureCoordinatorRpcs { final ZKProcedureUtil getZkProcedureUtil() { return zkProc; } + + @Override + public void clearChildrenZnodes(Procedure procName, byte[] info, long wakeFrequency, long retry) + throws IOException { + try { + for (int i = 0; i < retry; i++) { + if (ZKUtil.checkExists(zkProc.getWatcher(), zkProc.getAcquiredBarrierNode(procName.getName())) != -1) { + List children = + zkProc.getChildren(zkProc.getAcquiredBarrierNode(procName.getName())); + if (children != null && !children.isEmpty()) { + try { + Thread.sleep(wakeFrequency); + } catch (InterruptedException e) { + if (i >= retry - 1) { + LOG.error("Failed to clear znode", e); + throw new IOException("Failed while clearing node:" + procName.getName(), e); + } + } + continue; + } + } + if (ZKUtil.checkExists(zkProc.getWatcher(), zkProc.getReachedBarrierNode(procName.getName())) != -1) { + List children = + zkProc.getChildren(zkProc.getReachedBarrierNode(procName.getName())); + if (children != null && !children.isEmpty()) { + try { + Thread.sleep(wakeFrequency); + } catch (InterruptedException e) { + if (i >= retry - 1) { + LOG.error("Failed to clear znode", e); + throw new IOException("Failed while clearing node:" + procName.getName(), e); + } + } + continue; + } + } + break; + } + } catch (KeeperException e) { + LOG.error("Failed to watch abort", e); + throw new IOException("Failed while watching getChildren node:" + procName.getName(), e); + } + } + + @Override + public Configuration getConfiguration() { + return conf; + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java index c2ea919..6b9cc91 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java @@ -348,5 +348,29 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs { public void close() throws IOException { zkController.close(); } + + @Override + public void deleteMemberAcquired(String procName) { + String procZNode = zkController.getAcquiredBarrierNode(procName); + String joinPath = ZKUtil.joinZNode(procZNode,memberName); + try{ + ZKUtil.deleteNodeFailSilent(zkController.getWatcher(), joinPath); + } catch (KeeperException e) { + member.controllerConnectionFailure("Failed to delete acquired node for procedure: " + + procName + " and member: " + memberName, new IOException(e), procName); + } + } + + @Override + public void deleteMemberReached(String procName) { + String procZNode = zkController.getReachedBarrierNode(procName); + String joinPath = ZKUtil.joinZNode(procZNode,memberName); + try{ + ZKUtil.deleteNodeFailSilent(zkController.getWatcher(), joinPath); + } catch (KeeperException e) { + member.controllerConnectionFailure("Failed to delete reached node for procedure: " + + procName + " and member: " + memberName, new IOException(e), procName); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureUtil.java index 8171218..284ce95 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureUtil.java @@ -292,4 +292,8 @@ public abstract class ZKProcedureUtil ZKUtil.deleteNodeRecursivelyMultiOrSequential(watcher, true, acquiredBarrierNode, reachedBarrierNode, abortZNode); } + + public List getChildren(String root) throws KeeperException{ + return ZKUtil.listChildrenNoWatch(watcher, root); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleMasterProcedureManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleMasterProcedureManager.java index 06e9d7c..b9749f7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleMasterProcedureManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleMasterProcedureManager.java @@ -66,7 +66,7 @@ public class SimpleMasterProcedureManager extends MasterProcedureManager { String name = master.getServerName().toString(); ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, 1); ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs( - master.getZooKeeper(), getProcedureSignature(), name); + master.getZooKeeper(), getProcedureSignature(), name, master.getConfiguration()); this.coordinator = new ProcedureCoordinator(comms, tpool); this.executorService = master.getExecutorService(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java index 3798ab7..e0aeff0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java @@ -127,7 +127,7 @@ public class TestZKProcedure { // start running the controller ZKProcedureCoordinatorRpcs coordinatorComms = new ZKProcedureCoordinatorRpcs( - coordZkw, opDescription, COORDINATOR_NODE_NAME); + coordZkw, opDescription, COORDINATOR_NODE_NAME, UTIL.getConfiguration()); ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, POOL_SIZE, KEEP_ALIVE); ProcedureCoordinator coordinator = new ProcedureCoordinator(coordinatorComms, pool) { @Override @@ -209,7 +209,7 @@ public class TestZKProcedure { // start running the coordinator and its controller ZooKeeperWatcher coordinatorWatcher = newZooKeeperWatcher(); ZKProcedureCoordinatorRpcs coordinatorController = new ZKProcedureCoordinatorRpcs( - coordinatorWatcher, opDescription, COORDINATOR_NODE_NAME); + coordinatorWatcher, opDescription, COORDINATOR_NODE_NAME, UTIL.getConfiguration()); ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, POOL_SIZE, KEEP_ALIVE); ProcedureCoordinator coordinator = spy(new ProcedureCoordinator(coordinatorController, pool)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedureControllers.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedureControllers.java index 93efd5a..d765d16 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedureControllers.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedureControllers.java @@ -379,7 +379,7 @@ public class TestZKProcedureControllers { ProcedureMember member, List expected) throws Exception { // start the controller ZKProcedureCoordinatorRpcs controller = new ZKProcedureCoordinatorRpcs( - watcher, operationName, CONTROLLER_NODE_NAME); + watcher, operationName, CONTROLLER_NODE_NAME, UTIL.getConfiguration()); controller.start(coordinator); // make a cohort controller for each expected node @@ -417,7 +417,7 @@ public class TestZKProcedureControllers { // start the controller ZKProcedureCoordinatorRpcs controller = new ZKProcedureCoordinatorRpcs( - watcher, operationName, CONTROLLER_NODE_NAME); + watcher, operationName, CONTROLLER_NODE_NAME, UTIL.getConfiguration()); controller.start(coordinator); return new Pair>(