Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HBaseAdmin.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HBaseAdmin.java (revision 564467) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HBaseAdmin.java (working copy) @@ -488,8 +488,9 @@ * @throws IllegalArgumentException - if the table name is reserved */ protected void checkReservedTableName(Text tableName) { - if(tableName.equals(ROOT_TABLE_NAME) - || tableName.equals(META_TABLE_NAME)) { + if(tableName.charAt(0) == '-' || + tableName.charAt(0) == '.' || + tableName.find(",") != -1) { throw new IllegalArgumentException(tableName + " is a reserved table name"); } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java (revision 564467) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java (working copy) @@ -92,15 +92,17 @@ // Do we ever need to know all the information that we are storing? /** The root table's name. */ - static final Text ROOT_TABLE_NAME = new Text("--ROOT--"); + static final Text ROOT_TABLE_NAME = new Text("-ROOT-"); /** The META table's name. */ - static final Text META_TABLE_NAME = new Text("--META--"); + static final Text META_TABLE_NAME = new Text(".META."); // Defines for the column names used in both ROOT and META HBase 'meta' tables. - /** The ROOT and META column family */ + /** The ROOT and META column family (string) */ static final String COLUMN_FAMILY_STR = "info:"; + + /** The ROOT and META column family (Text) */ static final Text COLUMN_FAMILY = new Text(COLUMN_FAMILY_STR); /** Array of meta column names */ Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java (revision 564467) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java (working copy) @@ -28,6 +28,7 @@ import java.io.*; import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; /** * HLog stores all the edits to the HStore. @@ -73,14 +74,14 @@ SequenceFile.Writer writer; TreeMap outputfiles = new TreeMap(); - boolean insideCacheFlush = false; + volatile boolean insideCacheFlush = false; TreeMap regionToLastFlush = new TreeMap(); - boolean closed = false; - transient long logSeqNum = 0; + volatile boolean closed = false; + volatile long logSeqNum = 0; long filenum = 0; - transient int numEntries = 0; + AtomicInteger numEntries = new AtomicInteger(0); Integer rollLock = new Integer(0); @@ -125,7 +126,7 @@ logWriters.put(regionName, w); } if (LOG.isDebugEnabled()) { - LOG.debug("Edit " + key.toString()); + LOG.debug("Edit " + key.toString() + "=" + val.toString()); } w.append(key, val); } @@ -173,6 +174,16 @@ fs.mkdirs(dir); rollWriter(); } + + synchronized void setSequenceNumber(long newvalue) { + if (newvalue > logSeqNum) { + if (LOG.isDebugEnabled()) { + LOG.debug("changing sequence number from " + logSeqNum + " to " + + newvalue); + } + logSeqNum = newvalue; + } + } /** * Roll the log writer. That is, start writing log messages to a new file. @@ -266,7 +277,7 @@ } fs.delete(p); } - this.numEntries = 0; + this.numEntries.set(0); } } @@ -343,7 +354,7 @@ new HLogKey(regionName, tableName, row, seqNum[counter++]); HLogEdit logEdit = new HLogEdit(es.getKey(), es.getValue(), timestamp); writer.append(logKey, logEdit); - numEntries++; + numEntries.getAndIncrement(); } } @@ -349,7 +360,7 @@ /** @return How many items have been added to the log */ int getNumEntries() { - return numEntries; + return numEntries.get(); } /** @@ -418,7 +429,7 @@ writer.append(new HLogKey(regionName, tableName, HLog.METAROW, logSeqId), new HLogEdit(HLog.METACOLUMN, COMPLETE_CACHEFLUSH.get(), System.currentTimeMillis())); - numEntries++; + numEntries.getAndIncrement(); // Remember the most-recent flush for each region. // This is used to delete obsolete log files. Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLogEdit.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLogEdit.java (revision 564467) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLogEdit.java (working copy) @@ -72,8 +72,14 @@ /** {@inheritDoc} */ @Override public String toString() { - return getColumn().toString() + " " + this.getTimestamp() + " " + - new String(getVal()).trim(); + String value = ""; + try { + value = new String(getVal(), HConstants.UTF8_ENCODING); + + } catch (UnsupportedEncodingException e) { + throw new RuntimeException("UTF8 encoding not present?", e); + } + return "(" + getColumn().toString() + "/" + getTimestamp() + "/" + value + ")"; } // Writable Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLogKey.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLogKey.java (revision 564467) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLogKey.java (working copy) @@ -84,7 +84,7 @@ */ @Override public String toString() { - return tablename + "," + regionName + "," + row + "," + logSeqNum; + return tablename + "/" + regionName + "/" + row + "/" + logSeqNum; } /** Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (revision 564467) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (working copy) @@ -31,7 +31,6 @@ import java.util.Random; import java.util.Set; import java.util.SortedMap; -import java.util.SortedSet; import java.util.Timer; import java.util.TimerTask; import java.util.TreeMap; @@ -40,6 +39,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -61,18 +61,18 @@ * There is only one HMaster for a single HBase deployment. */ public class HMaster implements HConstants, HMasterInterface, - HMasterRegionInterface, Runnable { +HMasterRegionInterface, Runnable { - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ public long getProtocolVersion(String protocol, - @SuppressWarnings("unused") long clientVersion) - throws IOException { + @SuppressWarnings("unused") long clientVersion) throws IOException { + if (protocol.equals(HMasterInterface.class.getName())) { return HMasterInterface.versionID; + } else if (protocol.equals(HMasterRegionInterface.class.getName())) { return HMasterRegionInterface.versionID; + } else { throw new IOException("Unknown protocol to name node: " + protocol); } @@ -79,7 +79,7 @@ } static final Log LOG = LogFactory.getLog(HMaster.class.getName()); - + volatile boolean closed; Path dir; Configuration conf; @@ -88,25 +88,18 @@ long threadWakeFrequency; int numRetries; long maxRegionOpenTime; - + BlockingQueue msgQueue; - + private Leases serverLeases; private Server server; private HServerAddress address; - + HConnection connection; - + long metaRescanInterval; - - volatile HServerAddress rootRegionLocation; - - /** - * Columns in the 'meta' ROOT and META tables. - */ - static final Text METACOLUMNS[] = { - COLUMN_FAMILY - }; + + final AtomicReference rootRegionLocation; /** * Base HRegion scanner class. Holds utilty common to ROOT and @@ -156,13 +149,12 @@ * once. */ abstract class BaseScanner implements Runnable { - private final Text FIRST_ROW = new Text(); protected boolean rootRegion; protected final Text tableName; - + protected abstract void initialScan(); protected abstract void maintenanceScan(); - + BaseScanner(final Text tableName) { super(); this.tableName = tableName; @@ -168,10 +160,8 @@ this.tableName = tableName; this.rootRegion = tableName.equals(ROOT_TABLE_NAME); } - - /** - * {@inheritDoc} - */ + + /** {@inheritDoc} */ public void run() { initialScan(); while (!closed) { @@ -194,8 +184,10 @@ long scannerId = -1L; LOG.info(Thread.currentThread().getName() + " scanning meta region " + region.regionName + " on " + region.server.toString()); + // Array to hold list of split parents found. Scan adds to list. After // scan we go check if parents can be removed. + Map> splitParents = new HashMap>(); try { @@ -200,8 +192,10 @@ new HashMap>(); try { regionServer = connection.getHRegionConnection(region.server); - scannerId = regionServer.openScanner(region.regionName, METACOLUMNS, - FIRST_ROW, System.currentTimeMillis(), null); + scannerId = + regionServer.openScanner(region.regionName, COLUMN_FAMILY_ARRAY, + EMPTY_START_ROW, System.currentTimeMillis(), null); + int numberOfRegionsFound = 0; while (true) { TreeMap results = new TreeMap(); @@ -209,13 +203,16 @@ if (values.length == 0) { break; } + for (int i = 0; i < values.length; i++) { results.put(values[i].getKey().getColumn(), values[i].getData()); } + HRegionInfo info = HRegion.getRegionInfo(results); String serverName = HRegion.getServerName(results); long startCode = HRegion.getStartCode(results); - if(LOG.isDebugEnabled()) { + + if (LOG.isDebugEnabled()) { LOG.debug(Thread.currentThread().getName() + " scanner: " + Long.valueOf(scannerId) + " regioninfo: {" + info.toString() + "}, server: " + serverName + ", startCode: " + startCode); @@ -223,6 +220,7 @@ // Note Region has been assigned. checkAssigned(info, serverName, startCode); + if (isSplitParent(info)) { splitParents.put(info, results); } @@ -228,12 +226,14 @@ } numberOfRegionsFound += 1; } - if(rootRegion) { + if (rootRegion) { numberOfMetaRegions.set(numberOfRegionsFound); } + } catch (IOException e) { if (e instanceof RemoteException) { e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + if (e instanceof UnknownScannerException) { // Reset scannerId so we do not try closing a scanner the other side // has lost account of: prevents duplicated stack trace out of the @@ -242,10 +242,11 @@ } } throw e; + } finally { try { if (scannerId != -1L && regionServer != null) { - regionServer.close(scannerId); + regionServer.close(scannerId); } } catch (IOException e) { if (e instanceof RemoteException) { @@ -254,15 +255,17 @@ LOG.error("Closing scanner", e); } } - // Scan is finished. Take a look at split parents to see if any - // we can clean up. + + // Scan is finished. Take a look at split parents to see if any we can clean up. + if (splitParents.size() > 0) { for (Map.Entry> e: - splitParents.entrySet()) { + splitParents.entrySet()) { + TreeMap results = e.getValue(); cleanupSplits(e.getKey(), - HRegion.getSplit(results, HRegion.COL_SPLITA), - HRegion.getSplit(results, HRegion.COL_SPLITB)); + HRegion.getSplit(results, HRegion.COL_SPLITA), + HRegion.getSplit(results, HRegion.COL_SPLITB)); } } LOG.info(Thread.currentThread().getName() + " scan of meta region " + @@ -268,10 +271,12 @@ LOG.info(Thread.currentThread().getName() + " scan of meta region " + region.regionName + " complete"); } - + private boolean isSplitParent(final HRegionInfo info) { boolean result = false; + // Skip if not a split region. + if (!info.isSplit()) { return result; } @@ -280,7 +285,7 @@ } return true; } - + /* * @param info * @param splitA @@ -290,52 +295,62 @@ * @throws IOException */ private boolean cleanupSplits(final HRegionInfo info, - final HRegionInfo splitA, final HRegionInfo splitB) + final HRegionInfo splitA, final HRegionInfo splitB) throws IOException { + boolean result = false; if (LOG.isDebugEnabled()) { LOG.debug("Checking " + info.getRegionName() + " to see if daughter " + - "splits still hold references"); + "splits still hold references"); } boolean noReferencesA = splitA == null; boolean noReferencesB = splitB == null; + if (!noReferencesA) { - noReferencesA = hasReferences(info.getRegionName(), splitA, - HRegion.COL_SPLITA); + noReferencesA = + hasReferences(info.getRegionName(), splitA, HRegion.COL_SPLITA); } if (!noReferencesB) { - noReferencesB = hasReferences(info.getRegionName(), splitB, - HRegion.COL_SPLITB); + noReferencesB = + hasReferences(info.getRegionName(), splitB, HRegion.COL_SPLITB); } if (!(noReferencesA && noReferencesB)) { + // No references. Remove this item from table and deleted region on // disk. + LOG.info("Deleting region " + info.getRegionName() + - " because daughter splits no longer hold references"); + " because daughter splits no longer hold references"); + HRegion.deleteRegion(fs, dir, info.getRegionName()); HRegion.removeRegionFromMETA(conf, this.tableName, - info.getRegionName()); + info.getRegionName()); + result = true; } + if (LOG.isDebugEnabled()) { LOG.debug("Done checking " + info.getRegionName() + ": splitA: " + - noReferencesA + ", splitB: "+ noReferencesB); + noReferencesA + ", splitB: "+ noReferencesB); } return result; } - + protected boolean hasReferences(final Text regionName, - final HRegionInfo split, final Text column) - throws IOException { + final HRegionInfo split, final Text column) throws IOException { + boolean result = HRegion.hasReferences(fs, fs.makeQualified(dir), split); + if (result) { return result; } + if (LOG.isDebugEnabled()) { LOG.debug(split.getRegionName().toString() - +" no longer has references to " + regionName.toString()); + +" no longer has references to " + regionName.toString()); } + HTable t = new HTable(conf, this.tableName); try { HRegion.removeSplitFromMETA(t, regionName, column); @@ -344,45 +359,54 @@ } return result; } - + protected void checkAssigned(final HRegionInfo info, final String serverName, final long startCode) { + // Skip region - if ... - if(info.offLine // offline - || killedRegions.contains(info.regionName) // queued for offline - || regionsToDelete.contains(info.regionName)) { // queued for delete + + if(info.offLine // offline + || killedRegions.contains(info.regionName) // queued for offline + || regionsToDelete.contains(info.regionName)) { // queued for delete unassignedRegions.remove(info.regionName); assignAttempts.remove(info.regionName); + if(LOG.isDebugEnabled()) { - LOG.debug("not assigning region: " + info.regionName + - " (offline: " + info.isOffline() + ", split: " + info.isSplit() + - ")"); + LOG.debug("not assigning region: " + info.regionName + " (offline: " + + info.isOffline() + ", split: " + info.isSplit() + ")"); } return; } - + HServerInfo storedInfo = null; - if(serverName != null) { - TreeMap regionsToKill = killList.get(serverName); - if(regionsToKill != null && regionsToKill.containsKey(info.regionName)) { + if (serverName != null) { + Map regionsToKill = killList.get(serverName); + if (regionsToKill != null && + regionsToKill.containsKey(info.regionName)) { + // Skip if region is on kill list + if(LOG.isDebugEnabled()) { - LOG.debug("not assigning region (on kill list): " + - info.regionName); + LOG.debug("not assigning region (on kill list): " + info.regionName); } return; } - storedInfo = serversToServerInfo.get(serverName); + synchronized (serversToServerInfo) { + storedInfo = serversToServerInfo.get(serverName); + } } - if( !(unassignedRegions.containsKey(info.regionName) || - pendingRegions.contains(info.regionName)) + if (!(unassignedRegions.containsKey(info.regionName) || + pendingRegions.contains(info.regionName)) && (storedInfo == null || storedInfo.getStartCode() != startCode)) { + // The current assignment is no good; load the region. + unassignedRegions.put(info.regionName, info); assignAttempts.put(info.regionName, Long.valueOf(0L)); + } else if (LOG.isDebugEnabled()) { - LOG.debug("Finished if " + info.getRegionName() + " is assigned: " + + LOG.debug("Finished if " + info.getRegionName() + " is assigned: " + "unassigned: " + unassignedRegions.containsKey(info.regionName) + ", pending: " + pendingRegions.contains(info.regionName)); } @@ -388,12 +412,10 @@ } } } - + volatile boolean rootScanned; - - /** - * Scanner for the ROOT HRegion. - */ + + /** Scanner for the ROOT HRegion. */ class RootScanner extends BaseScanner { /** Constructor */ public RootScanner() { @@ -403,14 +425,17 @@ private void scanRoot() { int tries = 0; while (!closed && tries < numRetries) { - while(!closed && rootRegionLocation == null) { - // rootRegionLocation will be filled in when we get an 'open region' - // regionServerReport message from the HRegionServer that has been - // allocated the ROOT region below. - try { - Thread.sleep(threadWakeFrequency); - } catch (InterruptedException e) { - // continue + synchronized (rootRegionLocation) { + while(!closed && rootRegionLocation.get() == null) { + // rootRegionLocation will be filled in when we get an 'open region' + // regionServerReport message from the HRegionServer that has been + // allocated the ROOT region below. + + try { + rootRegionLocation.wait(); + } catch (InterruptedException e) { + // continue + } } } if (closed) { @@ -418,15 +443,20 @@ } try { - synchronized(rootScannerLock) { // Don't interrupt us while we're working - scanRegion(new MetaRegion(rootRegionLocation, + // Don't interrupt us while we're working + + synchronized(rootScannerLock) { + scanRegion(new MetaRegion(rootRegionLocation.get(), HGlobals.rootRegionInfo.regionName, null)); } break; + } catch (IOException e) { if (e instanceof RemoteException) { try { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + e = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) e); + } catch (IOException ex) { e = ex; } @@ -439,6 +469,8 @@ } } if (!closed) { + // sleep before retry + try { Thread.sleep(threadWakeFrequency); } catch (InterruptedException e) { @@ -447,7 +479,7 @@ } } } - + @Override protected void initialScan() { scanRoot(); @@ -453,7 +485,7 @@ scanRoot(); rootScanned = true; } - + @Override protected void maintenanceScan() { scanRoot(); @@ -459,7 +491,7 @@ scanRoot(); } } - + private RootScanner rootScanner; private Thread rootScannerThread; Integer rootScannerLock = new Integer(0); @@ -469,7 +501,7 @@ HServerAddress server; Text regionName; Text startKey; - + MetaRegion(HServerAddress server, Text regionName, Text startKey) { this.server = server; this.regionName = regionName; @@ -476,9 +508,7 @@ this.startKey = startKey; } - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ @Override public boolean equals(Object o) { return this.compareTo(o) == 0; @@ -483,10 +513,8 @@ public boolean equals(Object o) { return this.compareTo(o) == 0; } - - /** - * {@inheritDoc} - */ + + /** {@inheritDoc} */ @Override public int hashCode() { int result = this.regionName.hashCode(); @@ -496,12 +524,10 @@ // Comparable - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ public int compareTo(Object o) { MetaRegion other = (MetaRegion)o; - + int result = this.regionName.compareTo(other.regionName); if(result == 0) { result = this.startKey.compareTo(other.startKey); @@ -508,7 +534,6 @@ } return result; } - } /** Set by root scanner to indicate the number of meta regions */ @@ -513,7 +538,7 @@ /** Set by root scanner to indicate the number of meta regions */ AtomicInteger numberOfMetaRegions; - + /** Work for the meta scanner is queued up here */ BlockingQueue metaRegionsToScan; @@ -522,7 +547,7 @@ /** Set by meta scanner after initial scan */ volatile boolean initialMetaScanComplete; - + /** * MetaScanner META table. * @@ -537,11 +562,11 @@ public MetaScanner() { super(HConstants.META_TABLE_NAME); } - + private void scanOneMetaRegion(MetaRegion region) { int tries = 0; while (!closed && tries < numRetries) { - while (!closed && !rootScanned && rootRegionLocation == null) { + while (!closed && !rootScanned && rootRegionLocation.get() == null) { try { Thread.sleep(threadWakeFrequency); } catch (InterruptedException e) { @@ -553,8 +578,9 @@ } try { + // Don't interrupt us while we're working + synchronized (metaScannerLock) { - // Don't interrupt us while we're working scanRegion(region); onlineMetaRegions.put(region.startKey, region); } @@ -565,6 +591,7 @@ try { e = RemoteExceptionHandler.decodeRemoteException( (RemoteException) e); + } catch (IOException ex) { e = ex; } @@ -577,6 +604,8 @@ } } if (!closed) { + // sleep before retry + try { Thread.sleep(threadWakeFrequency); } catch (InterruptedException e) { @@ -603,7 +632,7 @@ } initialMetaScanComplete = true; } - + @Override protected void maintenanceScan() { ArrayList regions = new ArrayList(); @@ -613,7 +642,7 @@ } metaRegionsScanned(); } - + /** * Called by the meta scanner when it has completed scanning all meta * regions. This wakes up any threads that were waiting for this to happen. @@ -619,7 +648,9 @@ * regions. This wakes up any threads that were waiting for this to happen. */ private synchronized boolean metaRegionsScanned() { - if (!rootScanned || numberOfMetaRegions.get() != onlineMetaRegions.size()) { + if (!rootScanned || + numberOfMetaRegions.get() != onlineMetaRegions.size()) { + return false; } LOG.info("all meta regions scanned"); @@ -626,7 +657,7 @@ notifyAll(); return true; } - + /** * Other threads call this method to wait until all the meta regions have * been scanned. @@ -633,9 +664,9 @@ */ synchronized boolean waitForMetaRegionsOrClose() { while (!closed) { - if (rootScanned - && numberOfMetaRegions.get() == onlineMetaRegions.size()) { - + if (rootScanned && + numberOfMetaRegions.get() == onlineMetaRegions.size()) { + break; } @@ -660,7 +691,7 @@ * We fill 'unassignedRecords' by scanning ROOT and META tables, learning the * set of all known valid regions. */ - SortedMap unassignedRegions; + Map unassignedRegions; /** * The 'assignAttempts' table maps from regions to a timestamp that indicates @@ -667,7 +698,7 @@ * the last time we *tried* to assign the region to a RegionServer. If the * timestamp is out of date, then we can try to reassign it. */ - SortedMap assignAttempts; + Map assignAttempts; /** * Regions that have been assigned, and the server has reported that it has @@ -673,8 +704,8 @@ * Regions that have been assigned, and the server has reported that it has * started serving it, but that we have not yet recorded in the meta table. */ - SortedSet pendingRegions; - + Set pendingRegions; + /** * The 'killList' is a list of regions that are going to be closed, but not * reopened. @@ -679,10 +710,10 @@ * The 'killList' is a list of regions that are going to be closed, but not * reopened. */ - SortedMap> killList; - + Map> killList; + /** 'killedRegions' contains regions that are in the process of being closed */ - SortedSet killedRegions; + Set killedRegions; /** * 'regionsToDelete' contains regions that need to be deleted, but cannot be @@ -688,11 +719,21 @@ * 'regionsToDelete' contains regions that need to be deleted, but cannot be * until the region server closes it */ - SortedSet regionsToDelete; - - /** The map of known server names to server info */ - SortedMap serversToServerInfo = - Collections.synchronizedSortedMap(new TreeMap()); + Set regionsToDelete; + + /** + * The map of known server names to server info + * + * Access to this map and loadToServers and serversToLoad must be synchronized + * on this object + */ + Map serversToServerInfo; + + /** SortedMap server load -> Set of server names */ + SortedMap> loadToServers; + + /** Map of server names -> server load */ + Map serversToLoad; /** Build the HMaster out of a raw configuration item. * @@ -722,7 +763,7 @@ this.rand = new Random(); // Make sure the root directory exists! - + if(! fs.exists(dir)) { fs.mkdirs(dir); } @@ -730,14 +771,17 @@ Path rootRegionDir = HRegion.getRegionDir(dir, HGlobals.rootRegionInfo.regionName); LOG.info("Root region dir: " + rootRegionDir.toString()); - if(! fs.exists(rootRegionDir)) { + + if (!fs.exists(rootRegionDir)) { LOG.info("bootstrap: creating ROOT and first META regions"); try { HRegion root = HRegion.createHRegion(0L, HGlobals.rootTableDesc, - this.dir, this.conf); + this.dir, this.conf); HRegion meta = HRegion.createHRegion(1L, HGlobals.metaTableDesc, this.dir, this.conf); + // Add first region from the META table to the ROOT region. + HRegion.addRegionToMETA(root, meta); root.close(); root.getLog().closeAndDelete(); @@ -743,6 +787,7 @@ root.getLog().closeAndDelete(); meta.close(); meta.getLog().closeAndDelete(); + } catch (IOException e) { if (e instanceof RemoteException) { e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); @@ -747,7 +792,7 @@ if (e instanceof RemoteException) { e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); } - LOG.error("", e); + LOG.error("bootstrap", e); } } @@ -753,31 +798,36 @@ this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000); this.numRetries = conf.getInt("hbase.client.retries.number", 2); - this.maxRegionOpenTime = conf.getLong("hbase.hbasemaster.maxregionopen", 30 * 1000); + this.maxRegionOpenTime = + conf.getLong("hbase.hbasemaster.maxregionopen", 30 * 1000); + this.msgQueue = new LinkedBlockingQueue(); + this.serverLeases = new Leases( - conf.getLong("hbase.master.lease.period", 30 * 1000), - conf.getLong("hbase.master.lease.thread.wakefrequency", 15 * 1000)); + conf.getLong("hbase.master.lease.period", 30 * 1000), + conf.getLong("hbase.master.lease.thread.wakefrequency", 15 * 1000)); + this.server = RPC.getServer(this, address.getBindAddress(), - address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10), - false, conf); + address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10), + false, conf); // The rpc-server port can be ephemeral... ensure we have the correct info + this.address = new HServerAddress(server.getListenerAddress()); conf.set(MASTER_ADDRESS, address.toString()); - + this.connection = HConnectionManager.getConnection(conf); - - this.metaRescanInterval - = conf.getLong("hbase.master.meta.thread.rescanfrequency", 60 * 1000); + + this.metaRescanInterval = + conf.getLong("hbase.master.meta.thread.rescanfrequency", 60 * 1000); // The root region - - this.rootRegionLocation = null; + + this.rootRegionLocation = new AtomicReference(); this.rootScanned = false; this.rootScanner = new RootScanner(); this.rootScannerThread = new Thread(rootScanner, "HMaster.rootScanner"); - + // Scans the meta table this.numberOfMetaRegions = new AtomicInteger(); @@ -782,12 +832,12 @@ this.numberOfMetaRegions = new AtomicInteger(); this.metaRegionsToScan = new LinkedBlockingQueue(); - + this.onlineMetaRegions = Collections.synchronizedSortedMap(new TreeMap()); - + this.initialMetaScanComplete = false; - + this.metaScanner = new MetaScanner(); this.metaScannerThread = new Thread(metaScanner, "HMaster.metaScanner"); @@ -792,29 +842,34 @@ this.metaScannerThread = new Thread(metaScanner, "HMaster.metaScanner"); this.unassignedRegions = - Collections.synchronizedSortedMap(new TreeMap()); - - this.unassignedRegions.put(HGlobals.rootRegionInfo.regionName, HGlobals.rootRegionInfo); - + Collections.synchronizedMap(new HashMap()); + + this.unassignedRegions.put(HGlobals.rootRegionInfo.regionName, + HGlobals.rootRegionInfo); + this.assignAttempts = - Collections.synchronizedSortedMap(new TreeMap()); - + Collections.synchronizedMap(new HashMap()); + + this.assignAttempts.put(HGlobals.rootRegionInfo.regionName, + Long.valueOf(0L)); + this.pendingRegions = - Collections.synchronizedSortedSet(new TreeSet()); - - this.assignAttempts.put(HGlobals.rootRegionInfo.regionName, - Long.valueOf(0L)); + Collections.synchronizedSet(new HashSet()); this.killList = - Collections.synchronizedSortedMap( - new TreeMap>()); - + Collections.synchronizedMap( + new HashMap>()); + this.killedRegions = - Collections.synchronizedSortedSet(new TreeSet()); - + Collections.synchronizedSet(new HashSet()); + this.regionsToDelete = - Collections.synchronizedSortedSet(new TreeSet()); - + Collections.synchronizedSet(new HashSet()); + + this.serversToServerInfo = new HashMap(); + this.loadToServers = new TreeMap>(); + this.serversToLoad = new HashMap(); + // We're almost open for business this.closed = false; LOG.info("HMaster initialized on " + this.address.toString()); @@ -819,10 +874,8 @@ this.closed = false; LOG.info("HMaster initialized on " + this.address.toString()); } - - /** - * @return HServerAddress of the master server - */ + + /** @return HServerAddress of the master server */ public HServerAddress getMasterAddress() { return address; } @@ -837,7 +890,9 @@ // Start the server last so everything else is running before we start // receiving requests + this.server.start(); + } catch (IOException e) { if (e instanceof RemoteException) { try { @@ -842,11 +897,14 @@ if (e instanceof RemoteException) { try { e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } catch (IOException ex) { - LOG.warn("", ex); + LOG.warn("thread start", ex); } } + // Something happened during startup. Shut things down. + this.closed = true; LOG.error("Failed startup", e); } @@ -851,10 +909,14 @@ LOG.error("Failed startup", e); } - // Main processing loop + /* + * Main processing loop + */ + for (PendingOperation op = null; !closed; ) { try { op = msgQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { // continue } @@ -865,6 +927,7 @@ if (LOG.isDebugEnabled()) { LOG.debug("Main processing loop: " + op.toString()); } + if (!op.process()) { // Operation would have blocked because not all meta regions are // online. This could cause a deadlock, because this thread is waiting @@ -871,9 +934,11 @@ // for the missing meta region(s) to come back online, but since it // is waiting, it cannot process the meta region online operation it // is waiting for. So put this operation back on the queue for now. + if (msgQueue.size() == 0) { // The queue is currently empty so wait for a while to see if what // we need comes in first + try { Thread.sleep(threadWakeFrequency); } catch (InterruptedException e) { @@ -886,16 +951,18 @@ } msgQueue.put(op); } catch (InterruptedException e) { - throw new RuntimeException("Putting into msgQueue was " + - "interrupted.", e); + throw new RuntimeException("Putting into msgQueue was interrupted.", e); } } + } catch (Exception ex) { if (ex instanceof RemoteException) { try { - ex = RemoteExceptionHandler.decodeRemoteException((RemoteException) ex); + ex = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) ex); + } catch (IOException e) { - LOG.warn("", e); + LOG.warn("main processing loop: " + op.toString(), e); } } LOG.warn("Processing pending operations: " + op.toString(), ex); @@ -907,7 +974,7 @@ } } letRegionServersShutdown(); - + /* * Clean up and close up shop */ @@ -912,48 +979,39 @@ * Clean up and close up shop */ - // Wake other threads so they notice the close - synchronized(rootScannerLock) { - rootScannerThread.interrupt(); + rootScannerThread.interrupt(); // Wake root scanner } synchronized(metaScannerLock) { - metaScannerThread.interrupt(); + metaScannerThread.interrupt(); // Wake meta scanner } - server.stop(); // Stop server - serverLeases.close(); // Turn off the lease monitor - + server.stop(); // Stop server + serverLeases.close(); // Turn off the lease monitor + // Join up with all threads - + try { - // Wait for the root scanner to finish. - rootScannerThread.join(); + rootScannerThread.join(); // Wait for the root scanner to finish. } catch (Exception iex) { - // Print if ever there is an interrupt (Just for kicks. Remove if it - // ever happens). LOG.warn("root scanner", iex); } try { - // Join the thread till it finishes. - metaScannerThread.join(); + metaScannerThread.join(); // Wait for meta scanner to finish. } catch(Exception iex) { - // Print if ever there is an interrupt (Just for kicks. Remove if it - // ever happens). LOG.warn("meta scanner", iex); } try { - // Join until its finished. TODO: Maybe do in parallel in its own thread - // as is done in TaskTracker if its taking a long time to go down. - server.join(); + // TODO: Maybe do in parallel in its own thread as is done in TaskTracker + // if its taking a long time to go down. + + server.join(); // Wait for server to finish. } catch(InterruptedException iex) { - // Print if ever there is an interrupt (Just for kicks. Remove if it - // ever happens). LOG.warn("server", iex); } - + LOG.info("HMaster main thread exiting"); } - + /* * Wait on regionservers to report in * with {@link #regionServerReport(HServerInfo, HMsg[])} so they get notice @@ -962,36 +1020,48 @@ * by remote region servers have expired. */ private void letRegionServersShutdown() { - while (this.serversToServerInfo.size() > 0) { - LOG.info("Waiting on following regionserver(s) to go down (or " + - "region server lease expiration, whichever happens first): " + - this.serversToServerInfo.values()); - try { - Thread.sleep(threadWakeFrequency); - } catch (InterruptedException e) { - // continue + synchronized (serversToServerInfo) { + while (this.serversToServerInfo.size() > 0) { + LOG.info("Waiting on following regionserver(s) to go down (or " + + "region server lease expiration, whichever happens first): " + + this.serversToServerInfo.values()); + try { + serversToServerInfo.wait(threadWakeFrequency); + } catch (InterruptedException e) { + // continue + } } } } - - ////////////////////////////////////////////////////////////////////////////// - // HMasterRegionInterface - ////////////////////////////////////////////////////////////////////////////// - - /** - * {@inheritDoc} + + /* + * HMasterRegionInterface */ + + /** {@inheritDoc} */ @SuppressWarnings("unused") - public void regionServerStartup(HServerInfo serverInfo) - throws IOException { + public void regionServerStartup(HServerInfo serverInfo) throws IOException { String s = serverInfo.getServerAddress().toString().trim(); HServerInfo storedInfo = null; LOG.info("received start message from: " + s); - + // If we get the startup message but there's an old server by that // name, then we can timeout the old one right away and register // the new one. - storedInfo = serversToServerInfo.remove(s); + + synchronized (serversToServerInfo) { + storedInfo = serversToServerInfo.remove(s); + HServerLoad load = serversToLoad.remove(s); + + if (load != null) { + Set servers = loadToServers.get(load); + if (servers != null) { + servers.remove(s); + loadToServers.put(load, servers); + } + } + serversToServerInfo.notifyAll(); + } if (storedInfo != null && !closed) { try { msgQueue.put(new PendingServerShutdown(storedInfo)); @@ -1001,16 +1071,26 @@ } // Either way, record the new server - serversToServerInfo.put(s, serverInfo); - if(!closed) { - long serverLabel = getServerLabel(s); - if (LOG.isTraceEnabled()) { - LOG.trace("Created lease for " + serverLabel); + + synchronized (serversToServerInfo) { + HServerLoad load = new HServerLoad(); + serverInfo.setLoad(load); + serversToServerInfo.put(s, serverInfo); + serversToLoad.put(s, load); + Set servers = loadToServers.get(load); + if (servers == null) { + servers = new HashSet(); } + servers.add(s); + loadToServers.put(load, servers); + } + + if (!closed) { + long serverLabel = getServerLabel(s); serverLeases.createLease(serverLabel, serverLabel, new ServerExpirer(s)); } } - + private long getServerLabel(final String s) { return s.hashCode(); } @@ -1015,22 +1095,25 @@ return s.hashCode(); } - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ public HMsg[] regionServerReport(HServerInfo serverInfo, HMsg msgs[]) throws IOException { - String s = serverInfo.getServerAddress().toString().trim(); - long serverLabel = getServerLabel(s); + String serverName = serverInfo.getServerAddress().toString().trim(); + long serverLabel = getServerLabel(serverName); + if (msgs.length > 0 && msgs[0].getMsg() == HMsg.MSG_REPORT_EXITING) { - + // HRegionServer is shutting down. Cancel the server's lease. - if (cancelLease(s, serverLabel)) { + // Note that cancelling the server's lease takes care of updating + // serversToServerInfo, etc. + + if (cancelLease(serverName, serverLabel)) { // Only process the exit message if the server still has a lease. // Otherwise we could end up processing the server exit twice. - LOG.info("Region server " + s + ": MSG_REPORT_EXITING"); - + + LOG.info("Region server " + serverName + ": MSG_REPORT_EXITING"); + // Get all the regions the server was serving reassigned // (if we are not shutting down). @@ -1039,7 +1122,7 @@ HRegionInfo info = msgs[i].getRegionInfo(); if (info.tableDesc.getName().equals(ROOT_TABLE_NAME)) { - rootRegionLocation = null; + rootRegionLocation.set(null); } else if (info.tableDesc.getName().equals(META_TABLE_NAME)) { onlineMetaRegions.remove(info.getStartKey()); @@ -1050,7 +1133,7 @@ } } } - + // We don't need to return anything to the server because it isn't // going to do any more work. return new HMsg[0]; @@ -1055,7 +1138,7 @@ // going to do any more work. return new HMsg[0]; } - + if (closed) { // Tell server to shut down if we are shutting down. This should // happen after check of MSG_REPORT_EXITING above, since region server @@ -1060,13 +1143,17 @@ // Tell server to shut down if we are shutting down. This should // happen after check of MSG_REPORT_EXITING above, since region server // will send us one of these messages after it gets MSG_REGIONSERVER_STOP + return new HMsg[]{new HMsg(HMsg.MSG_REGIONSERVER_STOP)}; } - HServerInfo storedInfo = serversToServerInfo.get(s); + HServerInfo storedInfo; + synchronized (serversToServerInfo) { + storedInfo = serversToServerInfo.get(serverName); + } if(storedInfo == null) { if(LOG.isDebugEnabled()) { - LOG.debug("received server report from unknown server: " + s); + LOG.debug("received server report from unknown server: " + serverName); } // The HBaseMaster may have been restarted. @@ -1071,7 +1158,7 @@ // The HBaseMaster may have been restarted. // Tell the RegionServer to start over and call regionServerStartup() - + return new HMsg[]{new HMsg(HMsg.MSG_CALL_SERVER_STARTUP)}; } else if (storedInfo.getStartCode() != serverInfo.getStartCode()) { @@ -1086,11 +1173,12 @@ // The answer is to ask A to shut down for good. if (LOG.isDebugEnabled()) { - LOG.debug("region server race condition detected: " + s); + LOG.debug("region server race condition detected: " + serverName); } + cancelLease(serverName, serverLabel); return new HMsg[]{new HMsg(HMsg.MSG_REGIONSERVER_STOP)}; - + } else { // All's well. Renew the server's lease. @@ -1096,43 +1184,93 @@ // All's well. Renew the server's lease. // This will always succeed; otherwise, the fetch of serversToServerInfo // would have failed above. - + serverLeases.renewLease(serverLabel, serverLabel); - // Refresh the info object - serversToServerInfo.put(s, serverInfo); + // Refresh the info object and the load information + + synchronized (serversToServerInfo) { + serversToServerInfo.put(serverName, serverInfo); + + HServerLoad load = serversToLoad.get(serverName); + if (load != null && !load.equals(serverInfo.getLoad())) { + // We have previous information about the load on this server + // and the load on this server has changed - // Next, process messages for this server - return processMsgs(serverInfo, msgs); - } - } + Set servers = loadToServers.get(load); - /** cancel a server's lease */ + // Note that servers should never be null because loadToServers + // and serversToLoad are manipulated in pairs + + servers.remove(serverName); + loadToServers.put(load, servers); + } + + // Set the current load information + + load = serverInfo.getLoad(); + serversToLoad.put(serverName, load); + Set servers = loadToServers.get(load); + if (servers == null) { + servers = new HashSet(); + } + servers.add(serverName); + loadToServers.put(load, servers); + } + + // Next, process messages for this server + return processMsgs(serverInfo, msgs); + } + } + + /** Cancel a server's lease and update its load information */ private boolean cancelLease(final String serverName, final long serverLabel) { boolean leaseCancelled = false; - if (serversToServerInfo.remove(serverName) != null) { - // Only cancel lease once. - // This method can be called a couple of times during shutdown. - LOG.info("Cancelling lease for " + serverName); - serverLeases.cancelLease(serverLabel, serverLabel); - leaseCancelled = true; + synchronized (serversToServerInfo) { + HServerInfo info = serversToServerInfo.remove(serverName); + if (info != null) { + // Only cancel lease and update load information once. + // This method can be called a couple of times during shutdown. + + LOG.info("Cancelling lease for " + serverName); + serverLeases.cancelLease(serverLabel, serverLabel); + leaseCancelled = true; + + // update load information + + HServerLoad load = serversToLoad.remove(serverName); + if (load != null) { + Set servers = loadToServers.get(load); + if (servers != null) { + servers.remove(serverName); + loadToServers.put(load, servers); + } + } + } + serversToServerInfo.notifyAll(); } return leaseCancelled; } - - /** Process all the incoming messages from a server that's contacted us. */ - private HMsg[] processMsgs(HServerInfo info, HMsg incomingMsgs[]) throws IOException { - ArrayList returnMsgs = new ArrayList(); - - TreeMap regionsToKill = - killList.remove(info.getServerAddress().toString()); + + /** + * Process all the incoming messages from a server that's contacted us. + * + * Note that we never need to update the server's load information because + * that has already been done in regionServerReport. + */ + private HMsg[] processMsgs(HServerInfo info, HMsg incomingMsgs[]) + throws IOException { + ArrayList returnMsgs = new ArrayList(); + String serverName = info.getServerAddress().toString(); + HashMap regionsToKill = killList.remove(serverName); + // Get reports on what the RegionServer did. - - for(int i = 0; i < incomingMsgs.length; i++) { + + for (int i = 0; i < incomingMsgs.length; i++) { HRegionInfo region = incomingMsgs[i].getRegionInfo(); - switch(incomingMsgs[i].getMsg()) { + switch (incomingMsgs[i].getMsg()) { case HMsg.MSG_REPORT_OPEN: HRegionInfo regionInfo = unassignedRegions.get(region.regionName); @@ -1137,9 +1275,9 @@ case HMsg.MSG_REPORT_OPEN: HRegionInfo regionInfo = unassignedRegions.get(region.regionName); - if(regionInfo == null) { + if (regionInfo == null) { - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("region server " + info.getServerAddress().toString() + " should not have opened region " + region.regionName); } @@ -1156,11 +1294,19 @@ region.regionName); // Remove from unassigned list so we don't assign it to someone else + unassignedRegions.remove(region.regionName); assignAttempts.remove(region.regionName); - if(region.regionName.compareTo(HGlobals.rootRegionInfo.regionName) == 0) { + + if (region.regionName.compareTo( + HGlobals.rootRegionInfo.regionName) == 0) { + // Store the Root Region location (in memory) - rootRegionLocation = new HServerAddress(info.getServerAddress()); + + synchronized (rootRegionLocation) { + rootRegionLocation.set(new HServerAddress(info.getServerAddress())); + rootRegionLocation.notifyAll(); + } break; } @@ -1166,9 +1312,11 @@ // Note that the table has been assigned and is waiting for the meta // table to be updated. + pendingRegions.add(region.regionName); - + // Queue up an update to note the region location. + try { msgQueue.put(new PendingOpenReport(info, region)); } catch (InterruptedException e) { @@ -1181,8 +1329,12 @@ LOG.info(info.getServerAddress().toString() + " no longer serving " + region.regionName); - if(region.regionName.compareTo(HGlobals.rootRegionInfo.regionName) == 0) { // Root region - rootRegionLocation = null; + if (region.regionName.compareTo( + HGlobals.rootRegionInfo.regionName) == 0) { + + // Root region + + rootRegionLocation.set(null); unassignedRegions.put(region.regionName, region); assignAttempts.put(region.regionName, Long.valueOf(0L)); @@ -1190,14 +1342,19 @@ boolean reassignRegion = true; boolean deleteRegion = false; - if(killedRegions.remove(region.regionName)) { + if (killedRegions.remove(region.regionName)) { reassignRegion = false; } - - if(regionsToDelete.remove(region.regionName)) { + + if (regionsToDelete.remove(region.regionName)) { reassignRegion = false; deleteRegion = true; } + + // NOTE: we cannot put the region into unassignedRegions as that + // could create a race with the pending close if it gets + // reassigned before the close is processed. + unassignedRegions.remove(region.regionName); assignAttempts.remove(region.regionName); @@ -1202,15 +1359,12 @@ assignAttempts.remove(region.regionName); try { - msgQueue.put(new PendingCloseReport(region, reassignRegion, deleteRegion)); + msgQueue.put(new PendingCloseReport(region, reassignRegion, + deleteRegion)); + } catch (InterruptedException e) { throw new RuntimeException("Putting into msgQueue was interrupted.", e); } - - // NOTE: we cannot put the region into unassignedRegions as that - // could create a race with the pending close if it gets - // reassigned before the close is processed. - } break; @@ -1216,19 +1370,25 @@ case HMsg.MSG_REPORT_SPLIT: // A region has split. + HRegionInfo newRegionA = incomingMsgs[++i].getRegionInfo(); HRegionInfo newRegionB = incomingMsgs[++i].getRegionInfo(); + LOG.info("region " + region.regionName + " split. New regions are: " - + newRegionA.regionName + ", " + newRegionB.regionName); - if(region.tableDesc.getName().equals(META_TABLE_NAME)) { + + newRegionA.regionName + ", " + newRegionB.regionName); + + if (region.tableDesc.getName().equals(META_TABLE_NAME)) { // A meta region has split. + onlineMetaRegions.remove(region.getStartKey()); onlineMetaRegions.put(newRegionA.getStartKey(), - new MetaRegion(info.getServerAddress(), newRegionA.getRegionName(), - newRegionA.getStartKey())); + new MetaRegion(info.getServerAddress(), + newRegionA.getRegionName(), newRegionA.getStartKey())); + onlineMetaRegions.put(newRegionB.getStartKey(), - new MetaRegion(info.getServerAddress(), newRegionB.getRegionName(), - newRegionB.getStartKey())); + new MetaRegion(info.getServerAddress(), + newRegionB.getRegionName(), newRegionB.getStartKey())); + numberOfMetaRegions.incrementAndGet(); } break; @@ -1241,8 +1401,9 @@ } // Process the kill list - if(regionsToKill != null) { - for(HRegionInfo i: regionsToKill.values()) { + + if (regionsToKill != null) { + for (HRegionInfo i: regionsToKill.values()) { returnMsgs.add(new HMsg(HMsg.MSG_REGION_CLOSE, i)); killedRegions.add(i.regionName); } @@ -1249,48 +1410,174 @@ } // Figure out what the RegionServer ought to do, and write back. - if(unassignedRegions.size() > 0) { - // Open new regions as necessary - int targetForServer = (int) Math.ceil(unassignedRegions.size() - / (1.0 * serversToServerInfo.size())); - int counter = 0; - long now = System.currentTimeMillis(); - for (Text curRegionName: unassignedRegions.keySet()) { - HRegionInfo regionInfo = unassignedRegions.get(curRegionName); - long assignedTime = assignAttempts.get(curRegionName); - if (now - assignedTime > maxRegionOpenTime) { - LOG.info("assigning region " + regionInfo.regionName + " to server " + - info.getServerAddress().toString()); + assignRegions(info, serverName, returnMsgs); + return returnMsgs.toArray(new HMsg[returnMsgs.size()]); + } + /** + * Assigns regions to region servers attempting to balance the load across + * all region servers + * + * @param info + * @param serverName + * @param returnMsgs + */ + private void assignRegions(HServerInfo info, String serverName, + ArrayList returnMsgs) { + + long now = System.currentTimeMillis(); + TreeSet regionsToAssign = new TreeSet(); + for (Map.Entry e: assignAttempts.entrySet()) { + if (now - e.getValue() > maxRegionOpenTime) { + regionsToAssign.add(e.getKey()); + } + } + int nRegionsToAssign = regionsToAssign.size(); + + if (nRegionsToAssign > 0) { + if (serversToServerInfo.size() == 1) { + // Only one server. An unlikely case but still possible. + // Assign all unassigned regions to it. + + for (Text regionName: regionsToAssign) { + HRegionInfo regionInfo = unassignedRegions.get(regionName); + LOG.info("assigning region " + regionName + " to server " + + serverName); + + assignAttempts.put(regionName, Long.valueOf(now)); returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo)); + } + + } else { + // Multiple servers in play. + // We need to allocate regions only to most lightly loaded servers. + + HServerLoad thisServersLoad = info.getLoad(); + + synchronized (serversToServerInfo) { + SortedMap> lightServers = + loadToServers.headMap(thisServersLoad); + + // How many regions we can assign to more lightly loaded servers? + + int nregions = 0; + for (Map.Entry> e: lightServers.entrySet()) { + HServerLoad lightLoad = + new HServerLoad(e.getKey().getNumberOfRequests(), + e.getKey().getNumberOfRegions()); + + do { + lightLoad.setNumberOfRegions(lightLoad.getNumberOfRegions() + 1); + nregions += 1; + + } while (lightLoad.compareTo(thisServersLoad) <= 0 && + nregions < nRegionsToAssign); + + nregions *= e.getValue().size(); + + if (nregions >= nRegionsToAssign) { + break; + } + } + + nRegionsToAssign -= nregions; + if (nRegionsToAssign > 0) { + // We still have more regions to assign. See how many we can assign + // before this server becomes more heavily loaded than the next + // most heavily loaded server. + + SortedMap> heavyServers = + loadToServers.tailMap(thisServersLoad); + int nservers = 0; + HServerLoad heavierLoad = null; + for (Map.Entry> e: + heavyServers.entrySet()) { + + Set servers = e.getValue(); + nservers += servers.size(); + + if (e.getKey().compareTo(thisServersLoad) == 0) { + // This is the load factor of the server we are considering + + nservers -= 1; + continue; + } + + // If we get here, we are at the first load entry that is a + // heavier load than the server we are considering + + heavierLoad = e.getKey(); + break; + } - assignAttempts.put(curRegionName, Long.valueOf(now)); - counter++; - } + nregions = 0; + if (heavierLoad != null) { + // There is a more heavily loaded server + + for (HServerLoad load = + new HServerLoad(thisServersLoad.getNumberOfRequests(), + thisServersLoad.getNumberOfRegions()); + load.compareTo(heavierLoad) <= 0 && nregions < nRegionsToAssign; + load.setNumberOfRegions(load.getNumberOfRegions() + 1), + nregions++) { + } + } + + if (nregions < nRegionsToAssign) { + // There are some more heavily loaded servers + // but we can't assign all the regions to this server. + + if (nservers > 0) { + // There are other servers that can share the load. + // Split regions that need assignment across the servers. + + nregions = + (int) Math.ceil((1.0 * nRegionsToAssign) / (1.0 * nservers)); + + } else { + // No other servers with same load. + // Split regions over all available servers + + nregions = (int) Math.ceil((1.0 * nRegionsToAssign) + / (1.0 * serversToServerInfo.size())); + + } + + } else { + // Assign all regions to this server + + nregions = nRegionsToAssign; + } + + for (Map.Entry e: unassignedRegions.entrySet()) { + Text regionName = e.getKey(); + HRegionInfo regionInfo = e.getValue(); + LOG.info("assigning region " + regionName + " to server " + + serverName); + + assignAttempts.put(regionName, Long.valueOf(now)); + returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo)); - if(counter >= targetForServer) { - break; + if (--nregions <= 0) { + break; + } + } + } } } } - return returnMsgs.toArray(new HMsg[returnMsgs.size()]); } - - ////////////////////////////////////////////////////////////////////////////// - // Some internal classes to manage msg-passing and client operations - ////////////////////////////////////////////////////////////////////////////// - + + /* + * Some internal classes to manage msg-passing and client operations + */ + private abstract class PendingOperation { - protected final Text[] columns = { - COLUMN_FAMILY - }; - protected final Text startRow = new Text(); - PendingOperation() { super(); } - + abstract boolean process() throws IOException; } @@ -1302,11 +1589,10 @@ private class PendingServerShutdown extends PendingOperation { private HServerAddress deadServer; private String deadServerName; - private long oldStartCode; private transient boolean logSplit; private transient boolean rootChecked; private transient boolean rootRescanned; - + private class ToDoEntry { boolean deleteRegion; boolean regionOffline; @@ -1312,7 +1598,7 @@ boolean regionOffline; Text row; HRegionInfo info; - + ToDoEntry(Text row, HRegionInfo info) { this.deleteRegion = false; this.regionOffline = false; @@ -1320,7 +1606,7 @@ this.info = info; } } - + PendingServerShutdown(HServerInfo serverInfo) { super(); this.deadServer = serverInfo.getServerAddress(); @@ -1325,7 +1611,6 @@ super(); this.deadServer = serverInfo.getServerAddress(); this.deadServerName = this.deadServer.toString(); - this.oldStartCode = serverInfo.getStartCode(); this.logSplit = false; this.rootChecked = false; this.rootRescanned = false; @@ -1330,7 +1615,8 @@ this.rootChecked = false; this.rootRescanned = false; } - + + /** {@inheritDoc} */ @Override public String toString() { return "PendingServerShutdown of " + this.deadServer.toString(); @@ -1335,22 +1621,26 @@ public String toString() { return "PendingServerShutdown of " + this.deadServer.toString(); } - + /** Finds regions that the dead region server was serving */ private void scanMetaRegion(HRegionInterface server, long scannerId, - Text regionName) - throws IOException { + Text regionName) throws IOException { + ArrayList toDoList = new ArrayList(); TreeMap regions = new TreeMap(); - DataInputBuffer inbuf = new DataInputBuffer(); + try { - while(true) { + while (true) { KeyedData[] values = null; + try { values = server.next(scannerId); + } catch (IOException e) { if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + e = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) e); + } LOG.error("Shutdown scanning of meta region", e); break; @@ -1355,8 +1645,8 @@ LOG.error("Shutdown scanning of meta region", e); break; } - - if(values == null || values.length == 0) { + + if (values == null || values.length == 0) { break; } @@ -1362,11 +1652,12 @@ TreeMap results = new TreeMap(); Text row = null; - for(int i = 0; i < values.length; i++) { + for (int i = 0; i < values.length; i++) { if(row == null) { row = values[i].getKey().getRow(); + } else { - if(!row.equals(values[i].getKey().getRow())) { + if (!row.equals(values[i].getKey().getRow())) { LOG.error("Multiple rows in same scanner result set. firstRow=" + row + ", currentRow=" + values[i].getKey().getRow()); } @@ -1373,7 +1664,8 @@ } results.put(values[i].getKey().getColumn(), values[i].getData()); } - if (LOG.isDebugEnabled()) { + + if (LOG.isDebugEnabled() && row != null) { LOG.debug("shutdown scanner looking at " + row.toString()); } @@ -1380,9 +1672,11 @@ // Check server name. If null, be conservative and treat as though // region had been on shutdown server (could be null because we // missed edits in hlog because hdfs does not do write-append). + String serverName = null; try { serverName = Keying.bytesToString(results.get(COL_SERVER)); + } catch(UnsupportedEncodingException e) { LOG.error("Server name", e); break; @@ -1389,10 +1683,12 @@ } if (serverName != null && serverName.length() > 0 && deadServerName.compareTo(serverName) != 0) { + // This isn't the server you're looking for - move along + if (LOG.isDebugEnabled()) { LOG.debug("Server name " + serverName + " is not same as " + - deadServerName + ": Passing"); + deadServerName + ": Passing"); } continue; } @@ -1398,10 +1694,12 @@ } // Bingo! Found it. + HRegionInfo info = null; try { - info = (HRegionInfo)Writables. - getWritable(results.get(COL_REGIONINFO), new HRegionInfo()); + info = (HRegionInfo) Writables.getWritable( + results.get(COL_REGIONINFO), new HRegionInfo()); + } catch (IOException e) { LOG.error("Read fields", e); break; @@ -1407,17 +1705,21 @@ break; } LOG.info(info.getRegionName() + " was on shutdown server <" + - serverName + "> (or server is null). Marking unassigned if " + - "meta and clearing pendingRegions"); + serverName + "> (or server is null). Marking unassigned if " + + "meta and clearing pendingRegions"); + if (info.tableDesc.getName().equals(META_TABLE_NAME)) { onlineMetaRegions.remove(info.getStartKey()); } - + ToDoEntry todo = new ToDoEntry(row, info); toDoList.add(todo); - if(killList.containsKey(deadServerName)) { - TreeMap regionsToKill = killList.get(deadServerName); - if(regionsToKill.containsKey(info.regionName)) { + + if (killList.containsKey(deadServerName)) { + HashMap regionsToKill = + killList.get(deadServerName); + + if (regionsToKill.containsKey(info.regionName)) { regionsToKill.remove(info.regionName); killList.put(deadServerName, regionsToKill); unassignedRegions.remove(info.regionName); @@ -1422,20 +1724,28 @@ killList.put(deadServerName, regionsToKill); unassignedRegions.remove(info.regionName); assignAttempts.remove(info.regionName); - if(regionsToDelete.contains(info.regionName)) { + + if (regionsToDelete.contains(info.regionName)) { // Delete this region + regionsToDelete.remove(info.regionName); todo.deleteRegion = true; + } else { // Mark region offline + todo.regionOffline = true; } } + } else { // Get region reassigned + regions.put(info.regionName, info); - // If was pending, remove otherwise will obstruct its getting - // reassigned. + + // If it was pending, remove. + // Otherwise will obstruct its getting reassigned. + pendingRegions.remove(info.getRegionName()); } } @@ -1444,10 +1754,11 @@ if(scannerId != -1L) { try { server.close(scannerId); - + } catch (IOException e) { if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + e = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) e); } LOG.error("Closing scanner", e); } @@ -1455,12 +1766,15 @@ } // Remove server from root/meta entries + long clientId = rand.nextLong(); for (ToDoEntry e: toDoList) { long lockid = server.startUpdate(regionName, clientId, e.row); - if(e.deleteRegion) { + + if (e.deleteRegion) { server.delete(regionName, clientId, lockid, COL_REGIONINFO); - } else if(e.regionOffline) { + + } else if (e.regionOffline) { e.info.offLine = true; ByteArrayOutputStream byteValue = new ByteArrayOutputStream(); DataOutputStream s = new DataOutputStream(byteValue); @@ -1466,7 +1780,7 @@ DataOutputStream s = new DataOutputStream(byteValue); e.info.write(s); server.put(regionName, clientId, lockid, COL_REGIONINFO, - byteValue.toByteArray()); + byteValue.toByteArray()); } server.delete(regionName, clientId, lockid, COL_SERVER); server.delete(regionName, clientId, lockid, COL_STARTCODE); @@ -1472,10 +1786,10 @@ server.delete(regionName, clientId, lockid, COL_STARTCODE); server.commit(regionName, clientId, lockid, System.currentTimeMillis()); } - + // Get regions reassigned - for(Map.Entry e: regions.entrySet()) { + for (Map.Entry e: regions.entrySet()) { Text region = e.getKey(); HRegionInfo regionInfo = e.getValue(); @@ -1487,24 +1801,28 @@ @Override boolean process() throws IOException { LOG.info("process shutdown of server " + deadServer + ": logSplit: " + - this.logSplit + ", rootChecked: " + this.rootChecked + - ", rootRescanned: " + this.rootRescanned + ", numberOfMetaRegions: " + - numberOfMetaRegions.get() + ", onlineMetaRegions.size(): " + - onlineMetaRegions.size()); + this.logSplit + ", rootChecked: " + this.rootChecked + + ", rootRescanned: " + this.rootRescanned + ", numberOfMetaRegions: " + + numberOfMetaRegions.get() + ", onlineMetaRegions.size(): " + + onlineMetaRegions.size()); - if(!logSplit) { + if (!logSplit) { // Process the old log file + HLog.splitLog(dir, new Path(dir, "log" + "_" + - deadServer.getBindAddress() + "_" + deadServer.getPort()), fs, conf); + deadServer.getBindAddress() + "_" + deadServer.getPort()), fs, conf); + logSplit = true; } - if(!rootChecked) { - if(rootRegionLocation != null - && deadServer.equals(rootRegionLocation)) { - rootRegionLocation = null; + if (!rootChecked) { + if (rootRegionLocation.get() != null && + deadServer.equals(rootRegionLocation.get())) { + + rootRegionLocation.set(null); unassignedRegions.put(HGlobals.rootRegionInfo.regionName, HGlobals.rootRegionInfo); + assignAttempts.put(HGlobals.rootRegionInfo.regionName, Long.valueOf(0L)); } @@ -1511,24 +1829,25 @@ rootChecked = true; } - if(!rootRescanned) { + if (!rootRescanned) { // Scan the ROOT region + HRegionInterface server = null; long scannerId = -1L; - for(int tries = 0; tries < numRetries; tries ++) { + for (int tries = 0; tries < numRetries; tries ++) { if (closed) { return true; } - if (rootRegionLocation == null || !rootScanned) { - // We can't proceed until the root region is online and has been - // scanned + if (rootRegionLocation.get() == null || !rootScanned) { + // We can't proceed until the root region is online and has been scanned + if (LOG.isDebugEnabled()) { LOG.debug("process server shutdown scanning root region " + - "cancelled because rootRegionLocation is null"); + "cancelled because rootRegionLocation is null"); } return false; } - server = connection.getHRegionConnection(rootRegionLocation); + server = connection.getHRegionConnection(rootRegionLocation.get()); scannerId = -1L; try { @@ -1534,17 +1853,20 @@ try { if (LOG.isDebugEnabled()) { LOG.debug("process server shutdown scanning root region on " + - rootRegionLocation.getBindAddress()); + rootRegionLocation.get().getBindAddress()); } scannerId = server.openScanner(HGlobals.rootRegionInfo.regionName, - columns, startRow, System.currentTimeMillis(), null); - scanMetaRegion(server, scannerId, - HGlobals.rootRegionInfo.regionName); + COLUMN_FAMILY_ARRAY, EMPTY_START_ROW, + System.currentTimeMillis(), null); + + scanMetaRegion(server, scannerId, HGlobals.rootRegionInfo.regionName); break; + } catch (IOException e) { if (tries == numRetries - 1) { if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + e = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) e); } throw e; } @@ -1552,8 +1874,8 @@ } if (LOG.isDebugEnabled()) { LOG.debug("process server shutdown scanning root region on " + - rootRegionLocation.getBindAddress() + " finished " + - Thread.currentThread().getName()); + rootRegionLocation.get().getBindAddress() + " finished " + + Thread.currentThread().getName()); } rootRescanned = true; } @@ -1569,11 +1891,12 @@ // We can't block either because that would prevent the meta region // online message from being processed. So return false to have this // operation requeued. + if (LOG.isDebugEnabled()) { LOG.debug("Requeuing shutdown because rootScanned: " + - rootScanned + ", numberOfMetaRegions: " + - numberOfMetaRegions.get() + ", onlineMetaRegions.size(): " + - onlineMetaRegions.size()); + rootScanned + ", numberOfMetaRegions: " + + numberOfMetaRegions.get() + ", onlineMetaRegions.size(): " + + onlineMetaRegions.size()); } return false; } @@ -1579,27 +1902,34 @@ } for (MetaRegion r: onlineMetaRegions.values()) { + HRegionInterface server = null; long scannerId = -1L; + if (LOG.isDebugEnabled()) { LOG.debug("process server shutdown scanning " + r.regionName + - " on " + r.server + " " + Thread.currentThread().getName()); + " on " + r.server + " " + Thread.currentThread().getName()); } server = connection.getHRegionConnection(r.server); - scannerId = server.openScanner(r.regionName, columns, startRow, - System.currentTimeMillis(), null); + + scannerId = server.openScanner(r.regionName, COLUMN_FAMILY_ARRAY, + EMPTY_START_ROW, System.currentTimeMillis(), null); + scanMetaRegion(server, scannerId, r.regionName); + if (LOG.isDebugEnabled()) { LOG.debug("process server shutdown finished scanning " + - r.regionName + - " on " + r.server + " " + Thread.currentThread().getName()); + r.regionName + " on " + r.server + " " + + Thread.currentThread().getName()); } } break; + } catch (IOException e) { if (tries == numRetries - 1) { if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + e = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) e); } throw e; } @@ -1608,7 +1938,7 @@ return true; } } - + /** * PendingCloseReport is instantiated when a region server reports that it * has closed a region. @@ -1618,10 +1948,10 @@ private boolean reassignRegion; private boolean deleteRegion; private boolean rootRegion; - + PendingCloseReport(HRegionInfo regionInfo, boolean reassignRegion, boolean deleteRegion) { - + super(); this.regionInfo = regionInfo; @@ -1630,10 +1960,10 @@ // If the region closing down is a meta region then we need to update // the ROOT table - - if(this.regionInfo.tableDesc.getName().equals(META_TABLE_NAME)) { + + if (this.regionInfo.tableDesc.getName().equals(META_TABLE_NAME)) { this.rootRegion = true; - + } else { this.rootRegion = false; } @@ -1638,7 +1968,8 @@ this.rootRegion = false; } } - + + /** {@inheritDoc} */ @Override public String toString() { return "PendingCloseReport of " + this.regionInfo.getRegionName(); @@ -1643,10 +1974,13 @@ public String toString() { return "PendingCloseReport of " + this.regionInfo.getRegionName(); } - + @Override boolean process() throws IOException { for (int tries = 0; tries < numRetries; tries++) { + if (closed) { + return true; + } LOG.info("region closed: " + regionInfo.regionName); // Mark the Region as unavailable in the appropriate meta table @@ -1653,12 +1987,8 @@ Text metaRegionName; HRegionInterface server; - - if (closed) { - return true; - } if (rootRegion) { - if (rootRegionLocation == null || !rootScanned) { + if (rootRegionLocation.get() == null || !rootScanned) { // We can't proceed until the root region is online and has been // scanned return false; @@ -1664,7 +1994,7 @@ return false; } metaRegionName = HGlobals.rootRegionInfo.regionName; - server = connection.getHRegionConnection(rootRegionLocation); + server = connection.getHRegionConnection(rootRegionLocation.get()); onlineMetaRegions.remove(regionInfo.getStartKey()); } else { @@ -1670,6 +2000,7 @@ } else { if (!rootScanned || numberOfMetaRegions.get() != onlineMetaRegions.size()) { + // We can't proceed because not all of the meta regions are online. // We can't block either because that would prevent the meta region // online message from being processed. So return false to have this @@ -1674,11 +2005,12 @@ // We can't block either because that would prevent the meta region // online message from being processed. So return false to have this // operation requeued. + if (LOG.isDebugEnabled()) { LOG.debug("Requeuing close because rootScanned=" + - rootScanned + ", numberOfMetaRegions=" + - numberOfMetaRegions.get() + ", onlineMetaRegions.size()=" + - onlineMetaRegions.size()); + rootScanned + ", numberOfMetaRegions=" + + numberOfMetaRegions.get() + ", onlineMetaRegions.size()=" + + onlineMetaRegions.size()); } return false; } @@ -1688,8 +2020,8 @@ r = onlineMetaRegions.get(regionInfo.getRegionName()); } else { - r = onlineMetaRegions.get( - onlineMetaRegions.headMap(regionInfo.getRegionName()).lastKey()); + r = onlineMetaRegions.get(onlineMetaRegions.headMap( + regionInfo.getRegionName()).lastKey()); } metaRegionName = r.regionName; server = connection.getHRegionConnection(r.server); @@ -1699,11 +2031,11 @@ try { long lockid = server.startUpdate(metaRegionName, clientId, regionInfo.regionName); - - if(deleteRegion) { + + if (deleteRegion) { server.delete(metaRegionName, clientId, lockid, COL_REGIONINFO); - - } else if(!reassignRegion ) { + + } else if (!reassignRegion ) { regionInfo.offLine = true; ByteArrayOutputStream byteValue = new ByteArrayOutputStream(); DataOutputStream s = new DataOutputStream(byteValue); @@ -1710,7 +2042,7 @@ regionInfo.write(s); server.put(metaRegionName, clientId, lockid, COL_REGIONINFO, - byteValue.toByteArray()); + byteValue.toByteArray()); } server.delete(metaRegionName, clientId, lockid, COL_SERVER); server.delete(metaRegionName, clientId, lockid, COL_STARTCODE); @@ -1716,7 +2048,7 @@ server.delete(metaRegionName, clientId, lockid, COL_STARTCODE); server.commit(metaRegionName, clientId, lockid, System.currentTimeMillis()); - + break; } catch (IOException e) { @@ -1722,7 +2054,8 @@ } catch (IOException e) { if (tries == numRetries - 1) { if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + e = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) e); } throw e; } @@ -1730,13 +2063,13 @@ } } - if(reassignRegion) { + if (reassignRegion) { LOG.info("reassign region: " + regionInfo.regionName); - + unassignedRegions.put(regionInfo.regionName, regionInfo); assignAttempts.put(regionInfo.regionName, Long.valueOf(0L)); - - } else if(deleteRegion) { + + } else if (deleteRegion) { try { HRegion.deleteRegion(fs, dir, regionInfo.regionName); @@ -1742,7 +2075,8 @@ } catch (IOException e) { if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + e = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) e); } LOG.error("failed delete region " + regionInfo.regionName, e); throw e; @@ -1762,7 +2096,7 @@ private HRegionInfo region; private HServerAddress serverAddress; private byte [] startCode; - + PendingOpenReport(HServerInfo info, HRegionInfo region) { if (region.tableDesc.getName().equals(META_TABLE_NAME)) { // The region which just came on-line is a META region. @@ -1767,9 +2101,12 @@ if (region.tableDesc.getName().equals(META_TABLE_NAME)) { // The region which just came on-line is a META region. // We need to look in the ROOT region for its information. + this.rootRegion = true; + } else { // Just an ordinary region. Look for it in the META table. + this.rootRegion = false; } this.region = region; @@ -1781,7 +2118,8 @@ LOG.error("Start code", e); } } - + + /** {@inheritDoc} */ @Override public String toString() { return "PendingOpenOperation from " + serverAddress.toString(); @@ -1786,25 +2124,25 @@ public String toString() { return "PendingOpenOperation from " + serverAddress.toString(); } - + @Override boolean process() throws IOException { for (int tries = 0; tries < numRetries; tries++) { + if (closed) { + return true; + } LOG.info(region.getRegionName() + " open on " + this.serverAddress.toString()); + // Register the newly-available Region's location. + Text metaRegionName; HRegionInterface server; - if (closed) { - return true; - } - if (rootRegion) { - if (rootRegionLocation == null || !rootScanned) { - // We can't proceed until the root region is online and has been - // scanned + if (rootRegionLocation.get() == null || !rootScanned) { + // We can't proceed until the root region is online and has been scanned if (LOG.isDebugEnabled()) { - LOG.debug("root region=" + rootRegionLocation.toString() + + LOG.debug("root region=" + rootRegionLocation.get().toString() + ", rootScanned=" + rootScanned); } return false; @@ -1810,15 +2148,17 @@ return false; } metaRegionName = HGlobals.rootRegionInfo.regionName; - server = connection.getHRegionConnection(rootRegionLocation); + server = connection.getHRegionConnection(rootRegionLocation.get()); + } else { if (!rootScanned || - numberOfMetaRegions.get() != onlineMetaRegions.size()) { - + numberOfMetaRegions.get() != onlineMetaRegions.size()) { + // We can't proceed because not all of the meta regions are online. // We can't block either because that would prevent the meta region // online message from being processed. So return false to have this - // operation requeue + // operation requeued. + if (LOG.isDebugEnabled()) { LOG.debug("Requeuing open because rootScanned: " + rootScanned + ", numberOfMetaRegions: " + @@ -1831,9 +2171,10 @@ MetaRegion r = null; if (onlineMetaRegions.containsKey(region.getRegionName())) { r = onlineMetaRegions.get(region.getRegionName()); + } else { - r = onlineMetaRegions.get( - onlineMetaRegions.headMap(region.getRegionName()).lastKey()); + r = onlineMetaRegions.get(onlineMetaRegions.headMap( + region.getRegionName()).lastKey()); } metaRegionName = r.regionName; server = connection.getHRegionConnection(r.server); @@ -1840,6 +2181,7 @@ } LOG.info("updating row " + region.getRegionName() + " in table " + metaRegionName); + long clientId = rand.nextLong(); try { long lockid = server.startUpdate(metaRegionName, clientId, @@ -1844,20 +2186,27 @@ try { long lockid = server.startUpdate(metaRegionName, clientId, region.getRegionName()); + server.put(metaRegionName, clientId, lockid, COL_SERVER, serverAddress.toString().getBytes(UTF8_ENCODING)); + server.put(metaRegionName, clientId, lockid, COL_STARTCODE, startCode); + server.commit(metaRegionName, clientId, lockid, System.currentTimeMillis()); - + if (region.tableDesc.getName().equals(META_TABLE_NAME)) { // It's a meta region. + MetaRegion m = new MetaRegion(serverAddress, region.regionName, region.startKey); + if (!initialMetaScanComplete) { // Put it on the queue to be scanned for the first time. + try { metaRegionsToScan.put(m); + } catch (InterruptedException e) { throw new RuntimeException( "Putting into metaRegionsToScan was interrupted.", e); @@ -1864,6 +2213,7 @@ } } else { // Add it to the online meta regions + onlineMetaRegions.put(region.startKey, m); } } @@ -1868,12 +2218,15 @@ } } // If updated successfully, remove from pending list. + pendingRegions.remove(region.getRegionName()); break; + } catch (IOException e) { if (tries == numRetries - 1) { if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + e = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) e); } throw e; } @@ -1883,13 +2236,11 @@ } } - ////////////////////////////////////////////////////////////////////////////// - // HMasterInterface - ////////////////////////////////////////////////////////////////////////////// - - /** - * {@inheritDoc} + /* + * HMasterInterface */ + + /** {@inheritDoc} */ public boolean isMasterRunning() { return !closed; } @@ -1894,9 +2245,7 @@ return !closed; } - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ public void shutdown() { TimerTask tt = new TimerTask() { @Override @@ -1912,11 +2261,10 @@ t.schedule(tt, 10); } - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ public void createTable(HTableDescriptor desc) throws IOException { + if (!isMasterRunning()) { throw new MasterNotRunningException(); } @@ -1926,6 +2274,7 @@ try { // We can not access meta regions if they have not already been // assigned and scanned. If we timeout waiting, just shutdown. + if (metaScanner.waitForMetaRegionsOrClose()) { break; } @@ -1932,10 +2281,12 @@ createTable(newRegion); LOG.info("created table " + desc.getName()); break; + } catch (IOException e) { if (tries == numRetries - 1) { if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + e = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) e); } throw e; } @@ -1942,12 +2293,10 @@ } } } - - /* - * Set of tables currently in creation. Access needs to be synchronized. - */ + + /* Set of tables currently in creation. Access needs to be synchronized. */ private Set tableInCreation = new HashSet(); - + private void createTable(final HRegionInfo newRegion) throws IOException { Text tableName = newRegion.tableDesc.getName(); synchronized (tableInCreation) { @@ -1962,27 +2311,32 @@ // table would sit should it exist. Open scanner on it. If a region // for the table we want to create already exists, then table already // created. Throw already-exists exception. - MetaRegion m = (onlineMetaRegions.containsKey(newRegion.regionName)) ? + + MetaRegion m = (onlineMetaRegions.containsKey(newRegion.regionName) ? onlineMetaRegions.get(newRegion.regionName) : - onlineMetaRegions.get(onlineMetaRegions. - headMap(newRegion.getTableDesc().getName()).lastKey()); + onlineMetaRegions.get(onlineMetaRegions.headMap( + newRegion.getTableDesc().getName()).lastKey())); + Text metaRegionName = m.regionName; HRegionInterface r = connection.getHRegionConnection(m.server); - long scannerid = r.openScanner(metaRegionName, - new Text[] { COL_REGIONINFO }, tableName, System.currentTimeMillis(), - null); + long scannerid = r.openScanner(metaRegionName, COL_REGIONINFO_ARRAY, + tableName, System.currentTimeMillis(), null); try { KeyedData[] data = r.next(scannerid); - // Test data and that the row for the data is for our table. If - // table does not exist, scanner will return row after where our table - // would be inserted if it exists so look for exact match on table - // name. + + // Test data and that the row for the data is for our table. If table + // does not exist, scanner will return row after where our table would + // be inserted if it exists so look for exact match on table name. + if (data != null && data.length > 0 && - HRegionInfo.getTableNameFromRegionName(data[0].getKey().getRow()). - equals(tableName)) { + HRegionInfo.getTableNameFromRegionName( + data[0].getKey().getRow()).equals(tableName)) { + // Then a region for this table already exists. Ergo table exists. + throw new TableExistsException(tableName.toString()); } + } finally { r.close(scannerid); } @@ -1988,10 +2342,12 @@ } // 2. Create the HRegion - HRegion region = HRegion.createHRegion(newRegion.regionId, newRegion. - getTableDesc(), this.dir, this.conf); + + HRegion region = HRegion.createHRegion(newRegion.regionId, + newRegion.getTableDesc(), this.dir, this.conf); // 3. Insert into meta + HRegionInfo info = region.getRegionInfo(); Text regionName = region.getRegionName(); ByteArrayOutputStream byteValue = new ByteArrayOutputStream(); @@ -1999,12 +2355,14 @@ info.write(s); long clientId = rand.nextLong(); long lockid = r.startUpdate(metaRegionName, clientId, regionName); + r.put(metaRegionName, clientId, lockid, COL_REGIONINFO, - byteValue.toByteArray()); - r.commit(metaRegionName, clientId, lockid, - System.currentTimeMillis()); + byteValue.toByteArray()); + + r.commit(metaRegionName, clientId, lockid, System.currentTimeMillis()); // 4. Close the new region to flush it to disk. Close its log file too. + region.close(); region.getLog().closeAndDelete(); @@ -2009,8 +2367,10 @@ region.getLog().closeAndDelete(); // 5. Get it assigned to a server + unassignedRegions.put(regionName, info); assignAttempts.put(regionName, Long.valueOf(0L)); + } finally { synchronized (tableInCreation) { tableInCreation.remove(newRegion.getTableDesc().getName()); @@ -2018,9 +2378,7 @@ } } - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ public void deleteTable(Text tableName) throws IOException { new TableDelete(tableName).process(); LOG.info("deleted table: " + tableName); @@ -2025,24 +2383,20 @@ new TableDelete(tableName).process(); LOG.info("deleted table: " + tableName); } - - /** - * {@inheritDoc} - */ - public void addColumn(Text tableName, HColumnDescriptor column) throws IOException { + + /** {@inheritDoc} */ + public void addColumn(Text tableName, HColumnDescriptor column) + throws IOException { + new AddColumn(tableName, column).process(); } - - /** - * {@inheritDoc} - */ + + /** {@inheritDoc} */ public void deleteColumn(Text tableName, Text columnName) throws IOException { new DeleteColumn(tableName, HStoreKey.extractFamily(columnName)).process(); } - - /** - * {@inheritDoc} - */ + + /** {@inheritDoc} */ public void enableTable(Text tableName) throws IOException { new ChangeTableState(tableName, true).process(); } @@ -2047,28 +2401,25 @@ new ChangeTableState(tableName, true).process(); } - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ public void disableTable(Text tableName) throws IOException { new ChangeTableState(tableName, false).process(); } - - /** - * {@inheritDoc} - */ + + /** {@inheritDoc} */ public HServerAddress findRootRegion() { - return rootRegionLocation; + return rootRegionLocation.get(); } - - // Helper classes for HMasterInterface + + /* + * Helper classes for HMasterInterface + */ private abstract class TableOperation { - private SortedSet metaRegions; + private Set metaRegions; protected Text tableName; - - protected TreeSet unservedRegions; - + protected Set unservedRegions; + protected TableOperation(Text tableName) throws IOException { if (!isMasterRunning()) { throw new MasterNotRunningException(); @@ -2073,9 +2424,10 @@ if (!isMasterRunning()) { throw new MasterNotRunningException(); } - this.metaRegions = new TreeSet(); + + this.metaRegions = new HashSet(); this.tableName = tableName; - this.unservedRegions = new TreeSet(); + this.unservedRegions = new HashSet(); // We can not access any meta region if they have not already been // assigned and scanned. @@ -2081,7 +2433,7 @@ // assigned and scanned. if (metaScanner.waitForMetaRegionsOrClose()) { - throw new MasterNotRunningException(); // We're shutting down. Forget it. + throw new MasterNotRunningException(); // We're shutting down. Forget it. } Text firstMetaRegion = null; @@ -2097,7 +2449,7 @@ this.metaRegions.addAll(onlineMetaRegions.tailMap(firstMetaRegion).values()); } - + void process() throws IOException { for (int tries = 0; tries < numRetries; tries++) { boolean tableExists = false; @@ -2103,7 +2455,7 @@ boolean tableExists = false; try { synchronized(metaScannerLock) { // Prevent meta scanner from running - for(MetaRegion m: metaRegions) { + for (MetaRegion m: metaRegions) { // Get a connection to a meta server @@ -2110,18 +2462,18 @@ HRegionInterface server = connection.getHRegionConnection(m.server); // Open a scanner on the meta region - + long scannerId = - server.openScanner(m.regionName, METACOLUMNS, tableName, + server.openScanner(m.regionName, COLUMN_FAMILY_ARRAY, tableName, System.currentTimeMillis(), null); - + try { DataInputBuffer inbuf = new DataInputBuffer(); - while(true) { + while (true) { HRegionInfo info = new HRegionInfo(); String serverName = null; long startCode = -1L; - + KeyedData[] values = server.next(scannerId); if(values == null || values.length == 0) { break; @@ -2127,28 +2479,32 @@ break; } boolean haveRegionInfo = false; - for(int i = 0; i < values.length; i++) { - if(values[i].getData().length == 0) { + for (int i = 0; i < values.length; i++) { + if (values[i].getData().length == 0) { break; } Text column = values[i].getKey().getColumn(); - if(column.equals(COL_REGIONINFO)) { + if (column.equals(COL_REGIONINFO)) { haveRegionInfo = true; inbuf.reset(values[i].getData(), - values[i].getData().length); + values[i].getData().length); info.readFields(inbuf); - } else if(column.equals(COL_SERVER)) { + + } else if (column.equals(COL_SERVER)) { try { serverName = new String(values[i].getData(), UTF8_ENCODING); - } catch(UnsupportedEncodingException e) { + + } catch (UnsupportedEncodingException e) { assert(false); } - } else if(column.equals(COL_STARTCODE)) { + + } else if (column.equals(COL_STARTCODE)) { try { startCode = Long.valueOf(new String(values[i].getData(), - UTF8_ENCODING)).longValue(); - } catch(UnsupportedEncodingException e) { + UTF8_ENCODING)).longValue(); + + } catch (UnsupportedEncodingException e) { assert(false); } } @@ -2153,17 +2509,17 @@ } } } - - if(!haveRegionInfo) { + + if (!haveRegionInfo) { throw new IOException(COL_REGIONINFO + " not found"); } - - if(info.tableDesc.getName().compareTo(tableName) > 0) { + + if (info.tableDesc.getName().compareTo(tableName) > 0) { break; // Beyond any more entries for this table } - + tableExists = true; - if(!isBeingServed(serverName, startCode)) { + if (!isBeingServed(serverName, startCode)) { unservedRegions.add(info); } processScanItem(serverName, startCode, info); @@ -2169,9 +2525,9 @@ processScanItem(serverName, startCode, info); } // while(true) - + } finally { - if(scannerId != -1L) { + if (scannerId != -1L) { try { server.close(scannerId); @@ -2177,7 +2533,8 @@ } catch (IOException e) { if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + e = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) e); } LOG.error("", e); } @@ -2184,21 +2541,22 @@ } scannerId = -1L; } - - if(!tableExists) { + + if (!tableExists) { throw new IOException(tableName + " does not exist"); } - + postProcessMeta(m, server); unservedRegions.clear(); - + } // for(MetaRegion m:) } // synchronized(metaScannerLock) - + } catch (IOException e) { if (tries == numRetries - 1) { if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + e = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) e); } throw e; } @@ -2207,11 +2565,14 @@ break; } // for(tries...) } - + protected boolean isBeingServed(String serverName, long startCode) { boolean result = false; - if(serverName != null && startCode != -1L) { - HServerInfo s = serversToServerInfo.get(serverName); + if (serverName != null && startCode != -1L) { + HServerInfo s; + synchronized (serversToServerInfo) { + s = serversToServerInfo.get(serverName); + } result = s != null && s.getStartCode() == startCode; } return result; @@ -2216,17 +2577,16 @@ } return result; } - + protected boolean isEnabled(HRegionInfo info) { return !info.offLine; } - + protected abstract void processScanItem(String serverName, long startCode, HRegionInfo info) throws IOException; - + protected abstract void postProcessMeta(MetaRegion m, - HRegionInterface srvr) - throws IOException; + HRegionInterface server) throws IOException; } /** Instantiated to enable or disable a table */ @@ -2232,12 +2592,13 @@ /** Instantiated to enable or disable a table */ private class ChangeTableState extends TableOperation { private boolean online; + + protected Map> servedRegions = + new HashMap>(); - protected TreeMap> servedRegions = - new TreeMap>(); protected long lockid; protected long clientId; - + ChangeTableState(Text tableName, boolean onLine) throws IOException { super(tableName); this.online = onLine; @@ -2242,14 +2603,15 @@ super(tableName); this.online = onLine; } - + @Override protected void processScanItem(String serverName, long startCode, HRegionInfo info) { + if (isBeingServed(serverName, startCode)) { - TreeSet regions = servedRegions.get(serverName); + HashSet regions = servedRegions.get(serverName); if (regions == null) { - regions = new TreeSet(); + regions = new HashSet(); } regions.add(info); servedRegions.put(serverName, regions); @@ -2255,24 +2617,28 @@ servedRegions.put(serverName, regions); } } - + @Override protected void postProcessMeta(MetaRegion m, HRegionInterface server) - throws IOException { + throws IOException { + // Process regions not being served - if(LOG.isDebugEnabled()) { + + if (LOG.isDebugEnabled()) { LOG.debug("processing unserved regions"); } - for(HRegionInfo i: unservedRegions) { + for (HRegionInfo i: unservedRegions) { if (i.offLine && i.isSplit()) { if (LOG.isDebugEnabled()) { LOG.debug("Skipping region " + i.toString() + " because it is " + - "offline because it has been split"); + "offline because it has been split"); } continue; } + // Update meta table - if(LOG.isDebugEnabled()) { + + if (LOG.isDebugEnabled()) { LOG.debug("updating columns in row: " + i.regionName); } @@ -2285,10 +2651,10 @@ server.delete(m.regionName, clientId, lockid, COL_STARTCODE); server.commit(m.regionName, clientId, lockid, System.currentTimeMillis()); - + lockid = -1L; - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("updated columns in row: " + i.regionName); } @@ -2294,7 +2660,8 @@ } catch (IOException e) { if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + e = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) e); } LOG.error("column update failed in row: " + i.regionName, e); @@ -2300,7 +2667,7 @@ } finally { try { - if(lockid != -1L) { + if (lockid != -1L) { server.abort(m.regionName, clientId, lockid); } @@ -2306,7 +2673,8 @@ } catch (IOException iex) { if (iex instanceof RemoteException) { - iex = RemoteExceptionHandler.decodeRemoteException((RemoteException) iex); + iex = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) iex); } LOG.error("", iex); } @@ -2312,11 +2680,12 @@ } } - if(online) { // Bring offline regions on-line - if(!unassignedRegions.containsKey(i.regionName)) { + if (online) { // Bring offline regions on-line + if (!unassignedRegions.containsKey(i.regionName)) { unassignedRegions.put(i.regionName, i); - assignAttempts.put(i.regionName, 0L); + assignAttempts.put(i.regionName, Long.valueOf(0L)); } + } else { // Prevent region from getting assigned. unassignedRegions.remove(i.regionName); assignAttempts.remove(i.regionName); @@ -2322,12 +2691,13 @@ assignAttempts.remove(i.regionName); } } - + // Process regions currently being served - if(LOG.isDebugEnabled()) { + + if (LOG.isDebugEnabled()) { LOG.debug("processing regions currently being served"); } - for(Map.Entry> e: servedRegions.entrySet()) { + for (Map.Entry> e: servedRegions.entrySet()) { String serverName = e.getKey(); if (online) { LOG.debug("Already online"); @@ -2333,14 +2703,15 @@ LOG.debug("Already online"); continue; // Already being served } - + // Cause regions being served to be taken off-line and disabled - TreeMap localKillList = killList.get(serverName); - if(localKillList == null) { - localKillList = new TreeMap(); + + HashMap localKillList = killList.get(serverName); + if (localKillList == null) { + localKillList = new HashMap(); } - for(HRegionInfo i: e.getValue()) { - if(LOG.isDebugEnabled()) { + for (HRegionInfo i: e.getValue()) { + if (LOG.isDebugEnabled()) { LOG.debug("adding region " + i.regionName + " to local kill list"); } localKillList.put(i.regionName, i); @@ -2345,10 +2716,10 @@ } localKillList.put(i.regionName, i); } - if(localKillList.size() > 0) { - if(LOG.isDebugEnabled()) { + if (localKillList.size() > 0) { + if (LOG.isDebugEnabled()) { LOG.debug("inserted local kill list into kill list for server " + - serverName); + serverName); } killList.put(serverName, localKillList); } @@ -2355,16 +2726,18 @@ } servedRegions.clear(); } - - protected void updateRegionInfo(final HRegionInterface srvr, - final Text regionName, final HRegionInfo i) - throws IOException { + + protected void updateRegionInfo(final HRegionInterface server, + final Text regionName, final HRegionInfo i) throws IOException { + i.offLine = !online; + ByteArrayOutputStream byteValue = new ByteArrayOutputStream(); DataOutputStream s = new DataOutputStream(byteValue); i.write(s); - srvr.put(regionName, clientId, lockid, COL_REGIONINFO, - byteValue.toByteArray()); + + server.put(regionName, clientId, lockid, COL_REGIONINFO, + byteValue.toByteArray()); } } @@ -2374,16 +2747,18 @@ * the table. */ private class TableDelete extends ChangeTableState { - + TableDelete(Text tableName) throws IOException { super(tableName, false); } - + @Override - protected void postProcessMeta(MetaRegion m, HRegionInterface srvr) - throws IOException { + protected void postProcessMeta(MetaRegion m, HRegionInterface server) + throws IOException { + // For regions that are being served, mark them for deletion - for (TreeSet s: servedRegions.values()) { + + for (HashSet s: servedRegions.values()) { for (HRegionInfo i: s) { regionsToDelete.add(i.regionName); } @@ -2390,13 +2765,17 @@ } // Unserved regions we can delete now + for (HRegionInfo i: unservedRegions) { // Delete the region + try { HRegion.deleteRegion(fs, dir, i.regionName); + } catch (IOException e) { if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + e = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) e); } LOG.error("failed to delete region " + i.regionName, e); } @@ -2401,18 +2780,18 @@ LOG.error("failed to delete region " + i.regionName, e); } } - super.postProcessMeta(m, srvr); + super.postProcessMeta(m, server); } - + @Override protected void updateRegionInfo( - @SuppressWarnings("hiding") HRegionInterface server, Text regionName, - @SuppressWarnings("unused") HRegionInfo i) - throws IOException { + @SuppressWarnings("hiding") HRegionInterface server, Text regionName, + @SuppressWarnings("unused") HRegionInfo i) throws IOException { + server.delete(regionName, clientId, lockid, COL_REGIONINFO); } } - + private abstract class ColumnOperation extends TableOperation { protected ColumnOperation(Text tableName) throws IOException { super(tableName); @@ -2420,11 +2799,11 @@ @Override protected void processScanItem( - @SuppressWarnings("unused") String serverName, - @SuppressWarnings("unused") long startCode, - final HRegionInfo info) - throws IOException { - if(isEnabled(info)) { + @SuppressWarnings("unused") String serverName, + @SuppressWarnings("unused") long startCode, + final HRegionInfo info) throws IOException { + + if (isEnabled(info)) { throw new TableNotDisabledException(tableName.toString()); } } @@ -2430,8 +2809,8 @@ } protected void updateRegionInfo(HRegionInterface server, Text regionName, - HRegionInfo i) - throws IOException { + HRegionInfo i) throws IOException { + ByteArrayOutputStream byteValue = new ByteArrayOutputStream(); DataOutputStream s = new DataOutputStream(byteValue); i.write(s); @@ -2440,12 +2819,15 @@ try { lockid = server.startUpdate(regionName, clientId, i.regionName); server.put(regionName, clientId, lockid, COL_REGIONINFO, - byteValue.toByteArray()); + byteValue.toByteArray()); + server.commit(regionName, clientId, lockid, System.currentTimeMillis()); lockid = -1L; - if(LOG.isDebugEnabled()) { + + if (LOG.isDebugEnabled()) { LOG.debug("updated columns in row: " + i.regionName); } + } catch (Exception e) { if (e instanceof RemoteException) { e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); @@ -2453,13 +2835,14 @@ LOG.error("column update failed in row: " + i.regionName, e); } finally { - if(lockid != -1L) { + if (lockid != -1L) { try { server.abort(regionName, clientId, lockid); - + } catch (IOException iex) { if (iex instanceof RemoteException) { - iex = RemoteExceptionHandler.decodeRemoteException((RemoteException) iex); + iex = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) iex); } LOG.error("", iex); } @@ -2471,7 +2854,7 @@ /** Instantiated to remove a column family from a table */ private class DeleteColumn extends ColumnOperation { private Text columnName; - + DeleteColumn(Text tableName, Text columnName) throws IOException { super(tableName); this.columnName = columnName; @@ -2476,37 +2859,38 @@ super(tableName); this.columnName = columnName; } - + @Override - protected void postProcessMeta(MetaRegion m, HRegionInterface srvr) + protected void postProcessMeta(MetaRegion m, HRegionInterface server) throws IOException { - for(HRegionInfo i: unservedRegions) { + for (HRegionInfo i: unservedRegions) { i.tableDesc.families().remove(columnName); - updateRegionInfo(srvr, m.regionName, i); - + updateRegionInfo(server, m.regionName, i); + // Delete the directories used by the column - + try { fs.delete(HStoreFile.getMapDir(dir, i.regionName, columnName)); - + } catch (IOException e) { if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + e = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) e); } LOG.error("", e); } - + try { fs.delete(HStoreFile.getInfoDir(dir, i.regionName, columnName)); - + } catch (IOException e) { if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + e = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) e); } LOG.error("", e); } - } } } @@ -2514,33 +2898,31 @@ /** Instantiated to add a column family to a table */ private class AddColumn extends ColumnOperation { private HColumnDescriptor newColumn; - - AddColumn(Text tableName, HColumnDescriptor newColumn) - throws IOException { - + + AddColumn(Text tableName, HColumnDescriptor newColumn) throws IOException { super(tableName); this.newColumn = newColumn; } - + @Override - protected void postProcessMeta(MetaRegion m, HRegionInterface srvr) - throws IOException { + protected void postProcessMeta(MetaRegion m, HRegionInterface server) + throws IOException { - for(HRegionInfo i: unservedRegions) { - + for (HRegionInfo i: unservedRegions) { + // All we need to do to add a column is add it to the table descriptor. // When the region is brought on-line, it will find the column missing // and create it. - + i.tableDesc.addFamily(newColumn); - updateRegionInfo(srvr, m.regionName, i); + updateRegionInfo(server, m.regionName, i); } } } - - ////////////////////////////////////////////////////////////////////////////// - // Managing leases - ////////////////////////////////////////////////////////////////////////////// + + /* + * Managing leases + */ /** Instantiated to monitor the health of a region server */ private class ServerExpirer implements LeaseListener { @@ -2546,24 +2928,41 @@ private class ServerExpirer implements LeaseListener { @SuppressWarnings("hiding") private String server; - + ServerExpirer(String server) { this.server = server; } - - /** - * {@inheritDoc} - */ + + /** {@inheritDoc} */ public void leaseExpired() { LOG.info(server + " lease expired"); - // Remove the server from the known servers list - HServerInfo storedInfo = serversToServerInfo.remove(server); - + + // Remove the server from the known servers list and update load info + + HServerInfo info; + synchronized (serversToServerInfo) { + info = serversToServerInfo.remove(server); + + if (info != null) { + String serverName = info.getServerAddress().toString(); + HServerLoad load = serversToLoad.remove(serverName); + if (load != null) { + Set servers = loadToServers.get(load); + if (servers != null) { + servers.remove(serverName); + loadToServers.put(load, servers); + } + } + } + serversToServerInfo.notifyAll(); + } + // NOTE: If the server was serving the root region, we cannot reassign it // here because the new server will start serving the root region before // the PendingServerShutdown operation has a chance to split the log file. + try { - msgQueue.put(new PendingServerShutdown(storedInfo)); + msgQueue.put(new PendingServerShutdown(info)); } catch (InterruptedException e) { throw new RuntimeException("Putting into msgQueue was interrupted.", e); } @@ -2570,16 +2969,16 @@ } } - ////////////////////////////////////////////////////////////////////////////// - // Main program - ////////////////////////////////////////////////////////////////////////////// - + /* + * Main program + */ + private static void printUsageAndExit() { System.err.println("Usage: java org.apache.hbase.HMaster " + - "[--bind=hostname:port] start|stop"); + "[--bind=hostname:port] start|stop"); System.exit(0); } - + /** * Main program * @param args @@ -2588,11 +2987,12 @@ if (args.length < 1) { printUsageAndExit(); } - + Configuration conf = new HBaseConfiguration(); - + // Process command-line args. TODO: Better cmd-line processing // (but hopefully something not as painful as cli options). + final String addressArgKey = "--bind="; for (String cmd: args) { if (cmd.startsWith(addressArgKey)) { @@ -2599,7 +2999,7 @@ conf.set(MASTER_ADDRESS, cmd.substring(addressArgKey.length())); continue; } - + if (cmd.equals("start")) { try { (new Thread(new HMaster(conf))).start(); @@ -2609,7 +3009,7 @@ } break; } - + if (cmd.equals("stop")) { try { HBaseAdmin adm = new HBaseAdmin(conf); @@ -2620,7 +3020,7 @@ } break; } - + // Print out usage if we get to here. printUsageAndExit(); } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (revision 564467) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (working copy) @@ -275,6 +275,7 @@ int compactionThreshold = 0; private final HLocking lock = new HLocking(); private long desiredMaxFileSize; + private final long maxSequenceId; ////////////////////////////////////////////////////////////////////////////// // Constructor @@ -324,12 +325,26 @@ } // Load in all the HStores. + + long maxSeqId = -1; for(Map.Entry e : this.regionInfo.tableDesc.families().entrySet()) { Text colFamily = HStoreKey.extractFamily(e.getKey()); - stores.put(colFamily, - new HStore(rootDir, this.regionInfo.regionName, e.getValue(), fs, - oldLogFile, conf)); + + HStore store = new HStore(rootDir, this.regionInfo.regionName, + e.getValue(), fs, oldLogFile, conf); + + stores.put(colFamily, store); + + long storeSeqId = store.getMaxSequenceId(); + if (storeSeqId > maxSeqId) { + maxSeqId = storeSeqId; + } + } + this.maxSequenceId = maxSeqId; + if (LOG.isDebugEnabled()) { + LOG.debug("maximum sequence id for region " + regionInfo.getRegionName() + + " is " + this.maxSequenceId); } // Get rid of any splits or merges that were lost in-progress @@ -361,6 +376,10 @@ this.writestate.writesOngoing = false; LOG.info("region " + this.regionInfo.regionName + " available"); } + + long getMaxSequenceId() { + return this.maxSequenceId; + } /** Returns a HRegionInfo object for this region */ HRegionInfo getRegionInfo() { @@ -464,8 +483,8 @@ * @throws IOException */ HRegion[] closeAndSplit(final Text midKey, - final RegionUnavailableListener listener) - throws IOException { + final RegionUnavailableListener listener) throws IOException { + checkMidKey(midKey); long startTime = System.currentTimeMillis(); Path splits = getSplitsDir(); @@ -496,6 +515,7 @@ Vector hstoreFilesToSplit = close(); if (hstoreFilesToSplit == null) { LOG.warn("Close came back null (Implement abort of close?)"); + throw new RuntimeException("close returned empty vector of HStoreFiles"); } // Tell listener that region is now closed and that they can therefore @@ -690,8 +710,11 @@ biggest = size; } } - biggest.setSplitable(splitable); + if (biggest != null) { + biggest.setSplitable(splitable); + } return biggest; + } finally { lock.releaseReadLock(); } @@ -1405,6 +1428,7 @@ } } + /** {@inheritDoc} */ @Override public String toString() { return getRegionName().toString(); @@ -1842,9 +1866,7 @@ if (bytes == null || bytes.length == 0) { return null; } - return (HRegionInfo)((bytes == null || bytes.length == 0)? - null: - Writables.getWritable(bytes, new HRegionInfo())); + return (HRegionInfo) Writables.getWritable(bytes, new HRegionInfo()); } /** @@ -1905,6 +1927,13 @@ return startCode; } + /** + * Computes the Path of the HRegion + * + * @param dir parent directory + * @param regionName name of the region + * @return Path of HRegion directory + */ public static Path getRegionDir(final Path dir, final Text regionName) { return new Path(dir, new Path(HREGIONDIR_PREFIX + regionName)); } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (revision 564467) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (working copy) @@ -32,6 +32,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; @@ -391,6 +392,9 @@ // Leases private Leases leases; + + // Request counter + private AtomicInteger requestCount; /** * Starts a HRegionServer at the default location @@ -424,6 +428,7 @@ Collections.synchronizedSortedMap(new TreeMap()); this.outboundMsgs = new Vector(); + this.requestCount = new AtomicInteger(); // Config'ed params this.numRetries = conf.getInt("hbase.client.retries.number", 2); @@ -597,6 +602,8 @@ if (LOG.isDebugEnabled()) { LOG.debug("Telling master we are up"); } + requestCount.set(0); + serverInfo.setLoad(new HServerLoad(0, onlineRegions.size())); hbaseMaster.regionServerStartup(serverInfo); if (LOG.isDebugEnabled()) { LOG.debug("Done telling master we are up"); @@ -626,6 +633,10 @@ } try { + serverInfo.setLoad(new HServerLoad(requestCount.get(), + onlineRegions.size())); + requestCount.set(0); + HMsg msgs[] = hbaseMaster.regionServerReport(serverInfo, outboundArray); lastMsg = System.currentTimeMillis(); @@ -897,6 +908,7 @@ this.lock.writeLock().lock(); try { + this.log.setSequenceNumber(region.getMaxSequenceId()); this.onlineRegions.put(region.getRegionName(), region); } finally { this.lock.writeLock().unlock(); @@ -963,6 +975,7 @@ */ public HRegionInfo getRegionInfo(final Text regionName) throws NotServingRegionException { + requestCount.incrementAndGet(); return getRegion(regionName).getRegionInfo(); } @@ -971,6 +984,7 @@ */ public void batchUpdate(Text regionName, long timestamp, BatchUpdate b) throws IOException { + requestCount.incrementAndGet(); long clientid = rand.nextLong(); long lockid = startUpdate(regionName, clientid, b.getRow()); for(BatchOperation op: b) { @@ -993,6 +1007,7 @@ public byte [] get(final Text regionName, final Text row, final Text column) throws IOException { + requestCount.incrementAndGet(); return getRegion(regionName).get(row, column); } @@ -1002,6 +1017,7 @@ public byte [][] get(final Text regionName, final Text row, final Text column, final int numVersions) throws IOException { + requestCount.incrementAndGet(); return getRegion(regionName).get(row, column, numVersions); } @@ -1010,6 +1026,7 @@ */ public byte [][] get(final Text regionName, final Text row, final Text column, final long timestamp, final int numVersions) throws IOException { + requestCount.incrementAndGet(); return getRegion(regionName).get(row, column, timestamp, numVersions); } @@ -1018,6 +1035,7 @@ */ public KeyedData[] getRow(final Text regionName, final Text row) throws IOException { + requestCount.incrementAndGet(); HRegion region = getRegion(regionName); TreeMap map = region.getFull(row); KeyedData result[] = new KeyedData[map.size()]; @@ -1034,6 +1052,7 @@ */ public KeyedData[] next(final long scannerId) throws IOException { + requestCount.incrementAndGet(); String scannerName = String.valueOf(scannerId); HInternalScannerInterface s = scanners.get(scannerName); if (s == null) { @@ -1077,6 +1096,7 @@ */ public long startUpdate(Text regionName, long clientid, Text row) throws IOException { + requestCount.incrementAndGet(); HRegion region = getRegion(regionName); long lockid = region.startUpdate(row); this.leases.createLease(clientid, lockid, @@ -1120,6 +1140,7 @@ public void put(final Text regionName, final long clientid, final long lockid, final Text column, final byte [] val) throws IOException { + requestCount.incrementAndGet(); HRegion region = getRegion(regionName, true); leases.renewLease(clientid, lockid); region.put(lockid, column, val); @@ -1130,6 +1151,7 @@ */ public void delete(Text regionName, long clientid, long lockid, Text column) throws IOException { + requestCount.incrementAndGet(); HRegion region = getRegion(regionName); leases.renewLease(clientid, lockid); region.delete(lockid, column); @@ -1140,6 +1162,7 @@ */ public void abort(Text regionName, long clientid, long lockid) throws IOException { + requestCount.incrementAndGet(); HRegion region = getRegion(regionName, true); leases.cancelLease(clientid, lockid); region.abort(lockid); @@ -1150,6 +1173,7 @@ */ public void commit(Text regionName, final long clientid, final long lockid, final long timestamp) throws IOException { + requestCount.incrementAndGet(); HRegion region = getRegion(regionName, true); leases.cancelLease(clientid, lockid); region.commit(lockid, timestamp); @@ -1159,6 +1183,7 @@ * {@inheritDoc} */ public void renewLease(long lockid, long clientid) throws IOException { + requestCount.incrementAndGet(); leases.renewLease(clientid, lockid); } @@ -1247,6 +1272,7 @@ public long openScanner(Text regionName, Text[] cols, Text firstRow, final long timestamp, final RowFilterInterface filter) throws IOException { + requestCount.incrementAndGet(); HRegion r = getRegion(regionName); long scannerId = -1L; try { @@ -1277,6 +1303,7 @@ * {@inheritDoc} */ public void close(final long scannerId) throws IOException { + requestCount.incrementAndGet(); String scannerName = String.valueOf(scannerId); HInternalScannerInterface s = null; synchronized(scanners) { Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HServerInfo.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HServerInfo.java (revision 564467) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HServerInfo.java (working copy) @@ -33,6 +33,7 @@ public class HServerInfo implements Writable { private HServerAddress serverAddress; private long startCode; + private HServerLoad load; /** default constructor - used by Writable */ public HServerInfo() { @@ -38,10 +39,11 @@ public HServerInfo() { this.serverAddress = new HServerAddress(); this.startCode = 0; + this.load = new HServerLoad(); } /** - * Constructs a fully initialized object + * Constructor * @param serverAddress * @param startCode */ @@ -48,6 +50,7 @@ public HServerInfo(HServerAddress serverAddress, long startCode) { this.serverAddress = new HServerAddress(serverAddress); this.startCode = startCode; + this.load = new HServerLoad(); } /** @@ -57,6 +60,21 @@ public HServerInfo(HServerInfo other) { this.serverAddress = new HServerAddress(other.getServerAddress()); this.startCode = other.getStartCode(); + this.load = other.getLoad(); + } + + /** + * @return the load + */ + public HServerLoad getLoad() { + return load; + } + + /** + * @param load the load to set + */ + public void setLoad(HServerLoad load) { + this.load = load; } /** @return the server address */ @@ -72,7 +90,8 @@ /** {@inheritDoc} */ @Override public String toString() { - return "address: " + this.serverAddress + ", startcode: " + this.startCode; + return "address: " + this.serverAddress + ", startcode: " + this.startCode + + ", load: (" + this.load.toString() + ")"; } // Writable @@ -81,6 +100,7 @@ public void readFields(DataInput in) throws IOException { this.serverAddress.readFields(in); this.startCode = in.readLong(); + this.load.readFields(in); } /** {@inheritDoc} */ @@ -87,5 +107,6 @@ public void write(DataOutput out) throws IOException { this.serverAddress.write(out); out.writeLong(this.startCode); + this.load.write(out); } } \ No newline at end of file Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HServerLoad.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HServerLoad.java (revision 0) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HServerLoad.java (revision 0) @@ -0,0 +1,136 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.WritableComparable; + +/** + * This class encapsulates metrics for determining the load on a HRegionServer + */ +public class HServerLoad implements WritableComparable { + private int numberOfRequests; // number of requests since last report + private int numberOfRegions; // number of regions being served + + /* + * TODO: Other metrics that might be considered when the master is actually + * doing load balancing instead of merely trying to decide where to assign + * a region: + *
    + *
  • # of CPUs, heap size (to determine the "class" of machine). For + * now, we consider them to be homogeneous.
  • + *
  • #requests per region (Map<{String|HRegionInfo}, Integer>)
  • + *
  • #compactions and/or #splits (churn)
  • + *
  • server death rate (maybe there is something wrong with this server)
  • + *
+ */ + + /** default constructior (used by Writable) */ + public HServerLoad() {} + + /** + * Constructor + * @param numberOfRequests + * @param numberOfRegions + */ + public HServerLoad(int numberOfRequests, int numberOfRegions) { + this.numberOfRequests = numberOfRequests; + this.numberOfRegions = numberOfRegions; + } + + /** + * @return load factor for this server + */ + public int getLoad() { + int load = numberOfRequests == 0 ? 1 : numberOfRequests; + load *= numberOfRegions == 0 ? 1 : numberOfRegions; + return load; + } + + /** {@inheritDoc} */ + @Override + public String toString() { + return "requests: " + numberOfRequests + " regions: " + numberOfRegions; + } + + /** {@inheritDoc} */ + @Override + public boolean equals(Object o) { + return compareTo(o) == 0; + } + + /** {@inheritDoc} */ + @Override + public int hashCode() { + int result = Integer.valueOf(numberOfRequests).hashCode(); + result ^= Integer.valueOf(numberOfRegions).hashCode(); + return result; + } + + // Getters + + /** + * @return the numberOfRegions + */ + public int getNumberOfRegions() { + return numberOfRegions; + } + + /** + * @return the numberOfRequests + */ + public int getNumberOfRequests() { + return numberOfRequests; + } + + // Setters + + /** + * @param numberOfRegions the numberOfRegions to set + */ + public void setNumberOfRegions(int numberOfRegions) { + this.numberOfRegions = numberOfRegions; + } + + // Writable + + /** {@inheritDoc} */ + public void readFields(DataInput in) throws IOException { + numberOfRequests = in.readInt(); + numberOfRegions = in.readInt(); + } + + /** {@inheritDoc} */ + public void write(DataOutput out) throws IOException { + out.writeInt(numberOfRequests); + out.writeInt(numberOfRegions); + } + + // Comparable + + /** {@inheritDoc} */ + public int compareTo(Object o) { + HServerLoad other = (HServerLoad) o; + return this.getLoad() - other.getLoad(); + } +} Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (revision 564467) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (working copy) @@ -90,6 +90,8 @@ TreeMap readers = new TreeMap(); Random rand = new Random(); + + private long maxSeqId; /** * An HStore is a set of zero or more MapFiles, which stretch backwards over @@ -196,6 +198,7 @@ // If the HSTORE_LOGINFOFILE doesn't contain a number, just ignore it. That // means it was built prior to the previous run of HStore, and so it cannot // contain any updates also contained in the log. + long maxSeqID = -1; for (HStoreFile hsf: hstoreFiles) { long seqid = hsf.loadInfo(fs); @@ -205,8 +208,14 @@ } } } - - doReconstructionLog(reconstructionLog, maxSeqID); + this.maxSeqId = maxSeqID; + if (LOG.isDebugEnabled()) { + LOG.debug("maximum sequence id for hstore " + storeName + " is " + + this.maxSeqId); + } + + doReconstructionLog(reconstructionLog, maxSeqId); + this.maxSeqId += 1; // Compact all the MapFiles into a single file. The resulting MapFile // should be "timeless"; that is, it should not have an associated seq-ID, @@ -228,6 +237,10 @@ } } + long getMaxSequenceId() { + return this.maxSeqId; + } + /* * Read the reconstructionLog to see whether we need to build a brand-new * MapFile out of non-flushed log entries. @@ -258,6 +271,11 @@ while (login.next(key, val)) { maxSeqIdInLog = Math.max(maxSeqIdInLog, key.getLogSeqNum()); if (key.getLogSeqNum() <= maxSeqID) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping edit <" + key.toString() + "=" + + val.toString() + "> key sequence: " + key.getLogSeqNum() + + " max sequence: " + maxSeqID); + } continue; } // Check this edit is for me. Also, guard against writing @@ -277,7 +295,8 @@ } HStoreKey k = new HStoreKey(key.getRow(), column, val.getTimestamp()); if (LOG.isDebugEnabled()) { - LOG.debug("Applying edit " + k.toString()); + LOG.debug("Applying edit <" + k.toString() + "=" + val.toString() + + ">"); } reconstructedCache.put(k, val.getVal()); } @@ -428,16 +447,12 @@ String name = flushedFile.toString(); MapFile.Writer out = flushedFile.getWriter(this.fs, this.compression, this.bloomFilter); - int count = 0; - int total = 0; try { for (Map.Entry es: inputCache.entrySet()) { HStoreKey curkey = es.getKey(); - total++; if (this.familyName. equals(HStoreKey.extractFamily(curkey.getColumn()))) { out.append(curkey, new ImmutableBytesWritable(es.getValue())); - count++; } } } finally { @@ -1030,6 +1045,7 @@ ////////////////////////////////////////////////////////////////////////////// class HStoreScanner extends HAbstractScanner { + @SuppressWarnings("hiding") private MapFile.Reader[] readers; HStoreScanner(long timestamp, Text[] targetCols, Text firstRow) Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreFile.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreFile.java (revision 564467) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreFile.java (working copy) @@ -94,7 +94,17 @@ static final String HSTORE_DATFILE_DIR = "mapfiles"; static final String HSTORE_INFO_DIR = "info"; static final String HSTORE_FILTER_DIR = "filter"; - public static enum Range {top, bottom} + + /** + * For split HStoreFiles, specifies if the file covers the lower half or + * the upper half of the key range + */ + public static enum Range { + /** HStoreFile contains upper half of key range */ + top, + /** HStoreFile contains lower half of key range */ + bottom + } /* * Regex that will work for straight filenames and for reference names. @@ -156,7 +166,7 @@ /* * Data structure to hold reference to a store file over in another region. */ - static class Reference { + static class Reference implements Writable { Text regionName; long fileid; Range region; @@ -190,6 +200,8 @@ return this.regionName; } + /** {@inheritDoc} */ + @Override public String toString() { return this.regionName + "/" + this.fileid + "/" + this.region; } @@ -195,6 +207,8 @@ } // Make it serializable. + + /** {@inheritDoc} */ public void write(DataOutput out) throws IOException { this.regionName.write(out); out.writeLong(this.fileid); @@ -203,6 +217,7 @@ this.midkey.write(out); } + /** {@inheritDoc} */ public void readFields(DataInput in) throws IOException { this.regionName = new Text(); this.regionName.readFields(in); @@ -417,6 +432,8 @@ private static boolean isReference(final Path p, final Matcher m) { if (m == null || !m.matches()) { LOG.warn("Failed match of store file name " + p.toString()); + throw new RuntimeException("Failed match of store file name " + + p.toString()); } return m.groupCount() > 1 && m.group(2) != null; } @@ -662,6 +679,7 @@ } } + /** {@inheritDoc} */ @SuppressWarnings({ "unused"}) @Override public synchronized void finalKey(WritableComparable key) @@ -669,6 +687,7 @@ throw new UnsupportedOperationException("Unsupported"); } + /** {@inheritDoc} */ @Override public synchronized Writable get(WritableComparable key, Writable val) throws IOException { @@ -676,6 +695,7 @@ return super.get(key, val); } + /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public synchronized WritableComparable getClosest(WritableComparable key, @@ -692,6 +712,7 @@ return super.getClosest(key, val); } + /** {@inheritDoc} */ @SuppressWarnings("unused") @Override public synchronized WritableComparable midKey() throws IOException { @@ -699,6 +720,7 @@ return null; } + /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public synchronized boolean next(WritableComparable key, Writable val) @@ -727,6 +749,7 @@ return false; } + /** {@inheritDoc} */ @Override public synchronized void reset() throws IOException { if (top) { @@ -737,6 +760,7 @@ super.reset(); } + /** {@inheritDoc} */ @Override public synchronized boolean seek(WritableComparable key) throws IOException { @@ -758,6 +782,15 @@ static class Reader extends MapFile.Reader { private final Filter bloomFilter; + /** + * Constructor + * + * @param fs + * @param dirName + * @param conf + * @param filter + * @throws IOException + */ public Reader(FileSystem fs, String dirName, Configuration conf, final Filter filter) throws IOException { @@ -810,6 +843,18 @@ private final Filter bloomFilter; + /** + * Constructor + * + * @param conf + * @param fs + * @param dirName + * @param keyClass + * @param valClass + * @param compression + * @param filter + * @throws IOException + */ @SuppressWarnings("unchecked") public Writer(Configuration conf, FileSystem fs, String dirName, Class keyClass, Class valClass, @@ -905,6 +950,7 @@ return (isReference())? l / 2: l; } + /** {@inheritDoc} */ @Override public String toString() { return this.regionName.toString() + "/" + this.colFamily.toString() + @@ -912,6 +958,7 @@ (isReference()? "/" + this.reference.toString(): ""); } + /** {@inheritDoc} */ @Override public boolean equals(Object o) { return this.compareTo(o) == 0; Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java (revision 564467) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java (working copy) @@ -133,6 +133,7 @@ } + /** @return the table name */ public Text getTableName() { return this.tableName; } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTableDescriptor.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTableDescriptor.java (revision 564467) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTableDescriptor.java (working copy) @@ -41,7 +41,7 @@ /* * Legal table names can only contain 'word characters': - * i.e. [a-zA-Z_0-9]. + * i.e. [a-zA-Z_0-9-.]. * Lets be restrictive until a reason to be otherwise. One reason to limit * characters in table name is to ensure table regions as entries in META * regions can be found (See HADOOP-1581 'HBASE: Un-openable tablename bug'). @@ -47,7 +47,7 @@ * regions can be found (See HADOOP-1581 'HBASE: Un-openable tablename bug'). */ private static final Pattern LEGAL_TABLE_NAME = - Pattern.compile("[\\w-]+"); + Pattern.compile("^[\\w-.]+$"); /** Constructs an empty object */ public HTableDescriptor() { Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (revision 564467) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (working copy) @@ -143,6 +143,7 @@ } } + /** runs the master server */ public static class MasterThread extends Thread { private final HMaster master; MasterThread(final HMaster m) { @@ -149,6 +150,8 @@ super(m, "Master:" + m.getMasterAddress().toString()); this.master = m; } + + /** {@inheritDoc} */ @Override public void run() { LOG.info("Starting " + getName()); @@ -154,6 +157,8 @@ LOG.info("Starting " + getName()); super.run(); } + + /** @return master server */ public HMaster getMaster() { return this.master; } @@ -159,6 +164,7 @@ } } + /** runs region servers */ public static class RegionServerThread extends Thread { private final HRegionServer regionServer; RegionServerThread(final HRegionServer r, final int index) { @@ -165,6 +171,8 @@ super(r, "RegionServer:" + index); this.regionServer = r; } + + /** {@inheritDoc} */ @Override public void run() { LOG.info("Starting " + getName()); @@ -170,6 +178,8 @@ LOG.info("Starting " + getName()); super.run(); } + + /** @return the region server */ public HRegionServer getRegionServer() { return this.regionServer; } @@ -227,6 +237,11 @@ return threads; } + /** + * Starts a region server thread running + * + * @throws IOException + */ public void startRegionServer() throws IOException { RegionServerThread t = startRegionServer(this.conf, this.regionThreads.size()); @@ -275,6 +290,7 @@ * Shut down the specified region server cleanly * * @param serverNumber + * @return the region server that was stopped */ public HRegionServer stopRegionServer(int serverNumber) { HRegionServer server = Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCleanRegionServerExit.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCleanRegionServerExit.java (revision 564467) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCleanRegionServerExit.java (working copy) @@ -34,10 +34,10 @@ /** constructor */ public TestCleanRegionServerExit() { - super(); + super(2); conf.setInt("ipc.client.timeout", 5000); // reduce ipc client timeout conf.setInt("ipc.client.connect.max.retries", 5); // and number of retries - conf.setInt("hbase.client.retries.number", 2); // reduce HBase retries + conf.setInt("hbase.client.retries.number", 3); // reduce HBase retries Logger.getRootLogger().setLevel(Level.WARN); Logger.getLogger(this.getClass().getPackage().getName()).setLevel(Level.DEBUG); } Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java (revision 564467) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java (working copy) @@ -40,7 +40,7 @@ * HRegions or in the HBaseMaster, so only basic testing is possible. */ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListener { - private static final Logger LOG = + static final Logger LOG = Logger.getLogger(TestHRegion.class.getName()); /** Constructor */ Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHStoreFile.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHStoreFile.java (revision 564467) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHStoreFile.java (working copy) @@ -34,6 +34,9 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; +/** + * Test HStoreFile + */ public class TestHStoreFile extends TestCase { static final Log LOG = LogFactory.getLog(TestHStoreFile.class); private static String DIR = System.getProperty("test.build.data", "."); @@ -226,7 +229,9 @@ } assertTrue(key.compareTo(midkey) < 0); } - LOG.info("Last in bottom: " + previous.toString()); + if (previous != null) { + LOG.info("Last in bottom: " + previous.toString()); + } // Now test reading from the top. top = new HStoreFile.HalfMapFileReader(this.fs, p.toString(), this.conf, HStoreFile.Range.top, midkey); @@ -249,17 +254,17 @@ // Next test using a midkey that does not exist in the file. // First, do a key that is < than first key. Ensure splits behave // properly. - midkey = new HStoreKey(new Text(" ")); + WritableComparable badkey = new HStoreKey(new Text(" ")); bottom = new HStoreFile.HalfMapFileReader(this.fs, p.toString(), - this.conf, HStoreFile.Range.bottom, midkey); - // When midkey is < than the bottom, should return no values. + this.conf, HStoreFile.Range.bottom, badkey); + // When badkey is < than the bottom, should return no values. assertFalse(bottom.next(key, value)); // Now read from the top. top = new HStoreFile.HalfMapFileReader(this.fs, p.toString(), this.conf, - HStoreFile.Range.top, midkey); + HStoreFile.Range.top, badkey); first = true; while (top.next(key, value)) { - assertTrue(key.compareTo(midkey) >= 0); + assertTrue(key.compareTo(badkey) >= 0); if (first) { first = false; LOG.info("First top when key < bottom: " + key.toString()); @@ -275,10 +280,10 @@ assertTrue(tmp.charAt(i) == 'z'); } - // Test when midkey is > than last key in file ('||' > 'zz'). - midkey = new HStoreKey(new Text("|||")); + // Test when badkey is > than last key in file ('||' > 'zz'). + badkey = new HStoreKey(new Text("|||")); bottom = new HStoreFile.HalfMapFileReader(this.fs, p.toString(), - this.conf, HStoreFile.Range.bottom, midkey); + this.conf, HStoreFile.Range.bottom, badkey); first = true; while (bottom.next(key, value)) { if (first) { @@ -297,7 +302,7 @@ } // Now look at top. Should not return any values. top = new HStoreFile.HalfMapFileReader(this.fs, p.toString(), this.conf, - HStoreFile.Range.top, midkey); + HStoreFile.Range.top, badkey); assertFalse(top.next(key, value)); } finally { Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestRegionServerAbort.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestRegionServerAbort.java (revision 564467) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestRegionServerAbort.java (working copy) @@ -34,10 +34,10 @@ /** constructor */ public TestRegionServerAbort() { - super(); + super(2); conf.setInt("ipc.client.timeout", 5000); // reduce client timeout conf.setInt("ipc.client.connect.max.retries", 5); // and number of retries - conf.setInt("hbase.client.retries.number", 2); // reduce HBase retries + conf.setInt("hbase.client.retries.number", 3); // reduce HBase retries Logger.getRootLogger().setLevel(Level.WARN); Logger.getLogger(this.getClass().getPackage().getName()).setLevel(Level.DEBUG); } Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java (revision 564467) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java (working copy) @@ -206,7 +206,8 @@ HRegionLocation rl = t.getRegionLocation(table); regionServer = t.getConnection().getHRegionConnection(rl.getServerAddress()); scannerId = regionServer.openScanner(rl.getRegionInfo().getRegionName(), - HMaster.METACOLUMNS, new Text(), System.currentTimeMillis(), null); + HConstants.COLUMN_FAMILY_ARRAY, new Text(), + System.currentTimeMillis(), null); while (true) { TreeMap results = new TreeMap(); KeyedData[] values = regionServer.next(scannerId); Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java (revision 564467) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java (working copy) @@ -46,6 +46,7 @@ private static final char FIRST_CHAR = 'a'; private static final char LAST_CHAR = 'z'; + /** {@inheritDoc} */ @Override public void setUp() throws Exception { super.setUp(); @@ -59,6 +60,7 @@ conf.setLong("hbase.hregion.max.filesize", 1024 * 128); } + /** {@inheritDoc} */ @Override public void tearDown() throws Exception { try { Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestToString.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestToString.java (revision 564467) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestToString.java (working copy) @@ -35,8 +35,8 @@ HServerAddress address = new HServerAddress(hostport); assertEquals("HServerAddress toString", address.toString(), hostport); HServerInfo info = new HServerInfo(address, -1); - assertEquals("HServerInfo", info.toString(), - "address: " + hostport + ", startcode: " + -1); + assertEquals("HServerInfo", "address: " + hostport + ", startcode: -1" + + ", load: (requests: 0 regions: 0)", info.toString()); } /**