diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java index 8ce5f9c..01f9695 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java @@ -66,7 +66,7 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager { @Override public ProcedureCoordinatorRpcs getProcedureCoordinatorRpcs(String procType, String coordNode) throws IOException { - return new ZKProcedureCoordinator(watcher, procType, coordNode); + return new ZKProcedureCoordinator(watcher, procType, coordNode, this.server.getConfiguration()); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java index 5f86e08..7d0f4e5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java @@ -104,9 +104,15 @@ public class DisabledTableSnapshotHandler extends TakeSnapshotHandler { snapshotManifest.addRegion(FSUtils.getTableDir(rootDir, snapshotTable), regionInfo); } }); - } finally { - exec.shutdown(); + } catch(IOException e){ + exec.shutdownNow(); + while(!exec.isTerminated()){ + Thread.sleep(2000); + } + throw e; } + exec.shutdown(); + } catch (Exception e) { // make sure we capture the exception to propagate back to the client later String reason = "Failed snapshot " + ClientSnapshotDescriptionUtils.toString(snapshot) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java index 55d58e0..486dd72 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java @@ -1115,7 +1115,7 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable String name = master.getServerName().toString(); ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads); ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinator( - master.getZooKeeper(), SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name); + master.getZooKeeper(), SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name, conf); this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency); this.executorService = master.getExecutorService(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Procedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Procedure.java index 1d20ba5..b2db8f1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Procedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Procedure.java @@ -108,6 +108,9 @@ public class Procedure implements Callable, ForeignExceptionListener { private final List inBarrierMembers; private final HashMap dataFromFinishedMembers; private ProcedureCoordinator coord; + + private final long maximumAttempts; + private final long waitTimeBetweenAttempts; /** * Creates a procedure. (FOR TESTING) @@ -133,6 +136,13 @@ public class Procedure implements Callable, ForeignExceptionListener { this.monitor = monitor; this.wakeFrequency = wakeFreq; + this.waitTimeBetweenAttempts = + this.coord.getRpcs().getConfiguration() + .getLong("hbase.procedure.clearznodes.wait.time", 3000l); + this.maximumAttempts = + this.coord.getRpcs().getConfiguration() + .getLong("hbase.procedure.clearznodes.maximum.attempts", 10); + int count = expectedMembers.size(); this.acquiredBarrierLatch = new CountDownLatch(count); this.releasedBarrierLatch = new CountDownLatch(count); @@ -224,8 +234,11 @@ public class Procedure implements Callable, ForeignExceptionListener { Thread.currentThread().interrupt(); } String msg = "Procedure '" + procName +"' execution failed!"; + ForeignException ee = new ForeignException(getName(), e); + coord.getRpcs().sendAbortToMembers(this, ee); + clearChildrenZnodes(); LOG.error(msg, e); - receive(new ForeignException(getName(), e)); + receive(ee); } finally { LOG.debug("Running finish phase."); sendGlobalBarrierComplete(); @@ -407,4 +420,14 @@ public class Procedure implements Callable, ForeignExceptionListener { monitor.rethrowException(); } } + + private void clearChildrenZnodes() { + try { + coord.clearChildrenZnodes(this, args, waitTimeBetweenAttempts, maximumAttempts); + } catch (IOException e) { + ForeignException ee = new ForeignException(this.getName(), e); + monitor.receive(ee); + } + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java index 8a64cc8..c53c63a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java @@ -327,4 +327,10 @@ public class ProcedureCoordinator { public Set getProcedureNames() { return new HashSet<>(procedures.keySet()); } + + public void clearChildrenZnodes(Procedure procName, byte[] args, long wakeFrequency, long retry) + throws IOException { + rpcs.clearChildrenZnodes(procName, args, wakeFrequency, retry); + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinatorRpcs.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinatorRpcs.java index 631c270..caa1b07 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinatorRpcs.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinatorRpcs.java @@ -21,6 +21,7 @@ import java.io.Closeable; import java.io.IOException; import java.util.List; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.errorhandling.ForeignException; @@ -80,4 +81,10 @@ public interface ProcedureCoordinatorRpcs extends Closeable { * @throws IOException if the remote notification mechanism cannot be reached */ void resetMembers(Procedure procName) throws IOException; + + void clearChildrenZnodes(Procedure procName, byte[] info, long wakeFrequency, long retry) + throws IOException; + + Configuration getConfiguration(); + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java index baed1f3..dcfd1be 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java @@ -21,6 +21,7 @@ import java.io.Closeable; import java.io.IOException; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -47,7 +48,14 @@ public class ProcedureMember implements Closeable { private final SubprocedureFactory builder; private final ProcedureMemberRpcs rpcs; - + + /** + * The cached task which is submitted. + * When the subproc is terminated, future#cancel() can be accurately executed through the cache. + * */ + private final ConcurrentMap> submitSubprocedures= + new MapMaker().concurrencyLevel(4).weakValues().makeMap(); + private final ConcurrentMap subprocs = new MapMaker().concurrencyLevel(4).weakValues().makeMap(); private final ExecutorService pool; @@ -144,6 +152,7 @@ public class ProcedureMember implements Closeable { LOG.error("Another thread has replaced existing subproc '" + procName + "'. Bailing out"); return false; } + submitSubprocedures.remove(procName); } LOG.debug("Submitting new Subprocedure:" + procName); @@ -151,7 +160,7 @@ public class ProcedureMember implements Closeable { // kick off the subprocedure try { if (subprocs.putIfAbsent(procName, subproc) == null) { - this.pool.submit(subproc); + submitSubprocedures.put(procName, this.pool.submit(subproc)); return true; } else { LOG.error("Another thread has submitted subproc '" + procName + "'. Bailing out"); @@ -159,7 +168,8 @@ public class ProcedureMember implements Closeable { } } catch (RejectedExecutionException e) { subprocs.remove(procName, subproc); - + submitSubprocedures.remove(procName); + // the thread pool is full and we can't run the subprocedure String msg = "Subprocedure pool is full!"; subproc.cancel(msg, e.getCause()); @@ -242,5 +252,15 @@ public class ProcedureMember implements Closeable { String msg = "Propagating foreign exception to subprocedure " + sub.getName(); LOG.error(msg, ee); sub.cancel(msg, ee); + + Future f=submitSubprocedures.remove(sub.getName()); + if(f!=null){ + f.cancel(false); + //wait future cancel + while(!f.isCancelled()&&!f.isDone()){} + } + //del znode acquired reached + this.rpcs.deleteMemberAcquired(procName); + this.rpcs.deleteMemberReached(procName); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMemberRpcs.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMemberRpcs.java index 96c22b0..db107a4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMemberRpcs.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMemberRpcs.java @@ -69,4 +69,8 @@ public interface ProcedureMemberRpcs extends Closeable { * @throws IOException if we can't reach the coordinator */ void sendMemberCompleted(Subprocedure sub, byte[] data) throws IOException; + + void deleteMemberAcquired(String procName); + + void deleteMemberReached(String procName); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java index 8927338..8fff381 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java @@ -74,7 +74,7 @@ abstract public class Subprocedure implements Callable { protected final long wakeFrequency; protected final TimeoutExceptionInjector executionTimeoutTimer; protected final ProcedureMemberRpcs rpcs; - + protected final ProcedureMember member; private volatile boolean complete = false; /** @@ -92,11 +92,12 @@ abstract public class Subprocedure implements Callable { assert member.getRpcs() != null : "rpc handlers should be non-null"; assert procName != null : "procedure name should be non-null"; assert monitor != null : "monitor should be non-null"; - + // Default to a very large timeout this.rpcs = member.getRpcs(); this.barrierName = procName; this.monitor = monitor; + this.member=member; // forward any failures to coordinator. Since this is a dispatcher, resend loops should not be // possible. this.monitor.addListener(new ForeignExceptionListener() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinator.java index b656894..d59b13c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinator.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.errorhandling.ForeignException; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; @@ -44,6 +45,7 @@ public class ZKProcedureCoordinator implements ProcedureCoordinatorRpcs { String procedureType; String coordName; + private final Configuration conf; /** * @param watcher zookeeper watcher. Owned by this and closed via {@link #close()} * @param procedureClass procedure type name is a category for when there are multiple kinds of @@ -52,10 +54,11 @@ public class ZKProcedureCoordinator implements ProcedureCoordinatorRpcs { * @throws KeeperException if an unexpected zk error occurs */ public ZKProcedureCoordinator(ZooKeeperWatcher watcher, - String procedureClass, String coordName) { + String procedureClass, String coordName, Configuration conf) { this.watcher = watcher; this.procedureType = procedureClass; this.coordName = coordName; + this.conf=conf; } /** @@ -325,4 +328,52 @@ public class ZKProcedureCoordinator implements ProcedureCoordinatorRpcs { final ZKProcedureUtil getZkProcedureUtil() { return zkProc; } + + @Override + public void clearChildrenZnodes(Procedure procName, byte[] info, long wakeFrequency, long retry) + throws IOException { + try { + for (int i = 0; i < retry; i++) { + if (ZKUtil.checkExists(zkProc.getWatcher(), zkProc.getAcquiredBarrierNode(procName.getName())) != -1) { + List children = + zkProc.getChildren(zkProc.getAcquiredBarrierNode(procName.getName())); + if (children != null && !children.isEmpty()) { + try { + Thread.sleep(wakeFrequency); + } catch (InterruptedException e) { + if (i >= retry - 1) { + LOG.error("Failed to clear znode", e); + throw new IOException("Failed while clearing node:" + procName.getName(), e); + } + } + continue; + } + } + if (ZKUtil.checkExists(zkProc.getWatcher(), zkProc.getReachedBarrierNode(procName.getName())) != -1) { + List children = + zkProc.getChildren(zkProc.getReachedBarrierNode(procName.getName())); + if (children != null && !children.isEmpty()) { + try { + Thread.sleep(wakeFrequency); + } catch (InterruptedException e) { + if (i >= retry - 1) { + LOG.error("Failed to clear znode", e); + throw new IOException("Failed while clearing node:" + procName.getName(), e); + } + } + continue; + } + } + break; + } + } catch (KeeperException e) { + LOG.error("Failed to watch abort", e); + throw new IOException("Failed while watching getChildren node:" + procName.getName(), e); + } + } + + @Override + public Configuration getConfiguration() { + return conf; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java index 8c56255..5fc210e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java @@ -359,4 +359,27 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs { zkController.close(); } + @Override + public void deleteMemberAcquired(String procName) { + String procZNode = zkController.getAcquiredBarrierNode(procName); + String joinPath = ZKUtil.joinZNode(procZNode,memberName); + try{ + ZKUtil.deleteNodeFailSilent(zkController.getWatcher(), joinPath); + } catch (KeeperException e) { + member.controllerConnectionFailure("Failed to delete acquired node for procedure: " + + procName + " and member: " + memberName, new IOException(e), procName); + } + } + + @Override + public void deleteMemberReached(String procName) { + String procZNode = zkController.getReachedBarrierNode(procName); + String joinPath = ZKUtil.joinZNode(procZNode,memberName); + try{ + ZKUtil.deleteNodeFailSilent(zkController.getWatcher(), joinPath); + } catch (KeeperException e) { + member.controllerConnectionFailure("Failed to delete reached node for procedure: " + + procName + " and member: " + memberName, new IOException(e), procName); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureUtil.java index 4ebb411..bc7f672 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureUtil.java @@ -300,4 +300,8 @@ public abstract class ZKProcedureUtil logZKTree(this.baseZNode); } } + + public List getChildren(String root) throws KeeperException{ + return ZKUtil.listChildrenNoWatch(watcher, root); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java index 3092114..8035846 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java @@ -99,7 +99,7 @@ public class MasterFlushTableProcedureManager extends MasterProcedureManager { String name = master.getServerName().toString(); ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, threads); ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinator( - master.getZooKeeper(), getProcedureSignature(), name); + master.getZooKeeper(), getProcedureSignature(), name, conf); this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleMasterProcedureManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleMasterProcedureManager.java index 296b38f..bbb59f6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleMasterProcedureManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleMasterProcedureManager.java @@ -65,7 +65,7 @@ public class SimpleMasterProcedureManager extends MasterProcedureManager { String name = master.getServerName().toString(); ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, 1); ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinator( - master.getZooKeeper(), getProcedureSignature(), name); + master.getZooKeeper(), getProcedureSignature(), name, master.getConfiguration()); this.coordinator = new ProcedureCoordinator(comms, tpool); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java index f20c8a9..6aa5c05 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java @@ -128,7 +128,7 @@ public class TestZKProcedure { // start running the controller ZKProcedureCoordinator coordinatorComms = new ZKProcedureCoordinator( - coordZkw, opDescription, COORDINATOR_NODE_NAME); + coordZkw, opDescription, COORDINATOR_NODE_NAME, UTIL.getConfiguration()); ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, POOL_SIZE, KEEP_ALIVE); ProcedureCoordinator coordinator = new ProcedureCoordinator(coordinatorComms, pool) { @Override @@ -209,7 +209,7 @@ public class TestZKProcedure { // start running the coordinator and its controller ZooKeeperWatcher coordinatorWatcher = newZooKeeperWatcher(); ZKProcedureCoordinator coordinatorController = new ZKProcedureCoordinator( - coordinatorWatcher, opDescription, COORDINATOR_NODE_NAME); + coordinatorWatcher, opDescription, COORDINATOR_NODE_NAME, UTIL.getConfiguration()); ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, POOL_SIZE, KEEP_ALIVE); ProcedureCoordinator coordinator = spy(new ProcedureCoordinator(coordinatorController, pool)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedureControllers.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedureControllers.java index e7e2b23..516d850 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedureControllers.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedureControllers.java @@ -398,7 +398,7 @@ public class TestZKProcedureControllers { ProcedureMember member, List expected) throws Exception { // start the controller ZKProcedureCoordinator controller = new ZKProcedureCoordinator( - watcher, operationName, CONTROLLER_NODE_NAME); + watcher, operationName, CONTROLLER_NODE_NAME, UTIL.getConfiguration()); controller.start(coordinator); // make a cohort controller for each expected node @@ -435,7 +435,7 @@ public class TestZKProcedureControllers { // start the controller ZKProcedureCoordinator controller = new ZKProcedureCoordinator( - watcher, operationName, CONTROLLER_NODE_NAME); + watcher, operationName, CONTROLLER_NODE_NAME, UTIL.getConfiguration()); controller.start(coordinator); return new Pair<>(controller, cohortControllers);