Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (revision 601882) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (working copy) @@ -81,6 +81,8 @@ // Chore threads need to know about the hosting class. protected final AtomicBoolean stopRequested = new AtomicBoolean(false); + protected final AtomicBoolean quiesced = new AtomicBoolean(false); + // Go down hard. Used if file system becomes unavailable and also in // debugging and unit tests. protected volatile boolean abortRequested; @@ -655,6 +657,7 @@ * load/unload instructions. */ public void run() { + boolean quiesceRequested = false; try { init(reportForDuty()); long lastMsg = 0; @@ -685,6 +688,16 @@ HMsg msgs[] = this.hbaseMaster.regionServerReport(serverInfo, outboundArray); lastMsg = System.currentTimeMillis(); + + if (this.quiesced.get() && onlineRegions.size() == 0) { + // We've just told the master we're exiting because we aren't + // serving any regions. So set the stop bit and exit. + LOG.info("Server quiesced and not serving any regions. " + + "Starting shutdown"); + stopRequested.set(true); + continue; + } + // Queue up the HMaster's instruction stream for processing boolean restart = false; for(int i = 0; i < msgs.length && !stopRequested.get() && @@ -692,9 +705,7 @@ switch(msgs[i].getMsg()) { case HMsg.MSG_CALL_SERVER_STARTUP: - if (LOG.isDebugEnabled()) { - LOG.debug("Got call server startup message"); - } + LOG.info("Got call server startup message"); // We the MSG_CALL_SERVER_STARTUP on startup but we can also // get it when the master is panicing because for instance // the HDFS has been yanked out from under it. Be wary of @@ -728,11 +739,22 @@ break; case HMsg.MSG_REGIONSERVER_STOP: - if (LOG.isDebugEnabled()) { - LOG.debug("Got regionserver stop message"); - } + LOG.info("Got regionserver stop message"); stopRequested.set(true); break; + + case HMsg.MSG_REGIONSERVER_QUIESCE: + if (!quiesceRequested) { + LOG.info("Got quiesce server message"); + try { + toDo.put(new ToDoEntry(msgs[i])); + } catch (InterruptedException e) { + throw new RuntimeException("Putting into msgQueue was " + + "interrupted.", e); + } + quiesceRequested = true; + } + break; default: if (fsOk) { @@ -1104,6 +1126,10 @@ try { LOG.info(e.msg.toString()); switch(e.msg.getMsg()) { + + case HMsg.MSG_REGIONSERVER_QUIESCE: + closeUserRegions(); + break; case HMsg.MSG_REGION_OPEN: // Open a region @@ -1152,12 +1178,19 @@ } } - void openRegion(final HRegionInfo regionInfo) throws IOException { + void openRegion(final HRegionInfo regionInfo) { HRegion region = onlineRegions.get(regionInfo.getRegionName()); if(region == null) { - region = new HRegion(new Path(this.conf.get(HConstants.HBASE_DIR)), - this.log, FileSystem.get(conf), conf, regionInfo, null, - this.cacheFlusher); + try { + region = new HRegion(new Path(this.conf.get(HConstants.HBASE_DIR)), + this.log, FileSystem.get(conf), conf, regionInfo, null, + this.cacheFlusher); + + } catch (IOException e) { + LOG.error("error opening region " + regionInfo.getRegionName(), e); + reportClose(region); + return; + } this.lock.writeLock().lock(); try { this.log.setSequenceNumber(region.getMinSequenceId()); @@ -1211,6 +1244,45 @@ return regionsToClose; } + /** Called as the first stage of cluster shutdown. */ + void closeUserRegions() { + ArrayList regionsToClose = new ArrayList(); + this.lock.writeLock().lock(); + try { + synchronized (onlineRegions) { + for (Iterator> i = + onlineRegions.entrySet().iterator(); + i.hasNext();) { + Map.Entry e = i.next(); + HRegion r = e.getValue(); + if (!r.getRegionInfo().isMetaRegion()) { + regionsToClose.add(r); + i.remove(); + } + } + } + } finally { + this.lock.writeLock().unlock(); + } + for(HRegion region: regionsToClose) { + if (LOG.isDebugEnabled()) { + LOG.debug("closing region " + region.getRegionName()); + } + try { + region.close(false); + } catch (IOException e) { + LOG.error("error closing region " + region.getRegionName(), + RemoteExceptionHandler.checkIOException(e)); + } + } + this.quiesced.set(true); + if (onlineRegions.size() == 0) { + outboundMsgs.add(new HMsg(HMsg.MSG_REPORT_EXITING)); + } else { + outboundMsgs.add(new HMsg(HMsg.MSG_REPORT_QUIESCED)); + } + } + // // HRegionInterface // Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTableDescriptor.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTableDescriptor.java (revision 601882) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTableDescriptor.java (working copy) @@ -52,7 +52,8 @@ HColumnDescriptor.CompressionType.NONE, false, Integer.MAX_VALUE, null)); - + private boolean rootregion; + private boolean metaregion; private Text name; // TODO: Does this need to be a treemap? Can it be a HashMap? private final TreeMap families; @@ -69,6 +70,8 @@ /** Used to construct the table descriptors for root and meta tables */ private HTableDescriptor(Text name, HColumnDescriptor family) { + rootregion = name.equals(HConstants.ROOT_TABLE_NAME); + this.metaregion = true; this.name = new Text(name); this.families = new TreeMap(); families.put(family.getName(), family); @@ -92,14 +95,31 @@ * [a-zA-Z_0-9] */ public HTableDescriptor(String name) { + this(); Matcher m = LEGAL_TABLE_NAME.matcher(name); if (m == null || !m.matches()) { throw new IllegalArgumentException( "Table names can only contain 'word characters': i.e. [a-zA-Z_0-9"); } - this.name = new Text(name); - this.families = new TreeMap(); + this.name.set(name); + this.rootregion = false; + this.metaregion = false; } + + /** @return true if this is the root region */ + public boolean isRootRegion() { + return rootregion; + } + + /** @return true if table is the meta table */ + public boolean isMetaTable() { + return metaregion && !rootregion; + } + + /** @return true if this is a meta region (part of the root or meta tables) */ + public boolean isMetaRegion() { + return metaregion; + } /** @return name of table */ public Text getName() { @@ -165,6 +185,8 @@ /** {@inheritDoc} */ public void write(DataOutput out) throws IOException { + out.writeBoolean(rootregion); + out.writeBoolean(metaregion); name.write(out); out.writeInt(families.size()); for(Iterator it = families.values().iterator(); @@ -175,6 +197,8 @@ /** {@inheritDoc} */ public void readFields(DataInput in) throws IOException { + this.rootregion = in.readBoolean(); + this.metaregion = in.readBoolean(); this.name.readFields(in); int numCols = in.readInt(); families.clear(); Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (revision 601882) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (working copy) @@ -31,8 +31,6 @@ import java.util.Random; import java.util.Set; import java.util.SortedMap; -import java.util.Timer; -import java.util.TimerTask; import java.util.TreeMap; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; @@ -74,6 +72,7 @@ HMasterRegionInterface { static final Log LOG = LogFactory.getLog(HMaster.class.getName()); + static final Long ZERO_L = Long.valueOf(0L); /** {@inheritDoc} */ public long getProtocolVersion(String protocol, @@ -93,6 +92,8 @@ // started here in HMaster rather than have them have to know about the // hosting class volatile AtomicBoolean closed = new AtomicBoolean(true); + volatile AtomicBoolean shutdownRequested = new AtomicBoolean(false); + volatile AtomicInteger quiescedMetaServers = new AtomicInteger(0); volatile boolean fsOk; Path dir; HBaseConfiguration conf; @@ -102,9 +103,9 @@ int numRetries; long maxRegionOpenTime; - DelayQueue shutdownQueue = - new DelayQueue(); - BlockingQueue msgQueue = + DelayQueue delayedToDoQueue = + new DelayQueue(); + BlockingQueue toDoQueue = new LinkedBlockingQueue(); int leaseTimeout; @@ -424,8 +425,7 @@ || killedRegions.contains(info.getRegionName()) // queued for offline || regionsToDelete.contains(info.getRegionName())) { // queued for delete - unassignedRegions.remove(info.getRegionName()); - assignAttempts.remove(info.getRegionName()); + unassignedRegions.remove(info); return; } HServerInfo storedInfo = null; @@ -458,7 +458,7 @@ if (!deadServer && ((storedInfo != null && storedInfo.getStartCode() != startCode) || (storedInfo == null && - !unassignedRegions.containsKey(info.getRegionName()) && + !unassignedRegions.containsKey(info) && !pendingRegions.contains(info.getRegionName()) ) ) @@ -495,8 +495,7 @@ } } // Now get the region assigned - unassignedRegions.put(info.getRegionName(), info); - assignAttempts.put(info.getRegionName(), Long.valueOf(0L)); + unassignedRegions.put(info, ZERO_L); } } } @@ -818,8 +817,9 @@ new ConcurrentHashMap(); /** - * The 'unassignedRegions' table maps from a region name to a HRegionInfo - * record, which includes the region's table, its id, and its start/end keys. + * The 'unassignedRegions' table maps from a HRegionInfo to a timestamp that + * indicates the last time we *tried* to assign the region to a RegionServer. + * If the timestamp is out of date, then we can try to reassign it. * * We fill 'unassignedRecords' by scanning ROOT and META tables, learning the * set of all known valid regions. @@ -827,17 +827,10 @@ *

