From 1929837905568c4710095989f5acf80a73b90384 Mon Sep 17 00:00:00 2001 From: darionyaphet Date: Tue, 28 Jun 2016 22:26:56 +0800 Subject: [PATCH] HBASE-16136 : Check component is alive when closing RegionServer --- .../org/apache/hadoop/hbase/ScheduledChore.java | 7 ++++ .../RegionServerProcedureManagerHost.java | 7 ++++ .../hbase/quotas/RegionServerQuotaManager.java | 7 ++++ .../hbase/regionserver/CompactSplitThread.java | 7 ++++ .../hadoop/hbase/regionserver/HRegionServer.java | 39 +++++++++++++++++----- .../hbase/regionserver/HeapMemoryManager.java | 8 ++++- 6 files changed, 65 insertions(+), 10 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java index 422ca1a..42e94e5 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java @@ -81,6 +81,7 @@ public abstract class ScheduledChore implements Runnable { * command can cause many chores to stop together. */ private final Stoppable stopper; + private boolean isAlive; interface ChoreServicer { /** @@ -348,6 +349,7 @@ public abstract class ScheduledChore implements Runnable { */ protected boolean initialChore() { // Default does nothing + this.isAlive = true; return true; } @@ -355,6 +357,11 @@ public abstract class ScheduledChore implements Runnable { * Override to run cleanup tasks when the Chore encounters an error and must stop running */ protected synchronized void cleanup() { + this.isAlive = false; + } + + public boolean isAlive() { + return isAlive; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManagerHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManagerHost.java index 0f4ea64..7802957 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManagerHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/RegionServerProcedureManagerHost.java @@ -38,6 +38,7 @@ public class RegionServerProcedureManagerHost extends private static final Log LOG = LogFactory .getLog(RegionServerProcedureManagerHost.class); + private boolean isAlive; public void initialize(RegionServerServices rss) throws KeeperException { for (RegionServerProcedureManager proc : procedures) { @@ -53,6 +54,7 @@ public class RegionServerProcedureManagerHost extends proc.start(); LOG.debug("Procedure " + proc.getProcedureSignature() + " is started"); } + this.isAlive = true; } public void stop(boolean force) { @@ -64,6 +66,11 @@ public class RegionServerProcedureManagerHost extends + " cleanly", e); } } + this.isAlive = false; + } + + public boolean isAlive() { + return isAlive; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerQuotaManager.java index 0a63c13..e6cc1b0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerQuotaManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerQuotaManager.java @@ -52,6 +52,7 @@ public class RegionServerQuotaManager { private final RegionServerServices rsServices; private QuotaCache quotaCache = null; + private boolean isAlive; public RegionServerQuotaManager(final RegionServerServices rsServices) { this.rsServices = rsServices; @@ -68,12 +69,18 @@ public class RegionServerQuotaManager { // Initialize quota cache quotaCache = new QuotaCache(rsServices); quotaCache.start(); + this.isAlive = true; } public void stop() { if (isQuotaEnabled()) { quotaCache.stop("shutdown"); } + this.isAlive = false; + } + + public boolean isAlive() { + return isAlive; } public boolean isQuotaEnabled() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index c1f82b9..b019b6d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -90,6 +90,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi private final ThreadPoolExecutor mergePool; private volatile ThroughputController compactionThroughputController; + private boolean isAlive; /** * Splitting should not take place if the total number of regions exceed this. @@ -167,6 +168,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi // compaction throughput controller this.compactionThroughputController = CompactionThroughputControllerFactory.create(server, conf); + this.isAlive = true; } @Override @@ -393,6 +395,11 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi mergePool.shutdown(); longCompactions.shutdown(); shortCompactions.shutdown(); + this.isAlive = false; + } + + public boolean isAlive() { + return this.isAlive; } private void waitFor(ThreadPoolExecutor t, String name) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 1c1000e..5fc64e1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1032,22 +1032,43 @@ public class HRegionServer extends HasThread implements // Send interrupts to wake up threads if sleeping so they notice shutdown. // TODO: Should we check they are alive? If OOME could have exited already - if (this.hMemManager != null) this.hMemManager.stop(); - if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary(); - if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary(); - if (this.compactionChecker != null) this.compactionChecker.cancel(true); - if (this.healthCheckChore != null) this.healthCheckChore.cancel(true); - if (this.nonceManagerChore != null) this.nonceManagerChore.cancel(true); - if (this.storefileRefresher != null) this.storefileRefresher.cancel(true); + if (this.hMemManager != null && this.hMemManager.isAlive()) { + this.hMemManager.stop(); + } + + if (this.cacheFlusher != null && this.cacheFlusher.isAlive()) { + this.cacheFlusher.interruptIfNecessary(); + } + + if (this.compactSplitThread != null && this.compactSplitThread.isAlive()) { + this.compactSplitThread.interruptIfNecessary(); + } + + if (this.compactionChecker != null && this.compactionChecker.isAlive()) { + this.compactionChecker.cancel(true); + } + + if (this.healthCheckChore != null && this.healthCheckChore.isAlive()) { + this.healthCheckChore.cancel(true); + } + + if (this.nonceManagerChore != null && this.nonceManagerChore.isAlive()) { + this.nonceManagerChore.cancel(true); + } + + if (this.storefileRefresher != null && this.storefileRefresher.isAlive()) { + this.storefileRefresher.cancel(true); + } + sendShutdownInterrupt(); // Stop the quota manager - if (rsQuotaManager != null) { + if (rsQuotaManager != null && this.rsQuotaManager.isAlive()) { rsQuotaManager.stop(); } // Stop the snapshot and other procedure handlers, forcefully killing all running tasks - if (rspmHost != null) { + if (rspmHost != null && rspmHost.isAlive()) { rspmHost.stop(this.abortRequested || this.killed); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java index 77a9186..4a773b3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java @@ -84,6 +84,7 @@ public class HeapMemoryManager { private final float heapOccupancyLowWatermark; private long maxHeapSize = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax(); + private boolean isAlive; public static HeapMemoryManager create(Configuration conf, FlushRequester memStoreFlusher, Server server, RegionServerAccounting regionServerAccounting) { @@ -194,6 +195,7 @@ public class HeapMemoryManager { if (tunerOn) { // Register HeapMemoryTuner as a memstore flush listener memStoreFlusher.registerFlushRequestListener(heapMemTunerChore); + this.isAlive = true; } } @@ -201,7 +203,7 @@ public class HeapMemoryManager { // The thread is Daemon. Just interrupting the ongoing process. LOG.info("Stoping HeapMemoryTuner chore."); this.heapMemTunerChore.cancel(true); - + this.isAlive = false; } // Used by the test cases. @@ -209,6 +211,10 @@ public class HeapMemoryManager { return this.tunerOn; } + public boolean isAlive() { + return this.isAlive; + } + /** * @return heap occupancy percentage, 0 <= n <= 1 */ -- 2.9.0