From 34995c22f54e87b9d6ba66541cc979580a2baeac Mon Sep 17 00:00:00 2001 From: Jonathan Lawlor Date: Wed, 14 Jan 2015 11:04:25 -0800 Subject: [PATCH] HBASE 6778: Deprecate Chore; its a thread per task when we should have one thread to do all tasks --- .../hadoop/hbase/client/ConnectionManager.java | 25 +- .../java/org/apache/hadoop/hbase/AuthUtil.java | 12 +- .../java/org/apache/hadoop/hbase/ChoreService.java | 364 +++++++++ .../org/apache/hadoop/hbase/ScheduledChore.java | 331 ++++++++ .../org/apache/hadoop/hbase/TestChoreService.java | 905 +++++++++++++++++++++ .../org/apache/hadoop/hbase/HealthCheckChore.java | 4 +- .../apache/hadoop/hbase/master/CatalogJanitor.java | 9 +- .../hbase/master/ClusterStatusPublisher.java | 37 +- .../org/apache/hadoop/hbase/master/HMaster.java | 27 +- .../apache/hadoop/hbase/master/MasterServices.java | 8 +- .../hadoop/hbase/master/SplitLogManager.java | 23 +- .../hbase/master/balancer/BalancerChore.java | 13 +- .../hbase/master/balancer/ClusterStatusChore.java | 13 +- .../hadoop/hbase/master/cleaner/CleanerChore.java | 6 +- .../org/apache/hadoop/hbase/quotas/QuotaCache.java | 9 +- .../hadoop/hbase/regionserver/HRegionServer.java | 113 ++- .../hbase/regionserver/HeapMemoryManager.java | 25 +- .../hbase/regionserver/RegionServerServices.java | 12 +- .../hbase/regionserver/ServerNonceManager.java | 15 +- .../regionserver/StorefileRefresherChore.java | 8 +- .../java/org/apache/hadoop/hbase/tool/Canary.java | 11 +- .../apache/hadoop/hbase/util/ConnectionCache.java | 12 +- .../hadoop/hbase/MockRegionServerServices.java | 7 +- .../hadoop/hbase/backup/TestHFileArchiving.java | 13 +- .../example/TestZooKeeperTableArchiveClient.java | 7 +- .../hadoop/hbase/master/MockRegionServer.java | 8 +- .../hadoop/hbase/master/TestCatalogJanitor.java | 15 +- .../hadoop/hbase/master/TestTableLockManager.java | 16 +- .../hbase/master/cleaner/TestHFileCleaner.java | 1 - .../hbase/master/cleaner/TestHFileLinkCleaner.java | 9 +- .../regionserver/TestEndToEndSplitTransaction.java | 11 +- .../hbase/regionserver/TestHeapMemoryManager.java | 19 +- .../hbase/regionserver/TestServerNonceManager.java | 13 +- 33 files changed, 1874 insertions(+), 227 deletions(-) create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java create mode 100644 hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index 358ef3e..4edc6f0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -43,7 +43,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; @@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.RegionTooBusyException; +import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; @@ -575,8 +576,8 @@ final class ConnectionManager { private final Object masterAndZKLock = new Object(); private long keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE; - private final DelayedClosing delayedClosing = - DelayedClosing.createAndStart(this); + private final ChoreService choreService = new ChoreService("ConnectionManager"); + private final DelayedClosing delayedClosing = DelayedClosing.createAndStart(this, choreService); // thread executor shared by all HTableInterface instances created // by this connection @@ -1631,28 +1632,31 @@ final class ConnectionManager { * {@link #keepZooKeeperWatcherAliveUntil}). Keep alive time is * managed by the release functions and the variable {@link #keepAlive} */ - private static final class DelayedClosing extends Chore implements Stoppable { + private static final class DelayedClosing extends ScheduledChore implements Stoppable { private HConnectionImplementation hci; Stoppable stoppable; private DelayedClosing( HConnectionImplementation hci, Stoppable stoppable){ - super( - "ZooKeeperWatcher and Master delayed closing for connection "+hci, - 60*1000, // We check every minutes - stoppable); + super("ZooKeeperWatcher and Master delayed closing for connection " + hci, stoppable, + 60 * 1000); + this.hci = hci; this.stoppable = stoppable; } - static DelayedClosing createAndStart(HConnectionImplementation hci){ + static DelayedClosing createAndStart(HConnectionImplementation hci, ChoreService service) { Stoppable stoppable = new Stoppable() { private volatile boolean isStopped = false; @Override public void stop(String why) { isStopped = true;} @Override public boolean isStopped() {return isStopped;} }; - return new DelayedClosing(hci, stoppable); + DelayedClosing delayedClosing = new DelayedClosing(hci, stoppable); + // TODO: uncomment below if you want to actually start the chore. For some reason, the chore + // was being created but never started + // service.scheduleChore(delayedClosing); + return delayedClosing; } protected void closeMasterProtocol(MasterServiceState protocolState) { @@ -2386,6 +2390,7 @@ final class ConnectionManager { return; } delayedClosing.stop("Closing connection"); + choreService.shutdown(); closeMaster(); shutdownBatchPool(); this.closed = true; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/AuthUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/AuthUtil.java index 282b5e3..f597935 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/AuthUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/AuthUtil.java @@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.util.Strings; -import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.net.DNS; import org.apache.hadoop.security.UserGroupInformation; @@ -47,12 +46,12 @@ public class AuthUtil { /** * Checks if security is enabled and if so, launches chore for refreshing kerberos ticket. */ - public static void launchAuthChore(Configuration conf) throws IOException { + public static ScheduledChore getAuthChore(Configuration conf) throws IOException { UserProvider userProvider = UserProvider.instantiate(conf); // login the principal (if using secure Hadoop) boolean securityEnabled = userProvider.isHadoopSecurityEnabled() && userProvider.isHBaseSecurityEnabled(); - if (!securityEnabled) return; + if (!securityEnabled) return null; String host = null; try { host = Strings.domainNamePointerToHostName(DNS.getDefaultHost( @@ -87,7 +86,8 @@ public class AuthUtil { // e.g. 5min tgt * 0.8 = 4min refresh so interval is better be way less than 1min final int CHECK_TGT_INTERVAL = 30 * 1000; // 30sec - Chore refreshCredentials = new Chore("RefreshCredentials", CHECK_TGT_INTERVAL, stoppable) { + ScheduledChore refreshCredentials = + new ScheduledChore("RefreshCredentials", stoppable, CHECK_TGT_INTERVAL) { @Override protected void chore() { try { @@ -97,7 +97,7 @@ public class AuthUtil { } } }; - // Start the chore for refreshing credentials - Threads.setDaemonThreadRunning(refreshCredentials.getThread()); + + return refreshCredentials; } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java new file mode 100644 index 0000000..af7d95d --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java @@ -0,0 +1,364 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map.Entry; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.ScheduledChore.ChoreServicer; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * ChoreService is a service that can be used to schedule instances of {@link ScheduledChore} to run + * periodically while sharing threads. The ChoreService is backed by a + * {@link ScheduledThreadPoolExecutor} whose core pool size changes dynamically depending on the + * number of {@link ScheduledChore} scheduled. All of the threads in the core thread pool of the + * underlying {@link ScheduledThreadPoolExecutor} are set to be daemon threads. + *

+ * The ChoreService provides the ability to schedule, cancel, and trigger instances of + * {@link ScheduledChore}. The ChoreService also provides the ability to check on the status of + * scheduled chores. The number of threads used by the ChoreService changes based on the scheduling + * load and whether or not the scheduled chores are executing on time. As more chores are scheduled, + * there may be a need to increase the number of threads if it is noticed that chores are no longer + * meeting their scheduled start times. On the other hand, as chores are cancelled, an attempt is + * made to reduce the number of running threads to see if chores can still meet their start times + * with a smaller thread pool. + *

+ * When finished with a ChoreService it is good practice to call {@link ChoreService#shutdown()}. + * Calling this method ensures that all scheduled chores are cancelled and cleaned up properly. + */ +@InterfaceAudience.Private +public class ChoreService implements ChoreServicer { + private final Log LOG = LogFactory.getLog(this.getClass()); + + /** The minimum number of threads in the core pool of the underlying ScheduledThreadPoolExecutor */ + public final static int MIN_CORE_POOL_SIZE = 1; + + /** + * This thread pool is used to schedule all of the Chores + */ + private final ScheduledThreadPoolExecutor scheduler; + + /** + * Maps chores to their futures. Futures are used to control a chore's schedule + */ + private final HashMap> scheduledChores; + + /** + * Maps chores to Booleans which indicate whether or not a chore has caused an increase in the + * core pool size of the ScheduledThreadPoolExecutor. Each chore should only be allowed to + * increase the core pool size by 1 (otherwise a single long running chore whose execution is + * longer than its period would be able to spawn too many threads). + */ + private final HashMap choresMissingStartTime; + + /** + * The coreThreadPoolPrefix is the prefix that will be applied to all threads within the + * ScheduledThreadPoolExecutor. The prefix is typically related to the Server that the service is + * running on. The prefix is useful because it allows us to monitor how the thread pool of a + * particular service changes over time VIA thread dumps. + */ + private final String coreThreadPoolPrefix; + + /** + * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads + * spawned by this service + */ + public ChoreService(final String coreThreadPoolPrefix) { + this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE); + } + + /** + * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads + * spawned by this service + * @param corePoolSize The initial size to set the core pool of the ScheduledThreadPoolExecutor to + * during initialization. The default size is 1, but specifying a larger size may be + * beneficial if you know that 1 thread will not be enough. + */ + public ChoreService(final String coreThreadPoolPrefix, int corePoolSize) { + this.coreThreadPoolPrefix = coreThreadPoolPrefix; + if (corePoolSize < MIN_CORE_POOL_SIZE) corePoolSize = MIN_CORE_POOL_SIZE; + scheduler = + new ScheduledThreadPoolExecutor(corePoolSize, new ChoreServiceThreadFactory(coreThreadPoolPrefix)); + scheduler.setRemoveOnCancelPolicy(true); + scheduledChores = new HashMap>(); + choresMissingStartTime = new HashMap(); + } + + /** + * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads + * spawned by this service + */ + public static ChoreService getInstance(final String coreThreadPoolPrefix) { + return new ChoreService(coreThreadPoolPrefix); + } + + /** + * @param chore Chore to be scheduled. If the chore is already scheduled with another ChoreService + * instance, that schedule will be cancelled (i.e. a Chore can only ever be scheduled + * with a single ChoreService instance). + * @return true when the chore was successfully scheduled. false when the scheduling failed + * (typically occurs when a chore is scheduled during shutdown of service) + */ + public synchronized boolean scheduleChore(ScheduledChore chore) { + if (chore == null) return false; + + try { + ScheduledFuture future = + scheduler.scheduleAtFixedRate(chore, chore.getInitialDelay(), chore.getPeriod(), + chore.getTimeUnit()); + chore.setChoreServicer(this); + scheduledChores.put(chore, future); + return true; + } catch (Exception exception) { + LOG.info("Could not successfully schedule chore: " + chore.getName()); + return false; + } + } + + /** + * @param chore The Chore to be rescheduled. If the chore is not scheduled with this ChoreService + * yet then this call is equivalent to a call to scheduleChore. + */ + private synchronized void rescheduleChore(ScheduledChore chore) { + if (chore == null) return; + + if (scheduledChores.containsKey(chore)) { + ScheduledFuture future = scheduledChores.get(chore); + future.cancel(false); + } + scheduleChore(chore); + } + + @Override + public synchronized void cancelChore(ScheduledChore chore) { + cancelChore(chore, false); + } + + @Override + public synchronized void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning) { + if (chore != null && scheduledChores.containsKey(chore)) { + ScheduledFuture future = scheduledChores.get(chore); + future.cancel(mayInterruptIfRunning); + scheduledChores.remove(chore); + + // Removing a chore that was missing its start time means it may be possible + // to reduce the number of threads + if (choresMissingStartTime.containsKey(chore)) { + choresMissingStartTime.remove(chore); + requestCorePoolDecrease(); + } + } + } + + @Override + public synchronized boolean isChoreScheduled(ScheduledChore chore) { + return chore != null && scheduledChores.containsKey(chore) + && !scheduledChores.get(chore).isDone(); + } + + @Override + public synchronized boolean triggerNow(ScheduledChore chore) { + if (chore == null) { + return false; + } else { + rescheduleChore(chore); + return true; + } + } + + /** + * @return number of chores that this service currently has scheduled + */ + int getNumberOfScheduledChores() { + return scheduledChores.size(); + } + + /** + * @return number of chores that this service currently has scheduled that are missing their + * scheduled start time + */ + int getNumberOfChoresMissingStartTime() { + return choresMissingStartTime.size(); + } + + /** + * @return number of threads in the core pool of the underlying ScheduledThreadPoolExecutor + */ + int getCorePoolSize() { + return scheduler.getCorePoolSize(); + } + + /** + * Custom ThreadFactory used with the ScheduledThreadPoolExecutor so that all the threads are + * daemon threads, and thus, don't prevent the JVM from shutting down + */ + static class ChoreServiceThreadFactory implements ThreadFactory { + private final String threadPrefix; + private final static String THREAD_NAME_SUFFIX = "_ChoreServiceThread_"; + private AtomicInteger threadNumber = new AtomicInteger(1); + + public ChoreServiceThreadFactory(final String threadPrefix) { + this.threadPrefix = threadPrefix; + } + + @Override + public Thread newThread(Runnable r) { + Thread thread = + new Thread(r, threadPrefix + THREAD_NAME_SUFFIX + threadNumber.getAndIncrement()); + thread.setDaemon(true); + return thread; + } + } + + /** + * Represents a request to increase the number of core pool threads. Typically a request + * originates from the fact that the current core pool size is not sufficient to service all of + * the currently running Chores + * @return true when the request to increase the core pool size succeeds + */ + private synchronized boolean requestCorePoolIncrease() { + // There is no point in creating more threads than scheduledChores.size since scheduled runs + // of the same chore cannot run concurrently (i.e. happen-before behavior is enforced + // amongst occurrences of the same chore). + if (scheduler.getCorePoolSize() < scheduledChores.size()) { + scheduler.setCorePoolSize(scheduler.getCorePoolSize() + 1); + printChoreServiceDetails("requestCorePoolIncrease"); + return true; + } + return false; + } + + /** + * Represents a request to decrease the number of core pool threads. Typically a request + * originates from the fact that the current core pool size is more than sufficient to service the + * running Chores. + */ + private synchronized void requestCorePoolDecrease() { + if (scheduler.getCorePoolSize() > MIN_CORE_POOL_SIZE) { + scheduler.setCorePoolSize(scheduler.getCorePoolSize() - 1); + printChoreServiceDetails("requestCorePoolDecrease"); + } + } + + @Override + public synchronized void onChoreMissedStartTime(ScheduledChore chore) { + if (chore == null || !scheduledChores.containsKey(chore)) return; + + // If the chore has not caused an increase in the size of the core thread pool then request an + // increase. This allows each chore missing its start time to increase the core pool size by + // at most 1. + if (!choresMissingStartTime.containsKey(chore) || !choresMissingStartTime.get(chore)) { + choresMissingStartTime.put(chore, requestCorePoolIncrease()); + } + + // Must reschedule the chore to prevent unnecessary delays of chores in the scheduler. If + // the chore is NOT rescheduled, future executions of this chore will be delayed more and + // more on each iteration. This hurts us because the ScheduledThreadPoolExecutor allocates + // idle threads to chores based on how delayed they are. + rescheduleChore(chore); + printChoreDetails("onChoreMissedStartTime", chore); + } + + /** + * shutdown the service. Any chores that are scheduled for execution will be cancelled. Any chores + * in the middle of execution will be interrupted and shutdown. This service will be unusable + * after this method has been called (i.e. future scheduling attempts will fail). + */ + public void shutdown() { + List ongoing = scheduler.shutdownNow(); + LOG.info("Chore service for: " + coreThreadPoolPrefix + " had " + ongoing + + " on shutdown"); + cancelAllChores(true); + scheduledChores.clear(); + choresMissingStartTime.clear(); + } + + /** + * @return true when the service is shutdown and thus cannot be used anymore + */ + public boolean isShutdown() { + return scheduler.isShutdown(); + } + + /** + * @return true when the service is shutdown and all threads have terminated + */ + public boolean isTerminated() { + return scheduler.isTerminated(); + } + + private void cancelAllChores(final boolean mayInterruptIfRunning) { + ArrayList choresToCancel = new ArrayList(); + // Build list of chores to cancel so we can iterate through a set that won't change + // as chores are cancelled. If we tried to cancel each chore while iterating through + // keySet the results would be undefined because the keySet would be changing + for (ScheduledChore chore : scheduledChores.keySet()) { + choresToCancel.add(chore); + } + for (ScheduledChore chore : choresToCancel) { + chore.cancel(mayInterruptIfRunning); + } + choresToCancel.clear(); + } + + /** + * Prints a summary of important details about the chore. Used for debugging purposes + */ + private void printChoreDetails(final String header, ScheduledChore chore) { + LinkedHashMap output = new LinkedHashMap(); + output.put(header, ""); + output.put("Chore name: ", chore.getName()); + output.put("Chore period: ", Integer.toString(chore.getPeriod())); + output.put("Chore timeBetweenRuns: ", Long.toString(chore.getTimeBetweenRuns())); + + for (Entry entry : output.entrySet()) { + LOG.trace(entry.getKey() + entry.getValue()); + } + + LOG.trace("\n"); + } + + /** + * Prints a summary of important details about the service. Used for debugging purposes + */ + private void printChoreServiceDetails(final String header) { + LinkedHashMap output = new LinkedHashMap(); + output.put(header, ""); + output.put("ChoreService corePoolSize: ", Integer.toString(getCorePoolSize())); + output.put("ChoreService scheduledChores: ", Integer.toString(getNumberOfScheduledChores())); + output.put("ChoreService missingStartTimeCount: ", + Integer.toString(getNumberOfChoresMissingStartTime())); + + for (Entry entry : output.entrySet()) { + LOG.trace(entry.getKey() + entry.getValue()); + } + + LOG.trace("\n"); + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java new file mode 100644 index 0000000..3f17f21 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java @@ -0,0 +1,331 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +import com.google.common.annotations.VisibleForTesting; + +/** + * ScheduledChore is a task performed on a period in hbase. ScheduledChores become active once + * scheduled with a {@link ChoreService} via {@link ChoreService#scheduleChore(ScheduledChore)}. The + * chore is run in a {@link ScheduledThreadPoolExecutor} and competes with other ScheduledChores for + * access to the threads in the core thread pool. If an unhandled exception occurs, the chore + * cancellation is logged. Implementers should consider whether or not the Chore will be able to + * execute within the defined period. It is bad practice to define a ScheduledChore whose execution + * time exceeds its period since it will try to hog one of the threads in the {@link ChoreService}'s + * thread pool. + *

+ * Don't subclass ScheduledChore if the task relies on being woken up for something to do, such as + * an entry being added to a queue, etc. + */ +@InterfaceAudience.Private +public abstract class ScheduledChore implements Runnable { + private final Log LOG = LogFactory.getLog(this.getClass()); + + private final String name; + + /** + * Default values for scheduling parameters should they be excluded during construction + */ + private final static TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS; + private final static long DEFAULT_INITIAL_DELAY = 0; + + /** + * Scheduling parameters. Used by ChoreService when scheduling the chore to run periodically + */ + private final int period; + private final TimeUnit timeUnit; + private final long initialDelay; + + /** + * Interface to the ChoreService that this ScheduledChore is scheduled with. null if the chore is + * not scheduled. + */ + private ChoreServicer choreServicer; + + /** + * Variables that encapsulate the meaningful state information + */ + private long timeOfLastRun = -1; + private long timeOfThisRun = -1; + private boolean initialChoreComplete = false; + + /** + * A means by which a ScheduledChore can be stopped. Once a chore recognizes that it has been + * stopped, it will cancel itself. This is particularly useful in the case where a single stopper + * instance is given to multiple chores. In such a case, a single {@link Stoppable#stop(String)} + * command can cause many chores to stop together. + */ + private final Stoppable stopper; + + /** + * + */ + interface ChoreServicer { + /** + * Cancel any ongoing schedules that this chore has with the implementer of this interface. + */ + public void cancelChore(ScheduledChore chore); + public void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning); + + /** + * @return true when the chore is scheduled with the implementer of this interface + */ + public boolean isChoreScheduled(ScheduledChore chore); + + /** + * This method tries to execute the chore immediately. If the chore is executing at the time of + * this call, the chore will begin another execution as soon as the current execution finishes + *

+ * If the chore is not scheduled with a ChoreService, this call will fail. + * @return false when the chore could not be triggered immediately + */ + public boolean triggerNow(ScheduledChore chore); + + /** + * A callback that tells the implementer of this interface that one of the scheduled chores is + * missing its start time. The implication of a chore missing its start time is that the + * service's current means of scheduling may not be sufficient to handle the number of ongoing + * chores (the other explanation is that the chore's execution time is greater than its + * scheduled period). The service should try to increase its concurrency when this callback is + * received. + * @param chore The chore that missed its start time + */ + public void onChoreMissedStartTime(ScheduledChore chore); + } + + /** + * This constructor is for test only. It allows us to create an object and to call chore() on it. + */ + protected ScheduledChore() { + this.name = null; + this.stopper = null; + this.period = 0; + this.initialDelay = DEFAULT_INITIAL_DELAY; + this.timeUnit = DEFAULT_TIME_UNIT; + } + + /** + * @param name Name assigned to Chore. Useful for identification amongst chores of the same type + * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup + * @param period Period with which this Chore repeats execution when scheduled. + */ + public ScheduledChore(final String name, Stoppable stopper, final int period) { + this(name, stopper, period, DEFAULT_INITIAL_DELAY); + } + + /** + * @param name Name assigned to Chore. Useful for identification amongst chores of the same type + * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup + * @param period Period with which this Chore repeats execution when scheduled. + * @param initialDelay Delay before this Chore begins to execute once it has been scheduled. A + * value of 0 means the chore will begin to execute immediately. Negative delays are + * invalid and will be corrected to a value of 0. + */ + public ScheduledChore(final String name, Stoppable stopper, final int period, + final long initialDelay) { + this(name, stopper, period, initialDelay, DEFAULT_TIME_UNIT); + } + + /** + * @param name Name assigned to Chore. Useful for identification amongst chores of the same type + * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup + * @param period Period with which this Chore repeats execution when scheduled. + * @param initialDelay Delay before this Chore begins to execute once it has been scheduled. A + * value of 0 means the chore will begin to execute immediately. Negative delays are + * invalid and will be corrected to a value of 0. + * @param unit The unit that is used to measure period and initialDelay + */ + public ScheduledChore(final String name, Stoppable stopper, final int period, + final long initialDelay, final TimeUnit unit) { + this.name = name; + this.stopper = stopper; + this.period = period; + this.initialDelay = initialDelay < 0 ? 0 : initialDelay; + this.timeUnit = unit; + } + + void resetState() { + timeOfLastRun = -1; + timeOfThisRun = -1; + initialChoreComplete = false; + } + + /** + * @see java.lang.Thread#run() + */ + @Override + public synchronized void run() { + timeOfLastRun = timeOfThisRun; + timeOfThisRun = System.currentTimeMillis(); + if (missedStartTime() && choreServicer != null) { + choreServicer.onChoreMissedStartTime(this); + LOG.info("Chore: " + getName() + " missed its start time"); + } else if (stopper.isStopped()) { + cancel(); + cleanup(); + LOG.info("Chore: " + getName() + " was stopped"); + } else { + try { + if (!initialChoreComplete) { + initialChoreComplete = initialChore(); + } else { + chore(); + } + } catch (Throwable t) { + LOG.fatal(getName() + " error", t); + cancel(); + cleanup(); + } + } + } + + /** + * @return How long has it been since this chore last run. Useful for checking if the chore has + * missed its scheduled start time by too large of a margin + */ + long getTimeBetweenRuns() { + return timeOfThisRun - timeOfLastRun; + } + + /** + * @return true when the time between runs exceeds the acceptable threshold + */ + private boolean missedStartTime() { + return isValidTime(timeOfLastRun) && isValidTime(timeOfThisRun) + && getTimeBetweenRuns() > getMaximumAllowedTimeBetweenRuns(); + } + + private double getMaximumAllowedTimeBetweenRuns() { + // Threshold used to determine if the Chore's current run started too late + return 1.5 * period; + } + + private boolean isValidTime(final long time) { + return time > 0 && time <= System.currentTimeMillis(); + } + + /** + * @return false when the Chore is not currently scheduled with a ChoreService + */ + public boolean triggerNow() { + if (choreServicer != null) { + return choreServicer.triggerNow(this); + } else { + return false; + } + } + + void setChoreServicer(ChoreServicer service) { + // Chores should only ever be scheduled with a single ChoreService. If the choreServicer + // is changing, cancel any existing schedules of this chore. + if (choreServicer != null && choreServicer != service) { + choreServicer.cancelChore(this, false); + } + choreServicer = service; + timeOfThisRun = System.currentTimeMillis(); + } + + public void cancel() { + cancel(false); + } + + public void cancel(boolean mayInterruptIfRunning) { + if (choreServicer != null) choreServicer.cancelChore(this, mayInterruptIfRunning); + + choreServicer = null; + } + + public String getName() { + return name; + } + + public Stoppable getStopper() { + return stopper; + } + + public int getPeriod() { + return period; + } + + public long getInitialDelay() { + return initialDelay; + } + + public final TimeUnit getTimeUnit() { + return timeUnit; + } + + public boolean isInitialChoreComplete() { + return initialChoreComplete; + } + + @VisibleForTesting + ChoreServicer getChoreServicer() { + return choreServicer; + } + + @VisibleForTesting + long getTimeOfLastRun() { + return timeOfLastRun; + } + + @VisibleForTesting + long getTimeOfThisRun() { + return timeOfThisRun; + } + + /** + * @return true when this Chore is scheduled with a ChoreService + */ + public boolean isScheduled() { + return choreServicer != null && choreServicer.isChoreScheduled(this); + } + + @VisibleForTesting + public void choreForTesting() { + chore(); + } + + /** + * The task to execute on each scheduled execution of the Chore + */ + protected abstract void chore(); + + /** + * Override to run a task before we start looping. + * @return true if initial chore was successful + */ + protected boolean initialChore() { + // Default does nothing + return true; + } + + /** + * Override to run cleanup tasks when the Chore encounters an error and must stop running + */ + protected void cleanup() { + } +} diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java new file mode 100644 index 0000000..4697277 --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java @@ -0,0 +1,905 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.CountingChore; +import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.DoNothingChore; +import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.FailInitialChore; +import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.SampleStopper; +import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.SleepingChore; +import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.SlowChore; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestChoreService { + private final Log LOG = LogFactory.getLog(this.getClass()); + private final String TEST_SERVER_NAME = "testServerName"; + + /** + * Tests for the Chore implementation (the one we want to replace with ScheduledChore) + */ + private static class TestHelper { + public static Chore createChore(final String name, int period_millis) { + Stoppable stopper = new Stoppable() { + boolean stopped = false; + + @Override + public void stop(String why) { + stopped = true; + + } + + @Override + public boolean isStopped() { + return stopped; + } + }; + + Chore chore = new Chore(name, 1000, stopper) { + @Override + protected void chore() { + // DO NOTHING + } + }; + + return chore; + } + } + + @Test + public void testExistingChoreImplementationStartStop() throws InterruptedException { + final int initialThreadCount = Thread.activeCount(); + final int period = 1000; + final int sleepTime = 10; + + Chore chore1 = TestHelper.createChore("chore1", period); + Chore chore2 = TestHelper.createChore("chore2", period); + Chore chore3 = TestHelper.createChore("chore3", period); + assertEquals("Thread count mismatch", initialThreadCount, Thread.activeCount()); + + // The current chore implementation creates a thread for each chore to run in separately + // Thus, after starting 3 chores we expect the thread count to go up by 3 + chore1.start(); + chore2.start(); + chore3.start(); + Thread.sleep(sleepTime); + + assertEquals("Thread count mismatch", initialThreadCount + 3, Thread.activeCount()); + + chore1.getStopper().stop(""); + chore2.getStopper().stop(""); + chore3.getStopper().stop(""); + Thread.sleep(period); + + assertEquals("Thread count mismatch", initialThreadCount, Thread.activeCount()); + } + + /** + * Tests for ScheduledChore and ChoreService to ensure proper functionality is enforced. We want + * to make sure that ScheduledChores can do everything Chores can do but with less threads. + */ + /** + * A few ScheduledChore samples that are useful for testing with ChoreService + */ + public static class ScheduledChoreSamples { + /** + * Straight forward stopper implementation that is used by default when one is not provided + */ + public static class SampleStopper implements Stoppable { + private boolean stopped = false; + + @Override + public void stop(String why) { + stopped = true; + } + + @Override + public boolean isStopped() { + return stopped; + } + } + + /** + * Sleeps for longer than the scheduled period. This chore always misses its scheduled periodic + * executions + */ + public static class SlowChore extends ScheduledChore { + public SlowChore(String name, int period) { + this(name, new SampleStopper(), period); + } + + public SlowChore(String name, Stoppable stopper, int period) { + super(name, stopper, period); + } + + @Override + protected boolean initialChore() { + try { + Thread.sleep(getPeriod() * 2); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return true; + } + + @Override + protected void chore() { + try { + Thread.sleep(getPeriod() * 2); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + /** + * Lightweight ScheduledChore used primarily to fill the scheduling queue in tests + */ + public static class DoNothingChore extends ScheduledChore { + public DoNothingChore(String name, int period) { + super(name, new SampleStopper(), period); + } + + public DoNothingChore(String name, Stoppable stopper, int period) { + super(name, stopper, period); + } + + @Override + protected void chore() { + // DO NOTHING + } + + } + + public static class SleepingChore extends ScheduledChore { + private int sleepTime; + + public SleepingChore(String name, int chorePeriod, int sleepTime) { + this(name, new SampleStopper(), chorePeriod, sleepTime); + } + + public SleepingChore(String name, Stoppable stopper, int period, int sleepTime) { + super(name, stopper, period); + this.sleepTime = sleepTime; + } + + @Override + protected boolean initialChore() { + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return true; + } + + @Override + protected void chore() { + try { + Thread.sleep(sleepTime); + } catch (Exception e) { + System.err.println(e.getStackTrace()); + } + } + } + + public static class CountingChore extends ScheduledChore { + private int countOfChoreCalls; + private boolean outputOnTicks = false; + + public CountingChore(String name, int period) { + this(name, new SampleStopper(), period); + } + + public CountingChore(String name, Stoppable stopper, int period) { + this(name, stopper, period, false); + } + + public CountingChore(String name, Stoppable stopper, int period, final boolean outputOnTicks) { + super(name, stopper, period); + this.countOfChoreCalls = 0; + this.outputOnTicks = outputOnTicks; + } + + @Override + protected boolean initialChore() { + countOfChoreCalls++; + if (outputOnTicks) outputTickCount(); + return true; + } + + @Override + protected void chore() { + countOfChoreCalls++; + if (outputOnTicks) outputTickCount(); + } + + private void outputTickCount() { + System.out.println("Chore: " + getName() + ". Count of chore calls: " + countOfChoreCalls); + } + + public int getCountOfChoreCalls() { + return countOfChoreCalls; + } + + public boolean isOutputtingOnTicks() { + return outputOnTicks; + } + + public void setOutputOnTicks(boolean o) { + outputOnTicks = o; + } + } + + /** + * A Chore that will try to execute the initial chore a few times before succeeding. Once the + * initial chore is complete the chore cancels itself + */ + public static class FailInitialChore extends ScheduledChore { + private int numberOfFailures; + private int failureThreshold; + + /** + * @param failThreshold Number of times the Chore fails when trying to execute initialChore + * before succeeding. + */ + public FailInitialChore(String name, int period, int failThreshold) { + this(name, new SampleStopper(), period, failThreshold); + } + + public FailInitialChore(String name, Stoppable stopper, int period, int failThreshold) { + super(name, stopper, period); + numberOfFailures = 0; + failureThreshold = failThreshold; + } + + @Override + protected boolean initialChore() { + if (numberOfFailures < failureThreshold) { + numberOfFailures++; + return false; + } else { + return true; + } + } + + @Override + protected void chore() { + assertTrue(numberOfFailures == failureThreshold); + cancel(false); + } + + } + } + + @Test + public void testInitialChorePrecedence() throws InterruptedException { + ChoreService service = ChoreService.getInstance(TEST_SERVER_NAME); + + final int period = 100; + final int failureThreshold = 5; + ScheduledChore chore = new FailInitialChore("chore", period, failureThreshold); + service.scheduleChore(chore); + + int loopCount = 0; + boolean brokeOutOfLoop = false; + + while (!chore.isInitialChoreComplete() && chore.isScheduled()) { + Thread.sleep(failureThreshold * period); + loopCount++; + if (loopCount > 3) { + brokeOutOfLoop = true; + break; + } + } + + assertFalse(brokeOutOfLoop); + } + + @Test + public void testCancelChore() throws InterruptedException { + final int period = 100; + ScheduledChore chore1 = new DoNothingChore("chore1", period); + ChoreService service = ChoreService.getInstance(TEST_SERVER_NAME); + + service.scheduleChore(chore1); + assertTrue(chore1.isScheduled()); + + chore1.cancel(true); + assertFalse(chore1.isScheduled()); + assertTrue(service.getNumberOfScheduledChores() == 0); + } + + @Test + public void testScheduledChoreConstruction() { + final String NAME = "chore"; + final int PERIOD = 100; + final long VALID_DELAY = 0; + final long INVALID_DELAY = -100; + final TimeUnit UNIT = TimeUnit.NANOSECONDS; + + ScheduledChore chore1 = + new ScheduledChore(NAME, new SampleStopper(), PERIOD, VALID_DELAY, UNIT) { + @Override + protected void chore() { + // DO NOTHING + } + }; + + assertEquals("Name construction failed", chore1.getName(), NAME); + assertEquals("Period construction failed", chore1.getPeriod(), PERIOD); + assertEquals("Initial Delay construction failed", chore1.getInitialDelay(), VALID_DELAY); + assertEquals("TimeUnit construction failed", chore1.getTimeUnit(), UNIT); + + ScheduledChore invalidDelayChore = + new ScheduledChore(NAME, new SampleStopper(), PERIOD, INVALID_DELAY, UNIT) { + @Override + protected void chore() { + // DO NOTHING + } + }; + + assertEquals("Initial Delay should be set to 0 when invalid", 0, + invalidDelayChore.getInitialDelay()); + } + + @Test + public void testChoreServiceConstruction() { + final int corePoolSize = 10; + final int defaultCorePoolSize = ChoreService.MIN_CORE_POOL_SIZE; + + ChoreService customInit = new ChoreService(TEST_SERVER_NAME, corePoolSize); + assertEquals(corePoolSize, customInit.getCorePoolSize()); + + ChoreService defaultInit = new ChoreService(TEST_SERVER_NAME); + assertEquals(defaultCorePoolSize, defaultInit.getCorePoolSize()); + + ChoreService invalidInit = new ChoreService(TEST_SERVER_NAME, -10); + assertEquals(defaultCorePoolSize, invalidInit.getCorePoolSize()); + } + + @Test + public void testFrequencyOfChores() throws InterruptedException { + final int period = 100; + // Small delta that acts as time buffer (allowing chores to complete if running slowly) + final int delta = 5; + ChoreService service = ChoreService.getInstance(TEST_SERVER_NAME); + CountingChore chore = new CountingChore("countingChore", period); + service.scheduleChore(chore); + + Thread.sleep(10 * period + delta); + assertTrue(chore.getCountOfChoreCalls() == 11); + + Thread.sleep(10 * period); + assertTrue(chore.getCountOfChoreCalls() == 21); + } + + @Test + public void testForceTrigger() throws InterruptedException { + final int period = 100; + final int delta = 5; + ChoreService service = ChoreService.getInstance(TEST_SERVER_NAME); + CountingChore chore = new CountingChore("countingChore", period); + service.scheduleChore(chore); + Thread.sleep(10 * period + delta); + + assertTrue(chore.getCountOfChoreCalls() == 11); + + // Force five runs of the chore to occur, sleeping between triggers to ensure the + // chore has time to run + chore.triggerNow(); + Thread.sleep(delta); + chore.triggerNow(); + Thread.sleep(delta); + chore.triggerNow(); + Thread.sleep(delta); + chore.triggerNow(); + Thread.sleep(delta); + chore.triggerNow(); + Thread.sleep(delta); + + assertTrue(chore.getCountOfChoreCalls() == 16); + + Thread.sleep(10 * period + delta); + + assertTrue(chore.getCountOfChoreCalls() == 26); + } + + @Test + public void testCorePoolIncrease() throws InterruptedException { + final int initialCorePoolSize = 3; + ChoreService service = new ChoreService(TEST_SERVER_NAME, initialCorePoolSize); + assertEquals("Should have a core pool of size: " + initialCorePoolSize, initialCorePoolSize, + service.getCorePoolSize()); + + final int slowChorePeriod = 100; + SlowChore slowChore1 = new SlowChore("slowChore1", slowChorePeriod); + SlowChore slowChore2 = new SlowChore("slowChore2", slowChorePeriod); + SlowChore slowChore3 = new SlowChore("slowChore3", slowChorePeriod); + + service.scheduleChore(slowChore1); + service.scheduleChore(slowChore2); + service.scheduleChore(slowChore3); + + Thread.sleep(slowChorePeriod * 10); + assertEquals("Should not create more pools than scheduled chores", 3, service.getCorePoolSize()); + + SlowChore slowChore4 = new SlowChore("slowChore4", slowChorePeriod); + service.scheduleChore(slowChore4); + + Thread.sleep(slowChorePeriod * 10); + assertEquals("Chores are missing their start time. Should expand core pool size", 4, + service.getCorePoolSize()); + + SlowChore slowChore5 = new SlowChore("slowChore5", slowChorePeriod); + service.scheduleChore(slowChore5); + + Thread.sleep(slowChorePeriod * 10); + assertEquals("Chores are missing their start time. Should expand core pool size", 5, + service.getCorePoolSize()); + } + + @Test + public void testCorePoolDecrease() throws InterruptedException { + final int initialCorePoolSize = 3; + ChoreService service = new ChoreService(TEST_SERVER_NAME, initialCorePoolSize); + final int chorePeriod = 10; + + // Slow chores always miss their start time and thus the core pool size should be at least as + // large as the number of running slow chores + SlowChore slowChore1 = new SlowChore("slowChore1", chorePeriod); + SlowChore slowChore2 = new SlowChore("slowChore2", chorePeriod); + SlowChore slowChore3 = new SlowChore("slowChore3", chorePeriod); + + service.scheduleChore(slowChore1); + service.scheduleChore(slowChore2); + service.scheduleChore(slowChore3); + + Thread.sleep(chorePeriod * 10); + assertEquals("Should not create more pools than scheduled chores", + service.getNumberOfScheduledChores(), service.getCorePoolSize()); + + SlowChore slowChore4 = new SlowChore("slowChore4", chorePeriod); + service.scheduleChore(slowChore4); + Thread.sleep(chorePeriod * 10); + assertEquals("Chores are missing their start time. Should expand core pool size", + service.getNumberOfScheduledChores(), service.getCorePoolSize()); + + SlowChore slowChore5 = new SlowChore("slowChore5", chorePeriod); + service.scheduleChore(slowChore5); + Thread.sleep(chorePeriod * 10); + assertEquals("Chores are missing their start time. Should expand core pool size", + service.getNumberOfScheduledChores(), service.getCorePoolSize()); + assertEquals(service.getNumberOfChoresMissingStartTime(), 5); + + slowChore5.cancel(); + Thread.sleep(chorePeriod * 10); + assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()), + service.getCorePoolSize()); + assertEquals(service.getNumberOfChoresMissingStartTime(), 4); + + slowChore4.cancel(); + Thread.sleep(chorePeriod * 10); + assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()), + service.getCorePoolSize()); + assertEquals(service.getNumberOfChoresMissingStartTime(), 3); + + slowChore3.cancel(); + Thread.sleep(chorePeriod * 10); + assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()), + service.getCorePoolSize()); + assertEquals(service.getNumberOfChoresMissingStartTime(), 2); + + slowChore2.cancel(); + Thread.sleep(chorePeriod * 10); + assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()), + service.getCorePoolSize()); + assertEquals(service.getNumberOfChoresMissingStartTime(), 1); + + slowChore1.cancel(); + Thread.sleep(chorePeriod * 10); + assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()), + service.getCorePoolSize()); + assertEquals(service.getNumberOfChoresMissingStartTime(), 0); + + slowChore1.resetState(); + service.scheduleChore(slowChore1); + Thread.sleep(chorePeriod * 10); + assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()), + service.getCorePoolSize()); + assertEquals(service.getNumberOfChoresMissingStartTime(), 1); + + slowChore2.resetState(); + service.scheduleChore(slowChore2); + Thread.sleep(chorePeriod * 10); + assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()), + service.getCorePoolSize()); + assertEquals(service.getNumberOfChoresMissingStartTime(), 2); + + DoNothingChore fastChore1 = new DoNothingChore("fastChore1", chorePeriod); + service.scheduleChore(fastChore1); + Thread.sleep(chorePeriod * 10); + assertEquals(service.getNumberOfChoresMissingStartTime(), 2); + assertEquals("Should increase", 3, service.getCorePoolSize()); + + DoNothingChore fastChore2 = new DoNothingChore("fastChore2", chorePeriod); + service.scheduleChore(fastChore2); + Thread.sleep(chorePeriod * 10); + assertEquals(service.getNumberOfChoresMissingStartTime(), 2); + assertEquals("Should increase", 3, service.getCorePoolSize()); + + DoNothingChore fastChore3 = new DoNothingChore("fastChore3", chorePeriod); + service.scheduleChore(fastChore3); + Thread.sleep(chorePeriod * 10); + assertEquals(service.getNumberOfChoresMissingStartTime(), 2); + assertEquals("Should not change", 3, service.getCorePoolSize()); + + DoNothingChore fastChore4 = new DoNothingChore("fastChore4", chorePeriod); + service.scheduleChore(fastChore4); + Thread.sleep(chorePeriod * 10); + assertEquals(service.getNumberOfChoresMissingStartTime(), 2); + assertEquals("Should not change", 3, service.getCorePoolSize()); + } + + @Test + public void testNumberOfRunningChores() throws InterruptedException { + ChoreService service = new ChoreService(TEST_SERVER_NAME); + + final int period = 100; + final int sleepTime = 5; + + DoNothingChore dn1 = new DoNothingChore("dn1", period); + DoNothingChore dn2 = new DoNothingChore("dn2", period); + DoNothingChore dn3 = new DoNothingChore("dn3", period); + DoNothingChore dn4 = new DoNothingChore("dn4", period); + DoNothingChore dn5 = new DoNothingChore("dn5", period); + + service.scheduleChore(dn1); + service.scheduleChore(dn2); + service.scheduleChore(dn3); + service.scheduleChore(dn4); + service.scheduleChore(dn5); + + Thread.sleep(sleepTime); + assertEquals("Scheduled chore mismatch", 5, service.getNumberOfScheduledChores()); + + dn1.cancel(); + Thread.sleep(sleepTime); + assertEquals("Scheduled chore mismatch", 4, service.getNumberOfScheduledChores()); + + dn2.cancel(); + dn3.cancel(); + dn4.cancel(); + Thread.sleep(sleepTime); + assertEquals("Scheduled chore mismatch", 1, service.getNumberOfScheduledChores()); + + dn5.cancel(); + Thread.sleep(sleepTime); + assertEquals("Scheduled chore mismatch", 0, service.getNumberOfScheduledChores()); + } + + @Test + public void testNumberOfChoresMissingStartTime() throws InterruptedException { + ChoreService service = new ChoreService(TEST_SERVER_NAME); + + final int period = 100; + final int sleepTime = 5 * period; + + // Slow chores sleep for a length of time LONGER than their period. Thus, SlowChores + // ALWAYS miss their start time since their execution takes longer than their period + SlowChore sc1 = new SlowChore("sc1", period); + SlowChore sc2 = new SlowChore("sc2", period); + SlowChore sc3 = new SlowChore("sc3", period); + SlowChore sc4 = new SlowChore("sc4", period); + SlowChore sc5 = new SlowChore("sc5", period); + + service.scheduleChore(sc1); + service.scheduleChore(sc2); + service.scheduleChore(sc3); + service.scheduleChore(sc4); + service.scheduleChore(sc5); + + Thread.sleep(sleepTime); + assertEquals(5, service.getNumberOfChoresMissingStartTime()); + + sc1.cancel(); + Thread.sleep(sleepTime); + assertEquals(4, service.getNumberOfChoresMissingStartTime()); + + sc2.cancel(); + sc3.cancel(); + sc4.cancel(); + Thread.sleep(sleepTime); + assertEquals(1, service.getNumberOfChoresMissingStartTime()); + + sc5.cancel(); + Thread.sleep(sleepTime); + assertEquals(0, service.getNumberOfChoresMissingStartTime()); + } + + /** + * ChoreServices should never have a core pool size that exceeds the number of chores that have + * been scheduled with the service. For example, if 4 ScheduledChores are scheduled with a + * ChoreService, the number of threads in the ChoreService's core pool should never exceed 4 + */ + @Test + public void testMaximumChoreServiceThreads() throws InterruptedException { + ChoreService service = new ChoreService(TEST_SERVER_NAME); + + final int period = 10; + final int sleepTime = 5 * period; + + // Slow chores sleep for a length of time LONGER than their period. Thus, SlowChores + // ALWAYS miss their start time since their execution takes longer than their period. + // Chores that miss their start time will trigger the onChoreMissedStartTime callback + // in the ChoreService. This callback will try to increase the number of core pool + // threads. + SlowChore sc1 = new SlowChore("sc1", period); + SlowChore sc2 = new SlowChore("sc2", period); + SlowChore sc3 = new SlowChore("sc3", period); + SlowChore sc4 = new SlowChore("sc4", period); + SlowChore sc5 = new SlowChore("sc5", period); + + service.scheduleChore(sc1); + service.scheduleChore(sc2); + service.scheduleChore(sc3); + service.scheduleChore(sc4); + service.scheduleChore(sc5); + + Thread.sleep(sleepTime); + assertTrue(service.getCorePoolSize() <= service.getNumberOfScheduledChores()); + + SlowChore sc6 = new SlowChore("sc6", period); + SlowChore sc7 = new SlowChore("sc7", period); + SlowChore sc8 = new SlowChore("sc8", period); + SlowChore sc9 = new SlowChore("sc9", period); + SlowChore sc10 = new SlowChore("sc10", period); + + service.scheduleChore(sc6); + service.scheduleChore(sc7); + service.scheduleChore(sc8); + service.scheduleChore(sc9); + service.scheduleChore(sc10); + + Thread.sleep(sleepTime); + assertTrue(service.getCorePoolSize() <= service.getNumberOfScheduledChores()); + } + + @Test + public void testScheduledChoreReset() throws InterruptedException { + final int period = 100; + ChoreService service = new ChoreService(TEST_SERVER_NAME); + ScheduledChore chore = new DoNothingChore("sampleChore", period); + + // TRUE + assertTrue(!chore.isInitialChoreComplete()); + assertTrue(chore.getTimeOfLastRun() == -1); + assertTrue(chore.getTimeOfThisRun() == -1); + + service.scheduleChore(chore); + Thread.sleep(5 * period); + + // FALSE + assertFalse(!chore.isInitialChoreComplete()); + assertFalse(chore.getTimeOfLastRun() == -1); + assertFalse(chore.getTimeOfThisRun() == -1); + + chore.resetState(); + + // TRUE + assertTrue(!chore.isInitialChoreComplete()); + assertTrue(chore.getTimeOfLastRun() == -1); + assertTrue(chore.getTimeOfThisRun() == -1); + } + + @Test + public void testChangingChoreServices() throws InterruptedException { + final int period = 100; + final int sleepTime = 10; + ChoreService service1 = new ChoreService(TEST_SERVER_NAME); + ChoreService service2 = new ChoreService(TEST_SERVER_NAME); + ScheduledChore chore = new DoNothingChore("sample", period); + + assertFalse(chore.isScheduled()); + assertFalse(service1.isChoreScheduled(chore)); + assertFalse(service2.isChoreScheduled(chore)); + assertTrue(chore.getChoreServicer() == null); + + service1.scheduleChore(chore); + Thread.sleep(sleepTime); + assertTrue(chore.isScheduled()); + assertTrue(service1.isChoreScheduled(chore)); + assertFalse(service2.isChoreScheduled(chore)); + assertFalse(chore.getChoreServicer() == null); + + service2.scheduleChore(chore); + Thread.sleep(sleepTime); + assertTrue(chore.isScheduled()); + assertFalse(service1.isChoreScheduled(chore)); + assertTrue(service2.isChoreScheduled(chore)); + assertFalse(chore.getChoreServicer() == null); + + chore.cancel(); + assertFalse(chore.isScheduled()); + assertFalse(service1.isChoreScheduled(chore)); + assertFalse(service2.isChoreScheduled(chore)); + assertTrue(chore.getChoreServicer() == null); + } + + @Test + public void testTriggerNowFailsWhenNotScheduled() throws InterruptedException { + final int period = 100; + // Small sleep time buffer to allow CountingChore to complete + final int sleep = 5; + ChoreService service = new ChoreService(TEST_SERVER_NAME); + CountingChore chore = new CountingChore("dn", period); + + assertFalse(chore.triggerNow()); + assertTrue(chore.getCountOfChoreCalls() == 0); + + service.scheduleChore(chore); + Thread.sleep(sleep); + assertEquals(1, chore.getCountOfChoreCalls()); + Thread.sleep(period); + assertEquals(2, chore.getCountOfChoreCalls()); + assertTrue(chore.triggerNow()); + Thread.sleep(sleep); + assertTrue(chore.triggerNow()); + Thread.sleep(sleep); + assertTrue(chore.triggerNow()); + Thread.sleep(sleep); + assertEquals(5, chore.getCountOfChoreCalls()); + } + + @Test + public void testStopperForScheduledChores() throws InterruptedException { + ChoreService service = ChoreService.getInstance(TEST_SERVER_NAME); + Stoppable stopperForGroup1 = new SampleStopper(); + Stoppable stopperForGroup2 = new SampleStopper(); + final int period = 100; + final int delta = 10; + + ScheduledChore chore1_group1 = new DoNothingChore("c1g1", stopperForGroup1, period); + ScheduledChore chore2_group1 = new DoNothingChore("c2g1", stopperForGroup1, period); + ScheduledChore chore3_group1 = new DoNothingChore("c3g1", stopperForGroup1, period); + + ScheduledChore chore1_group2 = new DoNothingChore("c1g2", stopperForGroup2, period); + ScheduledChore chore2_group2 = new DoNothingChore("c2g2", stopperForGroup2, period); + ScheduledChore chore3_group2 = new DoNothingChore("c3g2", stopperForGroup2, period); + + service.scheduleChore(chore1_group1); + service.scheduleChore(chore2_group1); + service.scheduleChore(chore3_group1); + service.scheduleChore(chore1_group2); + service.scheduleChore(chore2_group2); + service.scheduleChore(chore3_group2); + + Thread.sleep(delta); + Thread.sleep(10 * period); + assertTrue(chore1_group1.isScheduled()); + assertTrue(chore2_group1.isScheduled()); + assertTrue(chore3_group1.isScheduled()); + assertTrue(chore1_group2.isScheduled()); + assertTrue(chore2_group2.isScheduled()); + assertTrue(chore3_group2.isScheduled()); + + stopperForGroup1.stop("test stopping group 1"); + Thread.sleep(period); + assertFalse(chore1_group1.isScheduled()); + assertFalse(chore2_group1.isScheduled()); + assertFalse(chore3_group1.isScheduled()); + assertTrue(chore1_group2.isScheduled()); + assertTrue(chore2_group2.isScheduled()); + assertTrue(chore3_group2.isScheduled()); + + stopperForGroup2.stop("test stopping group 2"); + Thread.sleep(period); + assertFalse(chore1_group1.isScheduled()); + assertFalse(chore2_group1.isScheduled()); + assertFalse(chore3_group1.isScheduled()); + assertFalse(chore1_group2.isScheduled()); + assertFalse(chore2_group2.isScheduled()); + assertFalse(chore3_group2.isScheduled()); + } + + @Test + public void testShutdownCancelsScheduledChores() throws InterruptedException { + final int period = 100; + ChoreService service = new ChoreService(TEST_SERVER_NAME); + ScheduledChore successChore1 = new DoNothingChore("sc1", period); + ScheduledChore successChore2 = new DoNothingChore("sc2", period); + ScheduledChore successChore3 = new DoNothingChore("sc3", period); + + assertTrue(service.scheduleChore(successChore1)); + assertTrue(successChore1.isScheduled()); + assertTrue(service.scheduleChore(successChore2)); + assertTrue(successChore2.isScheduled()); + assertTrue(service.scheduleChore(successChore3)); + assertTrue(successChore3.isScheduled()); + + service.shutdown(); + + assertFalse(successChore1.isScheduled()); + assertFalse(successChore2.isScheduled()); + assertFalse(successChore3.isScheduled()); + } + + @Test + public void testShutdownWorksWhileChoresAreExecuting() throws InterruptedException { + final int period = 100; + final int sleep = 5 * period; + ChoreService service = new ChoreService(TEST_SERVER_NAME); + ScheduledChore slowChore1 = new SleepingChore("sc1", period, sleep); + ScheduledChore slowChore2 = new SleepingChore("sc2", period, sleep); + ScheduledChore slowChore3 = new SleepingChore("sc3", period, sleep); + + assertTrue(service.scheduleChore(slowChore1)); + assertTrue(service.scheduleChore(slowChore2)); + assertTrue(service.scheduleChore(slowChore3)); + + Thread.sleep(sleep / 2); + service.shutdown(); + + assertFalse(slowChore1.isScheduled()); + assertFalse(slowChore2.isScheduled()); + assertFalse(slowChore3.isScheduled()); + assertTrue(service.isShutdown()); + + Thread.sleep(5); + assertTrue(service.isTerminated()); + } + + @Test + public void testShutdownRejectsNewSchedules() throws InterruptedException { + final int period = 100; + ChoreService service = new ChoreService(TEST_SERVER_NAME); + ScheduledChore successChore1 = new DoNothingChore("sc1", period); + ScheduledChore successChore2 = new DoNothingChore("sc2", period); + ScheduledChore successChore3 = new DoNothingChore("sc3", period); + ScheduledChore failChore1 = new DoNothingChore("fc1", period); + ScheduledChore failChore2 = new DoNothingChore("fc2", period); + ScheduledChore failChore3 = new DoNothingChore("fc3", period); + + assertTrue(service.scheduleChore(successChore1)); + assertTrue(successChore1.isScheduled()); + assertTrue(service.scheduleChore(successChore2)); + assertTrue(successChore2.isScheduled()); + assertTrue(service.scheduleChore(successChore3)); + assertTrue(successChore3.isScheduled()); + + service.shutdown(); + + assertFalse(service.scheduleChore(failChore1)); + assertFalse(failChore1.isScheduled()); + assertFalse(service.scheduleChore(failChore2)); + assertFalse(failChore2.isScheduled()); + assertFalse(service.scheduleChore(failChore3)); + assertFalse(failChore3.isScheduled()); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/HealthCheckChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/HealthCheckChore.java index 8d65c66..e729ec8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/HealthCheckChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/HealthCheckChore.java @@ -28,7 +28,7 @@ import org.apache.hadoop.util.StringUtils; /** * The Class HealthCheckChore for running health checker regularly. */ - public class HealthCheckChore extends Chore { +public class HealthCheckChore extends ScheduledChore { private static Log LOG = LogFactory.getLog(HealthCheckChore.class); private HealthChecker healthChecker; private Configuration config; @@ -38,7 +38,7 @@ import org.apache.hadoop.util.StringUtils; private long startWindow; public HealthCheckChore(int sleepTime, Stoppable stopper, Configuration conf) { - super("HealthChecker", sleepTime, stopper); + super("HealthChecker", stopper, sleepTime); LOG.info("Health Check Chore runs every " + StringUtils.formatTime(sleepTime)); this.config = conf; String healthCheckScript = this.config.get(HConstants.HEALTH_SCRIPT_LOC); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java index 9f71b90..9d18c98 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java @@ -31,12 +31,12 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Chore; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.HFileArchiver; @@ -57,7 +57,7 @@ import org.apache.hadoop.hbase.util.Triple; * table on a period looking for unused regions to garbage collect. */ @InterfaceAudience.Private -public class CatalogJanitor extends Chore { +public class CatalogJanitor extends ScheduledChore { private static final Log LOG = LogFactory.getLog(CatalogJanitor.class.getName()); private final Server server; private final MasterServices services; @@ -66,9 +66,8 @@ public class CatalogJanitor extends Chore { private final Connection connection; CatalogJanitor(final Server server, final MasterServices services) { - super("CatalogJanitor-" + server.getServerName().toShortString(), - server.getConfiguration().getInt("hbase.catalogjanitor.interval", 300000), - server); + super("CatalogJanitor-" + server.getServerName().toShortString(), server, server + .getConfiguration().getInt("hbase.catalogjanitor.interval", 300000)); this.server = server; this.services = services; this.connection = server.getConnection(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java index 6e7024c..e90aae6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java @@ -35,22 +35,7 @@ import io.netty.channel.socket.DatagramPacket; import io.netty.channel.socket.InternetProtocolFamily; import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.handler.codec.MessageToMessageEncoder; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import io.netty.util.internal.StringUtil; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Chore; -import org.apache.hadoop.hbase.ClusterStatus; -import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos; -import org.apache.hadoop.hbase.util.Addressing; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.ExceptionUtil; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.ReflectionUtils; -import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.hbase.util.VersionInfo; import java.io.Closeable; import java.io.IOException; @@ -67,6 +52,22 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos; +import org.apache.hadoop.hbase.util.Addressing; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.ExceptionUtil; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.ReflectionUtils; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.util.VersionInfo; + /** * Class to publish the cluster status to the client. This allows them to know immediately @@ -75,7 +76,7 @@ import java.util.concurrent.ConcurrentMap; * on the client the different timeouts, as the dead servers will be detected separately. */ @InterfaceAudience.Private -public class ClusterStatusPublisher extends Chore { +public class ClusterStatusPublisher extends ScheduledChore { /** * The implementation class used to publish the status. Default is null (no publish). * Use org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher to multicast the @@ -115,8 +116,8 @@ public class ClusterStatusPublisher extends Chore { public ClusterStatusPublisher(HMaster master, Configuration conf, Class publisherClass) throws IOException { - super("HBase clusterStatusPublisher for " + master.getName(), - conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD), master); + super("HBase clusterStatusPublisher for " + master.getName(), master, conf.getInt( + STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD)); this.master = master; this.messagePeriod = conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD); try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 596308f..3a23778 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -375,7 +375,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server { " is not set - not publishing status"); } else { clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass); - Threads.setDaemonThreadRunning(clusterStatusPublisherChore.getThread()); + getChoreService().scheduleChore(clusterStatusPublisherChore); } } activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName, this); @@ -715,11 +715,11 @@ public class HMaster extends HRegionServer implements MasterServices, Server { // been assigned. status.setStatus("Starting balancer and catalog janitor"); this.clusterStatusChore = new ClusterStatusChore(this, balancer); - Threads.setDaemonThreadRunning(clusterStatusChore.getThread()); + getChoreService().scheduleChore(clusterStatusChore); this.balancerChore = new BalancerChore(this); - Threads.setDaemonThreadRunning(balancerChore.getThread()); + getChoreService().scheduleChore(balancerChore); this.catalogJanitorChore = new CatalogJanitor(this, this); - Threads.setDaemonThreadRunning(catalogJanitorChore.getThread()); + getChoreService().scheduleChore(catalogJanitorChore); status.setStatus("Starting namespace manager"); initNamespace(); @@ -956,16 +956,13 @@ public class HMaster extends HRegionServer implements MasterServices, Server { new LogCleaner(cleanerInterval, this, conf, getMasterFileSystem().getFileSystem(), getMasterFileSystem().getOldLogDir()); - Threads.setDaemonThreadRunning(logCleaner.getThread(), - getServerName().toShortString() + ".oldLogCleaner"); + getChoreService().scheduleChore(logCleaner); //start the hfile archive cleaner thread Path archiveDir = HFileArchiveUtil.getArchivePath(conf); this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem() .getFileSystem(), archiveDir); - Threads.setDaemonThreadRunning(hfileCleaner.getThread(), - getServerName().toShortString() + ".archivedHFileCleaner"); - + getChoreService().scheduleChore(hfileCleaner); serviceStarted = true; if (LOG.isTraceEnabled()) { LOG.trace("Started service threads"); @@ -994,8 +991,8 @@ public class HMaster extends HRegionServer implements MasterServices, Server { LOG.debug("Stopping service threads"); } // Clean up and close up shop - if (this.logCleaner!= null) this.logCleaner.interrupt(); - if (this.hfileCleaner != null) this.hfileCleaner.interrupt(); + if (this.logCleaner != null) this.logCleaner.cancel(true); + if (this.hfileCleaner != null) this.hfileCleaner.cancel(true); if (this.quotaManager != null) this.quotaManager.stop(); if (this.activeMasterManager != null) this.activeMasterManager.stop(); if (this.serverManager != null) this.serverManager.stop(); @@ -1006,16 +1003,16 @@ public class HMaster extends HRegionServer implements MasterServices, Server { private void stopChores() { if (this.balancerChore != null) { - this.balancerChore.interrupt(); + this.balancerChore.cancel(true); } if (this.clusterStatusChore != null) { - this.clusterStatusChore.interrupt(); + this.clusterStatusChore.cancel(true); } if (this.catalogJanitorChore != null) { - this.catalogJanitorChore.interrupt(); + this.catalogJanitorChore.cancel(true); } if (this.clusterStatusPublisherChore != null){ - clusterStatusPublisherChore.interrupt(); + clusterStatusPublisherChore.cancel(true); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index 7733256..cd0ac59 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.master; import java.io.IOException; import java.util.List; -import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.quotas.MasterQuotaManager; @@ -62,6 +63,11 @@ public interface MasterServices extends Server { ExecutorService getExecutorService(); /** + * @return Master's instance of {@link ChoreService} + */ + ChoreService getChoreService(); + + /** * @return Master's instance of {@link TableLockManager} */ TableLockManager getTableLockManager(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index bc798cd..8a7a362 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -39,30 +39,31 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SplitLogCounters; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination; import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination.SplitLogManagerDetails; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; -import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.DefaultWALProvider; +import org.apache.hadoop.hbase.wal.WALFactory; import com.google.common.annotations.VisibleForTesting; @@ -103,6 +104,7 @@ public class SplitLogManager { private final Stoppable stopper; private final Configuration conf; + private final ChoreService choreService; public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); // 3 min @@ -139,6 +141,7 @@ public class SplitLogManager { this.server = server; this.conf = conf; this.stopper = stopper; + this.choreService = new ChoreService(serverName.toString() + "_splitLogManager_"); if (server.getCoordinatedStateManager() != null) { SplitLogManagerCoordination coordination = ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) @@ -155,8 +158,7 @@ public class SplitLogManager { this.timeoutMonitor = new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), stopper); - Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), serverName - + ".splitLogManagerTimeoutMonitor"); + choreService.scheduleChore(timeoutMonitor); } private FileStatus[] getFileList(List logDirs, PathFilter filter) throws IOException { @@ -529,8 +531,11 @@ public class SplitLogManager { } public void stop() { + if (choreService != null) { + choreService.shutdown(); + } if (timeoutMonitor != null) { - timeoutMonitor.interrupt(); + timeoutMonitor.cancel(true); } } @@ -684,11 +689,11 @@ public class SplitLogManager { /** * Periodically checks all active tasks and resubmits the ones that have timed out */ - private class TimeoutMonitor extends Chore { + private class TimeoutMonitor extends ScheduledChore { private long lastLog = 0; public TimeoutMonitor(final int period, Stoppable stopper) { - super("SplitLogManager Timeout Monitor", period, stopper); + super("SplitLogManager Timeout Monitor", stopper, period); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerChore.java index 21fe272..bbbfdf2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerChore.java @@ -18,28 +18,27 @@ package org.apache.hadoop.hbase.master.balancer; +import java.io.IOException; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.Chore; import org.apache.hadoop.hbase.master.HMaster; -import java.io.IOException; - /** * Chore that will call HMaster.balance{@link org.apache.hadoop.hbase.master.HMaster#balance()} when * needed. */ @InterfaceAudience.Private -public class BalancerChore extends Chore { +public class BalancerChore extends ScheduledChore { private static final Log LOG = LogFactory.getLog(BalancerChore.class); private final HMaster master; public BalancerChore(HMaster master) { - super(master.getServerName() + "-BalancerChore", - master.getConfiguration().getInt("hbase.balancer.period", 300000), - master); + super(master.getServerName() + "-BalancerChore", master, master.getConfiguration().getInt( + "hbase.balancer.period", 300000)); this.master = master; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/ClusterStatusChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/ClusterStatusChore.java index 046dfb7..58e5808 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/ClusterStatusChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/ClusterStatusChore.java @@ -18,28 +18,27 @@ package org.apache.hadoop.hbase.master.balancer; +import java.io.InterruptedIOException; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.Chore; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.LoadBalancer; -import java.io.InterruptedIOException; - /** * Chore that will feed the balancer the cluster status. */ @InterfaceAudience.Private -public class ClusterStatusChore extends Chore { +public class ClusterStatusChore extends ScheduledChore { private static final Log LOG = LogFactory.getLog(ClusterStatusChore.class); private final HMaster master; private final LoadBalancer balancer; public ClusterStatusChore(HMaster master, LoadBalancer balancer) { - super(master.getServerName() + "-ClusterStatusChore", - master.getConfiguration().getInt("hbase.balancer.statusPeriod", 60000), - master); + super(master.getServerName() + "-ClusterStatusChore", master, master.getConfiguration().getInt( + "hbase.balancer.statusPeriod", 60000)); this.master = master; this.balancer = balancer; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java index 294131e..05a5a9e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java @@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.ipc.RemoteException; @@ -41,7 +41,7 @@ import com.google.common.collect.Lists; * Abstract Cleaner that uses a chain of delegates to clean a directory of files * @param Cleaner delegate class that is dynamically loaded from configuration */ -public abstract class CleanerChore extends Chore { +public abstract class CleanerChore extends ScheduledChore { private static final Log LOG = LogFactory.getLog(CleanerChore.class.getName()); @@ -61,7 +61,7 @@ public abstract class CleanerChore extends Chore */ public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf, FileSystem fs, Path oldFileDir, String confKey) { - super(name, sleepPeriod, s); + super(name, s, sleepPeriod); this.fs = fs; this.oldFileDir = oldFileDir; this.conf = conf; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java index 8cd402d..15962d2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java @@ -28,7 +28,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.security.UserGroupInformation; import com.google.common.annotations.VisibleForTesting; @@ -86,7 +85,7 @@ public class QuotaCache implements Stoppable { Configuration conf = rsServices.getConfiguration(); int period = conf.getInt(REFRESH_CONF_KEY, REFRESH_DEFAULT_PERIOD); refreshChore = new QuotaRefresherChore(period, this); - Threads.setDaemonThreadRunning(refreshChore.getThread()); + rsServices.getChoreService().scheduleChore(refreshChore); } @Override @@ -198,11 +197,11 @@ public class QuotaCache implements Stoppable { } // TODO: Remove this once we have the notification bus - private class QuotaRefresherChore extends Chore { + private class QuotaRefresherChore extends ScheduledChore { private long lastUpdate = 0; public QuotaRefresherChore(final int period, final Stoppable stoppable) { - super("QuotaRefresherChore", period, stoppable); + super("QuotaRefresherChore", stoppable, period); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 5263a99..df36371 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -25,6 +25,7 @@ import java.lang.management.ManagementFactory; import java.lang.management.MemoryUsage; import java.lang.reflect.Constructor; import java.net.BindException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; @@ -46,7 +47,6 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.net.InetAddress; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; @@ -58,7 +58,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.ClockOutOfSyncException; import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.CoordinatedStateManagerFactory; @@ -69,6 +69,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HealthCheckChore; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableDescriptors; @@ -76,10 +77,10 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.YouAreDeadException; import org.apache.hadoop.hbase.ZNodeClearer; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.conf.ConfigurationManager; -import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; @@ -127,10 +128,7 @@ import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; -import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; -import org.apache.hadoop.hbase.wal.WAL; -import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.trace.SpanReceiverHost; @@ -147,6 +145,9 @@ import org.apache.hadoop.hbase.util.JvmPauseMonitor; import org.apache.hadoop.hbase.util.Sleeper; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.VersionInfo; +import org.apache.hadoop.hbase.wal.DefaultWALProvider; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; @@ -325,15 +326,20 @@ public class HRegionServer extends HasThread implements MetricsRegionServer metricsRegionServer; private SpanReceiverHost spanReceiverHost; + /** + * ChoreService used to schedule tasks that we want to run periodically + */ + private final ChoreService choreService; + /* * Check for compactions requests. */ - Chore compactionChecker; + ScheduledChore compactionChecker; /* * Check for flushes */ - Chore periodicFlusher; + ScheduledChore periodicFlusher; protected volatile WALFactory walFactory; @@ -372,7 +378,7 @@ public class HRegionServer extends HasThread implements private HealthCheckChore healthCheckChore; /** The nonce manager chore. */ - private Chore nonceManagerChore; + private ScheduledChore nonceManagerChore; private Map coprocessorServiceHandlers = Maps.newHashMap(); @@ -550,6 +556,7 @@ public class HRegionServer extends HasThread implements rpcServices.start(); putUpWebUI(); this.walRoller = new LogRoller(this, this); + this.choreService = new ChoreService(getServerName().toString()); } protected void login(UserProvider user, String host) throws IOException { @@ -777,8 +784,8 @@ public class HRegionServer extends HasThread implements movedRegionsCleaner = MovedRegionsCleaner.createAndStart(this); if (this.nonceManager != null) { - // Create the chore that cleans up nonces. - nonceManagerChore = this.nonceManager.createCleanupChore(this); + // Create the scheduled chore that cleans up nonces. + nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this); } // Setup the Quota Manager @@ -925,17 +932,10 @@ public class HRegionServer extends HasThread implements if(this.hMemManager != null) this.hMemManager.stop(); if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary(); if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary(); - if (this.compactionChecker != null) - this.compactionChecker.interrupt(); - if (this.healthCheckChore != null) { - this.healthCheckChore.interrupt(); - } - if (this.nonceManagerChore != null) { - this.nonceManagerChore.interrupt(); - } - if (this.storefileRefresher != null) { - this.storefileRefresher.interrupt(); - } + if (this.compactionChecker != null) this.compactionChecker.cancel(true); + if (this.healthCheckChore != null) this.healthCheckChore.cancel(true); + if (this.nonceManagerChore != null) this.nonceManagerChore.cancel(true); + if (this.storefileRefresher != null) this.storefileRefresher.cancel(true); // Stop the quota manager if (rsQuotaManager != null) { @@ -1296,7 +1296,7 @@ public class HRegionServer extends HasThread implements private void startHeapMemoryManager() { this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher, this); if (this.hMemManager != null) { - this.hMemManager.start(); + this.hMemManager.start(getChoreService()); } } @@ -1411,7 +1411,7 @@ public class HRegionServer extends HasThread implements /* * Inner class that runs on a long period checking if regions need compaction. */ - private static class CompactionChecker extends Chore { + private static class CompactionChecker extends ScheduledChore { private final HRegionServer instance; private final int majorCompactPriority; private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE; @@ -1419,7 +1419,7 @@ public class HRegionServer extends HasThread implements CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) { - super("CompactionChecker", sleepTime, h); + super("CompactionChecker", stopper, sleepTime); this.instance = h; LOG.info(this.getName() + " runs every " + StringUtils.formatTime(sleepTime)); @@ -1465,12 +1465,12 @@ public class HRegionServer extends HasThread implements } } - class PeriodicMemstoreFlusher extends Chore { + class PeriodicMemstoreFlusher extends ScheduledChore { final HRegionServer server; final static int RANGE_OF_DELAY = 20000; //millisec final static int MIN_DELAY_TIME = 3000; //millisec public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) { - super(server.getServerName() + "-MemstoreFlusherChore", cacheFlushInterval, server); + super(server.getServerName() + "-MemstoreFlusherChore", server, cacheFlushInterval); this.server = server; } @@ -1611,22 +1611,12 @@ public class HRegionServer extends HasThread implements Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller", uncaughtExceptionHandler); this.cacheFlusher.start(uncaughtExceptionHandler); - Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), getName() + - ".compactionChecker", uncaughtExceptionHandler); - Threads.setDaemonThreadRunning(this.periodicFlusher.getThread(), getName() + - ".periodicFlusher", uncaughtExceptionHandler); - if (this.healthCheckChore != null) { - Threads.setDaemonThreadRunning(this.healthCheckChore.getThread(), getName() + ".healthChecker", - uncaughtExceptionHandler); - } - if (this.nonceManagerChore != null) { - Threads.setDaemonThreadRunning(this.nonceManagerChore.getThread(), getName() + ".nonceCleaner", - uncaughtExceptionHandler); - } - if (this.storefileRefresher != null) { - Threads.setDaemonThreadRunning(this.storefileRefresher.getThread(), getName() + ".storefileRefresher", - uncaughtExceptionHandler); - } + + if (this.compactionChecker != null) choreService.scheduleChore(compactionChecker); + if (this.periodicFlusher != null) choreService.scheduleChore(periodicFlusher); + if (this.healthCheckChore != null) choreService.scheduleChore(healthCheckChore); + if (this.nonceManagerChore != null) choreService.scheduleChore(nonceManagerChore); + if (this.storefileRefresher != null) choreService.scheduleChore(storefileRefresher); // Leases is not a Thread. Internally it runs a daemon thread. If it gets // an unhandled exception, it will just exit. @@ -1719,8 +1709,8 @@ public class HRegionServer extends HasThread implements // Verify that all threads are alive if (!(leases.isAlive() && cacheFlusher.isAlive() && walRoller.isAlive() - && this.compactionChecker.isAlive() - && this.periodicFlusher.isAlive())) { + && this.compactionChecker.isScheduled() + && this.periodicFlusher.isScheduled())) { stop("One or more threads are no longer alive -- stop"); return false; } @@ -1982,21 +1972,18 @@ public class HRegionServer extends HasThread implements * have already been called. */ protected void stopServiceThreads() { - if (this.nonceManagerChore != null) { - Threads.shutdown(this.nonceManagerChore.getThread()); - } - if (this.compactionChecker != null) { - Threads.shutdown(this.compactionChecker.getThread()); - } - if (this.periodicFlusher != null) { - Threads.shutdown(this.periodicFlusher.getThread()); - } + // clean up the scheduled chores + if (this.choreService != null) choreService.shutdown(); + if (this.nonceManagerChore != null) nonceManagerChore.cancel(true); + if (this.compactionChecker != null) compactionChecker.cancel(true); + if (this.periodicFlusher != null) periodicFlusher.cancel(true); + if (this.healthCheckChore != null) healthCheckChore.cancel(true); + if (this.storefileRefresher != null) storefileRefresher.cancel(true); + if (this.cacheFlusher != null) { this.cacheFlusher.join(); } - if (this.healthCheckChore != null) { - Threads.shutdown(this.healthCheckChore.getThread()); - } + if (this.spanReceiverHost != null) { this.spanReceiverHost.closeReceivers(); } @@ -2022,9 +2009,6 @@ public class HRegionServer extends HasThread implements this.replicationSinkHandler.stopReplicationService(); } } - if (this.storefileRefresher != null) { - Threads.shutdown(this.storefileRefresher.getThread()); - } } /** @@ -2424,6 +2408,11 @@ public class HRegionServer extends HasThread implements } @Override + public ChoreService getChoreService() { + return choreService; + } + + @Override public RegionServerQuotaManager getRegionServerQuotaManager() { return rsQuotaManager; } @@ -2932,13 +2921,13 @@ public class HRegionServer extends HasThread implements /** * Creates a Chore thread to clean the moved region cache. */ - protected static class MovedRegionsCleaner extends Chore implements Stoppable { + protected static class MovedRegionsCleaner extends ScheduledChore implements Stoppable { private HRegionServer regionServer; Stoppable stoppable; private MovedRegionsCleaner( HRegionServer regionServer, Stoppable stoppable){ - super("MovedRegionsCleaner for region "+regionServer, TIMEOUT_REGION_MOVED, stoppable); + super("MovedRegionsCleaner for region " + regionServer, stoppable, TIMEOUT_REGION_MOVED); this.regionServer = regionServer; this.stoppable = stoppable; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java index 112634e..43deb58 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java @@ -26,16 +26,16 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.ResizableBlockCache; import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil; -import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.util.ReflectionUtils; import com.google.common.annotations.VisibleForTesting; @@ -183,11 +183,11 @@ public class HeapMemoryManager { return true; } - public void start() { - LOG.info("Starting HeapMemoryTuner chore."); - this.heapMemTunerChore = new HeapMemoryTunerChore(); - Threads.setDaemonThreadRunning(heapMemTunerChore.getThread()); - if (tunerOn) { + public void start(ChoreService service) { + LOG.info("Starting HeapMemoryTuner chore."); + this.heapMemTunerChore = new HeapMemoryTunerChore(); + service.scheduleChore(heapMemTunerChore); + if (tunerOn) { // Register HeapMemoryTuner as a memstore flush listener memStoreFlusher.registerFlushRequestListener(heapMemTunerChore); } @@ -196,7 +196,8 @@ public class HeapMemoryManager { public void stop() { // The thread is Daemon. Just interrupting the ongoing process. LOG.info("Stoping HeapMemoryTuner chore."); - this.heapMemTunerChore.interrupt(); + this.heapMemTunerChore.cancel(true); + } // Used by the test cases. @@ -211,7 +212,7 @@ public class HeapMemoryManager { return this.heapOccupancyPercent; } - private class HeapMemoryTunerChore extends Chore implements FlushRequestListener { + private class HeapMemoryTunerChore extends ScheduledChore implements FlushRequestListener { private HeapMemoryTuner heapMemTuner; private AtomicLong blockedFlushCount = new AtomicLong(); private AtomicLong unblockedFlushCount = new AtomicLong(); @@ -220,7 +221,7 @@ public class HeapMemoryManager { private boolean alarming = false; public HeapMemoryTunerChore() { - super(server.getServerName() + "-HeapMemoryTunerChore", defaultChorePeriod, server); + super(server.getServerName() + "-HeapMemoryTunerChore", server, defaultChorePeriod); Class tunerKlass = server.getConfiguration().getClass( HBASE_RS_HEAP_MEMORY_TUNER_CLASS, DefaultHeapMemoryTuner.class, HeapMemoryTuner.class); heapMemTuner = ReflectionUtils.newInstance(tunerKlass, server.getConfiguration()); @@ -239,7 +240,7 @@ public class HeapMemoryManager { " is above heap occupancy alarm watermark (" + heapOccupancyLowWatermark + ")"); alarming = true; } - getSleeper().skipSleepCycle(); + triggerNow(); try { // Need to sleep ourselves since we've told the chore's sleeper // to skip the next sleep cycle. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java index 3565195..618032a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java @@ -18,17 +18,16 @@ */ package org.apache.hadoop.hbase.regionserver; -import com.google.protobuf.Service; - import java.io.IOException; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.master.TableLockManager; @@ -37,6 +36,8 @@ import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager; import org.apache.hadoop.hbase.wal.WAL; import org.apache.zookeeper.KeeperException; +import com.google.protobuf.Service; + /** * Services provided by {@link HRegionServer} */ @@ -125,6 +126,11 @@ public interface RegionServerServices ExecutorService getExecutorService(); /** + * @return A choreService that can be used to schedule periodic tasks + */ + ChoreService getChoreService(); + + /** * @return set of recovering regions on the hosting region server */ Map getRecoveringRegions(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ServerNonceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ServerNonceManager.java index 128cb6f..0d974b1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ServerNonceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ServerNonceManager.java @@ -18,20 +18,19 @@ */ package org.apache.hadoop.hbase.regionserver; -import java.util.Date; import java.text.SimpleDateFormat; +import java.util.Date; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Chore; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.Stoppable; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; - import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import com.google.common.annotations.VisibleForTesting; @@ -247,13 +246,13 @@ public class ServerNonceManager { } /** - * Creates a chore that is used to clean up old nonces. + * Creates a scheduled chore that is used to clean up old nonces. * @param stoppable Stoppable for the chore. - * @return Chore; the chore is not started. + * @return ScheduledChore; the scheduled chore is not started. */ - public Chore createCleanupChore(Stoppable stoppable) { + public ScheduledChore createCleanupScheduledChore(Stoppable stoppable) { // By default, it will run every 6 minutes (30 / 5). - return new Chore("nonceCleaner", deleteNonceGracePeriod / 5, stoppable) { + return new ScheduledChore("nonceCleaner", stoppable, deleteNonceGracePeriod / 5) { @Override protected void chore() { cleanUpOldNonces(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StorefileRefresherChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StorefileRefresherChore.java index 8a7bd64..d0529ce 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StorefileRefresherChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StorefileRefresherChore.java @@ -25,9 +25,9 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.util.StringUtils; @@ -42,7 +42,7 @@ import org.apache.hadoop.util.StringUtils; * primary region). */ @InterfaceAudience.Private -public class StorefileRefresherChore extends Chore { +public class StorefileRefresherChore extends ScheduledChore { private static final Log LOG = LogFactory.getLog(StorefileRefresherChore.class); @@ -61,7 +61,7 @@ public class StorefileRefresherChore extends Chore { private Map lastRefreshTimes; // encodedName -> long public StorefileRefresherChore(int period, HRegionServer regionServer, Stoppable stoppable) { - super("StorefileRefresherChore", period, stoppable); + super("StorefileRefresherChore", stoppable, period); this.period = period; this.regionServer = regionServer; this.hfileTtl = this.regionServer.getConfiguration().getLong( diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java index 23673b6..309a1c2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java @@ -36,12 +36,14 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.AuthUtil; +import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotEnabledException; @@ -773,8 +775,15 @@ public final class Canary implements Tool { public static void main(String[] args) throws Exception { final Configuration conf = HBaseConfiguration.create(); - AuthUtil.launchAuthChore(conf); + final ChoreService choreService = new ChoreService("CANARY_TOOL"); + final ScheduledChore authChore = AuthUtil.getAuthChore(conf); + if (authChore != null) { + choreService.scheduleChore(authChore); + } + int exitCode = ToolRunner.run(conf, new Canary(), args); + + choreService.shutdown(); System.exit(exitCode); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConnectionCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConnectionCache.java index fc4e74c..794c67b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConnectionCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConnectionCache.java @@ -23,10 +23,11 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.Lock; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.ChoreService; +import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; @@ -53,6 +54,7 @@ public class ConnectionCache { private final UserGroupInformation realUser; private final UserProvider userProvider; private final Configuration conf; + private final ChoreService choreService; private final ThreadLocal effectiveUserNames = new ThreadLocal() { @@ -69,8 +71,8 @@ public class ConnectionCache { @Override public void stop(String why) { isStopped = true;} @Override public boolean isStopped() {return isStopped;} }; - - Chore cleaner = new Chore("ConnectionCleaner", cleanInterval, stoppable) { + this.choreService = new ChoreService("ConnectionCache"); + ScheduledChore cleaner = new ScheduledChore("ConnectionCleaner", stoppable, cleanInterval) { @Override protected void chore() { for (Map.Entry entry: connections.entrySet()) { @@ -93,7 +95,7 @@ public class ConnectionCache { } }; // Start the daemon cleaner chore - Threads.setDaemonThreadRunning(cleaner.getThread()); + choreService.scheduleChore(cleaner); this.realUser = userProvider.getCurrent().getUGI(); this.realUserName = realUser.getShortUserName(); this.userProvider = userProvider; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java index e6e98f2..fa156be 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java @@ -46,8 +46,8 @@ import org.apache.hadoop.hbase.regionserver.Leases; import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.ServerNonceManager; -import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; @@ -243,6 +243,11 @@ public class MockRegionServerServices implements RegionServerServices { } @Override + public ChoreService getChoreService() { + return null; + } + + @Override public void updateRegionFavoredNodesMapping(String encodedRegionName, List favoredNodes) { } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java index 8af6016..903ce0e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java @@ -33,10 +33,9 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; @@ -45,6 +44,8 @@ import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HFileArchiveTestingUtil; @@ -77,7 +78,7 @@ public class TestHFileArchiving { UTIL.startMiniCluster(); // We don't want the cleaner to remove files. The tests do that. - UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().interrupt(); + UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().cancel(true); } private static void setupConf(Configuration conf) { @@ -351,6 +352,7 @@ public class TestHFileArchiving { @Test public void testCleaningRace() throws Exception { final long TEST_TIME = 20 * 1000; + final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); Configuration conf = UTIL.getMiniHBaseCluster().getMaster().getConfiguration(); Path rootDir = UTIL.getDataTestDirOnTestFS("testCleaningRace"); @@ -369,7 +371,7 @@ public class TestHFileArchiving { // The cleaner should be looping without long pauses to reproduce the race condition. HFileCleaner cleaner = new HFileCleaner(1, stoppable, conf, fs, archiveDir); try { - cleaner.start(); + choreService.scheduleChore(cleaner); // Keep creating/archiving new files while the cleaner is running in the other thread long startTime = System.currentTimeMillis(); @@ -404,7 +406,8 @@ public class TestHFileArchiving { } } finally { stoppable.stop("test end"); - cleaner.join(); + cleaner.cancel(true); + choreService.shutdown(); fs.delete(rootDir, true); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java index 1757804..772c345 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java @@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; @@ -212,6 +213,7 @@ public class TestZooKeeperTableArchiveClient { Configuration conf = UTIL.getConfiguration(); // setup the delegate Stoppable stop = new StoppableImplementation(); + final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop); List cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner); final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); @@ -250,7 +252,7 @@ public class TestZooKeeperTableArchiveClient { // need to be checked) in 'otherTable' and the files (which should be retained) in the 'table' CountDownLatch finished = setupCleanerWatching(delegate, cleaners, files.size() + 3); // run the cleaner - cleaner.start(); + choreService.scheduleChore(cleaner); // wait for the cleaner to check all the files finished.await(); // stop the cleaner @@ -412,8 +414,9 @@ public class TestZooKeeperTableArchiveClient { */ private void runCleaner(HFileCleaner cleaner, CountDownLatch finished, Stoppable stop) throws InterruptedException { + final ChoreService choreService = new ChoreService("CLEANER_SERVER_NAME"); // run the cleaner - cleaner.start(); + choreService.scheduleChore(cleaner); // wait for the cleaner to check all the files finished.await(); // stop the cleaner diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index 0fc33db..2b251d7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; @@ -99,8 +100,8 @@ import org.apache.hadoop.hbase.regionserver.Leases; import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.ServerNonceManager; -import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; @@ -551,6 +552,11 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices { } @Override + public ChoreService getChoreService() { + return null; + } + + @Override public void updateRegionFavoredNodesMapping(String encodedRegionName, List favoredNodes) { } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java index cc501ed..7847b79 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java @@ -38,6 +38,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -53,7 +54,6 @@ import org.apache.hadoop.hbase.TableDescriptor; import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ClusterConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HConnectionTestingUtility; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.TableState; @@ -236,6 +236,11 @@ public class TestCatalogJanitor { } @Override + public ChoreService getChoreService() { + return null; + } + + @Override public MasterFileSystem getMasterFileSystem() { return this.mfs; } @@ -632,7 +637,7 @@ public class TestCatalogJanitor { assertTrue(janitor.cleanParent(parent, regions.get(parent))); services.stop("test finished"); - janitor.join(); + janitor.cancel(true); } /** @@ -700,7 +705,7 @@ public class TestCatalogJanitor { assertEquals(2, janitor.scan()); services.stop("test finished"); - janitor.join(); + janitor.cancel(true); } /** @@ -863,7 +868,7 @@ public class TestCatalogJanitor { FSUtils.delete(fs, rootdir, true); services.stop("Test finished"); server.stop("Test finished"); - janitor.join(); + janitor.cancel(true); } /** @@ -948,7 +953,7 @@ public class TestCatalogJanitor { // cleanup services.stop("Test finished"); server.stop("shutdown"); - janitor.join(); + janitor.cancel(true); } private FileStatus[] addMockStoreFiles(int count, MasterServices services, Path storedir) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestTableLockManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestTableLockManager.java index 54f0691..55b1e1f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestTableLockManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestTableLockManager.java @@ -37,18 +37,18 @@ import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Chore; -import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.coprocessor.BaseMasterObserver; import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.ObserverContext; @@ -346,9 +346,10 @@ public class TestTableLockManager { int familyValues = admin.getTableDescriptor(tableName).getFamily(family).getValues().size(); StoppableImplementation stopper = new StoppableImplementation(); + final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); //alter table every 10 sec - Chore alterThread = new Chore("Alter Chore", 10000, stopper) { + ScheduledChore alterThread = new ScheduledChore("Alter Chore", stopper, 10000) { @Override protected void chore() { Random random = new Random(); @@ -367,7 +368,7 @@ public class TestTableLockManager { }; //split table every 5 sec - Chore splitThread = new Chore("Split thread", 5000, stopper) { + ScheduledChore splitThread = new ScheduledChore("Split thread", stopper, 5000) { @Override public void chore() { try { @@ -392,8 +393,8 @@ public class TestTableLockManager { } }; - alterThread.start(); - splitThread.start(); + choreService.scheduleChore(alterThread); + choreService.scheduleChore(splitThread); TEST_UTIL.waitTableEnabled(tableName); while (true) { List regions = admin.getTableRegions(tableName); @@ -424,6 +425,7 @@ public class TestTableLockManager { } admin.close(); + choreService.shutdown(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java index b045c72..f8794fd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java @@ -157,7 +157,6 @@ public class TestHFileCleaner { LOG.debug("Kept hfiles: " + file.getPath().getName()); } - cleaner.interrupt(); // reset the edge back to the original edge EnvironmentEdgeManager.injectEdge(originalEdge); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java index a004134..81e3aaa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java @@ -27,14 +27,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.CoordinatedStateManager; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ClusterConnection; -import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -122,8 +121,6 @@ public class TestHFileLinkCleaner { } assertFalse("HFile should be deleted", fs.exists(FSUtils.getTableDir(archiveDir, tableName))); assertFalse("Link should be deleted", fs.exists(FSUtils.getTableDir(archiveDir, tableLinkName))); - - cleaner.interrupt(); } private static Path getFamilyDirPath (final Path rootDir, final TableName table, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java index 96b4342..f29601c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java @@ -34,11 +34,12 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; @@ -198,8 +199,9 @@ public class TestEndToEndSplitTransaction { Stoppable stopper = new StoppableImplementation(); RegionSplitter regionSplitter = new RegionSplitter(table); RegionChecker regionChecker = new RegionChecker(conf, stopper, TABLENAME); + final ChoreService choreService = new ChoreService("TEST_SERVER"); - regionChecker.start(); + choreService.scheduleChore(regionChecker); regionSplitter.start(); //wait until the splitter is finished @@ -298,17 +300,16 @@ public class TestEndToEndSplitTransaction { /** * Checks regions using MetaScanner, MetaTableAccessor and HTable methods */ - static class RegionChecker extends Chore { + static class RegionChecker extends ScheduledChore { Connection connection; Configuration conf; TableName tableName; Throwable ex; RegionChecker(Configuration conf, Stoppable stopper, TableName tableName) throws IOException { - super("RegionChecker", 10, stopper); + super("RegionChecker", stopper, 10); this.conf = conf; this.tableName = tableName; - this.setDaemon(true); this.connection = ConnectionFactory.createConnection(conf); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java index c1eeea0..698584d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java @@ -27,6 +27,7 @@ import java.lang.management.ManagementFactory; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; @@ -111,7 +112,8 @@ public class TestHeapMemoryManager { new RegionServerStub(conf)); long oldMemstoreHeapSize = memStoreFlusher.memstoreSize; long oldBlockCacheSize = blockCache.maxSize; - heapMemoryManager.start(); + final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); + heapMemoryManager.start(choreService); memStoreFlusher.flushType = FlushType.ABOVE_HIGHER_MARK; memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false); @@ -151,7 +153,8 @@ public class TestHeapMemoryManager { new RegionServerStub(conf)); long oldMemstoreHeapSize = memStoreFlusher.memstoreSize; long oldBlockCacheSize = blockCache.maxSize; - heapMemoryManager.start(); + final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); + heapMemoryManager.start(choreService); blockCache.evictBlock(null); blockCache.evictBlock(null); blockCache.evictBlock(null); @@ -186,7 +189,8 @@ public class TestHeapMemoryManager { // Let the system start with default values for memstore heap and block cache size. HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, new RegionServerStub(conf)); - heapMemoryManager.start(); + final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); + heapMemoryManager.start(choreService); // Now we wants to be in write mode. Set bigger memstore size from CustomHeapMemoryTuner CustomHeapMemoryTuner.memstoreSize = 0.78f; CustomHeapMemoryTuner.blockCacheSize = 0.02f; @@ -215,7 +219,8 @@ public class TestHeapMemoryManager { HeapMemoryTuner.class); HeapMemoryManager heapMemoryManager = new HeapMemoryManager(blockCache, memStoreFlusher, new RegionServerStub(conf)); - heapMemoryManager.start(); + final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); + heapMemoryManager.start(choreService); CustomHeapMemoryTuner.memstoreSize = 0.78f; CustomHeapMemoryTuner.blockCacheSize = 0.02f; Thread.sleep(1500); // Allow the tuner to run once and do necessary memory up @@ -241,7 +246,8 @@ public class TestHeapMemoryManager { new RegionServerStub(conf)); long oldMemstoreSize = memStoreFlusher.memstoreSize; long oldBlockCacheSize = blockCache.maxSize; - heapMemoryManager.start(); + final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); + heapMemoryManager.start(choreService); CustomHeapMemoryTuner.memstoreSize = 0.7f; CustomHeapMemoryTuner.blockCacheSize = 0.3f; Thread.sleep(1500); @@ -283,7 +289,8 @@ public class TestHeapMemoryManager { conf)); long oldMemstoreSize = memStoreFlusher.memstoreSize; long oldBlockCacheSize = blockCache.maxSize; - heapMemoryManager.start(); + final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); + heapMemoryManager.start(choreService); CustomHeapMemoryTuner.memstoreSize = 0.4f; CustomHeapMemoryTuner.blockCacheSize = 0.4f; Thread.sleep(1500); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestServerNonceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestServerNonceManager.java index 9b3c6c1..5efc12c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestServerNonceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestServerNonceManager.java @@ -19,17 +19,20 @@ package org.apache.hadoop.hbase.regionserver; import static org.apache.hadoop.hbase.HConstants.NO_NONCE; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Chore; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; import org.apache.hadoop.hbase.util.Threads; @@ -87,7 +90,7 @@ public class TestServerNonceManager { EnvironmentEdgeManager.injectEdge(edge); try { ServerNonceManager nm = createManager(6); - Chore cleanup = nm.createCleanupChore(Mockito.mock(Stoppable.class)); + ScheduledChore cleanup = nm.createCleanupScheduledChore(Mockito.mock(Stoppable.class)); edge.setValue(1); assertTrue(nm.startOperation(NO_NONCE, 1, createStoppable())); assertTrue(nm.startOperation(NO_NONCE, 2, createStoppable())); @@ -120,7 +123,7 @@ public class TestServerNonceManager { EnvironmentEdgeManager.injectEdge(edge); try { ServerNonceManager nm = createManager(6); - Chore cleanup = nm.createCleanupChore(Mockito.mock(Stoppable.class)); + ScheduledChore cleanup = nm.createCleanupScheduledChore(Mockito.mock(Stoppable.class)); // Add nonces from WAL, including dups. edge.setValue(12); nm.reportOperationFromWal(NO_NONCE, 1, 8); -- 1.9.3 (Apple Git-50)