Index: hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedureControllers.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedureControllers.java (revision 1456113) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedureControllers.java (working copy) @@ -88,7 +88,7 @@ final ForeignExceptionDispatcher monitor = spy(new ForeignExceptionDispatcher()); final ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs( - watcher, "testSimple", COHORT_NODE_NAME); + watcher, "testSimple", COHORT_NODE_NAME, UTIL.getConfiguration()); // mock out cohort member callbacks final ProcedureMember member = Mockito @@ -380,14 +380,14 @@ // start the controller ZKProcedureCoordinatorRpcs controller = new ZKProcedureCoordinatorRpcs( watcher, operationName, CONTROLLER_NODE_NAME); - controller.start(coordinator); + controller.start(coordinator, UTIL.getConfiguration()); // make a cohort controller for each expected node List cohortControllers = new ArrayList(); for (String nodeName : expected) { ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs( - watcher, operationName, nodeName); + watcher, operationName, nodeName, UTIL.getConfiguration()); cc.start(member); cohortControllers.add(cc); } @@ -412,7 +412,7 @@ List cohortControllers = new ArrayList(); for (String nodeName : expected) { ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs( - watcher, operationName, nodeName); + watcher, operationName, nodeName, UTIL.getConfiguration()); cc.start(member); cohortControllers.add(cc); } @@ -420,7 +420,7 @@ // start the controller ZKProcedureCoordinatorRpcs controller = new ZKProcedureCoordinatorRpcs( watcher, operationName, CONTROLLER_NODE_NAME); - controller.start(coordinator); + controller.start(coordinator, UTIL.getConfiguration()); return new Pair>( controller, cohortControllers); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java (revision 1456113) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java (working copy) @@ -40,6 +40,7 @@ import java.util.List; import java.util.concurrent.ThreadPoolExecutor; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.errorhandling.ForeignException; import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; @@ -87,7 +88,7 @@ private ProcedureCoordinator buildNewCoordinator() { ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(nodeName, POOL_KEEP_ALIVE, 1, WAKE_FREQUENCY); - return spy(new ProcedureCoordinator(controller, pool)); + return spy(new ProcedureCoordinator(controller, pool, new Configuration())); } /** Index: hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java (revision 1456113) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java (working copy) @@ -128,8 +128,10 @@ // start running the controller ZKProcedureCoordinatorRpcs coordinatorComms = new ZKProcedureCoordinatorRpcs( coordZkw, opDescription, COORDINATOR_NODE_NAME); - ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, KEEP_ALIVE, POOL_SIZE, WAKE_FREQUENCY); - ProcedureCoordinator coordinator = new ProcedureCoordinator(coordinatorComms, pool) { + ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, KEEP_ALIVE, + POOL_SIZE, WAKE_FREQUENCY); + ProcedureCoordinator coordinator = new ProcedureCoordinator(coordinatorComms, pool, + UTIL.getConfiguration()) { @Override public Procedure createProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs, List expectedMembers) { @@ -145,7 +147,8 @@ // start each member for (String member : members) { ZooKeeperWatcher watcher = newZooKeeperWatcher(); - ZKProcedureMemberRpcs comms = new ZKProcedureMemberRpcs(watcher, opDescription, member); + ZKProcedureMemberRpcs comms = new ZKProcedureMemberRpcs(watcher, opDescription, member, + UTIL.getConfiguration()); ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(WAKE_FREQUENCY, KEEP_ALIVE, 1, member); ProcedureMember procMember = new ProcedureMember(comms, pool2, subprocFactory); procMembers.add(new Pair(procMember, comms)); @@ -210,16 +213,20 @@ ZooKeeperWatcher coordinatorWatcher = newZooKeeperWatcher(); ZKProcedureCoordinatorRpcs coordinatorController = new ZKProcedureCoordinatorRpcs( coordinatorWatcher, opDescription, COORDINATOR_NODE_NAME); - ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, KEEP_ALIVE, POOL_SIZE, WAKE_FREQUENCY); - ProcedureCoordinator coordinator = spy(new ProcedureCoordinator(coordinatorController, pool)); + ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, KEEP_ALIVE, + POOL_SIZE, WAKE_FREQUENCY); + ProcedureCoordinator coordinator = spy(new ProcedureCoordinator(coordinatorController, pool, + UTIL.getConfiguration())); // start a member for each node SubprocedureFactory subprocFactory = Mockito.mock(SubprocedureFactory.class); - List> members = new ArrayList>( + List> members = new ArrayList>( expected.size()); for (String member : expected) { ZooKeeperWatcher watcher = newZooKeeperWatcher(); - ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(watcher, opDescription, member); + ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(watcher, opDescription, member, + UTIL.getConfiguration()); ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(WAKE_FREQUENCY, KEEP_ALIVE, 1, member); ProcedureMember mem = new ProcedureMember(controller, pool2, subprocFactory); members.add(new Pair(mem, controller)); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java (revision 1456113) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java (working copy) @@ -120,11 +120,11 @@ this.rss = rss; ZooKeeperWatcher zkw = rss.getZooKeeper(); String nodeName = rss.getServerName().toString(); - this.memberRpcs = new ZKProcedureMemberRpcs(zkw, - SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, nodeName); // read in the snapshot request configuration properties Configuration conf = rss.getConfiguration(); + this.memberRpcs = new ZKProcedureMemberRpcs(zkw, + SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, nodeName, conf); long wakeMillis = conf.getLong(SNAPSHOT_REQUEST_WAKE_MILLIS_KEY, SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT); long keepAlive = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT); int opThreads = conf.getInt(SNAPSHOT_REQUEST_THREADS_KEY, SNAPSHOT_REQUEST_THREADS_DEFAULT); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinatorRpcs.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinatorRpcs.java (revision 1456113) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinatorRpcs.java (working copy) @@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.errorhandling.ForeignException; /** @@ -37,9 +38,10 @@ /** * Initialize and start threads necessary to connect an implementation's rpc mechanisms. * @param listener + * @param conf * @return true if succeed, false if encountered initialization errors. */ - public boolean start(final ProcedureCoordinator listener); + public boolean start(final ProcedureCoordinator listener, Configuration conf); /** * Notify the members that the coordinator has aborted the procedure and that it should release Index: hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java (revision 1456113) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java (working copy) @@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.errorhandling.ForeignException; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil; @@ -154,9 +155,11 @@ /** * Start monitoring znodes in ZK - subclass hook to start monitoring znodes they are about. + * @param coordinator + * @param conf * @return true if succeed, false if encountered initialization errors. */ - final public boolean start(final ProcedureCoordinator coordinator) { + final public boolean start(final ProcedureCoordinator coordinator, Configuration conf) { if (this.coordinator != null) { throw new IllegalStateException( "ZKProcedureCoordinator already started and already has listener installed"); @@ -164,7 +167,7 @@ this.coordinator = coordinator; try { - this.zkProc = new ZKProcedureUtil(watcher, procedureType, coordName) { + this.zkProc = new ZKProcedureUtil(watcher, procedureType, coordName, conf) { @Override public void nodeCreated(String path) { if (!isInProcedurePath(path)) return; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureUtil.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureUtil.java (revision 1456113) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureUtil.java (working copy) @@ -19,13 +19,17 @@ import java.io.Closeable; import java.io.IOException; +import java.util.LinkedList; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; @@ -65,6 +69,7 @@ protected final String abortZnode; protected final String memberName; + protected final Configuration conf; /** * Top-level watcher/controller for procedures across the cluster. @@ -75,12 +80,14 @@ * {@link #close()} * @param procDescription name of the znode describing the procedure to run * @param memberName name of the member from which we are interacting with running procedures + * @param conf * @throws KeeperException when the procedure znodes cannot be created */ public ZKProcedureUtil(ZooKeeperWatcher watcher, String procDescription, - String memberName) throws KeeperException { + String memberName, Configuration conf) throws KeeperException { super(watcher); this.memberName = memberName; + this.conf = conf; // make sure we are listening for events watcher.registerListener(this); // setup paths for the zknodes used in procedures @@ -269,6 +276,12 @@ // If the coordinator was shutdown mid-procedure, then we are going to lose // an procedure that was previously started by cleaning out all the previous state. Its much // harder to figure out how to keep an procedure going and the subject of HBASE-5487. + /* + if (conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, false)) { + LinkedList ops = new LinkedList(); + ops.add(ZKUtilOp.deleteNodeFailSilent(path)); + ZKUtil.multiOrSequential(watcher, ops, true); + }*/ ZKUtil.deleteChildrenRecursively(watcher, acquiredZnode); ZKUtil.deleteChildrenRecursively(watcher, reachedZnode); ZKUtil.deleteChildrenRecursively(watcher, abortZnode); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java (revision 1456113) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java (working copy) @@ -34,6 +34,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DaemonThreadFactory; import org.apache.hadoop.hbase.errorhandling.ForeignException; import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; @@ -69,11 +70,13 @@ * * @param rpcs * @param pool Used for executing procedures. + * @param conf */ - public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, ThreadPoolExecutor pool) { + public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, ThreadPoolExecutor pool, + Configuration conf) { this.rpcs = rpcs; this.pool = pool; - this.rpcs.start(this); + this.rpcs.start(this, conf); } /** Index: hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java (revision 1456113) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java (working copy) @@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.errorhandling.ForeignException; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil; @@ -67,11 +68,12 @@ * {@link #close()}. * @param procType name of the znode describing the procedure type * @param memberName name of the member to join the procedure + * @param conf * @throws KeeperException if we can't reach zookeeper */ public ZKProcedureMemberRpcs(ZooKeeperWatcher watcher, - String procType, String memberName) throws KeeperException { - this.zkController = new ZKProcedureUtil(watcher, procType, memberName) { + String procType, String memberName, Configuration conf) throws KeeperException { + this.zkController = new ZKProcedureUtil(watcher, procType, memberName, conf) { @Override public void nodeCreated(String path) { if (!isInProcedurePath(path)) { Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java (revision 1456113) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java (working copy) @@ -154,7 +154,7 @@ ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, keepAliveTime, opThreads, wakeFrequency); ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs( master.getZooKeeper(), SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name); - this.coordinator = new ProcedureCoordinator(comms, tpool); + this.coordinator = new ProcedureCoordinator(comms, tpool, conf); this.rootDir = master.getMasterFileSystem().getRootDir(); this.executorService = master.getExecutorService(); resetTempDir();