From d0f83f5e6c367e7a6519481391c0aef27007de3b Mon Sep 17 00:00:00 2001 From: Mikhail Bautin Date: Fri, 13 Jul 2012 22:41:22 +0000 Subject: [PATCH] [HBASE-6388] [89-fb] parallelize close and avoid deleting HLog, unless successful. Author: aaiyer Summary: parallelizing close to reduce the restart time Ensure that we (i) delete HLogs and (ii) inform master only if all the regions were successfully closed. If a region was unable to flush, deleting HLogs might have resulted in a data loss. Test Plan: deploy to dev cluster and check the times taken to close regions Reviewers: pkhemani Reviewed By: pkhemani Differential Revision: https://phabricator.fb.com/D511446 git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/0.89-fb@1361429 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/hbase/regionserver/HRegionServer.java | 132 ++++++++++++++------ 1 files changed, 92 insertions(+), 40 deletions(-) diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 157b7f9..442198c 100755 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -46,10 +46,15 @@ import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -781,35 +786,45 @@ public class HRegionServer implements HRegionInterface, } LOG.info("aborting server at: " + this.serverInfo.getServerName()); } else { - ArrayList closedRegions = closeAllRegions(); - try { - if (this.hlog != null) { - hlog.closeAndDelete(); - } - } catch (Throwable e) { - LOG.error("Close and delete failed", - RemoteExceptionHandler.checkThrowable(e)); - } - try { - HMsg[] exitMsg = new HMsg[closedRegions.size() + 1]; - if (restartRequested) { - exitMsg[0] = REPORT_RESTARTING; - } else { - exitMsg[0] = REPORT_EXITING; - } - // Tell the master what regions we are/were serving - int i = 1; - for (HRegion region: closedRegions) { - exitMsg[i++] = new HMsg(HMsg.Type.MSG_REPORT_CLOSE, - region.getRegionInfo()); + Collection regionsToClose = this.onlineRegions.values(); + ArrayList regionsClosed = closeAllRegions(); + if (regionsToClose.size() == regionsClosed.size()) { + try { + if (this.hlog != null) { + hlog.closeAndDelete(); + } + } catch (Throwable e) { + LOG.error("Close and delete failed", + RemoteExceptionHandler.checkThrowable(e)); } + try { + HMsg[] exitMsg = new HMsg[regionsClosed.size() + 1]; + if (restartRequested) { + exitMsg[0] = REPORT_RESTARTING; + } else { + exitMsg[0] = REPORT_EXITING; + } + // Tell the master what regions we are/were serving + int i = 1; + for (HRegion region: regionsClosed) { + exitMsg[i++] = new HMsg(HMsg.Type.MSG_REPORT_CLOSE, + region.getRegionInfo()); + } - LOG.info("telling master that region server is shutting down at: " + - serverInfo.getServerName()); - hbaseMaster.regionServerReport(serverInfo, exitMsg, (HRegionInfo[])null); - } catch (Throwable e) { - LOG.warn("Failed to send exiting message to master: ", - RemoteExceptionHandler.checkThrowable(e)); + LOG.info("telling master that region server is shutting down at: " + + serverInfo.getServerName()); + hbaseMaster.regionServerReport(serverInfo, exitMsg, (HRegionInfo[])null); + } catch (Throwable e) { + LOG.warn("Failed to send exiting message to master: ", + RemoteExceptionHandler.checkThrowable(e)); + } + } + else { + // if we don't inform the master, then the master is going to detect the expired + // znode and cause log splitting. We need this for the region that we failed to + // close (in case there were unflushed edits). + LOG.info("Failed to close all regions" + + " -- skip informing master that we are shutting down "); } LOG.info("stopping server at: " + this.serverInfo.getServerName()); } @@ -2020,7 +2035,8 @@ public class HRegionServer implements HRegionInterface, } } - /** Called either when the master tells us to restart or from stop() */ + /** Called either when the master tells us to restart or from stop() + * @throws Throwable */ ArrayList closeAllRegions() { ArrayList regionsToClose = new ArrayList(); this.lock.writeLock().lock(); @@ -2030,26 +2046,62 @@ public class HRegionServer implements HRegionInterface, } finally { this.lock.writeLock().unlock(); } - // Close any outstanding scanners. Means they'll get an UnknownScanner + + // First, close any outstanding scanners. Means they'll get an UnknownScanner // exception next time they come in. for (Map.Entry e: this.scanners.entrySet()) { try { e.getValue().close(); - } catch (IOException ioe) { - LOG.warn("Closing scanner " + e.getKey(), ioe); + } catch (Exception ioe) { + LOG.warn("Closing scanner " , ioe); } } - for (HRegion region: regionsToClose) { - if (LOG.isDebugEnabled()) { - LOG.debug("closing region " + Bytes.toString(region.getRegionName())); - } + + // Then, we close the regions + ExecutorService closingPoolExecutor = + new ThreadPoolExecutor(1, Integer.MAX_VALUE, + 60, TimeUnit.SECONDS, + new SynchronousQueue(), + new DaemonThreadFactory("regionserver-closing-")); + + List> futures = + new ArrayList>(regionsToClose.size()); + + for (int i = 0; i < regionsToClose.size(); i++ ) { + futures.add(closingPoolExecutor.submit(createRegionCloseCallable(regionsToClose.get(i)))); + } + + ArrayList regionsClosed = new ArrayList(); + for (int i = 0; i < futures.size(); i++ ) { + Future future = futures.get(i); try { - region.close(abortRequested); - } catch (Throwable e) { - cleanup(e, "Error closing " + Bytes.toString(region.getRegionName())); + future.get(); + // add to regionsClosed only if we don't see an exception. + regionsClosed.add(regionsToClose.get(i)); + } catch (Throwable e1) { + if (e1 instanceof ExecutionException) e1 = e1.getCause(); + LOG.error("Error closingRegion " + regionsToClose.get(i), e1); } } - return regionsToClose; + + return regionsClosed; + } + + private Callable createRegionCloseCallable(final HRegion region) { + return new Callable() { + public Object call() throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("closing region " + Bytes.toString(region.getRegionName())); + } + try { + region.close(abortRequested); + } catch (IOException e) { + cleanup(e, "Error closing " + Bytes.toString(region.getRegionName())); + throw e; + } + return null; + } + }; } /* -- 1.7.0.4