Items are removed from this list when a region server reports in that * the region has been deployed. */ - final SortedMap unassignedRegions = - Collections.synchronizedSortedMap(new TreeMap()); + final SortedMap unassignedRegions = + Collections.synchronizedSortedMap(new TreeMap()); /** - * The 'assignAttempts' table maps from regions to a timestamp that indicates - * the last time we *tried* to assign the region to a RegionServer. If the - * timestamp is out of date, then we can try to reassign it. - */ - final Map assignAttempts = new ConcurrentHashMap(); - - /** * Regions that have been assigned, and the server has reported that it has * started serving it, but that we have not yet recorded in the meta table. */ @@ -978,10 +971,7 @@ */ void unassignRootRegion() { this.rootRegionLocation.set(null); - this.unassignedRegions.put(HRegionInfo.rootRegionInfo.getRegionName(), - HRegionInfo.rootRegionInfo); - this.assignAttempts.put(HRegionInfo.rootRegionInfo.getRegionName(), - Long.valueOf(0L)); + this.unassignedRegions.put(HRegionInfo.rootRegionInfo, ZERO_L); } /** @@ -1030,7 +1020,11 @@ * @return Location of the -ROOT- region. */ public HServerAddress getRootRegionLocation() { - return this.rootRegionLocation.get(); + HServerAddress rootServer = null; + if (!shutdownRequested.get() && !closed.get()) { + rootServer = this.rootRegionLocation.get(); + } + return rootServer; } /** @@ -1054,11 +1048,11 @@ if (rootRegionLocation.get() != null) { // We can't process server shutdowns unless the root region is online - op = this.shutdownQueue.poll(); + op = this.delayedToDoQueue.poll(); } if (op == null ) { try { - op = msgQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); + op = toDoQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { // continue } @@ -1077,7 +1071,7 @@ // for the missing meta region(s) to come back online, but since it // is waiting, it cannot process the meta region online operation it // is waiting for. So put this operation back on the queue for now. - if (msgQueue.size() == 0) { + if (toDoQueue.size() == 0) { // The queue is currently empty so wait for a while to see if what // we need comes in first sleeper.sleep(); @@ -1086,9 +1080,10 @@ if (LOG.isDebugEnabled()) { LOG.debug("Put " + op.toString() + " back on queue"); } - msgQueue.put(op); + toDoQueue.put(op); } catch (InterruptedException e) { - throw new RuntimeException("Putting into msgQueue was interrupted.", e); + throw new RuntimeException( + "Putting into toDoQueue was interrupted.", e); } } } catch (Exception ex) { @@ -1106,9 +1101,10 @@ } LOG.warn("Processing pending operations: " + op.toString(), ex); try { - msgQueue.put(op); + toDoQueue.put(op); } catch (InterruptedException e) { - throw new RuntimeException("Putting into msgQueue was interrupted.", e); + throw new RuntimeException( + "Putting into toDoQueue was interrupted.", e); } } } @@ -1255,7 +1251,7 @@ if (root != null && root.equals(storedInfo.getServerAddress())) { unassignRootRegion(); } - shutdownQueue.put(new ProcessServerShutdown(storedInfo)); + delayedToDoQueue.put(new ProcessServerShutdown(storedInfo)); } // record new server @@ -1302,48 +1298,70 @@ throws IOException { String serverName = serverInfo.getServerAddress().toString().trim(); long serverLabel = getServerLabel(serverName); - if (msgs.length > 0 && msgs[0].getMsg() == HMsg.MSG_REPORT_EXITING) { - synchronized (serversToServerInfo) { - try { - // HRegionServer is shutting down. Cancel the server's lease. - // Note that canceling the server's lease takes care of updating - // serversToServerInfo, etc. - if (LOG.isDebugEnabled()) { - LOG.debug("Region server " + serverName + - ": MSG_REPORT_EXITING -- cancelling lease"); - } +// if (LOG.isDebugEnabled()) { +// LOG.debug("received heartbeat from " + serverName); +// } + if (msgs.length > 0) { + if (msgs[0].getMsg() == HMsg.MSG_REPORT_EXITING) { + synchronized (serversToServerInfo) { + try { + // HRegionServer is shutting down. Cancel the server's lease. + // Note that canceling the server's lease takes care of updating + // serversToServerInfo, etc. + if (LOG.isDebugEnabled()) { + LOG.debug("Region server " + serverName + + ": MSG_REPORT_EXITING -- cancelling lease"); + } - if (cancelLease(serverName, serverLabel)) { - // Only process the exit message if the server still has a lease. - // Otherwise we could end up processing the server exit twice. - LOG.info("Region server " + serverName + - ": MSG_REPORT_EXITING -- lease cancelled"); - // Get all the regions the server was serving reassigned - // (if we are not shutting down). - if (!closed.get()) { - for (int i = 1; i < msgs.length; i++) { - HRegionInfo info = msgs[i].getRegionInfo(); - if (info.getTableDesc().getName().equals(ROOT_TABLE_NAME)) { - rootRegionLocation.set(null); - } else if (info.getTableDesc().getName().equals(META_TABLE_NAME)) { - onlineMetaRegions.remove(info.getStartKey()); + if (cancelLease(serverName, serverLabel)) { + // Only process the exit message if the server still has a lease. + // Otherwise we could end up processing the server exit twice. + LOG.info("Region server " + serverName + + ": MSG_REPORT_EXITING -- lease cancelled"); + // Get all the regions the server was serving reassigned + // (if we are not shutting down). + if (!closed.get()) { + for (int i = 1; i < msgs.length; i++) { + HRegionInfo info = msgs[i].getRegionInfo(); + if (info.isRootRegion()) { + rootRegionLocation.set(null); + } else if (info.isMetaTable()) { + onlineMetaRegions.remove(info.getStartKey()); + } + + this.unassignedRegions.put(info, ZERO_L); } - - this.unassignedRegions.put(info.getRegionName(), info); - this.assignAttempts.put(info.getRegionName(), Long.valueOf(0L)); } } + + // We don't need to return anything to the server because it isn't + // going to do any more work. + return new HMsg[0]; + } finally { + serversToServerInfo.notifyAll(); } - - // We don't need to return anything to the server because it isn't - // going to do any more work. - return new HMsg[0]; - } finally { - serversToServerInfo.notifyAll(); } + } else if (msgs[0].getMsg() == HMsg.MSG_REPORT_QUIESCED) { + LOG.info("Region server " + serverName + " quiesced"); + if(quiescedMetaServers.incrementAndGet() == serversToServerInfo.size()) { + // If the only servers we know about are meta servers, then we can + // proceed with shutdown + LOG.info("All user tables quiesced. Proceeding with shutdown"); + closed.set(true); + synchronized(toDoQueue) { + toDoQueue.clear(); // Empty the queue + delayedToDoQueue.clear(); // Empty shut down queue + toDoQueue.notifyAll(); // Wake main thread + } + } } } + if (shutdownRequested.get() && !closed.get()) { + // Tell the server to stop serving any user regions + return new HMsg[]{new HMsg(HMsg.MSG_REGIONSERVER_QUIESCE)}; + } + if (closed.get()) { // Tell server to shut down if we are shutting down. This should // happen after check of MSG_REPORT_EXITING above, since region server @@ -1476,62 +1494,86 @@ switch (incomingMsgs[i].getMsg()) { case HMsg.MSG_REPORT_PROCESS_OPEN: - synchronized (this.assignAttempts) { + synchronized (unassignedRegions) { // Region server has acknowledged request to open region. - // Extend region open time by 1/2 max region open time. - assignAttempts.put(region.getRegionName(), - Long.valueOf(assignAttempts.get( - region.getRegionName()).longValue() + - (this.maxRegionOpenTime / 2))); + // Extend region open time by max region open time. + unassignedRegions.put(region, + System.currentTimeMillis() + this.maxRegionOpenTime); } break; case HMsg.MSG_REPORT_OPEN: - HRegionInfo regionInfo = unassignedRegions.get(region.getRegionName()); - - if (regionInfo == null) { - - if (LOG.isDebugEnabled()) { - LOG.debug("region server " + info.getServerAddress().toString() - + " should not have opened region " + region.getRegionName()); + boolean duplicateAssignment = false; + synchronized (unassignedRegions) { + if (unassignedRegions.remove(region) == null) { + if (region.getRegionName().compareTo( + HRegionInfo.rootRegionInfo.getRegionName()) == 0) { + // Root region + HServerAddress rootServer = rootRegionLocation.get(); + if (rootServer != null) { + if (rootServer.toString().compareTo(serverName) == 0) { + // A duplicate open report from the correct server + break; + } + // We received an open report on the root region, but it is + // assigned to a different server + duplicateAssignment = true; + } + } else { + // Not root region. If it is not a pending region, then we are + // going to treat it as a duplicate assignment + if (pendingRegions.contains(region.getRegionName())) { + // A duplicate report from the correct server + break; + } + // Although we can't tell for certain if this is a duplicate + // report from the correct server, we are going to treat it + // as such + duplicateAssignment = true; + } } + if (duplicateAssignment) { + if (LOG.isDebugEnabled()) { + LOG.debug("region server " + info.getServerAddress().toString() + + " should not have opened region " + region.getRegionName()); + } - // This Region should not have been opened. - // Ask the server to shut it down, but don't report it as closed. - // Otherwise the HMaster will think the Region was closed on purpose, - // and then try to reopen it elsewhere; that's not what we want. + // This Region should not have been opened. + // Ask the server to shut it down, but don't report it as closed. + // Otherwise the HMaster will think the Region was closed on purpose, + // and then try to reopen it elsewhere; that's not what we want. - returnMsgs.add(new HMsg(HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT, region)); + returnMsgs.add( + new HMsg(HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT, region)); - } else { - LOG.info(info.getServerAddress().toString() + " serving " + - region.getRegionName()); - - if (region.getRegionName().compareTo( - HRegionInfo.rootRegionInfo.getRegionName()) == 0) { - // Store the Root Region location (in memory) - synchronized (rootRegionLocation) { - this.rootRegionLocation.set( - new HServerAddress(info.getServerAddress())); - this.rootRegionLocation.notifyAll(); - } } else { - // Note that the table has been assigned and is waiting for the meta - // table to be updated. + LOG.info(info.getServerAddress().toString() + " serving " + + region.getRegionName()); - pendingRegions.add(region.getRegionName()); + if (region.getRegionName().compareTo( + HRegionInfo.rootRegionInfo.getRegionName()) == 0) { + // Store the Root Region location (in memory) + synchronized (rootRegionLocation) { + this.rootRegionLocation.set( + new HServerAddress(info.getServerAddress())); + this.rootRegionLocation.notifyAll(); + } + } else { + // Note that the table has been assigned and is waiting for the + // meta table to be updated. - // Queue up an update to note the region location. + pendingRegions.add(region.getRegionName()); - try { - msgQueue.put(new ProcessRegionOpen(info, region)); - } catch (InterruptedException e) { - throw new RuntimeException("Putting into msgQueue was interrupted.", e); - } - } - // Remove from unassigned list so we don't assign it to someone else - this.unassignedRegions.remove(region.getRegionName()); - this.assignAttempts.remove(region.getRegionName()); + // Queue up an update to note the region location. + + try { + toDoQueue.put(new ProcessRegionOpen(info, region)); + } catch (InterruptedException e) { + throw new RuntimeException( + "Putting into toDoQueue was interrupted.", e); + } + } + } } break; @@ -1563,15 +1605,15 @@ // could create a race with the pending close if it gets // reassigned before the close is processed. - unassignedRegions.remove(region.getRegionName()); - assignAttempts.remove(region.getRegionName()); + unassignedRegions.remove(region); try { - msgQueue.put(new ProcessRegionClose(region, reassignRegion, + toDoQueue.put(new ProcessRegionClose(region, reassignRegion, deleteRegion)); } catch (InterruptedException e) { - throw new RuntimeException("Putting into msgQueue was interrupted.", e); + throw new RuntimeException( + "Putting into toDoQueue was interrupted.", e); } } break; @@ -1580,12 +1622,10 @@ // A region has split. HRegionInfo newRegionA = incomingMsgs[++i].getRegionInfo(); - unassignedRegions.put(newRegionA.getRegionName(), newRegionA); - assignAttempts.put(newRegionA.getRegionName(), Long.valueOf(0L)); + unassignedRegions.put(newRegionA, ZERO_L); HRegionInfo newRegionB = incomingMsgs[++i].getRegionInfo(); - unassignedRegions.put(newRegionB.getRegionName(), newRegionB); - assignAttempts.put(newRegionB.getRegionName(), Long.valueOf(0L)); + unassignedRegions.put(newRegionB, ZERO_L); LOG.info("region " + region.getRegionName() + " split. New regions are: " + newRegionA.getRegionName() + ", " + @@ -1631,15 +1671,22 @@ private void assignRegions(HServerInfo info, String serverName, ArrayList returnMsgs) { - synchronized (this.assignAttempts) { + synchronized (this.unassignedRegions) { // We need to hold a lock on assign attempts while we figure out what to // do so that multiple threads do not execute this method in parallel // resulting in assigning the same region to multiple servers. long now = System.currentTimeMillis(); - Set regionsToAssign = new HashSet(); - for (Map.Entry e: this.assignAttempts.entrySet()) { + Set regionsToAssign = new HashSet(); + for (Map.Entry e: this.unassignedRegions.entrySet()) { + HRegionInfo i = e.getKey(); + if (numberOfMetaRegions.get() != onlineMetaRegions.size() && + !i.isMetaRegion()) { + // Can't assign user regions until all meta regions have been assigned + // and are on-line + continue; + } long diff = now - e.getValue().longValue(); if (diff > this.maxRegionOpenTime) { regionsToAssign.add(e.getKey()); @@ -1720,11 +1767,10 @@ } now = System.currentTimeMillis(); - for (Text regionName: regionsToAssign) { - HRegionInfo regionInfo = this.unassignedRegions.get(regionName); - LOG.info("assigning region " + regionName + " to server " + - serverName); - this.assignAttempts.put(regionName, Long.valueOf(now)); + for (HRegionInfo regionInfo: regionsToAssign) { + LOG.info("assigning region " + regionInfo.getRegionName() + + " to server " + serverName); + this.unassignedRegions.put(regionInfo, Long.valueOf(now)); returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo)); if (--nregions <= 0) { break; @@ -1773,14 +1819,13 @@ * @param serverName * @param returnMsgs */ - private void assignRegionsToOneServer(final Set regionsToAssign, + private void assignRegionsToOneServer(final Set regionsToAssign, final String serverName, final ArrayList returnMsgs) { long now = System.currentTimeMillis(); - for (Text regionName: regionsToAssign) { - HRegionInfo regionInfo = this.unassignedRegions.get(regionName); - LOG.info("assigning region " + regionName + " to the only server " + - serverName); - this.assignAttempts.put(regionName, Long.valueOf(now)); + for (HRegionInfo regionInfo: regionsToAssign) { + LOG.info("assigning region " + regionInfo.getRegionName() + + " to the only server " + serverName); + this.unassignedRegions.put(regionInfo, Long.valueOf(now)); returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo)); } } @@ -1789,10 +1834,63 @@ * Some internal classes to manage msg-passing and region server operations */ - private abstract class RegionServerOperation { - RegionServerOperation() {} + private abstract class RegionServerOperation implements Delayed { + private long expire; - abstract boolean process() throws IOException; + protected RegionServerOperation() { + // Set the future time at which we expect to be released from the + // DelayQueue we're inserted in on lease expiration. + this.expire = System.currentTimeMillis() + leaseTimeout / 2; + } + + /** {@inheritDoc} */ + public long getDelay(TimeUnit unit) { + return unit.convert(this.expire - System.currentTimeMillis(), + TimeUnit.MILLISECONDS); + } + + /** {@inheritDoc} */ + public int compareTo(Delayed o) { + return Long.valueOf(getDelay(TimeUnit.MILLISECONDS) + - o.getDelay(TimeUnit.MILLISECONDS)).intValue(); + } + + protected void requeue() { + this.expire = System.currentTimeMillis() + leaseTimeout / 2; + delayedToDoQueue.put(this); + } + + protected boolean rootAvailable() { + boolean available = true; + if (rootRegionLocation.get() == null) { + available = false; + requeue(); + } + return available; + } + + protected boolean metaTableAvailable() { + boolean available = true; + if (LOG.isDebugEnabled()) { + LOG.debug("numberOfMetaRegions: " + numberOfMetaRegions.get() + + ", onlineMetaRegions.size(): " + onlineMetaRegions.size()); + } + if (numberOfMetaRegions.get() != onlineMetaRegions.size()) { + // We can't proceed because not all of the meta regions are online. + // We can't block either because that would prevent the meta region + // online message from being processed. In order to prevent spinning + // in the run queue, put this request on the delay queue to give + // other threads the opportunity to get the meta regions on-line. + if (LOG.isDebugEnabled()) { + LOG.debug("Requeuing because not all meta regions are online"); + } + available = false; + requeue(); + } + return available; + } + + protected abstract boolean process() throws IOException; } /** @@ -1800,15 +1898,13 @@ * The region server's log file needs to be split up for each region it was * serving, and the regions need to get reassigned. */ - private class ProcessServerShutdown extends RegionServerOperation - implements Delayed { - private long expire; + private class ProcessServerShutdown extends RegionServerOperation { private HServerAddress deadServer; private String deadServerName; private Path oldLogDir; - private transient boolean logSplit; - private transient boolean rootChecked; - private transient boolean rootRescanned; + private boolean logSplit; + private boolean rootChecked; + private boolean rootRescanned; private class ToDoEntry { boolean deleteRegion; @@ -1824,7 +1920,10 @@ } } - ProcessServerShutdown(HServerInfo serverInfo) { + /** + * @param serverInfo + */ + public ProcessServerShutdown(HServerInfo serverInfo) { super(); this.deadServer = serverInfo.getServerAddress(); this.deadServerName = this.deadServer.toString(); @@ -1838,24 +1937,9 @@ dirName.append("_"); dirName.append(deadServer.getPort()); this.oldLogDir = new Path(dir, dirName.toString()); - // Set the future time at which we expect to be released from the - // DelayQueue we're inserted in on lease expiration. - this.expire = System.currentTimeMillis() + leaseTimeout / 2; } /** {@inheritDoc} */ - public long getDelay(TimeUnit unit) { - return unit.convert(this.expire - System.currentTimeMillis(), - TimeUnit.MILLISECONDS); - } - - /** {@inheritDoc} */ - public int compareTo(Delayed o) { - return Long.valueOf(getDelay(TimeUnit.MILLISECONDS) - - o.getDelay(TimeUnit.MILLISECONDS)).intValue(); - } - - /** {@inheritDoc} */ @Override public String toString() { return "ProcessServerShutdown of " + this.deadServer.toString(); @@ -1866,7 +1950,7 @@ Text regionName) throws IOException { ArrayList toDoList = new ArrayList(); - TreeMap regions = new TreeMap(); + HashSet regions = new HashSet(); try { while (true) { @@ -1958,8 +2042,7 @@ if (regionsToKill.containsKey(info.getRegionName())) { regionsToKill.remove(info.getRegionName()); killList.put(deadServerName, regionsToKill); - unassignedRegions.remove(info.getRegionName()); - assignAttempts.remove(info.getRegionName()); + unassignedRegions.remove(info); synchronized (regionsToDelete) { if (regionsToDelete.contains(info.getRegionName())) { // Delete this region @@ -1974,7 +2057,7 @@ } else { // Get region reassigned - regions.put(info.getRegionName(), info); + regions.add(info); // If it was pending, remove. // Otherwise will obstruct its getting reassigned. @@ -2008,16 +2091,13 @@ } // Get regions reassigned - for (Map.Entry e: regions.entrySet()) { - Text region = e.getKey(); - HRegionInfo regionInfo = e.getValue(); - unassignedRegions.put(region, regionInfo); - assignAttempts.put(region, Long.valueOf(0L)); + for (HRegionInfo info: regions) { + unassignedRegions.put(info, ZERO_L); } } @Override - boolean process() throws IOException { + protected boolean process() throws IOException { LOG.info("process shutdown of server " + deadServer + ": logSplit: " + this.logSplit + ", rootChecked: " + this.rootChecked + ", rootRescanned: " + this.rootRescanned + ", numberOfMetaRegions: " + @@ -2040,9 +2120,11 @@ } if (!rootChecked) { - boolean rootRegionUnavailable = false; - if (rootRegionLocation.get() == null) { - rootRegionUnavailable = true; + if (!rootAvailable()) { + // Return true so that worker does not put this request back on the + // toDoQueue. + // rootAvailable() has already put it on the delayedToDoQueue + return true; } else if (deadServer.equals(rootRegionLocation.get())) { // We should never get here because whenever an object of this type @@ -2050,20 +2132,14 @@ // and unassignRootRegion() is called then. However, in the // unlikely event that we do end up here, let's do the right thing. unassignRootRegion(); - rootRegionUnavailable = true; - } - if (rootRegionUnavailable) { // We can't do anything until the root region is on-line, put // us back on the delay queue. Reset the future time at which // we expect to be released from the DelayQueue we're inserted // in on lease expiration. - this.expire = System.currentTimeMillis() + leaseTimeout / 2; - shutdownQueue.put(this); - - // Return true so run() does not put us back on the msgQueue + requeue(); + // Return true so run() does not put us back on the toDoQueue return true; } - rootChecked = true; } if (!rootRescanned) { @@ -2114,27 +2190,14 @@ } rootRescanned = true; } - - if (LOG.isDebugEnabled()) { - LOG.debug("numberOfMetaRegions: " + numberOfMetaRegions.get() + - ", onlineMetaRegions.size(): " + onlineMetaRegions.size()); - } - if (numberOfMetaRegions.get() != onlineMetaRegions.size()) { - // We can't proceed because not all of the meta regions are online. - // We can't block either because that would prevent the meta region - // online message from being processed. In order to prevent spinning - // in the run queue, put this request on the delay queue to give - // other threads the opportunity to get the meta regions on-line. - if (LOG.isDebugEnabled()) { - LOG.debug( - "Requeuing shutdown because not all meta regions are online"); - } - this.expire = System.currentTimeMillis() + leaseTimeout / 2; - shutdownQueue.put(this); - - // Return true so run() does not put us back on the msgQueue + + if (!metaTableAvailable()) { + // We can't proceed because not all meta regions are online. + // metaAvailable() has put this request on the delayedToDoQueue + // Return true so that worker does not put this on the toDoQueue return true; } + for (int tries = 0; tries < numRetries; tries++) { try { if (closed.get()) { @@ -2181,33 +2244,97 @@ } /** + * Abstract class that performs common operations for + * @see #ProcessRegionClose and @see #ProcessRegionOpen + */ + private abstract class ProcessRegionStatusChange + extends RegionServerOperation { + + protected final boolean isMetaTable; + protected final HRegionInfo regionInfo; + private MetaRegion metaRegion; + protected Text metaRegionName; + + /** + * @param regionInfo + */ + public ProcessRegionStatusChange(HRegionInfo regionInfo) { + super(); + this.regionInfo = regionInfo; + this.isMetaTable = regionInfo.isMetaTable(); + this.metaRegion = null; + this.metaRegionName = null; + } + + protected boolean metaRegionAvailable() { + boolean available = true; + if (isMetaTable) { + // This operation is for the meta table + if (!rootAvailable()) { + // But we can't proceed unless the root region is available + available = false; + } + } else if (!rootScanned) { + // The root region has not been scanned so we can't proceed. + // Put the operation on the delayedToDoQueue + requeue(); + available = false; + } else { + if (!metaTableAvailable()) { + // For normal tables the meta table must be available + available = false; + } + } + return available; + } + + protected HRegionInterface getMetaServer() throws IOException { + if (this.isMetaTable) { + this.metaRegionName = HRegionInfo.rootRegionInfo.getRegionName(); + } else { + if (this.metaRegion == null) { + synchronized (onlineMetaRegions) { + metaRegion = onlineMetaRegions.size() == 1 ? + onlineMetaRegions.get(onlineMetaRegions.firstKey()) : + onlineMetaRegions.containsKey(regionInfo.getRegionName()) ? + onlineMetaRegions.get(regionInfo.getRegionName()) : + onlineMetaRegions.get(onlineMetaRegions.headMap( + regionInfo.getRegionName()).lastKey()); + } + this.metaRegionName = metaRegion.getRegionName(); + } + } + + HServerAddress server = null; + if (isMetaTable) { + server = rootRegionLocation.get(); + + } else { + server = metaRegion.getServer(); + } + return connection.getHRegionConnection(server); + } + + } + /** * ProcessRegionClose is instantiated when a region server reports that it * has closed a region. */ - private class ProcessRegionClose extends RegionServerOperation { - private HRegionInfo regionInfo; + private class ProcessRegionClose extends ProcessRegionStatusChange { private boolean reassignRegion; private boolean deleteRegion; - private boolean rootRegion; - ProcessRegionClose(HRegionInfo regionInfo, boolean reassignRegion, + /** + * @param regionInfo + * @param reassignRegion + * @param deleteRegion + */ + public ProcessRegionClose(HRegionInfo regionInfo, boolean reassignRegion, boolean deleteRegion) { - super(); - - this.regionInfo = regionInfo; + super(regionInfo); this.reassignRegion = reassignRegion; this.deleteRegion = deleteRegion; - - // If the region closing down is a meta region then we need to update - // the ROOT table - - if (this.regionInfo.getTableDesc().getName().equals(META_TABLE_NAME)) { - this.rootRegion = true; - - } else { - this.rootRegion = false; - } } /** {@inheritDoc} */ @@ -2217,59 +2344,29 @@ } @Override - boolean process() throws IOException { + protected boolean process() throws IOException { for (int tries = 0; tries < numRetries; tries++) { if (closed.get()) { return true; } LOG.info("region closed: " + regionInfo.getRegionName()); - // Mark the Region as unavailable in the appropriate meta table - - Text metaRegionName; - HRegionInterface server; - if (rootRegion) { - if (rootRegionLocation.get() == null || !rootScanned) { - // We can't proceed until the root region is online and has been - // scanned - return false; - } - metaRegionName = HRegionInfo.rootRegionInfo.getRegionName(); - server = connection.getHRegionConnection(rootRegionLocation.get()); + if (isMetaTable) { + // Region is part of the meta table. Remove it from onlineMetaRegions onlineMetaRegions.remove(regionInfo.getStartKey()); + } - } else { - if (!rootScanned || - numberOfMetaRegions.get() != onlineMetaRegions.size()) { - - // We can't proceed because not all of the meta regions are online. - // We can't block either because that would prevent the meta region - // online message from being processed. So return false to have this - // operation requeued. - - if (LOG.isDebugEnabled()) { - LOG.debug("Requeuing close because rootScanned=" + - rootScanned + ", numberOfMetaRegions=" + - numberOfMetaRegions.get() + ", onlineMetaRegions.size()=" + - onlineMetaRegions.size()); - } - return false; - } + // Mark the Region as unavailable in the appropriate meta table - MetaRegion r = null; - synchronized (onlineMetaRegions) { - if (onlineMetaRegions.containsKey(regionInfo.getRegionName())) { - r = onlineMetaRegions.get(regionInfo.getRegionName()); - - } else { - r = onlineMetaRegions.get(onlineMetaRegions.headMap( - regionInfo.getRegionName()).lastKey()); - } - } - metaRegionName = r.getRegionName(); - server = connection.getHRegionConnection(r.getServer()); + if (!metaRegionAvailable()) { + // We can't proceed unless the meta region we are going to update + // is online. metaRegionAvailable() has put this operation on the + // delayedToDoQueue, so return true so the operation is not put + // back on the toDoQueue + return true; } + HRegionInterface server = getMetaServer(); try { BatchUpdate b = new BatchUpdate(rand.nextLong()); long lockid = b.startUpdate(regionInfo.getRegionName()); @@ -2298,8 +2395,7 @@ if (reassignRegion) { LOG.info("reassign region: " + regionInfo.getRegionName()); - unassignedRegions.put(regionInfo.getRegionName(), regionInfo); - assignAttempts.put(regionInfo.getRegionName(), Long.valueOf(0L)); + unassignedRegions.put(regionInfo, ZERO_L); } else if (deleteRegion) { try { @@ -2320,19 +2416,18 @@ * serving a region. This applies to all meta and user regions except the * root region which is handled specially. */ - private class ProcessRegionOpen extends RegionServerOperation { - private final boolean rootRegion; - private final HRegionInfo region; + private class ProcessRegionOpen extends ProcessRegionStatusChange { private final HServerAddress serverAddress; private final byte [] startCode; - ProcessRegionOpen(HServerInfo info, HRegionInfo region) + /** + * @param info + * @param regionInfo + * @throws IOException + */ + public ProcessRegionOpen(HServerInfo info, HRegionInfo regionInfo) throws IOException { - // If true, the region which just came on-line is a META region. - // We need to look in the ROOT region for its information. Otherwise, - // its just an ordinary region. Look for it in the META table. - this.rootRegion = region.getTableDesc().getName().equals(META_TABLE_NAME); - this.region = region; + super(regionInfo); this.serverAddress = info.getServerAddress(); this.startCode = Writables.longToBytes(info.getStartCode()); } @@ -2344,72 +2439,40 @@ } @Override - boolean process() throws IOException { + protected boolean process() throws IOException { for (int tries = 0; tries < numRetries; tries++) { if (closed.get()) { return true; } - LOG.info(region.toString() + " open on " + + LOG.info(regionInfo.toString() + " open on " + this.serverAddress.toString()); + if (!metaRegionAvailable()) { + // We can't proceed unless the meta region we are going to update + // is online. metaRegionAvailable() has put this operation on the + // delayedToDoQueue, so return true so the operation is not put + // back on the toDoQueue + return true; + } + // Register the newly-available Region's location. - Text metaRegionName; - HRegionInterface server; - if (this.rootRegion) { - if (rootRegionLocation.get() == null || !rootScanned) { - // We can't proceed until root region is online and scanned - if (LOG.isDebugEnabled()) { - LOG.debug("root region: " + - ((rootRegionLocation.get() != null)? - rootRegionLocation.get().toString(): "null") + - ", rootScanned: " + rootScanned); - } - return false; - } - metaRegionName = HRegionInfo.rootRegionInfo.getRegionName(); - server = connection.getHRegionConnection(rootRegionLocation.get()); - } else { - if (!rootScanned || - numberOfMetaRegions.get() != onlineMetaRegions.size()) { - // We can't proceed because not all of the meta regions are online. - // We can't block either because that would prevent the meta region - // online message from being processed. So return false to have this - // operation requeued. - if (LOG.isDebugEnabled()) { - LOG.debug("Requeuing open because rootScanned: " + - rootScanned + ", numberOfMetaRegions: " + - numberOfMetaRegions.get() + ", onlineMetaRegions.size(): " + - onlineMetaRegions.size()); - } - return false; - } - - MetaRegion r = null; - synchronized (onlineMetaRegions) { - r = onlineMetaRegions.containsKey(region.getRegionName()) ? - onlineMetaRegions.get(region.getRegionName()) : - onlineMetaRegions.get(onlineMetaRegions.headMap( - region.getRegionName()).lastKey()); - } - metaRegionName = r.getRegionName(); - server = connection.getHRegionConnection(r.getServer()); - } - LOG.info("updating row " + region.getRegionName() + " in table " + + HRegionInterface server = getMetaServer(); + LOG.info("updating row " + regionInfo.getRegionName() + " in table " + metaRegionName + " with startcode " + Writables.bytesToLong(this.startCode) + " and server "+ serverAddress.toString()); try { BatchUpdate b = new BatchUpdate(rand.nextLong()); - long lockid = b.startUpdate(region.getRegionName()); + long lockid = b.startUpdate(regionInfo.getRegionName()); b.put(lockid, COL_SERVER, Writables.stringToBytes(serverAddress.toString())); b.put(lockid, COL_STARTCODE, startCode); server.batchUpdate(metaRegionName, System.currentTimeMillis(), b); - if (region.getTableDesc().getName().equals(META_TABLE_NAME)) { + if (isMetaTable) { // It's a meta region. MetaRegion m = new MetaRegion(this.serverAddress, - this.region.getRegionName(), this.region.getStartKey()); + this.regionInfo.getRegionName(), this.regionInfo.getStartKey()); if (!initialMetaScanComplete) { // Put it on the queue to be scanned for the first time. try { @@ -2422,11 +2485,11 @@ } else { // Add it to the online meta regions LOG.debug("Adding to onlineMetaRegions: " + m.toString()); - onlineMetaRegions.put(this.region.getStartKey(), m); + onlineMetaRegions.put(this.regionInfo.getStartKey(), m); } } // If updated successfully, remove from pending list. - pendingRegions.remove(region.getRegionName()); + pendingRegions.remove(regionInfo.getRegionName()); break; } catch (IOException e) { if (tries == numRetries - 1) { @@ -2449,19 +2512,8 @@ /** {@inheritDoc} */ public void shutdown() { - TimerTask tt = new TimerTask() { - @Override - public void run() { - closed.set(true); - synchronized(msgQueue) { - msgQueue.clear(); // Empty the queue - shutdownQueue.clear(); // Empty shut down queue - msgQueue.notifyAll(); // Wake main thread - } - } - }; - Timer t = new Timer(getName() + "-Shutdown"); - t.schedule(tt, 10); + LOG.info("Cluster shutdown requested. Starting to quiesce servers"); + this.shutdownRequested.set(true); } /** {@inheritDoc} */ @@ -2563,8 +2615,7 @@ // 5. Get it assigned to a server - this.unassignedRegions.put(regionName, info); - this.assignAttempts.put(regionName, Long.valueOf(0L)); + this.unassignedRegions.put(info, ZERO_L); } finally { tableInCreation.remove(newRegion.getTableDesc().getName()); @@ -2838,14 +2889,12 @@ } if (online) { // Bring offline regions on-line - if (!unassignedRegions.containsKey(i.getRegionName())) { - unassignedRegions.put(i.getRegionName(), i); - assignAttempts.put(i.getRegionName(), Long.valueOf(0L)); + if (!unassignedRegions.containsKey(i)) { + unassignedRegions.put(i, ZERO_L); } } else { // Prevent region from getting assigned. - unassignedRegions.remove(i.getRegionName()); - assignAttempts.remove(i.getRegionName()); + unassignedRegions.remove(i); } } @@ -3069,7 +3118,7 @@ // here because the new server will start serving the root region before // the ProcessServerShutdown operation has a chance to split the log file. if (info != null) { - shutdownQueue.put(new ProcessServerShutdown(info)); + delayedToDoQueue.put(new ProcessServerShutdown(info)); } } } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java (revision 601882) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java (working copy) @@ -193,6 +193,21 @@ return tableDesc; } + /** @return true if this is the root region */ + public boolean isRootRegion() { + return this.tableDesc.isRootRegion(); + } + + /** @return true if this is the meta table */ + public boolean isMetaTable() { + return this.tableDesc.isMetaTable(); + } + + /** @return true if this region is a meta region */ + public boolean isMetaRegion() { + return this.tableDesc.isMetaRegion(); + } + /** * @return True if has been split and has daughters. */ Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java (revision 601882) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java (working copy) @@ -45,6 +45,9 @@ /** Stop serving the specified region and don't report back that it's closed */ public static final byte MSG_REGION_CLOSE_WITHOUT_REPORT = 6; + + /** Stop serving user regions */ + public static final byte MSG_REGIONSERVER_QUIESCE = 7; // Messages sent from the region server to the master @@ -69,9 +72,12 @@ * region server is shutting down * * note that this message is followed by MSG_REPORT_CLOSE messages for each - * region the region server was serving. + * region the region server was serving, unless it was told to quiesce. */ public static final byte MSG_REPORT_EXITING = 104; + + /** region server has closed all user regions but is still serving meta regions */ + public static final byte MSG_REPORT_QUIESCED = 105; byte msg; HRegionInfo info; @@ -145,6 +151,10 @@ message.append("MSG_REGION_CLOSE_WITHOUT_REPORT : "); break; + case MSG_REGIONSERVER_QUIESCE: + message.append("MSG_REGIONSERVER_QUIESCE : "); + break; + case MSG_REPORT_PROCESS_OPEN: message.append("MSG_REPORT_PROCESS_OPEN : "); break; @@ -165,6 +175,10 @@ message.append("MSG_REPORT_EXITING : "); break; + case MSG_REPORT_QUIESCED: + message.append("MSG_REPORT_QUIESCED : "); + break; + default: message.append("unknown message code ("); message.append(msg);