diff --git a/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java b/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java index 439989c..eeffb96 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java +++ b/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java @@ -113,7 +113,7 @@ public class SnapshotManager implements Stoppable { private static final String SNAPSHOT_WAKE_MILLIS_KEY = "hbase.snapshot.master.wakeMillis"; /** By default, check to see if the snapshot is complete (ms) */ - private static final int SNAPSHOT_TIMEOUT_MILLIS_DEFAULT = 5000; + private static final int SNAPSHOT_TIMEOUT_MILLIS_DEFAULT = 60000; /** * Conf key for # of ms elapsed before injecting a snapshot timeout error when waiting for @@ -131,7 +131,6 @@ public class SnapshotManager implements Stoppable { private static final int SNAPSHOT_POOL_THREADS_DEFAULT = 1; private boolean stopped; - private final long wakeFrequency; private final MasterServices master; // Needed by TableEventHandlers private final MasterMetrics metricsMaster; private final ProcedureCoordinator coordinator; @@ -168,16 +167,16 @@ public class SnapshotManager implements Stoppable { // get the configuration for the coordinator Configuration conf = master.getConfiguration(); - this.wakeFrequency = conf.getInt(SNAPSHOT_WAKE_MILLIS_KEY, SNAPSHOT_WAKE_MILLIS_DEFAULT); - long keepAliveTime = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT); + long wakeFrequency = conf.getInt(SNAPSHOT_WAKE_MILLIS_KEY, SNAPSHOT_WAKE_MILLIS_DEFAULT); + long timeoutMillis = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT); int opThreads = conf.getInt(SNAPSHOT_POOL_THREADS_KEY, SNAPSHOT_POOL_THREADS_DEFAULT); // setup the default procedure coordinator String name = master.getServerName().toString(); - ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, keepAliveTime, opThreads, wakeFrequency); + ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads); ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs( master.getZooKeeper(), SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name); - this.coordinator = new ProcedureCoordinator(comms, tpool); + this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency); this.executorService = master.getExecutorService(); resetTempDir(); } @@ -197,8 +196,6 @@ public class SnapshotManager implements Stoppable { this.rootDir = master.getMasterFileSystem().getRootDir(); checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem()); - this.wakeFrequency = master.getConfiguration().getInt(SNAPSHOT_WAKE_MILLIS_KEY, - SNAPSHOT_WAKE_MILLIS_DEFAULT); this.coordinator = coordinator; this.executorService = pool; resetTempDir(); @@ -870,6 +867,12 @@ public class SnapshotManager implements Stoppable { for (SnapshotSentinel restoreHandler: this.restoreHandlers.values()) { restoreHandler.cancel(why); } + + try { + coordinator.close(); + } catch (IOException e) { + LOG.error("stop ProcedureCoordinator error", e); + } } @Override diff --git a/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java b/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java index 48ed967..e8d9abb 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java +++ b/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java @@ -97,7 +97,7 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh this.rootDir = this.master.getMasterFileSystem().getRootDir(); this.snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir); this.workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir); - this.monitor = new ForeignExceptionDispatcher(); + this.monitor = new ForeignExceptionDispatcher(snapshot.getName()); // prepare the verify this.verifier = new MasterSnapshotVerifier(masterServices, snapshot, rootDir); @@ -147,6 +147,7 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh // run the snapshot snapshotRegions(regionsAndLocations); + monitor.rethrowException(); // extract each pair to separate lists Set serverNames = new HashSet(); diff --git a/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java b/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java index f9c6259..dbeac4f 100644 --- a/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java +++ b/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java @@ -51,11 +51,14 @@ import com.google.common.collect.MapMaker; public class ProcedureCoordinator { private static final Log LOG = LogFactory.getLog(ProcedureCoordinator.class); + final static long KEEP_ALIVE_MILLIS_DEFAULT = 5000; final static long TIMEOUT_MILLIS_DEFAULT = 60000; final static long WAKE_MILLIS_DEFAULT = 500; private final ProcedureCoordinatorRpcs rpcs; private final ExecutorService pool; + private final long wakeTimeMillis; + private final long timeoutMillis; // Running procedure table. Maps procedure name to running procedure reference private final ConcurrentMap procedures = @@ -71,6 +74,23 @@ public class ProcedureCoordinator { * @param pool Used for executing procedures. */ public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, ThreadPoolExecutor pool) { + this(rpcs, pool, TIMEOUT_MILLIS_DEFAULT, WAKE_MILLIS_DEFAULT); + } + + /** + * Create and start a ProcedureCoordinator. + * + * The rpc object registers the ProcedureCoordinator and starts any threads in + * this constructor. + * + * @param rpcs + * @param pool Used for executing procedures. + * @param timeoutMillis + */ + public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, ThreadPoolExecutor pool, + long timeoutMillis, long wakeTimeMillis) { + this.timeoutMillis = timeoutMillis; + this.wakeTimeMillis = wakeTimeMillis; this.rpcs = rpcs; this.pool = pool; this.rpcs.start(this); @@ -78,10 +98,24 @@ public class ProcedureCoordinator { /** * Default thread pool for the procedure + * + * @param coordName + * @param opThreads the maximum number of threads to allow in the pool + */ + public static ThreadPoolExecutor defaultPool(String coordName, int opThreads) { + return defaultPool(coordName, opThreads, KEEP_ALIVE_MILLIS_DEFAULT); + } + + /** + * Default thread pool for the procedure + * + * @param coordName + * @param opThreads the maximum number of threads to allow in the pool + * @param keepAliveMillis the maximum time (ms) that excess idle threads will wait for new tasks */ - public static ThreadPoolExecutor defaultPool(String coordName, long keepAliveTime, int opThreads, - long wakeFrequency) { - return new ThreadPoolExecutor(1, opThreads, keepAliveTime, TimeUnit.SECONDS, + public static ThreadPoolExecutor defaultPool(String coordName, int opThreads, + long keepAliveMillis) { + return new ThreadPoolExecutor(1, opThreads, keepAliveMillis, TimeUnit.MILLISECONDS, new SynchronousQueue(), new DaemonThreadFactory("(" + coordName + ")-proc-coordinator-pool")); } @@ -194,7 +228,7 @@ public class ProcedureCoordinator { Procedure createProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs, List expectedMembers) { // build the procedure - return new Procedure(this, fed, WAKE_MILLIS_DEFAULT, TIMEOUT_MILLIS_DEFAULT, + return new Procedure(this, fed, wakeTimeMillis, timeoutMillis, procName, procArgs, expectedMembers); } diff --git a/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java b/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java index 876f5c4..ffc38e9 100644 --- a/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java +++ b/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java @@ -51,6 +51,8 @@ import com.google.common.collect.MapMaker; public class ProcedureMember implements Closeable { private static final Log LOG = LogFactory.getLog(ProcedureMember.class); + final static long KEEP_ALIVE_MILLIS_DEFAULT = 5000; + private final SubprocedureFactory builder; private final ProcedureMemberRpcs rpcs; @@ -72,9 +74,26 @@ public class ProcedureMember implements Closeable { this.builder = factory; } - public static ThreadPoolExecutor defaultPool(long wakeFrequency, long keepAlive, - int procThreads, String memberName) { - return new ThreadPoolExecutor(1, procThreads, keepAlive, TimeUnit.SECONDS, + /** + * Default thread pool for the procedure + * + * @param memberName + * @param procThreads the maximum number of threads to allow in the pool + */ + public static ThreadPoolExecutor defaultPool(String memberName, int procThreads) { + return defaultPool(memberName, procThreads, KEEP_ALIVE_MILLIS_DEFAULT); + } + + /** + * Default thread pool for the procedure + * + * @param memberName + * @param procThreads the maximum number of threads to allow in the pool + * @param keepAliveMillis the maximum time (ms) that excess idle threads will wait for new tasks + */ + public static ThreadPoolExecutor defaultPool(String memberName, int procThreads, + long keepAliveMillis) { + return new ThreadPoolExecutor(1, procThreads, keepAliveMillis, TimeUnit.MILLISECONDS, new SynchronousQueue(), new DaemonThreadFactory("member: '" + memberName + "' subprocedure-pool")); } @@ -85,7 +104,7 @@ public class ProcedureMember implements Closeable { * @return reference to the Procedure member's rpcs object */ ProcedureMemberRpcs getRpcs() { - return rpcs; + return rpcs; } diff --git a/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java b/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java index ea669a0..43787aa 100644 --- a/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java +++ b/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java @@ -198,7 +198,6 @@ abstract public class Subprocedure implements Callable { } else { msg = "Subprocedure '" + barrierName + "' failed!"; } - LOG.error(msg , e); cancel(msg, e); LOG.debug("Subprocedure '" + barrierName + "' running cleanup."); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java b/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java index 019bf97..60e23df 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java @@ -124,13 +124,12 @@ public class RegionServerSnapshotManager { // read in the snapshot request configuration properties Configuration conf = rss.getConfiguration(); - long wakeMillis = conf.getLong(SNAPSHOT_REQUEST_WAKE_MILLIS_KEY, SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT); long keepAlive = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT); int opThreads = conf.getInt(SNAPSHOT_REQUEST_THREADS_KEY, SNAPSHOT_REQUEST_THREADS_DEFAULT); // create the actual snapshot procedure member - ThreadPoolExecutor pool = ProcedureMember.defaultPool(wakeMillis, keepAlive, opThreads, - rss.getServerName().toString()); + ThreadPoolExecutor pool = ProcedureMember.defaultPool(rss.getServerName().toString(), + opThreads, keepAlive); this.member = new ProcedureMember(memberRpcs, pool, new SnapshotSubprocedureBuilder()); } @@ -192,7 +191,7 @@ public class RegionServerSnapshotManager { LOG.debug("Launching subprocedure for snapshot " + snapshot.getName() + " from table " + snapshot.getTable()); - ForeignExceptionDispatcher exnDispatcher = new ForeignExceptionDispatcher(); + ForeignExceptionDispatcher exnDispatcher = new ForeignExceptionDispatcher(snapshot.getName()); Configuration conf = rss.getConfiguration(); long timeoutMillis = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT); @@ -356,7 +355,6 @@ public class RegionServerSnapshotManager { } // evict remaining tasks and futures from taskPool. - LOG.debug(taskPool); while (!futures.isEmpty()) { // block to remove cancelled futures; LOG.warn("Removing cancelled elements from taskPool"); diff --git a/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java b/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java index f48aa23..d0b62b2 100644 --- a/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java +++ b/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java @@ -86,7 +86,7 @@ public class TestProcedureCoordinator { } private ProcedureCoordinator buildNewCoordinator() { - ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(nodeName, POOL_KEEP_ALIVE, 1, WAKE_FREQUENCY); + ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(nodeName, 1, POOL_KEEP_ALIVE); return spy(new ProcedureCoordinator(controller, pool)); } diff --git a/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java b/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java index 5af2e40..29ca89c 100644 --- a/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java +++ b/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java @@ -88,7 +88,7 @@ public class TestProcedureMember { */ private ProcedureMember buildCohortMember() { String name = "node"; - ThreadPoolExecutor pool = ProcedureMember.defaultPool(WAKE_FREQUENCY, POOL_KEEP_ALIVE, 1, name); + ThreadPoolExecutor pool = ProcedureMember.defaultPool(name, 1, POOL_KEEP_ALIVE); return new ProcedureMember(mockMemberComms, pool, mockBuilder); } @@ -98,7 +98,7 @@ public class TestProcedureMember { private void buildCohortMemberPair() throws IOException { dispatcher = new ForeignExceptionDispatcher(); String name = "node"; - ThreadPoolExecutor pool = ProcedureMember.defaultPool(WAKE_FREQUENCY, POOL_KEEP_ALIVE, 1, name); + ThreadPoolExecutor pool = ProcedureMember.defaultPool(name, 1, POOL_KEEP_ALIVE); member = new ProcedureMember(mockMemberComms, pool, mockBuilder); when(mockMemberComms.getMemberName()).thenReturn("membername"); // needed for generating exception Subprocedure subproc = new EmptySubprocedure(member, dispatcher); diff --git a/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java b/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java index fe555ad..36e0855 100644 --- a/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java +++ b/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java @@ -127,7 +127,7 @@ public class TestZKProcedure { // start running the controller ZKProcedureCoordinatorRpcs coordinatorComms = new ZKProcedureCoordinatorRpcs( coordZkw, opDescription, COORDINATOR_NODE_NAME); - ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, KEEP_ALIVE, POOL_SIZE, WAKE_FREQUENCY); + ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, POOL_SIZE, KEEP_ALIVE); ProcedureCoordinator coordinator = new ProcedureCoordinator(coordinatorComms, pool) { @Override public Procedure createProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs, @@ -145,7 +145,7 @@ public class TestZKProcedure { for (String member : members) { ZooKeeperWatcher watcher = newZooKeeperWatcher(); ZKProcedureMemberRpcs comms = new ZKProcedureMemberRpcs(watcher, opDescription); - ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(WAKE_FREQUENCY, KEEP_ALIVE, 1, member); + ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(member, 1, KEEP_ALIVE); ProcedureMember procMember = new ProcedureMember(comms, pool2, subprocFactory); procMembers.add(new Pair(procMember, comms)); comms.start(member, procMember); @@ -209,7 +209,7 @@ public class TestZKProcedure { ZooKeeperWatcher coordinatorWatcher = newZooKeeperWatcher(); ZKProcedureCoordinatorRpcs coordinatorController = new ZKProcedureCoordinatorRpcs( coordinatorWatcher, opDescription, COORDINATOR_NODE_NAME); - ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, KEEP_ALIVE, POOL_SIZE, WAKE_FREQUENCY); + ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, POOL_SIZE, KEEP_ALIVE); ProcedureCoordinator coordinator = spy(new ProcedureCoordinator(coordinatorController, pool)); // start a member for each node @@ -219,7 +219,7 @@ public class TestZKProcedure { for (String member : expected) { ZooKeeperWatcher watcher = newZooKeeperWatcher(); ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(watcher, opDescription); - ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(WAKE_FREQUENCY, KEEP_ALIVE, 1, member); + ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(member, 1, KEEP_ALIVE); ProcedureMember mem = new ProcedureMember(controller, pool2, subprocFactory); members.add(new Pair(mem, controller)); controller.start(member, mem);