commit b9b91f3c1abc27be884e4506fa036e518fef42f0 Author: Todd Lipcon Date: Mon Sep 12 16:09:42 2011 -0700 HBASE-4367. Add HasThread class diff --git src/main/java/org/apache/hadoop/hbase/Chore.java src/main/java/org/apache/hadoop/hbase/Chore.java index df1514a..38f476b 100644 --- src/main/java/org/apache/hadoop/hbase/Chore.java +++ src/main/java/org/apache/hadoop/hbase/Chore.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.Sleeper; /** @@ -33,7 +34,7 @@ import org.apache.hadoop.hbase.util.Sleeper; *

Don't subclass Chore if the task relies on being woken up for something to * do, such as an entry being added to a queue, etc. */ -public abstract class Chore extends Thread { +public abstract class Chore extends HasThread { private final Log LOG = LogFactory.getLog(this.getClass()); private final Sleeper sleeper; protected final Stoppable stopper; diff --git src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java index da0d90d..49e29ab 100644 --- src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java +++ src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.util.StringUtils; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -546,7 +547,7 @@ public class LruBlockCache implements BlockCache, HeapSize { * * Thread is triggered into action by {@link LruBlockCache#runEviction()} */ - private static class EvictionThread extends Thread { + private static class EvictionThread extends HasThread { private WeakReference cache; public EvictionThread(LruBlockCache cache) { diff --git src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java index 1611349..5d273f0 100644 --- src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java +++ src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary; import org.apache.hadoop.hbase.io.hfile.CacheStats; import org.apache.hadoop.hbase.io.hfile.Cacheable; import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.util.StringUtils; import com.google.common.base.Preconditions; @@ -307,7 +308,7 @@ public class SlabCache implements SlabItemEvictionWatcher, BlockCache, HeapSize /* * Statistics thread. Periodically prints the cache statistics to the log. */ - static class StatisticsThread extends Thread { + static class StatisticsThread extends HasThread { SlabCache ourcache; public StatisticsThread(SlabCache slabCache) { diff --git src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index c0170b4..703c158 100644 --- src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -188,7 +188,7 @@ public class AssignmentManager extends ZooKeeperListener { conf.getInt("hbase.master.assignment.timeoutmonitor.period", 10000), master, conf.getInt("hbase.master.assignment.timeoutmonitor.timeout", 1800000)); - Threads.setDaemonThreadRunning(timeoutMonitor, + Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), master.getServerName() + ".timeoutMonitor"); this.zkTable = new ZKTable(this.master.getZooKeeper()); this.maximumAssignmentAttempts = diff --git src/main/java/org/apache/hadoop/hbase/master/HMaster.java src/main/java/org/apache/hadoop/hbase/master/HMaster.java index cde36e1..7290c26 100644 --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -83,6 +83,7 @@ import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.InfoServer; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Sleeper; @@ -115,7 +116,7 @@ import org.apache.zookeeper.Watcher; * @see HMasterRegionInterface * @see Watcher */ -public class HMaster extends Thread +public class HMaster extends HasThread implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { private static final Log LOG = LogFactory.getLog(HMaster.class.getName()); @@ -181,7 +182,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { // If 'true', the balancer is 'on'. If 'false', the balancer will not run. private volatile boolean balanceSwitch = true; - private Thread catalogJanitorChore; + private CatalogJanitor catalogJanitorChore; private LogCleaner logCleaner; private MasterCoprocessorHost cpHost; @@ -482,8 +483,8 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { // been assigned. status.setStatus("Starting balancer and catalog janitor"); this.balancerChore = getAndStartBalancerChore(this); - this.catalogJanitorChore = - Threads.setDaemonThreadRunning(new CatalogJanitor(this, this)); + this.catalogJanitorChore = new CatalogJanitor(this, this); + Threads.setDaemonThreadRunning(catalogJanitorChore.getThread()); status.markComplete("Initialization successful"); LOG.info("Master has completed initialization"); @@ -668,7 +669,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { new LogCleaner(conf.getInt("hbase.master.cleaner.interval", 60 * 1000), this, conf, getMasterFileSystem().getFileSystem(), getMasterFileSystem().getOldLogDir()); - Threads.setDaemonThreadRunning(logCleaner, n + ".oldLogCleaner"); + Threads.setDaemonThreadRunning(logCleaner.getThread(), n + ".oldLogCleaner"); // Put up info server. int port = this.conf.getInt("hbase.master.info.port", 60010); @@ -718,7 +719,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { master.balance(); } }; - return Threads.setDaemonThreadRunning(chore); + return Threads.setDaemonThreadRunning(chore.getThread()); } private void stopChores() { diff --git src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index 4a0c6d8..8746449 100644 --- src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -168,7 +168,7 @@ public class SplitLogManager extends ZooKeeperListener { } public void finishInitialization() { - Threads.setDaemonThreadRunning(timeoutMonitor, serverName + + Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), serverName + ".splitLogManagerTimeoutMonitor"); // Watcher can be null during tests with Mock'd servers. if (this.watcher != null) { diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index cd809ba..46b90d7 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1326,10 +1326,10 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, this.service.startExecutorService(ExecutorType.RS_CLOSE_META, conf.getInt("hbase.regionserver.executor.closemeta.threads", 1)); - Threads.setDaemonThreadRunning(this.hlogRoller, n + ".logRoller", handler); - Threads.setDaemonThreadRunning(this.cacheFlusher, n + ".cacheFlusher", + Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), n + ".logRoller", handler); + Threads.setDaemonThreadRunning(this.cacheFlusher.getThread(), n + ".cacheFlusher", handler); - Threads.setDaemonThreadRunning(this.compactionChecker, n + + Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), n + ".compactionChecker", handler); // Leases is not a Thread. Internally it runs a daemon thread. If it gets @@ -1531,9 +1531,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, * have already been called. */ protected void join() { - Threads.shutdown(this.compactionChecker); - Threads.shutdown(this.cacheFlusher); - Threads.shutdown(this.hlogRoller); + Threads.shutdown(this.compactionChecker.getThread()); + Threads.shutdown(this.cacheFlusher.getThread()); + Threads.shutdown(this.hlogRoller.getThread()); if (this.compactSplitThread != null) { this.compactSplitThread.join(); } diff --git src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java index 8756efb..c518521 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.util.HasThread; import java.util.ConcurrentModificationException; import java.util.HashMap; @@ -50,7 +51,7 @@ import java.io.IOException; * can be interrupted when there is something to do, rather than the Chore * sleep time which is invariant. */ -public class Leases extends Thread { +public class Leases extends HasThread { private static final Log LOG = LogFactory.getLog(Leases.class.getName()); private final int leasePeriod; private final int leaseCheckFrequency; diff --git src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java index 65a5c36..36f0ae9 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.HasThread; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; @@ -40,7 +41,7 @@ import java.util.concurrent.locks.ReentrantLock; * can be interrupted when there is something to do, rather than the Chore * sleep time which is invariant. */ -class LogRoller extends Thread implements WALActionsListener { +class LogRoller extends HasThread implements WALActionsListener { static final Log LOG = LogFactory.getLog(LogRoller.class); private final ReentrantLock rollLock = new ReentrantLock(); private final AtomicBoolean rollLog = new AtomicBoolean(false); diff --git src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index 8388420..0433efe 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.util.StringUtils; import com.google.common.base.Preconditions; @@ -55,7 +56,7 @@ import com.google.common.base.Preconditions; * * @see FlushRequester */ -class MemStoreFlusher extends Thread implements FlushRequester { +class MemStoreFlusher extends HasThread implements FlushRequester { static final Log LOG = LogFactory.getLog(MemStoreFlusher.class); // These two data members go together. Any entry in the one must have // a corresponding entry in the other. diff --git src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java index abafe5e..64f7f92 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.PairOfSameType; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.zookeeper.ZKAssign; @@ -367,7 +368,7 @@ public class SplitTransaction { * Open daughter region in its own thread. * If we fail, abort this hosting server. */ - class DaughterOpener extends Thread { + class DaughterOpener extends HasThread { private final RegionServerServices services; private final Server server; private final HRegion r; diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 72412be..03c6bb2 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.StringUtils; @@ -400,7 +401,7 @@ public class HLog implements Syncable { this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out); logSyncerThread = new LogSyncer(this.optionalFlushInterval); - Threads.setDaemonThreadRunning(logSyncerThread, + Threads.setDaemonThreadRunning(logSyncerThread.getThread(), Thread.currentThread().getName() + ".logSyncer"); coprocessorHost = new WALCoprocessorHost(this, conf); } @@ -1021,7 +1022,7 @@ public class HLog implements Syncable { * This thread is responsible to call syncFs and buffer up the writers while * it happens. */ - class LogSyncer extends Thread { + class LogSyncer extends HasThread { private final long optionalFlushInterval; diff --git src/main/java/org/apache/hadoop/hbase/util/HasThread.java src/main/java/org/apache/hadoop/hbase/util/HasThread.java new file mode 100644 index 0000000..076604f --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/util/HasThread.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.lang.Thread.UncaughtExceptionHandler; + +/** + * Abstract class which contains a Thread and delegates the common Thread + * methods to that instance. + * + * The purpose of this class is to workaround Sun JVM bug #6915621, in which + * something internal to the JDK uses Thread.currentThread() as a monitor + * lock. This can produce deadlocks like HBASE-4367, HBASE-4101, etc. + */ +public abstract class HasThread implements Runnable { + private final Thread thread; + + public HasThread() { + this.thread = new Thread(this); + } + + public HasThread(String name) { + this.thread = new Thread(this, name); + } + + public Thread getThread() { + return thread; + } + + public abstract void run(); + + //// Begin delegation to Thread + + public final String getName() { + return thread.getName(); + } + + public void interrupt() { + thread.interrupt(); + } + + public final boolean isAlive() { + return thread.isAlive(); + } + + public boolean isInterrupted() { + return thread.isInterrupted(); + } + + public final void setDaemon(boolean on) { + thread.setDaemon(on); + } + + public final void setName(String name) { + thread.setName(name); + } + + public final void setPriority(int newPriority) { + thread.setPriority(newPriority); + } + + public void setUncaughtExceptionHandler(UncaughtExceptionHandler eh) { + thread.setUncaughtExceptionHandler(eh); + } + + public void start() { + thread.start(); + } + + public final void join() throws InterruptedException { + thread.join(); + } + + public final void join(long millis, int nanos) throws InterruptedException { + thread.join(millis, nanos); + } + + public final void join(long millis) throws InterruptedException { + thread.join(millis); + } + //// End delegation to Thread +} diff --git src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java index 5bd94fb..75945ec 100644 --- src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java +++ src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java @@ -234,7 +234,7 @@ public class JVMClusterUtil { // The below has been replaced to debug sometime hangs on end of // tests. // this.master.join(): - Threads.threadDumpingIsAlive(t.master); + Threads.threadDumpingIsAlive(t.master.getThread()); } catch(InterruptedException e) { // continue } diff --git src/test/java/org/apache/hadoop/hbase/HBaseClusterTestCase.java src/test/java/org/apache/hadoop/hbase/HBaseClusterTestCase.java index c18cddb..55013ff 100644 --- src/test/java/org/apache/hadoop/hbase/HBaseClusterTestCase.java +++ src/test/java/org/apache/hadoop/hbase/HBaseClusterTestCase.java @@ -208,7 +208,7 @@ public abstract class HBaseClusterTestCase extends HBaseTestCase { threadDumpingJoin(t); } } - threadDumpingJoin(this.cluster.getMaster()); + threadDumpingJoin(this.cluster.getMaster().getThread()); } protected void threadDumpingJoin(final Thread t) {