Index: src/contrib/hbase/conf/hbase-default.xml =================================================================== --- src/contrib/hbase/conf/hbase-default.xml (revision 545790) +++ src/contrib/hbase/conf/hbase-default.xml (working copy) @@ -15,7 +15,7 @@ - hbase.regiondir + hbase.rootdir ${hadoop.tmp.dir}/hbase The directory shared by region servers. Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HAbstractScanner.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HAbstractScanner.java (revision 545790) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HAbstractScanner.java (working copy) @@ -42,9 +42,12 @@ // The kind of match we are doing on a column: private static enum MATCH_TYPE { - FAMILY_ONLY, // Just check the column family name - REGEX, // Column family + matches regex - SIMPLE // Literal matching + /** Just check the column family name */ + FAMILY_ONLY, + /** Column family + matches regex */ + REGEX, + /** Literal matching */ + SIMPLE } // This class provides column matching functions that are more sophisticated @@ -63,12 +66,12 @@ ColumnMatcher(Text col) throws IOException { String column = col.toString(); try { - int colpos = column.indexOf(":") + 1; - if(colpos == 0) { + int colpos = column.indexOf(":"); + if(colpos == -1) { throw new InvalidColumnNameException("Column name has no family indicator."); } - String columnkey = column.substring(colpos); + String columnkey = column.substring(colpos + 1); if(columnkey == null || columnkey.length() == 0) { this.matchType = MATCH_TYPE.FAMILY_ONLY; @@ -97,7 +100,7 @@ return c.equals(this.col); } else if(this.matchType == MATCH_TYPE.FAMILY_ONLY) { - return c.toString().startsWith(this.family); + return HStoreKey.extractFamily(c).toString().equals(this.family); } else if(this.matchType == MATCH_TYPE.REGEX) { return this.columnMatcher.matcher(c.toString()).matches(); @@ -211,6 +214,7 @@ * @param key The key that matched * @param results All the results for key * @return true if a match was found + * @throws IOException * * @see org.apache.hadoop.hbase.HScannerInterface#next(org.apache.hadoop.hbase.HStoreKey, java.util.TreeMap) */ Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java (revision 545790) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java (working copy) @@ -53,7 +53,7 @@ COL_REGIONINFO }; - private static final Text EMPTY_START_ROW = new Text(); + static final Text EMPTY_START_ROW = new Text(); long pause; int numRetries; @@ -64,8 +64,8 @@ * Data structure that holds current location for a region and its info. */ static class RegionLocation { - public HRegionInfo regionInfo; - public HServerAddress serverAddress; + HRegionInfo regionInfo; + HServerAddress serverAddress; RegionLocation(HRegionInfo regionInfo, HServerAddress serverAddress) { this.regionInfo = regionInfo; @@ -83,7 +83,7 @@ private TreeMap> tablesToServers; // For the "current" table: Map startRow -> (HRegionInfo, HServerAddress) - private SortedMap tableServers; + SortedMap tableServers; // Known region HServerAddress.toString() -> HRegionInterface private TreeMap servers; @@ -95,7 +95,10 @@ Random rand; long clientid; - /** Creates a new HClient */ + /** + * Creates a new HClient + * @param conf - Configuration object + */ public HClient(Configuration conf) { this.conf = conf; @@ -239,6 +242,12 @@ } } + /** + * Deletes a table + * + * @param tableName - name of table to delete + * @throws IOException + */ public synchronized void deleteTable(Text tableName) throws IOException { checkReservedTableName(tableName); checkMaster(); @@ -254,7 +263,6 @@ HRegionInterface server = getHRegionConnection(firstMetaServer.serverAddress); DataInputBuffer inbuf = new DataInputBuffer(); - HStoreKey key = new HStoreKey(); HRegionInfo info = new HRegionInfo(); for (int tries = 0; tries < numRetries; tries++) { long scannerId = -1L; @@ -261,7 +269,7 @@ try { scannerId = server.openScanner(firstMetaServer.regionInfo.regionName, REGIONINFO, tableName); - LabelledData[] values = server.next(scannerId, key); + KeyedData[] values = server.next(scannerId); if(values == null || values.length == 0) { break; } @@ -267,10 +275,9 @@ } boolean found = false; for(int j = 0; j < values.length; j++) { - if(values[j].getLabel().equals(COL_REGIONINFO)) { + if(values[j].getKey().getColumn().equals(COL_REGIONINFO)) { byte[] bytes = new byte[values[j].getData().getSize()]; - System.arraycopy(values[j].getData().get(), 0, bytes, 0, - bytes.length); + System.arraycopy(values[j].getData().get(), 0, bytes, 0, bytes.length); inbuf.reset(bytes, bytes.length); info.readFields(inbuf); if(info.tableDesc.getName().equals(tableName)) { @@ -301,7 +308,15 @@ LOG.info("table " + tableName + " deleted"); } - public synchronized void addColumn(Text tableName, HColumnDescriptor column) throws IOException { + /** + * Add a column to an existing table + * + * @param tableName - name of the table to add column to + * @param column - column descriptor of column to be added + * @throws IOException + */ + public synchronized void addColumn(Text tableName, HColumnDescriptor column) + throws IOException { checkReservedTableName(tableName); checkMaster(); try { @@ -312,7 +327,15 @@ } } - public synchronized void deleteColumn(Text tableName, Text columnName) throws IOException { + /** + * Delete a column from a table + * + * @param tableName - name of table + * @param columnName - name of column to be deleted + * @throws IOException + */ + public synchronized void deleteColumn(Text tableName, Text columnName) + throws IOException { checkReservedTableName(tableName); checkMaster(); try { @@ -323,6 +346,12 @@ } } + /** + * Brings a table on-line (enables it) + * + * @param tableName - name of the table + * @throws IOException + */ public synchronized void enableTable(Text tableName) throws IOException { checkReservedTableName(tableName); checkMaster(); @@ -340,7 +369,6 @@ HRegionInterface server = getHRegionConnection(firstMetaServer.serverAddress); DataInputBuffer inbuf = new DataInputBuffer(); - HStoreKey key = new HStoreKey(); HRegionInfo info = new HRegionInfo(); for(int tries = 0; tries < numRetries; tries++) { int valuesfound = 0; @@ -348,21 +376,28 @@ try { scannerId = server.openScanner(firstMetaServer.regionInfo.regionName, REGIONINFO, tableName); - LabelledData[] values = server.next(scannerId, key); - if(values == null || values.length == 0) { - if(valuesfound == 0) { - throw new NoSuchElementException("table " + tableName + " not found"); - } - } - valuesfound += 1; boolean isenabled = false; - for(int j = 0; j < values.length; j++) { - if(values[j].getLabel().equals(COL_REGIONINFO)) { - byte[] bytes = new byte[values[j].getData().getSize()]; - System.arraycopy(values[j].getData().get(), 0, bytes, 0, bytes.length); - inbuf.reset(bytes, bytes.length); - info.readFields(inbuf); - isenabled = !info.offLine; + while(true) { + KeyedData[] values = server.next(scannerId); + if(values == null || values.length == 0) { + if(valuesfound == 0) { + throw new NoSuchElementException("table " + tableName + " not found"); + } + break; + } + valuesfound += 1; + for(int j = 0; j < values.length; j++) { + if(values[j].getKey().getColumn().equals(COL_REGIONINFO)) { + byte[] bytes = new byte[values[j].getData().getSize()]; + System.arraycopy(values[j].getData().get(), 0, bytes, 0, bytes.length); + inbuf.reset(bytes, bytes.length); + info.readFields(inbuf); + isenabled = !info.offLine; + break; + } + } + if(isenabled) { + break; } } if(isenabled) { @@ -395,6 +430,13 @@ LOG.info("Enabled table " + tableName); } + /** + * Disables a table (takes it off-line) If it is being served, the master + * will tell the servers to stop serving it. + * + * @param tableName - name of table + * @throws IOException + */ public synchronized void disableTable(Text tableName) throws IOException { checkReservedTableName(tableName); checkMaster(); @@ -412,7 +454,6 @@ HRegionInterface server = getHRegionConnection(firstMetaServer.serverAddress); DataInputBuffer inbuf = new DataInputBuffer(); - HStoreKey key = new HStoreKey(); HRegionInfo info = new HRegionInfo(); for(int tries = 0; tries < numRetries; tries++) { int valuesfound = 0; @@ -420,21 +461,28 @@ try { scannerId = server.openScanner(firstMetaServer.regionInfo.regionName, REGIONINFO, tableName); - LabelledData[] values = server.next(scannerId, key); - if(values == null || values.length == 0) { - if(valuesfound == 0) { - throw new NoSuchElementException("table " + tableName + " not found"); - } - } - valuesfound += 1; boolean disabled = false; - for(int j = 0; j < values.length; j++) { - if(values[j].getLabel().equals(COL_REGIONINFO)) { - byte[] bytes = new byte[values[j].getData().getSize()]; - System.arraycopy(values[j].getData().get(), 0, bytes, 0, bytes.length); - inbuf.reset(bytes, bytes.length); - info.readFields(inbuf); - disabled = info.offLine; + while(true) { + KeyedData[] values = server.next(scannerId); + if(values == null || values.length == 0) { + if(valuesfound == 0) { + throw new NoSuchElementException("table " + tableName + " not found"); + } + break; + } + valuesfound += 1; + for(int j = 0; j < values.length; j++) { + if(values[j].getKey().getColumn().equals(COL_REGIONINFO)) { + byte[] bytes = new byte[values[j].getData().getSize()]; + System.arraycopy(values[j].getData().get(), 0, bytes, 0, bytes.length); + inbuf.reset(bytes, bytes.length); + info.readFields(inbuf); + disabled = info.offLine; + break; + } + } + if(disabled) { + break; } } if(disabled) { @@ -466,6 +514,10 @@ LOG.info("Disabled table " + tableName); } + /** + * Shuts down the HBase instance + * @throws IOException + */ public synchronized void shutdown() throws IOException { checkMaster(); this.master.shutdown(); @@ -675,8 +727,8 @@ * @throws IOException */ private TreeMap scanOneMetaRegion(final RegionLocation t, - final Text tableName) - throws IOException { + final Text tableName) throws IOException { + HRegionInterface server = getHRegionConnection(t.serverAddress); TreeMap servers = new TreeMap(); for(int tries = 0; servers.size() == 0 && tries < this.numRetries; @@ -691,8 +743,7 @@ while(true) { HRegionInfo regionInfo = null; String serverAddress = null; - HStoreKey key = new HStoreKey(); - LabelledData[] values = server.next(scannerId, key); + KeyedData[] values = server.next(scannerId); if(values.length == 0) { if(servers.size() == 0) { // If we didn't find any servers then the table does not exist @@ -713,7 +764,7 @@ for(int i = 0; i < values.length; i++) { bytes = new byte[values[i].getData().getSize()]; System.arraycopy(values[i].getData().get(), 0, bytes, 0, bytes.length); - results.put(values[i].getLabel(), bytes); + results.put(values[i].getKey().getColumn(), bytes); } regionInfo = new HRegionInfo(); bytes = results.get(COL_REGIONINFO); @@ -808,6 +859,9 @@ * If we wanted this to be really fast, we could implement a special * catalog table that just contains table names and their descriptors. * Right now, it only exists as part of the META table's region info. + * + * @return - returns an array of HTableDescriptors + * @throws IOException */ public synchronized HTableDescriptor[] listTables() throws IOException { @@ -828,10 +882,9 @@ scannerId = server.openScanner(t.regionInfo.regionName, META_COLUMNS, EMPTY_START_ROW); - HStoreKey key = new HStoreKey(); DataInputBuffer inbuf = new DataInputBuffer(); while(true) { - LabelledData[] values = server.next(scannerId, key); + KeyedData[] values = server.next(scannerId); if(values.length == 0) { break; } @@ -836,7 +889,7 @@ break; } for(int i = 0; i < values.length; i++) { - if(values[i].getLabel().equals(COL_REGIONINFO)) { + if(values[i].getKey().getColumn().equals(COL_REGIONINFO)) { byte[] bytes = values[i].getData().get(); inbuf.reset(bytes, bytes.length); HRegionInfo info = new HRegionInfo(); @@ -901,7 +954,14 @@ } } - /** Get a single value for the specified row and column */ + /** + * Get a single value for the specified row and column + * + * @param row - row key + * @param column - column name + * @return - value for specified row/column + * @throws IOException + */ public byte[] get(Text row, Text column) throws IOException { RegionLocation info = null; BytesWritable value = null; @@ -931,7 +991,15 @@ return null; } - /** Get the specified number of versions of the specified row and column */ + /** + * Get the specified number of versions of the specified row and column + * + * @param row - row key + * @param column - column name + * @param numVersions - number of versions to retrieve + * @return - array byte values + * @throws IOException + */ public byte[][] get(Text row, Text column, int numVersions) throws IOException { RegionLocation info = null; BytesWritable[] values = null; @@ -968,8 +1036,16 @@ /** * Get the specified number of versions of the specified row and column with * the specified timestamp. + * + * @param row - row key + * @param column - column name + * @param timestamp - timestamp + * @param numVersions - number of versions to retrieve + * @return - array of values that match the above criteria + * @throws IOException */ - public byte[][] get(Text row, Text column, long timestamp, int numVersions) throws IOException { + public byte[][] get(Text row, Text column, long timestamp, int numVersions) + throws IOException { RegionLocation info = null; BytesWritable[] values = null; @@ -1002,10 +1078,16 @@ return null; } - /** Get all the data for the specified row */ - public LabelledData[] getRow(Text row) throws IOException { + /** + * Get all the data for the specified row + * + * @param row - row key + * @return - map of colums to values + * @throws IOException + */ + public SortedMap getRow(Text row) throws IOException { RegionLocation info = null; - LabelledData[] value = null; + KeyedData[] value = null; for(int tries = 0; tries < numRetries && info == null; tries++) { info = getRegionLocation(row); @@ -1023,8 +1105,15 @@ info = null; } } - - return value; + TreeMap results = new TreeMap(); + if(value != null && value.length != 0) { + for(int i = 0; i < value.length; i++) { + byte[] bytes = new byte[value[i].getData().getSize()]; + System.arraycopy(value[i].getData().get(), 0, bytes, 0, bytes.length); + results.put(value[i].getKey().getColumn(), bytes); + } + } + return results; } /** @@ -1030,8 +1119,15 @@ /** * Get a scanner on the current table starting at the specified row. * Return the specified columns. + * + * @param columns - array of columns to return + * @param startRow - starting row in table to scan + * @return - scanner + * @throws IOException */ - public synchronized HScannerInterface obtainScanner(Text[] columns, Text startRow) throws IOException { + public synchronized HScannerInterface obtainScanner(Text[] columns, + Text startRow) throws IOException { + if(this.tableServers == null) { throw new IllegalStateException("Must open table first"); } @@ -1069,9 +1165,20 @@ long startUpdate() throws IOException; } - /* Start an atomic row insertion or update + /** + * Start an atomic row insertion/update. No changes are committed until the + * call to commit() returns. A call to abort() will abandon any updates in progress. + * + * Callers to this method are given a lease for each unique lockid; before the + * lease expires, either abort() or commit() must be called. If it is not + * called, the system will automatically call abort() on the client's behalf. + * + * The client can gain extra time with a call to renewLease(). + * Start an atomic row insertion or update + * * @param row Name of row to start update against. * @return Row lockid. + * @throws IOException */ public long startUpdate(final Text row) throws IOException { // Implemention of the StartUpdate interface. @@ -1114,7 +1221,14 @@ return retryProxy.startUpdate(); } - /** Change a value for the specified column */ + /** + * Change a value for the specified column + * + * @param lockid - lock id returned from startUpdate + * @param column - column whose value is being set + * @param val - new value for column + * @throws IOException + */ public void put(long lockid, Text column, byte val[]) throws IOException { try { this.currentServer.put(this.currentRegion, this.clientid, lockid, column, @@ -1131,7 +1245,13 @@ } } - /** Delete the value for a column */ + /** + * Delete the value for a column + * + * @param lockid - lock id returned from startUpdate + * @param column - name of column whose value is to be deleted + * @throws IOException + */ public void delete(long lockid, Text column) throws IOException { try { this.currentServer.delete(this.currentRegion, this.clientid, lockid, @@ -1148,7 +1268,12 @@ } } - /** Abort a row mutation */ + /** + * Abort a row mutation + * + * @param lockid - lock id returned from startUpdate + * @throws IOException + */ public void abort(long lockid) throws IOException { try { this.currentServer.abort(this.currentRegion, this.clientid, lockid); @@ -1159,7 +1284,12 @@ } } - /** Finalize a row mutation */ + /** + * Finalize a row mutation + * + * @param lockid - lock id returned from startUpdate + * @throws IOException + */ public void commit(long lockid) throws IOException { try { this.currentServer.commit(this.currentRegion, this.clientid, lockid); @@ -1170,6 +1300,27 @@ } /** + * Renew lease on update + * + * @param lockid - lock id returned from startUpdate + * @throws IOException + */ + public void renewLease(long lockid) throws IOException { + try { + this.currentServer.renewLease(lockid, this.clientid); + } catch(IOException e) { + try { + this.currentServer.abort(this.currentRegion, this.clientid, lockid); + } catch(IOException e2) { + LOG.warn(e2); + } + this.currentServer = null; + this.currentRegion = null; + throw e; + } + } + + /** * Implements the scanner interface for the HBase client. * If there are multiple regions in a table, this scanner will iterate * through them all. @@ -1175,6 +1326,7 @@ * through them all. */ private class ClientScanner implements HScannerInterface { + private final Text EMPTY_COLUMN = new Text(); private Text[] columns; private Text startRow; private boolean closed; @@ -1198,7 +1350,7 @@ this.regions = info.toArray(new RegionLocation[info.size()]); } - public ClientScanner(Text[] columns, Text startRow) throws IOException { + ClientScanner(Text[] columns, Text startRow) throws IOException { this.columns = columns; this.startRow = startRow; this.closed = false; @@ -1260,17 +1412,22 @@ if(this.closed) { return false; } - LabelledData[] values = null; + KeyedData[] values = null; do { - values = this.server.next(this.scannerId, key); - } while(values.length == 0 && nextScanner()); + values = this.server.next(this.scannerId); + } while(values != null && values.length == 0 && nextScanner()); - for(int i = 0; i < values.length; i++) { - byte[] bytes = new byte[values[i].getData().getSize()]; - System.arraycopy(values[i].getData().get(), 0, bytes, 0, bytes.length); - results.put(values[i].getLabel(), bytes); + if(values != null && values.length != 0) { + for(int i = 0; i < values.length; i++) { + key.setRow(values[i].getKey().getRow()); + key.setVersion(values[i].getKey().getTimestamp()); + key.setColumn(EMPTY_COLUMN); + byte[] bytes = new byte[values[i].getData().getSize()]; + System.arraycopy(values[i].getData().get(), 0, bytes, 0, bytes.length); + results.put(values[i].getKey().getColumn(), bytes); + } } - return values.length != 0; + return values == null ? false : values.length != 0; } /* (non-Javadoc) @@ -1333,8 +1490,13 @@ " deleteTable testtable"); } + /** + * Process command-line args. + * @param args - command arguments + * @return 0 if successful -1 otherwise + */ public int doCommandLine(final String args[]) { - // Process command-line args. TODO: Better cmd-line processing + // TODO: Better cmd-line processing // (but hopefully something not as painful as cli options). int errCode = -1; if (args.length < 1) { @@ -1416,6 +1578,10 @@ return errCode; } + /** + * Main program + * @param args + */ public static void main(final String args[]) { Configuration c = new HBaseConfiguration(); int errCode = (new HClient(c)).doCommandLine(args); Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HColumnDescriptor.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HColumnDescriptor.java (revision 545790) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HColumnDescriptor.java (working copy) @@ -81,6 +81,12 @@ this.versionNumber = COLUMN_DESCRIPTOR_VERSION; } + /** + * Construct a column descriptor specifying only the family name + * The other attributes are defaulted. + * + * @param columnName - column family name + */ public HColumnDescriptor(String columnName) { this(); this.name.set(columnName); Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java (revision 545790) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java (working copy) @@ -34,16 +34,29 @@ // TODO: Support 'local': i.e. default of all running in single // process. Same for regionserver. TODO: Is having HBase homed // on port 60k OK? + + /** Parameter name for master address */ static final String MASTER_ADDRESS = "hbase.master"; + + /** Default master address */ static final String DEFAULT_MASTER_ADDRESS = "localhost:60000"; - // Key for hbase.regionserver address. + /** Parameter name for hbase.regionserver address. */ static final String REGIONSERVER_ADDRESS = "hbase.regionserver"; + + /** Default region server address */ static final String DEFAULT_REGIONSERVER_ADDRESS = "localhost:60010"; + /** Parameter name for how often threads should wake up */ static final String THREAD_WAKE_FREQUENCY = "hbase.server.thread.wakefrequency"; - static final String HREGION_DIR = "hbase.regiondir"; - static final String DEFAULT_HREGION_DIR = "/hbase"; + + /** Parameter name for HBase instance root directory */ + static final String HBASE_DIR = "hbase.rootdir"; + + /** Default HBase instance root directory */ + static final String DEFAULT_HBASE_DIR = "/hbase"; + + /** Used to construct the name of the directory in which a HRegion resides */ static final String HREGIONDIR_PREFIX = "hregion_"; // TODO: Someone may try to name a column family 'log'. If they @@ -48,8 +61,14 @@ // TODO: Someone may try to name a column family 'log'. If they // do, it will clash with the HREGION log dir subdirectory. FIX. + + /** Used to construct the name of the log directory for a region server */ static final String HREGION_LOGDIR_NAME = "log"; + + /** Name of old log file for reconstruction */ + static final String HREGION_OLDLOGFILE_NAME = "oldlogfile.log"; + /** Default maximum file size */ static final long DEFAULT_MAX_FILE_SIZE = 128 * 1024 * 1024; // 128MB // Always store the location of the root table's HRegion. @@ -59,26 +78,36 @@ // each row in the root and meta tables describes exactly 1 region // Do we ever need to know all the information that we are storing? - // The root tables' name. + /** The root table's name. */ static final Text ROOT_TABLE_NAME = new Text("--ROOT--"); - // The META tables' name. + /** The META table's name. */ static final Text META_TABLE_NAME = new Text("--META--"); - // Defines for the column names used in both ROOT and META HBase 'meta' - // tables. + // Defines for the column names used in both ROOT and META HBase 'meta' tables. + + /** The ROOT and META column family */ static final Text COLUMN_FAMILY = new Text("info:"); + + /** ROOT/META column family member - contains HRegionInfo */ static final Text COL_REGIONINFO = new Text(COLUMN_FAMILY + "regioninfo"); + + /** ROOT/META column family member - contains HServerAddress.toString() */ static final Text COL_SERVER = new Text(COLUMN_FAMILY + "server"); + + /** ROOT/META column family member - contains server start code (a long) */ static final Text COL_STARTCODE = new Text(COLUMN_FAMILY + "serverstartcode"); // Other constants - + + /** When we encode strings, we always specify UTF8 encoding */ static final String UTF8_ENCODING = "UTF-8"; - + + /** Value stored for a deleted item */ static final BytesWritable DELETE_BYTES = new BytesWritable("HBASE::DELETEVAL".getBytes()); - + + /** Value written to HLog on a complete cache flush */ static final BytesWritable COMPLETE_CACHEFLUSH = new BytesWritable("HBASE::CACHEFLUSH".getBytes()); Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java (revision 545790) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java (working copy) @@ -81,12 +81,11 @@ Integer rollLock = 0; /** - * Bundle up a bunch of log files (which are no longer being written to), - * into a new file. Delete the old log files when ready. - * @param srcDir Directory of log files to bundle: - * e.g. ${REGIONDIR}/log_HOST_PORT - * @param dstFile Destination file: - * e.g. ${REGIONDIR}/oldlogfile_HOST_PORT + * Split up a bunch of log files, that are no longer being written to, + * into new files, one per region. Delete the old log files when ready. + * @param rootDir Root directory of the HBase instance + * @param srcDir Directory of log files to split: + * e.g. ${ROOTDIR}/log_HOST_PORT * @param fs FileSystem * @param conf HBaseConfiguration * @throws IOException @@ -91,16 +90,17 @@ * @param conf HBaseConfiguration * @throws IOException */ - public static void consolidateOldLog(Path srcDir, Path dstFile, - FileSystem fs, Configuration conf) - throws IOException { + static void splitLog(Path rootDir, Path srcDir, FileSystem fs, + Configuration conf) throws IOException { + if(LOG.isDebugEnabled()) { - LOG.debug("consolidating log files"); + LOG.debug("splitting log files"); } Path logfiles[] = fs.listPaths(srcDir); - SequenceFile.Writer newlog = SequenceFile.createWriter(fs, conf, dstFile, - HLogKey.class, HLogEdit.class); + TreeMap logWriters = + new TreeMap(); + try { for(int i = 0; i < logfiles.length; i++) { SequenceFile.Reader in = @@ -109,7 +109,17 @@ HLogKey key = new HLogKey(); HLogEdit val = new HLogEdit(); while(in.next(key, val)) { - newlog.append(key, val); + Text regionName = key.getRegionName(); + SequenceFile.Writer w = logWriters.get(regionName); + if(w == null) { + Path logfile = new Path(HStoreFile.getHRegionDir(rootDir, + regionName), HREGION_OLDLOGFILE_NAME); + + w = SequenceFile.createWriter(fs, conf, logfile, HLogKey.class, + HLogEdit.class); + logWriters.put(regionName, w); + } + w.append(key, val); } } finally { @@ -118,7 +128,9 @@ } } finally { - newlog.close(); + for(SequenceFile.Writer w: logWriters.values()) { + w.close(); + } } if(fs.exists(srcDir)) { @@ -132,7 +144,7 @@ } } if(LOG.isDebugEnabled()) { - LOG.debug("log file consolidation completed"); + LOG.debug("log file splitting completed"); } } @@ -142,8 +154,13 @@ * You should never have to load an existing log. If there is a log * at startup, it should have already been processed and deleted by * the time the HLog object is started up. + * + * @param fs + * @param dir + * @param conf + * @throws IOException */ - public HLog(FileSystem fs, Path dir, Configuration conf) throws IOException { + HLog(FileSystem fs, Path dir, Configuration conf) throws IOException { this.fs = fs; this.dir = dir; this.conf = conf; @@ -163,8 +180,10 @@ * * The 'this' lock limits access to the current writer so * we don't append multiple items simultaneously. + * + * @throws IOException */ - public void rollWriter() throws IOException { + void rollWriter() throws IOException { synchronized(rollLock) { // Try to roll the writer to a new file. We may have to @@ -267,8 +286,21 @@ return new Path(dir, HLOG_DATFILE + String.format("%1$03d", filenum)); } - /** Shut down the log. */ - public synchronized void close() throws IOException { + /** + * Shut down the log and delete the log directory + * @throws IOException + */ + synchronized void closeAndDelete() throws IOException { + rollWriter(); + close(); + fs.delete(dir); + } + + /** + * Shut down the log. + * @throws IOException + */ + synchronized void close() throws IOException { if(LOG.isDebugEnabled()) { LOG.debug("closing log writer"); } @@ -300,7 +332,7 @@ * @param timestamp * @throws IOException */ - public synchronized void append(Text regionName, Text tableName, Text row, + synchronized void append(Text regionName, Text tableName, Text row, TreeMap columns, long timestamp) throws IOException { if(closed) { @@ -327,8 +359,8 @@ } } - /** How many items have been added to the log? */ - public int getNumEntries() { + /** @return How many items have been added to the log */ + int getNumEntries() { return numEntries; } @@ -340,6 +372,12 @@ return logSeqNum++; } + /** + * Obtain a specified number of sequence numbers + * + * @param num - number of sequence numbers to obtain + * @return - array of sequence numbers + */ synchronized long[] obtainSeqNum(int num) { long[] results = new long[num]; for (int i = 0; i < num; i++) { @@ -358,7 +396,7 @@ * @return sequence ID to pass {@link #completeCacheFlush(Text, Text, long)} * @see #completeCacheFlush(Text, Text, long) */ - public synchronized long startCacheFlush() { + synchronized long startCacheFlush() { while (insideCacheFlush) { try { wait(); @@ -370,8 +408,13 @@ return obtainSeqNum(); } - /** Complete the cache flush */ - public synchronized void completeCacheFlush(final Text regionName, + /** Complete the cache flush + * @param regionName + * @param tableName + * @param logSeqId + * @throws IOException + */ + synchronized void completeCacheFlush(final Text regionName, final Text tableName, final long logSeqId) throws IOException { if(closed) { Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLogKey.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLogKey.java (revision 545790) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLogKey.java (working copy) @@ -32,6 +32,10 @@ Text row = new Text(); long logSeqNum = 0L; + /** Create an empty key useful when deserializing */ + public HLogKey() { + } + /** * Create the log key! * We maintain the tablename mainly for debugging purposes. @@ -36,10 +40,12 @@ * Create the log key! * We maintain the tablename mainly for debugging purposes. * A regionName is always a sub-table object. + * + * @param regionName - name of region + * @param tablename - name of table + * @param row - row key + * @param logSeqNum - log sequence number */ - public HLogKey() { - } - public HLogKey(Text regionName, Text tablename, Text row, long logSeqNum) { this.regionName.set(regionName); this.tablename.set(tablename); @@ -51,19 +57,19 @@ // A bunch of accessors ////////////////////////////////////////////////////////////////////////////// - public Text getRegionName() { + Text getRegionName() { return regionName; } - public Text getTablename() { + Text getTablename() { return tablename; } - public Text getRow() { + Text getRow() { return row; } - public long getLogSeqNum() { + long getLogSeqNum() { return logSeqNum; } @@ -69,8 +75,7 @@ @Override public String toString() { - return getTablename().toString() + " " + getRegionName().toString() + " " + - getRow().toString() + " " + getLogSeqNum(); + return tablename + " " + regionName + " " + row + " " + logSeqNum; } @Override @@ -90,10 +95,8 @@ // Comparable ////////////////////////////////////////////////////////////////////////////// - /** - * When sorting through log entries, we want to group items - * first in the same table, then to the same row, then finally - * ordered by write-order. + /* (non-Javadoc) + * @see java.lang.Comparable#compareTo(java.lang.Object) */ public int compareTo(Object o) { HLogKey other = (HLogKey) o; @@ -119,6 +122,9 @@ // Writable ////////////////////////////////////////////////////////////////////////////// + /* (non-Javadoc) + * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput) + */ public void write(DataOutput out) throws IOException { this.regionName.write(out); this.tablename.write(out); @@ -126,6 +132,9 @@ out.writeLong(logSeqNum); } + /* (non-Javadoc) + * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput) + */ public void readFields(DataInput in) throws IOException { this.regionName.readFields(in); this.tablename.readFields(in); Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (revision 545790) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (working copy) @@ -21,6 +21,7 @@ import java.io.UnsupportedEncodingException; import java.util.Collections; import java.util.Iterator; +import java.util.LinkedList; import java.util.Map; import java.util.Random; import java.util.SortedMap; @@ -50,6 +51,9 @@ public class HMaster implements HConstants, HMasterInterface, HMasterRegionInterface, Runnable { + /* (non-Javadoc) + * @see org.apache.hadoop.ipc.VersionedProtocol#getProtocolVersion(java.lang.String, long) + */ public long getProtocolVersion(String protocol, @SuppressWarnings("unused") long clientVersion) throws IOException { @@ -67,14 +71,14 @@ volatile boolean closed; Path dir; - private Configuration conf; + Configuration conf; FileSystem fs; Random rand; - private long threadWakeFrequency; - private int numRetries; - private long maxRegionOpenTime; + long threadWakeFrequency; + int numRetries; + long maxRegionOpenTime; - Vector msgQueue; + LinkedList msgQueue; private Leases serverLeases; private Server server; @@ -84,7 +88,7 @@ long metaRescanInterval; - private HServerAddress rootRegionLocation; + HServerAddress rootRegionLocation; /** * Columns in the 'meta' ROOT and META tables. @@ -93,7 +97,6 @@ COLUMN_FAMILY }; - static final String MASTER_NOT_RUNNING = "Master not running"; boolean rootScanned; int numMetaRegions; @@ -166,20 +169,20 @@ try { regionServer = client.getHRegionConnection(region.server); scannerId = regionServer.openScanner(region.regionName, METACOLUMNS, - FIRST_ROW); + FIRST_ROW); + while (true) { TreeMap results = new TreeMap(); - HStoreKey key = new HStoreKey(); - LabelledData[] values = regionServer.next(scannerId, key); + KeyedData[] values = regionServer.next(scannerId); if (values.length == 0) { break; } - + for (int i = 0; i < values.length; i++) { byte[] bytes = new byte[values[i].getData().getSize()]; System.arraycopy(values[i].getData().get(), 0, bytes, 0, - bytes.length); - results.put(values[i].getLabel(), bytes); + bytes.length); + results.put(values[i].getKey().getColumn(), bytes); } HRegionInfo info = HRegion.getRegionInfo(results); @@ -188,8 +191,8 @@ if(LOG.isDebugEnabled()) { LOG.debug(Thread.currentThread().getName() + " scanner: " + - Long.valueOf(scannerId) + " regioninfo: {" + info.toString() + - "}, server: " + serverName + ", startCode: " + startCode); + Long.valueOf(scannerId) + " regioninfo: {" + info.toString() + + "}, server: " + serverName + ", startCode: " + startCode); } // Note Region has been assigned. @@ -196,6 +199,7 @@ checkAssigned(info, serverName, startCode); scannedRegion = true; } + } catch (UnknownScannerException e) { // Reset scannerId so we do not try closing a scanner the other side // has lost account of: prevents duplicated stack trace out of the @@ -201,6 +205,7 @@ // has lost account of: prevents duplicated stack trace out of the // below close in the finally. scannerId = -1L; + } finally { try { if (scannerId != -1L) { @@ -249,13 +254,25 @@ } storedInfo = serversToServerInfo.get(serverName); } - if(storedInfo == null || storedInfo.getStartCode() != startCode) { + if( !( + unassignedRegions.containsKey(info.regionName) + || pendingRegions.contains(info.regionName) + ) + && (storedInfo == null + || storedInfo.getStartCode() != startCode)) { + + if(LOG.isDebugEnabled()) { + LOG.debug("region unassigned: " + info.regionName + + " serverName: " + serverName + + (storedInfo == null ? " storedInfo == null" + : (" startCode=" + startCode + ", storedStartCode=" + + storedInfo.getStartCode()))); + } + // The current assignment is no good; load the region. + unassignedRegions.put(info.regionName, info); assignAttempts.put(info.regionName, 0L); - if(LOG.isDebugEnabled()) { - LOG.debug("region unassigned: " + info.regionName); - } } } } @@ -263,7 +280,7 @@ /** * Scanner for the ROOT HRegion. */ - private class RootScanner extends BaseScanner { + class RootScanner extends BaseScanner { public void run() { if (LOG.isDebugEnabled()) { LOG.debug("Running ROOT scanner"); @@ -268,8 +285,9 @@ if (LOG.isDebugEnabled()) { LOG.debug("Running ROOT scanner"); } - try { - while(!closed) { + int tries = 0; + while(!closed && tries < numRetries) { + try { // rootRegionLocation will be filled in when we get an 'open region' // regionServerReport message from the HRegionServer that has been // allocated the ROOT region below. If we get back false, then @@ -289,16 +307,25 @@ } rootScanned = true; } - try { - Thread.sleep(metaRescanInterval); - } catch(InterruptedException e) { - // Catch and go around again. If interrupt, its spurious or we're - // being shutdown. Go back up to the while test. + tries = 0; + + } catch(IOException e) { + tries++; + if(tries < numRetries) { + LOG.warn("ROOT scanner", e); + + } else { + LOG.error("ROOT scanner", e); + closed = true; + break; } } - } catch(IOException e) { - LOG.error("ROOT scanner", e); - closed = true; + try { + Thread.sleep(metaRescanInterval); + } catch(InterruptedException e) { + // Catch and go around again. If interrupt, its spurious or we're + // being shutdown. Go back up to the while test. + } } LOG.info("ROOT scanner exiting"); } @@ -306,12 +333,13 @@ private RootScanner rootScanner; private Thread rootScannerThread; - private Integer rootScannerLock = 0; - - private static class MetaRegion implements Comparable { - public HServerAddress server; - public Text regionName; - public Text startKey; + Integer rootScannerLock = 0; + + @SuppressWarnings("unchecked") + static class MetaRegion implements Comparable { + HServerAddress server; + Text regionName; + Text startKey; @Override public boolean equals(Object o) { @@ -354,7 +382,7 @@ * It's important to do this work in a separate thread, or else the blocking * action would prevent other work from getting done. */ - private class MetaScanner extends BaseScanner { + class MetaScanner extends BaseScanner { @SuppressWarnings("null") public void run() { while (!closed) { @@ -369,7 +397,7 @@ } if (region == null) { try { - metaRegionsToScan.wait(); + metaRegionsToScan.wait(threadWakeFrequency); } catch (InterruptedException e) { // Catch and go around again. We've been woken because there // are new meta regions available or because we are being @@ -394,6 +422,7 @@ } } + int tries = 0; do { try { Thread.sleep(metaRescanInterval); @@ -405,13 +434,26 @@ break; } - // Rescan the known meta regions every so often + try { + + // Rescan the known meta regions every so often - synchronized(metaScannerLock) { // Don't interrupt us while we're working - Vector v = new Vector(); - v.addAll(knownMetaRegions.values()); - for(Iterator i = v.iterator(); i.hasNext(); ) { - scanRegion(i.next()); + synchronized(metaScannerLock) { // Don't interrupt us while we're working + Vector v = new Vector(); + v.addAll(knownMetaRegions.values()); + for(Iterator i = v.iterator(); i.hasNext(); ) { + scanRegion(i.next()); + } + } + tries = 0; + + } catch (IOException e) { + tries++; + if(tries < numRetries) { + LOG.warn("META scanner", e); + + } else { + throw e; } } } while(true); @@ -424,6 +466,10 @@ LOG.info("META scanner exiting"); } + /** + * Called by the meta scanner when it has completed scanning all meta + * regions. This wakes up any threads that were waiting for this to happen. + */ private synchronized void metaRegionsScanned() { notifyAll(); } @@ -428,10 +474,14 @@ notifyAll(); } - public synchronized void waitForMetaScan() { + /** + * Other threads call this method to wait until all the meta regions have + * been scanned. + */ + synchronized boolean waitForMetaScanOrClose() { while(!closed && !allMetaRegionsScanned) { try { - wait(); + wait(threadWakeFrequency); } catch(InterruptedException e) { // continue } @@ -436,6 +486,7 @@ // continue } } + return closed; } } @@ -442,40 +493,55 @@ MetaScanner metaScanner; private Thread metaScannerThread; Integer metaScannerLock = 0; - - // The 'unassignedRegions' table maps from a region name to a HRegionInfo record, - // which includes the region's table, its id, and its start/end keys. - // - // We fill 'unassignedRecords' by scanning ROOT and META tables, learning the - // set of all known valid regions. + /** + * The 'unassignedRegions' table maps from a region name to a HRegionInfo + * record, which includes the region's table, its id, and its start/end keys. + * + * We fill 'unassignedRecords' by scanning ROOT and META tables, learning the + * set of all known valid regions. + */ SortedMap unassignedRegions; - // The 'assignAttempts' table maps from regions to a timestamp that indicates - // the last time we *tried* to assign the region to a RegionServer. If the - // timestamp is out of date, then we can try to reassign it. - + /** + * The 'assignAttempts' table maps from regions to a timestamp that indicates + * the last time we *tried* to assign the region to a RegionServer. If the + * timestamp is out of date, then we can try to reassign it. + */ SortedMap assignAttempts; + /** + * Regions that have been assigned, and the server has reported that it has + * started serving it, but that we have not yet recorded in the meta table. + */ + SortedSet pendingRegions; + + /** + * The 'killList' is a list of regions that are going to be closed, but not + * reopened. + */ SortedMap> killList; - // 'killedRegions' contains regions that are in the process of being closed - + /** 'killedRegions' contains regions that are in the process of being closed */ SortedSet killedRegions; - - // 'regionsToDelete' contains regions that need to be deleted, but cannot be - // until the region server closes it - + + /** + * 'regionsToDelete' contains regions that need to be deleted, but cannot be + * until the region server closes it + */ SortedSet regionsToDelete; - // A map of known server names to server info - + /** The map of known server names to server info */ SortedMap serversToServerInfo = Collections.synchronizedSortedMap(new TreeMap()); - /** Build the HMaster out of a raw configuration item. */ + /** Build the HMaster out of a raw configuration item. + * + * @param conf - Configuration object + * @throws IOException + */ public HMaster(Configuration conf) throws IOException { - this(new Path(conf.get(HREGION_DIR, DEFAULT_HREGION_DIR)), + this(new Path(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR)), new HServerAddress(conf.get(MASTER_ADDRESS, DEFAULT_MASTER_ADDRESS)), conf); } @@ -515,9 +581,9 @@ // Add first region from the META table to the ROOT region. HRegion.addRegionToMETA(root, meta); root.close(); - root.getLog().close(); + root.getLog().closeAndDelete(); meta.close(); - meta.getLog().close(); + meta.getLog().closeAndDelete(); } catch(IOException e) { LOG.error(e); } @@ -526,7 +592,7 @@ this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000); this.numRetries = conf.getInt("hbase.client.retries.number", 2); this.maxRegionOpenTime = conf.getLong("hbase.hbasemaster.maxregionopen", 30 * 1000); - this.msgQueue = new Vector(); + this.msgQueue = new LinkedList(); this.serverLeases = new Leases( conf.getLong("hbase.master.lease.period", 30 * 1000), conf.getLong("hbase.master.lease.thread.wakefrequency", 15 * 1000)); @@ -573,6 +639,9 @@ this.assignAttempts = Collections.synchronizedSortedMap(new TreeMap()); + this.pendingRegions = + Collections.synchronizedSortedSet(new TreeSet()); + this.assignAttempts.put(HGlobals.rootRegionInfo.regionName, 0L); this.killList = @@ -589,14 +658,17 @@ this.closed = false; - LOG.info("HMaster initialized on " + address.toString()); + LOG.info("HMaster initialized on " + this.address.toString()); } - /** returns the HMaster server address */ - public HServerAddress getMasterAddress() { + /** + * @return HServerAddress of the master server + */ + HServerAddress getMasterAddress() { return address; } + /** Main processing loop */ public void run() { Thread.currentThread().setName("HMaster"); try { @@ -626,7 +698,7 @@ if(closed) { continue; } - op = msgQueue.remove(msgQueue.size()-1); + op = msgQueue.removeFirst(); } try { if (LOG.isDebugEnabled()) { @@ -634,7 +706,10 @@ } op.process(); } catch(Exception ex) { - msgQueue.insertElementAt(op, 0); + LOG.warn(ex); + synchronized(msgQueue) { + msgQueue.addLast(op); + } } } letRegionServersShutdown(); @@ -698,7 +773,8 @@ while (endTime > System.currentTimeMillis() && this.serversToServerInfo.size() > 0) { if (LOG.isDebugEnabled()) { - LOG.debug("Waiting on regionservers: " + this.serversToServerInfo); + LOG.debug("Waiting on regionservers: " + + this.serversToServerInfo.values()); } try { Thread.sleep(threadWakeFrequency); @@ -713,7 +789,7 @@ * closed flag has been set. * @return True if rootRegionLocation was populated. */ - private synchronized boolean waitForRootRegionOrClose() { + synchronized boolean waitForRootRegionOrClose() { while (!closed && rootRegionLocation == null) { try { if (LOG.isDebugEnabled()) { @@ -719,7 +795,7 @@ if (LOG.isDebugEnabled()) { LOG.debug("Wait for root region (or close)"); } - wait(); + wait(threadWakeFrequency); if (LOG.isDebugEnabled()) { LOG.debug("Wake from wait for root region (or close)"); } @@ -736,7 +812,9 @@ // HMasterRegionInterface ////////////////////////////////////////////////////////////////////////////// - /** HRegionServers call this method upon startup. */ + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.HMasterRegionInterface#regionServerStartup(org.apache.hadoop.hbase.HServerInfo) + */ public void regionServerStartup(HServerInfo serverInfo) throws IOException { String s = serverInfo.getServerAddress().toString().trim(); HServerInfo storedInfo = null; @@ -752,7 +830,7 @@ if(storedInfo != null && !closed) { synchronized(msgQueue) { - msgQueue.add(new PendingServerShutdown(storedInfo)); + msgQueue.addLast(new PendingServerShutdown(storedInfo)); msgQueue.notifyAll(); } } @@ -768,7 +846,9 @@ } } - /** HRegionServers call this method repeatedly. */ + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.HMasterRegionInterface#regionServerReport(org.apache.hadoop.hbase.HServerInfo, org.apache.hadoop.hbase.HMsg[]) + */ public HMsg[] regionServerReport(HServerInfo serverInfo, HMsg msgs[]) throws IOException { String s = serverInfo.getServerAddress().toString().trim(); @@ -774,18 +854,39 @@ String s = serverInfo.getServerAddress().toString().trim(); Text serverLabel = new Text(s); - if (closed || - msgs.length == 1 && msgs[0].getMsg() == HMsg.MSG_REPORT_EXITING) { - // HRegionServer is shutting down. - if (serversToServerInfo.remove(s) != null) { - // Only cancel lease once (This block can run a couple of times during - // shutdown). - LOG.debug("Cancelling lease for " + serverLabel); - serverLeases.cancelLease(serverLabel, serverLabel); - } + if (closed) { + // Cancel the server's lease + cancelLease(s, serverLabel); + + // Tell server to shut down HMsg returnMsgs[] = {new HMsg(HMsg.MSG_REGIONSERVER_STOP)}; return returnMsgs; } + + if(msgs.length > 0 && msgs[0].getMsg() == HMsg.MSG_REPORT_EXITING) { + // HRegionServer is shutting down. + + // Cancel the server's lease + cancelLease(s, serverLabel); + + // Get all the regions the server was serving reassigned + + for(int i = 1; i < msgs.length; i++) { + HRegionInfo info = msgs[i].getRegionInfo(); + if(info.tableDesc.getName().equals(ROOT_TABLE_NAME)) { + rootRegionLocation = null; + + } else if(info.tableDesc.getName().equals(META_TABLE_NAME)) { + allMetaRegionsScanned = false; + } + unassignedRegions.put(info.regionName, info); + assignAttempts.put(info.regionName, 0L); + } + + // We don't need to return anything to the server because it isn't + // going to do any more work. + return new HMsg[0]; + } HServerInfo storedInfo = serversToServerInfo.get(s); @@ -838,8 +939,18 @@ } } + /** cancel a server's lease */ + private void cancelLease(String serverName, Text serverLabel) throws IOException { + if (serversToServerInfo.remove(serverName) != null) { + // Only cancel lease once. + // This method can be called a couple of times during shutdown. + LOG.debug("Cancelling lease for " + serverName); + serverLeases.cancelLease(serverLabel, serverLabel); + } + } + /** Process all the incoming messages from a server that's contacted us. */ - HMsg[] processMsgs(HServerInfo info, HMsg incomingMsgs[]) throws IOException { + private HMsg[] processMsgs(HServerInfo info, HMsg incomingMsgs[]) throws IOException { Vector returnMsgs = new Vector(); TreeMap regionsToKill = @@ -859,7 +970,7 @@ if(LOG.isDebugEnabled()) { LOG.debug("region server " + info.getServerAddress().toString() - + "should not have opened region " + region.regionName); + + " should not have opened region " + region.regionName); } // This Region should not have been opened. @@ -876,6 +987,11 @@ + region.regionName); } + // Note that it has been assigned and is waiting for the meta table + // to be updated. + + pendingRegions.add(region.regionName); + // Remove from unassigned list so we don't assign it to someone else unassignedRegions.remove(region.regionName); @@ -892,7 +1008,7 @@ rootRegionIsAvailable(); break; - } else if(region.regionName.find(META_TABLE_NAME.toString()) == 0) { + } else if(region.tableDesc.getName().equals(META_TABLE_NAME)) { // It's a meta region. Put it on the queue to be scanned. @@ -910,7 +1026,7 @@ // Queue up an update to note the region location. synchronized(msgQueue) { - msgQueue.add(new PendingOpenReport(info, region.regionName)); + msgQueue.addLast(new PendingOpenReport(info, region)); msgQueue.notifyAll(); } } @@ -943,7 +1059,7 @@ assignAttempts.remove(region.regionName); synchronized(msgQueue) { - msgQueue.add(new PendingCloseReport(region, reassignRegion, deleteRegion)); + msgQueue.addLast(new PendingCloseReport(region, reassignRegion, deleteRegion)); msgQueue.notifyAll(); } @@ -961,7 +1077,7 @@ // A region has split and the old server is serving the two new regions. - if(region.regionName.find(META_TABLE_NAME.toString()) == 0) { + if(region.tableDesc.getName().equals(META_TABLE_NAME)) { // A meta region has split. allMetaRegionsScanned = false; @@ -996,10 +1112,7 @@ int counter = 0; long now = System.currentTimeMillis(); - for(Iterator it = unassignedRegions.keySet().iterator(); - it.hasNext(); ) { - - Text curRegionName = it.next(); + for(Text curRegionName: unassignedRegions.keySet()) { HRegionInfo regionInfo = unassignedRegions.get(curRegionName); long assignedTime = assignAttempts.get(curRegionName); @@ -1023,6 +1136,11 @@ return returnMsgs.toArray(new HMsg[returnMsgs.size()]); } + /** + * Called when the master has received a report from a region server that it + * is now serving the root region. Causes any threads waiting for the root + * region to be available to be woken up. + */ private synchronized void rootRegionIsAvailable() { notifyAll(); } @@ -1038,15 +1156,21 @@ protected final Text startRow = new Text(); protected long clientId; - public PendingOperation() { + PendingOperation() { this.clientId = rand.nextLong(); } - public abstract void process() throws IOException; + abstract void process() throws IOException; } - + + /** + * Instantiated when a server's lease has expired, meaning it has crashed. + * The region server's log file needs to be split up for each region it was + * serving, and the regions need to get reassigned. + */ private class PendingServerShutdown extends PendingOperation { - private String deadServer; + private HServerAddress deadServer; + private String deadServerName; private long oldStartCode; private class ToDoEntry { @@ -1052,13 +1176,13 @@ private class ToDoEntry { boolean deleteRegion; boolean regionOffline; - HStoreKey key; + Text row; HRegionInfo info; - ToDoEntry(HStoreKey key, HRegionInfo info) { + ToDoEntry(Text row, HRegionInfo info) { this.deleteRegion = false; this.regionOffline = false; - this.key = key; + this.row = row; this.info = info; } } @@ -1063,12 +1187,14 @@ } } - public PendingServerShutdown(HServerInfo serverInfo) { + PendingServerShutdown(HServerInfo serverInfo) { super(); - this.deadServer = serverInfo.getServerAddress().toString(); + this.deadServer = serverInfo.getServerAddress(); + this.deadServerName = this.deadServer.toString(); this.oldStartCode = serverInfo.getStartCode(); } + /** Finds regions that the dead region server was serving */ private void scanMetaRegion(HRegionInterface server, long scannerId, Text regionName) throws IOException { @@ -1078,11 +1204,10 @@ DataInputBuffer inbuf = new DataInputBuffer(); try { while(true) { - LabelledData[] values = null; + KeyedData[] values = null; - HStoreKey key = new HStoreKey(); try { - values = server.next(scannerId, key); + values = server.next(scannerId); } catch(NotServingRegionException e) { throw e; @@ -1097,13 +1222,24 @@ } TreeMap results = new TreeMap(); + Text row = null; + byte[] bytes = null; for(int i = 0; i < values.length; i++) { - byte[] bytes = new byte[values[i].getData().getSize()]; + if(row == null) { + row = values[i].getKey().getRow(); + + } else { + if(!row.equals(values[i].getKey().getRow())) { + LOG.error("Multiple rows in same scanner result set. firstRow=" + + row + ", currentRow=" + values[i].getKey().getRow()); + } + } + bytes = new byte[values[i].getData().getSize()]; System.arraycopy(values[i].getData().get(), 0, bytes, 0, bytes.length); - results.put(values[i].getLabel(), bytes); + results.put(values[i].getKey().getColumn(), bytes); } - byte[] bytes = results.get(COL_SERVER); + bytes = results.get(COL_SERVER); String serverName = null; if(bytes == null || bytes.length == 0) { // No server @@ -1117,7 +1253,7 @@ break; } - if(deadServer.compareTo(serverName) != 0) { + if(deadServerName.compareTo(serverName) != 0) { // This isn't the server you're looking for - move along continue; } @@ -1128,10 +1264,9 @@ continue; } long startCode = -1L; - try { - startCode = Long.valueOf(new String(bytes, UTF8_ENCODING)). - longValue(); + startCode = + Long.valueOf(new String(bytes, UTF8_ENCODING)).longValue(); } catch(UnsupportedEncodingException e) { LOG.error(e); break; @@ -1150,7 +1285,6 @@ } inbuf.reset(bytes, bytes.length); HRegionInfo info = new HRegionInfo(); - try { info.readFields(inbuf); @@ -1160,17 +1294,21 @@ } if(LOG.isDebugEnabled()) { - LOG.debug(serverName + " was serving " + info.regionName); + LOG.debug(serverName + " was serving " + info.toString()); } - ToDoEntry todo = new ToDoEntry(key, info); + if(info.tableDesc.getName().equals(META_TABLE_NAME)) { + allMetaRegionsScanned = false; + } + + ToDoEntry todo = new ToDoEntry(row, info); toDoList.add(todo); - if(killList.containsKey(deadServer)) { - TreeMap regionsToKill = killList.get(deadServer); + if(killList.containsKey(deadServerName)) { + TreeMap regionsToKill = killList.get(deadServerName); if(regionsToKill.containsKey(info.regionName)) { regionsToKill.remove(info.regionName); - killList.put(deadServer, regionsToKill); + killList.put(deadServerName, regionsToKill); unassignedRegions.remove(info.regionName); assignAttempts.remove(info.regionName); @@ -1209,7 +1347,7 @@ for(int i = 0; i < toDoList.size(); i++) { ToDoEntry e = toDoList.get(i); - long lockid = server.startUpdate(regionName, clientId, e.key.getRow()); + long lockid = server.startUpdate(regionName, clientId, e.row); if(e.deleteRegion) { server.delete(regionName, clientId, lockid, COL_REGIONINFO); @@ -1237,12 +1375,27 @@ assignAttempts.put(region, 0L); } } - - public void process() throws IOException { + + @Override + void process() throws IOException { if(LOG.isDebugEnabled()) { - LOG.debug("server shutdown: " + deadServer); + LOG.debug("server shutdown: " + deadServerName); } + + // Process the old log file + + HLog.splitLog(dir, new Path(dir, "log" + "_" + deadServer.getBindAddress() + + "_" + deadServer.getPort()), fs, conf); + if(rootRegionLocation != null + && deadServerName.equals(rootRegionLocation.toString())) { + + rootRegionLocation = null; + unassignedRegions.put(HGlobals.rootRegionInfo.regionName, + HGlobals.rootRegionInfo); + assignAttempts.put(HGlobals.rootRegionInfo.regionName, 0L); + } + // Scan the ROOT region HRegionInterface server = null; @@ -1248,7 +1401,9 @@ HRegionInterface server = null; long scannerId = -1L; for(int tries = 0; tries < numRetries; tries ++) { - waitForRootRegion(); // Wait until the root region is available + if(waitForRootRegionOrClose()) {// Wait until the root region is available + return; // We're shutting down. Forget it. + } server = client.getHRegionConnection(rootRegionLocation); scannerId = -1L; @@ -1253,6 +1408,7 @@ scannerId = -1L; try { + LOG.debug("scanning root region"); scannerId = server.openScanner(HGlobals.rootRegionInfo.regionName, columns, startRow); scanMetaRegion(server, scannerId, HGlobals.rootRegionInfo.regionName); break; @@ -1269,7 +1425,9 @@ for(int tries = 0; tries < numRetries; tries ++) { try { - metaScanner.waitForMetaScan(); + if(metaScanner.waitForMetaScanOrClose()) { + return; // We're shutting down. Forget it. + } for(Iterator i = knownMetaRegions.values().iterator(); i.hasNext(); ) { @@ -1295,7 +1453,10 @@ } } - /** PendingCloseReport is a close message that is saved in a different thread. */ + /** + * PendingCloseReport is instantiated when a region server reports that it + * has closed a region. + */ private class PendingCloseReport extends PendingOperation { private HRegionInfo regionInfo; private boolean reassignRegion; @@ -1302,7 +1463,7 @@ private boolean deleteRegion; private boolean rootRegion; - public PendingCloseReport(HRegionInfo regionInfo, boolean reassignRegion, + PendingCloseReport(HRegionInfo regionInfo, boolean reassignRegion, boolean deleteRegion) { super(); @@ -1314,7 +1475,7 @@ // If the region closing down is a meta region then we need to update // the ROOT table - if(this.regionInfo.regionName.find(HGlobals.metaTableDesc.getName().toString()) == 0) { + if(this.regionInfo.tableDesc.getName().equals(META_TABLE_NAME)) { this.rootRegion = true; } else { @@ -1322,7 +1483,8 @@ } } - public void process() throws IOException { + @Override + void process() throws IOException { for(int tries = 0; tries < numRetries; tries ++) { // We can not access any meta region if they have not already been assigned @@ -1328,7 +1490,9 @@ // We can not access any meta region if they have not already been assigned // and scanned. - metaScanner.waitForMetaScan(); + if(metaScanner.waitForMetaScanOrClose()) { + return; // We're shutting down. Forget it. + } if(LOG.isDebugEnabled()) { LOG.debug("region closed: " + regionInfo.regionName); @@ -1340,7 +1504,9 @@ HRegionInterface server; if (rootRegion) { metaRegionName = HGlobals.rootRegionInfo.regionName; - waitForRootRegion(); // Make sure root region available + if(waitForRootRegionOrClose()) {// Make sure root region available + return; // We're shutting down. Forget it. + } server = client.getHRegionConnection(rootRegionLocation); } else { @@ -1404,7 +1570,11 @@ } } - /** PendingOpenReport is an open message that is saved in a different thread. */ + /** + * PendingOpenReport is instantiated when a region server reports that it is + * serving a region. This applies to all meta and user regions except the + * root region which is handled specially. + */ private class PendingOpenReport extends PendingOperation { private boolean rootRegion; private Text regionName; @@ -1411,8 +1581,8 @@ private BytesWritable serverAddress; private BytesWritable startCode; - public PendingOpenReport(HServerInfo info, Text regionName) { - if(regionName.find(HGlobals.metaTableDesc.getName().toString()) == 0) { + PendingOpenReport(HServerInfo info, HRegionInfo region) { + if(region.tableDesc.getName().equals(META_TABLE_NAME)) { // The region which just came on-line is a META region. // We need to look in the ROOT region for its information. @@ -1425,7 +1595,7 @@ this.rootRegion = false; } - this.regionName = regionName; + this.regionName = region.regionName; try { this.serverAddress = new BytesWritable( @@ -1440,7 +1610,8 @@ } - public void process() throws IOException { + @Override + void process() throws IOException { for(int tries = 0; tries < numRetries; tries ++) { // We can not access any meta region if they have not already been assigned @@ -1446,7 +1617,9 @@ // We can not access any meta region if they have not already been assigned // and scanned. - metaScanner.waitForMetaScan(); + if(metaScanner.waitForMetaScanOrClose()) { + return; // We're shutting down. Forget it. + } if(LOG.isDebugEnabled()) { LOG.debug(regionName + " open on " @@ -1459,7 +1632,9 @@ HRegionInterface server; if(rootRegion) { metaRegionName = HGlobals.rootRegionInfo.regionName; - waitForRootRegion(); // Make sure root region available + if(waitForRootRegionOrClose()) {// Make sure root region available + return; // We're shutting down. Forget it. + } server = client.getHRegionConnection(rootRegionLocation); } else { @@ -1489,24 +1664,7 @@ throw e; } } - } - } - } - - synchronized void waitForRootRegion() { - while (rootRegionLocation == null) { - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Wait for root region"); - } - wait(); - if (LOG.isDebugEnabled()) { - LOG.debug("Wake from wait for root region"); - } - } catch(InterruptedException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Wake from wait for root region (IE)"); - } + pendingRegions.remove(regionName); } } } @@ -1515,6 +1673,9 @@ // HMasterInterface ////////////////////////////////////////////////////////////////////////////// + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.HMasterInterface#isMasterRunning() + */ public boolean isMasterRunning() { return !closed; } @@ -1519,6 +1680,27 @@ return !closed; } + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.HMasterInterface#shutdown() + */ + public void shutdown() { + TimerTask tt = new TimerTask() { + @Override + public void run() { + closed = true; + synchronized(msgQueue) { + msgQueue.clear(); // Empty the queue + msgQueue.notifyAll(); // Wake main thread + } + } + }; + Timer t = new Timer("Shutdown"); + t.schedule(tt, 10); + } + + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.HMasterInterface#createTable(org.apache.hadoop.hbase.HTableDescriptor) + */ public void createTable(HTableDescriptor desc) throws IOException { if (!isMasterRunning()) { throw new MasterNotRunningException(); @@ -1530,7 +1712,9 @@ // We can not access any meta region if they have not already been assigned // and scanned. - metaScanner.waitForMetaScan(); + if(metaScanner.waitForMetaScanOrClose()) { + return; // We're shutting down. Forget it. + } // 1. Check to see if table already exists @@ -1630,25 +1814,6 @@ new ChangeTableState(tableName, true).process(); } - /** - * Turn off the HMaster. Sets a flag so that the main thread know to shut - * things down in an orderly fashion. - */ - public void shutdown() { - TimerTask tt = new TimerTask() { - @Override - public void run() { - closed = true; - synchronized(msgQueue) { - msgQueue.clear(); // Empty the queue - msgQueue.notifyAll(); // Wake main thread - } - } - }; - Timer t = new Timer("Shutdown"); - t.schedule(tt, 10); - } - /* (non-Javadoc) * @see org.apache.hadoop.hbase.HMasterInterface#findRootRegion() */ @@ -1682,7 +1847,9 @@ // We can not access any meta region if they have not already been // assigned and scanned. - metaScanner.waitForMetaScan(); + if(metaScanner.waitForMetaScanOrClose()) { + return; // We're shutting down. Forget it. + } Text firstMetaRegion = null; if(knownMetaRegions.size() == 1) { @@ -1698,7 +1865,7 @@ this.metaRegions.addAll(knownMetaRegions.tailMap(firstMetaRegion).values()); } - public void process() throws IOException { + void process() throws IOException { for(int tries = 0; tries < numRetries; tries++) { boolean tableExists = false; try { @@ -1722,9 +1889,8 @@ String serverName = null; long startCode = -1L; - LabelledData[] values = null; - HStoreKey key = new HStoreKey(); - values = server.next(scannerId, key); + KeyedData[] values = null; + values = server.next(scannerId); if(values == null || values.length == 0) { break; } @@ -1736,7 +1902,8 @@ } System.arraycopy(values[i].getData().get(), 0, bytes, 0, bytes.length); - if(values[i].getLabel().equals(COL_REGIONINFO)) { + Text column = values[i].getKey().getColumn(); + if(column.equals(COL_REGIONINFO)) { haveRegionInfo = true; inbuf.reset(bytes, bytes.length); info.readFields(inbuf); @@ -1741,7 +1908,7 @@ inbuf.reset(bytes, bytes.length); info.readFields(inbuf); - } else if(values[i].getLabel().equals(COL_SERVER)) { + } else if(column.equals(COL_SERVER)) { try { serverName = new String(bytes, UTF8_ENCODING); @@ -1749,7 +1916,7 @@ assert(false); } - } else if(values[i].getLabel().equals(COL_STARTCODE)) { + } else if(column.equals(COL_STARTCODE)) { try { startCode = Long.valueOf(new String(bytes, UTF8_ENCODING)); @@ -1828,6 +1995,7 @@ throws IOException; } + /** Instantiated to enable or disable a table */ private class ChangeTableState extends TableOperation { private boolean online; @@ -1836,7 +2004,7 @@ protected long lockid; protected long clientId; - public ChangeTableState(Text tableName, boolean onLine) throws IOException { + ChangeTableState(Text tableName, boolean onLine) throws IOException { super(tableName); this.online = onLine; } @@ -1841,6 +2009,7 @@ this.online = onLine; } + @Override protected void processScanItem(String serverName, long startCode, HRegionInfo info) throws IOException { @@ -1854,6 +2023,7 @@ } } + @Override protected void postProcessMeta(MetaRegion m, HRegionInterface server) throws IOException { // Process regions not being served @@ -1958,13 +2128,19 @@ } } - + + /** + * Instantiated to delete a table + * Note that it extends ChangeTableState, which takes care of disabling + * the table. + */ private class TableDelete extends ChangeTableState { - public TableDelete(Text tableName) throws IOException { + TableDelete(Text tableName) throws IOException { super(tableName, false); } + @Override protected void postProcessMeta(MetaRegion m, HRegionInterface server) throws IOException { // For regions that are being served, mark them for deletion @@ -2001,9 +2177,11 @@ super(tableName); } + @Override protected void processScanItem( @SuppressWarnings("unused") String serverName, - @SuppressWarnings("unused") long startCode, final HRegionInfo info) + @SuppressWarnings("unused") long startCode, + final HRegionInfo info) throws IOException { if(isEnabled(info)) { throw new TableNotDisabledException(tableName.toString()); @@ -2050,11 +2228,12 @@ } } } - + + /** Instantiated to remove a column family from a table */ private class DeleteColumn extends ColumnOperation { private Text columnName; - public DeleteColumn(Text tableName, Text columnName) throws IOException { + DeleteColumn(Text tableName, Text columnName) throws IOException { super(tableName); this.columnName = columnName; } @@ -2059,6 +2238,7 @@ this.columnName = columnName; } + @Override protected void postProcessMeta(MetaRegion m, HRegionInterface server) throws IOException { @@ -2085,11 +2265,12 @@ } } } - + + /** Instantiated to add a column family to a table */ private class AddColumn extends ColumnOperation { private HColumnDescriptor newColumn; - public AddColumn(Text tableName, HColumnDescriptor newColumn) + AddColumn(Text tableName, HColumnDescriptor newColumn) throws IOException { super(tableName); @@ -2096,6 +2277,7 @@ this.newColumn = newColumn; } + @Override protected void postProcessMeta(MetaRegion m, HRegionInterface server) throws IOException { @@ -2101,9 +2283,9 @@ for(HRegionInfo i: unservedRegions) { - //TODO: I *think* all we need to do to add a column is add it to - // the table descriptor. When the region is brought on-line, it - // should find the column missing and create it. + // All we need to do to add a column is add it to the table descriptor. + // When the region is brought on-line, it will find the column missing + // and create it. i.tableDesc.addFamily(newColumn); updateRegionInfo(server, m.regionName, i); @@ -2114,19 +2296,32 @@ ////////////////////////////////////////////////////////////////////////////// // Managing leases ////////////////////////////////////////////////////////////////////////////// - - private class ServerExpirer extends LeaseListener { + + /** Instantiated to monitor the health of a region server */ + private class ServerExpirer implements LeaseListener { private String server; - public ServerExpirer(String server) { + ServerExpirer(String server) { this.server = server; } + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.LeaseListener#leaseExpired() + */ public void leaseExpired() { LOG.info(server + " lease expired"); HServerInfo storedInfo = serversToServerInfo.remove(server); + if(rootRegionLocation != null + && rootRegionLocation.toString().equals( + storedInfo.getServerAddress().toString())) { + + rootRegionLocation = null; + unassignedRegions.put(HGlobals.rootRegionInfo.regionName, + HGlobals.rootRegionInfo); + assignAttempts.put(HGlobals.rootRegionInfo.regionName, 0L); + } synchronized(msgQueue) { - msgQueue.add(new PendingServerShutdown(storedInfo)); + msgQueue.addLast(new PendingServerShutdown(storedInfo)); msgQueue.notifyAll(); } } @@ -2142,6 +2337,10 @@ System.exit(0); } + /** + * Main program + * @param args + */ public static void main(String [] args) { if (args.length < 1) { printUsageAndExit(); Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java (revision 545790) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java (working copy) @@ -31,9 +31,13 @@ import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.Text; -public class HMerge implements HConstants { - private static final Log LOG = LogFactory.getLog(HMerge.class); - private static final Text[] META_COLS = {COL_REGIONINFO}; +/** + * A non-instantiable class that has a static method capable of compacting + * a table by merging adjacent regions that have grown too small. + */ +class HMerge implements HConstants { + static final Log LOG = LogFactory.getLog(HMerge.class); + static final Text[] META_COLS = {COL_REGIONINFO}; private HMerge() {} // Not instantiable @@ -93,7 +97,7 @@ this.more = true; this.key = new HStoreKey(); this.info = new HRegionInfo(); - this.dir = new Path(conf.get(HREGION_DIR, DEFAULT_HREGION_DIR)); + this.dir = new Path(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR)); this.basedir = new Path(dir, "merge_" + System.currentTimeMillis()); fs.mkdirs(basedir); this.hlog = new HLog(fs, new Path(basedir, HREGION_LOGDIR_NAME), conf); @@ -99,7 +103,7 @@ this.hlog = new HLog(fs, new Path(basedir, HREGION_LOGDIR_NAME), conf); } - public void process() throws IOException { + void process() throws IOException { try { while(more) { TreeSet regionsToMerge = next(); @@ -110,7 +114,7 @@ } } finally { try { - hlog.close(); + hlog.closeAndDelete(); } catch(IOException e) { LOG.error(e); @@ -137,7 +141,7 @@ for(int i = 0; i < regions.length - 1; i++) { if(currentRegion == null) { currentRegion = - new HRegion(dir, hlog, fs, conf, regions[i], null, null); + new HRegion(dir, hlog, fs, conf, regions[i], null); currentSize = currentRegion.largestHStore(); } @@ -142,7 +146,7 @@ currentSize = currentRegion.largestHStore(); } nextRegion = - new HRegion(dir, hlog, fs, conf, regions[i + 1], null, null); + new HRegion(dir, hlog, fs, conf, regions[i + 1], null); nextSize = nextRegion.largestHStore(); @@ -164,10 +168,9 @@ i++; continue; - } else { - LOG.info("not merging regions " + currentRegion.getRegionName() - + " and " + nextRegion.getRegionName()); } + LOG.info("not merging regions " + currentRegion.getRegionName() + + " and " + nextRegion.getRegionName()); currentRegion.close(); currentRegion = nextRegion; @@ -184,7 +187,8 @@ HRegion newRegion) throws IOException; } - + + /** Instantiated to compact a normal user table */ private static class OnlineMerger extends Merger { private HClient client; private HScannerInterface metaScanner; @@ -190,7 +194,7 @@ private HScannerInterface metaScanner; private HRegionInfo latestRegion; - public OnlineMerger(Configuration conf, FileSystem fs, HClient client, + OnlineMerger(Configuration conf, FileSystem fs, HClient client, Text tableName) throws IOException { super(conf, fs, tableName); @@ -231,6 +235,7 @@ } } + @Override protected TreeSet next() throws IOException { TreeSet regions = new TreeSet(); if(latestRegion == null) { @@ -245,7 +250,8 @@ } return regions; } - + + @Override protected void updateMeta(Text oldRegion1, Text oldRegion2, HRegion newRegion) throws IOException { Text[] regionsToDelete = { @@ -307,6 +313,7 @@ } } + /** Instantiated to compact the meta region */ private static class OfflineMerger extends Merger { private Path dir; private TreeSet metaRegions; @@ -312,11 +319,11 @@ private TreeSet metaRegions; private TreeMap results; - public OfflineMerger(Configuration conf, FileSystem fs, Text tableName) + OfflineMerger(Configuration conf, FileSystem fs, Text tableName) throws IOException { super(conf, fs, tableName); - this.dir = new Path(conf.get(HREGION_DIR, DEFAULT_HREGION_DIR)); + this.dir = new Path(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR)); this.metaRegions = new TreeSet(); this.results = new TreeMap(); @@ -322,8 +329,8 @@ // Scan root region to find all the meta regions - HRegion root = new HRegion(dir, hlog,fs, conf, HGlobals.rootRegionInfo, - null, null); + HRegion root = + new HRegion(dir, hlog,fs, conf, HGlobals.rootRegionInfo, null); HInternalScannerInterface rootScanner = root.getScanner(META_COLS, new Text()); @@ -350,6 +357,7 @@ } } + @Override protected TreeSet next() throws IOException { more = false; return metaRegions; @@ -354,7 +362,8 @@ more = false; return metaRegions; } - + + @Override protected void updateMeta(Text oldRegion1, Text oldRegion2, HRegion newRegion) throws IOException { @@ -359,7 +368,7 @@ HRegion newRegion) throws IOException { HRegion root = - new HRegion(dir, hlog, fs, conf, HGlobals.rootRegionInfo, null, null); + new HRegion(dir, hlog, fs, conf, HGlobals.rootRegionInfo, null); Text[] regionsToDelete = { oldRegion1, Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java (revision 545790) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java (working copy) @@ -24,17 +24,36 @@ * HRegionServers. ******************************************************************************/ public class HMsg implements Writable { + + // Messages sent from master to region server + + /** Start serving the specified region */ public static final byte MSG_REGION_OPEN = 1; + + /** Stop serving the specified region */ public static final byte MSG_REGION_CLOSE = 2; - public static final byte MSG_REGION_MERGE = 3; + + /** Region server is unknown to master. Restart */ public static final byte MSG_CALL_SERVER_STARTUP = 4; + + /** Master tells region server to stop */ public static final byte MSG_REGIONSERVER_STOP = 5; + + /** Stop serving the specified region and don't report back that it's closed */ public static final byte MSG_REGION_CLOSE_WITHOUT_REPORT = 6; + // Messages sent from the region server to the master + + /** region server is now serving the specified region */ public static final byte MSG_REPORT_OPEN = 100; + + /** region server is no longer serving the specified region */ public static final byte MSG_REPORT_CLOSE = 101; - public static final byte MSG_REGION_SPLIT = 102; + + /** region server is now serving a region produced by a region split */ public static final byte MSG_NEW_REGION = 103; + + /** region server is shutting down */ public static final byte MSG_REPORT_EXITING = 104; byte msg; @@ -40,6 +59,7 @@ byte msg; HRegionInfo info; + /** Default constructor. Used during deserialization */ public HMsg() { this.info = new HRegionInfo(); } @@ -44,6 +64,11 @@ this.info = new HRegionInfo(); } + /** + * Construct a message with an empty HRegionInfo + * + * @param msg - message code + */ public HMsg(byte msg) { this.msg = msg; this.info = new HRegionInfo(); @@ -49,6 +74,12 @@ this.info = new HRegionInfo(); } + /** + * Construct a message with the specified message code and HRegionInfo + * + * @param msg - message code + * @param info - HRegionInfo + */ public HMsg(byte msg, HRegionInfo info) { this.msg = msg; this.info = info; @@ -54,6 +85,10 @@ this.info = info; } + /** + * Accessor + * @return message code + */ public byte getMsg() { return msg; } @@ -58,6 +93,10 @@ return msg; } + /** + * Accessor + * @return HRegionInfo + */ public HRegionInfo getRegionInfo() { return info; } @@ -62,7 +101,56 @@ return info; } - + @Override + public String toString() { + StringBuilder message = new StringBuilder(); + switch(msg) { + case MSG_REGION_OPEN: + message.append("MSG_REGION_OPEN : "); + break; + + case MSG_REGION_CLOSE: + message.append("MSG_REGION_CLOSE : "); + break; + + case MSG_CALL_SERVER_STARTUP: + message.append("MSG_CALL_SERVER_STARTUP : "); + break; + + case MSG_REGIONSERVER_STOP: + message.append("MSG_REGIONSERVER_STOP : "); + break; + + case MSG_REGION_CLOSE_WITHOUT_REPORT: + message.append("MSG_REGION_CLOSE_WITHOUT_REPORT : "); + break; + + case MSG_REPORT_OPEN: + message.append("MSG_REPORT_OPEN : "); + break; + + case MSG_REPORT_CLOSE: + message.append("MSG_REPORT_CLOSE : "); + break; + + case MSG_NEW_REGION: + message.append("MSG_NEW_REGION : "); + break; + + case MSG_REPORT_EXITING: + message.append("MSG_REPORT_EXITING : "); + break; + + default: + message.append("unknown message code ("); + message.append(msg); + message.append(") : "); + break; + } + message.append(info == null ? "null" : info.toString()); + return message.toString(); + } + ////////////////////////////////////////////////////////////////////////////// // Writable ////////////////////////////////////////////////////////////////////////////// @@ -67,7 +155,10 @@ // Writable ////////////////////////////////////////////////////////////////////////////// - public void write(DataOutput out) throws IOException { + /* (non-Javadoc) + * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput) + */ + public void write(DataOutput out) throws IOException { out.writeByte(msg); info.write(out); } @@ -72,7 +163,10 @@ info.write(out); } - public void readFields(DataInput in) throws IOException { + /* (non-Javadoc) + * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput) + */ + public void readFields(DataInput in) throws IOException { this.msg = in.readByte(); this.info.readFields(in); } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (revision 545790) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (working copy) @@ -55,7 +55,7 @@ * regionName is a unique identifier for this HRegion. (startKey, endKey] * defines the keyspace for this HRegion. */ -public class HRegion implements HConstants { +class HRegion implements HConstants { static String SPLITDIR = "splits"; static String MERGEDIR = "merges"; static String TMPREGION_PREFIX = "tmpregion_"; @@ -72,7 +72,7 @@ * @param regionName - name of the region to delete * @throws IOException */ - public static void deleteRegion(FileSystem fs, Path baseDirectory, + static void deleteRegion(FileSystem fs, Path baseDirectory, Text regionName) throws IOException { LOG.debug("Deleting region " + regionName); fs.delete(HStoreFile.getHRegionDir(baseDirectory, regionName)); @@ -83,7 +83,7 @@ * HRegionServer. Returns a brand-new active HRegion, also * running on the current HRegionServer. */ - public static HRegion closeAndMerge(HRegion srcA, HRegion srcB) throws IOException { + static HRegion closeAndMerge(HRegion srcA, HRegion srcB) throws IOException { // Make sure that srcA comes first; important for key-ordering during // write of the merged file. @@ -110,7 +110,7 @@ Configuration conf = srcA.getConf(); HTableDescriptor tabledesc = srcA.getTableDesc(); HLog log = srcA.getLog(); - Path dir = srcA.getDir(); + Path rootDir = srcA.getRootDir(); Text startKey = srcA.getStartKey(); Text endKey = srcB.getEndKey(); @@ -222,8 +222,8 @@ // Done - HRegion dstRegion = new HRegion(dir, log, fs, conf, newRegionInfo, - newRegionDir, null); + HRegion dstRegion = new HRegion(rootDir, log, fs, conf, newRegionInfo, + newRegionDir); // Get rid of merges directory @@ -234,59 +234,6 @@ return dstRegion; } - /** - * Internal method to create a new HRegion. Used by createTable and by the - * bootstrap code in the HMaster constructor - * - * @param fs - file system to create region in - * @param dir - base directory - * @param conf - configuration object - * @param desc - table descriptor - * @param regionId - region id - * @param startKey - first key in region - * @param endKey - last key in region - * @return - new HRegion - * @throws IOException - */ - public static HRegion createNewHRegion(FileSystem fs, Path dir, - Configuration conf, HTableDescriptor desc, long regionId, Text startKey, - Text endKey) throws IOException { - - HRegionInfo info = new HRegionInfo(regionId, desc, startKey, endKey); - Path regionDir = HStoreFile.getHRegionDir(dir, info.regionName); - fs.mkdirs(regionDir); - - return new HRegion(dir, - new HLog(fs, new Path(regionDir, HREGION_LOGDIR_NAME), conf), - fs, conf, info, null, null); - } - - /** - * Inserts a new table's meta information into the meta table. Used by - * the HMaster bootstrap code. - * - * @param meta - HRegion to be updated - * @param table - HRegion of new table - * - * @throws IOException - */ - public static void addRegionToMeta(HRegion meta, HRegion table) - throws IOException { - - // The row key is the region name - - long writeid = meta.startUpdate(table.getRegionName()); - - ByteArrayOutputStream bytes = new ByteArrayOutputStream(); - DataOutputStream s = new DataOutputStream(bytes); - - table.getRegionInfo().write(s); - - meta.put(writeid, COL_REGIONINFO, new BytesWritable(bytes.toByteArray())); - - meta.commit(writeid); - } - ////////////////////////////////////////////////////////////////////////////// // Members ////////////////////////////////////////////////////////////////////////////// @@ -299,7 +246,7 @@ HMemcache memcache; - Path dir; + Path rootDir; HLog log; FileSystem fs; Configuration conf; @@ -307,10 +254,10 @@ Path regiondir; static class WriteState { - public volatile boolean writesOngoing; - public volatile boolean writesEnabled; - public volatile boolean closed; - public WriteState() { + volatile boolean writesOngoing; + volatile boolean writesEnabled; + volatile boolean closed; + WriteState() { this.writesOngoing = true; this.writesEnabled = true; this.closed = false; @@ -340,18 +287,22 @@ * appropriate log info for this HRegion. If there is a previous log file * (implying that the HRegion has been written-to before), then read it from * the supplied path. + * + * @param rootDir root directory for HBase instance + * @param log HLog where changes should be committed * @param fs is the filesystem. - * @param dir dir is where the HRegion is stored. * @param conf is global configuration settings. + * @param regionInfo - HRegionInfo that describes the region * @param initialFiles If there are initial files (implying that the HRegion * is new), then read them from the supplied path. + * * @throws IOException */ - public HRegion(Path dir, HLog log, FileSystem fs, Configuration conf, - HRegionInfo regionInfo, Path initialFiles, Path oldLogFile) + HRegion(Path rootDir, HLog log, FileSystem fs, Configuration conf, + HRegionInfo regionInfo, Path initialFiles) throws IOException { - this.dir = dir; + this.rootDir = rootDir; this.log = log; this.fs = fs; this.conf = conf; @@ -366,7 +317,8 @@ // Declare the regionName. This is a unique string for the region, used to // build a unique filename. - this.regiondir = HStoreFile.getHRegionDir(dir, this.regionInfo.regionName); + this.regiondir = HStoreFile.getHRegionDir(rootDir, this.regionInfo.regionName); + Path oldLogFile = new Path(regiondir, HREGION_OLDLOGFILE_NAME); // Move prefab HStore files into place (if any) @@ -378,7 +330,7 @@ for(Map.Entry e : this.regionInfo.tableDesc.families().entrySet()) { Text colFamily = HStoreKey.extractFamily(e.getKey()); - stores.put(colFamily, new HStore(dir, this.regionInfo.regionName, + stores.put(colFamily, new HStore(rootDir, this.regionInfo.regionName, e.getValue(), fs, oldLogFile, conf)); } @@ -411,7 +363,7 @@ } /** Returns a HRegionInfo object for this region */ - public HRegionInfo getRegionInfo() { + HRegionInfo getRegionInfo() { return this.regionInfo; } @@ -416,7 +368,7 @@ } /** returns true if region is closed */ - public boolean isClosed() { + boolean isClosed() { boolean closed = false; synchronized(writestate) { closed = writestate.closed; @@ -434,7 +386,7 @@ * This method could take some time to execute, so don't call it from a * time-sensitive thread. */ - public Vector close() throws IOException { + Vector close() throws IOException { lock.obtainWriteLock(); try { boolean shouldClose = false; @@ -483,7 +435,7 @@ * * Returns two brand-new (and open) HRegions */ - public HRegion[] closeAndSplit(Text midKey, RegionUnavailableListener listener) + HRegion[] closeAndSplit(Text midKey, RegionUnavailableListener listener) throws IOException { if(((regionInfo.startKey.getLength() != 0) @@ -572,9 +524,9 @@ // Done - HRegion regionA = new HRegion(dir, log, fs, conf, regionAInfo, dirA, null); + HRegion regionA = new HRegion(rootDir, log, fs, conf, regionAInfo, dirA); - HRegion regionB = new HRegion(dir, log, fs, conf, regionBInfo, dirB, null); + HRegion regionB = new HRegion(rootDir, log, fs, conf, regionBInfo, dirB); // Cleanup @@ -596,43 +548,43 @@ // HRegion accessors ////////////////////////////////////////////////////////////////////////////// - public Text getStartKey() { + Text getStartKey() { return regionInfo.startKey; } - public Text getEndKey() { + Text getEndKey() { return regionInfo.endKey; } - public long getRegionId() { + long getRegionId() { return regionInfo.regionId; } - public Text getRegionName() { + Text getRegionName() { return regionInfo.regionName; } - public Path getDir() { - return dir; + Path getRootDir() { + return rootDir; } - public HTableDescriptor getTableDesc() { + HTableDescriptor getTableDesc() { return regionInfo.tableDesc; } - public HLog getLog() { + HLog getLog() { return log; } - public Configuration getConf() { + Configuration getConf() { return conf; } - public Path getRegionDir() { + Path getRegionDir() { return regiondir; } - public FileSystem getFilesystem() { + FileSystem getFilesystem() { return fs; } @@ -652,7 +604,7 @@ * @param midKey - (return value) midKey of the largest MapFile * @return - true if the region should be split */ - public boolean needsSplit(Text midKey) { + boolean needsSplit(Text midKey) { lock.obtainReadLock(); try { Text key = new Text(); @@ -675,7 +627,7 @@ /** * @return - returns the size of the largest HStore */ - public long largestHStore() { + long largestHStore() { long maxsize = 0; lock.obtainReadLock(); try { @@ -697,7 +649,7 @@ /** * @return true if the region should be compacted. */ - public boolean needsCompaction() { + boolean needsCompaction() { boolean needsCompaction = false; this.lock.obtainReadLock(); try { @@ -726,7 +678,7 @@ * HRegion is busy doing something else storage-intensive (like flushing the * cache). The caller should check back later. */ - public boolean compactStores() throws IOException { + boolean compactStores() throws IOException { boolean shouldCompact = false; lock.obtainReadLock(); try { @@ -766,7 +718,7 @@ * Each HRegion is given a periodic chance to flush the cache, which it should * only take if there have been a lot of uncommitted writes. */ - public void optionallyFlush() throws IOException { + void optionallyFlush() throws IOException { if(commitsSinceFlush > maxUnflushedEntries) { if (LOG.isDebugEnabled()) { LOG.debug("Flushing cache. Number of commits is: " + commitsSinceFlush); @@ -792,8 +744,7 @@ * This method may block for some time, so it should not be called from a * time-sensitive thread. */ - public Vector flushcache(boolean disableFutureWrites) - throws IOException { + Vector flushcache(boolean disableFutureWrites) throws IOException { boolean shouldFlush = false; synchronized(writestate) { if((! writestate.writesOngoing) @@ -934,7 +885,7 @@ ////////////////////////////////////////////////////////////////////////////// /** Fetch a single data item. */ - public BytesWritable get(Text row, Text column) throws IOException { + BytesWritable get(Text row, Text column) throws IOException { BytesWritable[] results = get(row, column, Long.MAX_VALUE, 1); return (results == null)? null: results[0]; } @@ -940,7 +891,7 @@ } /** Fetch multiple versions of a single data item */ - public BytesWritable[] get(Text row, Text column, int numVersions) throws IOException { + BytesWritable[] get(Text row, Text column, int numVersions) throws IOException { return get(row, column, Long.MAX_VALUE, numVersions); } @@ -945,7 +896,7 @@ } /** Fetch multiple versions of a single data item, with timestamp. */ - public BytesWritable[] get(Text row, Text column, long timestamp, int numVersions) + BytesWritable[] get(Text row, Text column, long timestamp, int numVersions) throws IOException { if(writestate.closed) { @@ -969,8 +920,7 @@ } } - // Private implementation: get the value for the indicated HStoreKey - + /** Private implementation: get the value for the indicated HStoreKey */ private BytesWritable[] get(HStoreKey key, int numVersions) throws IOException { lock.obtainReadLock(); @@ -1007,7 +957,7 @@ * determine which column groups are useful for that row. That would let us * avoid a bunch of disk activity. */ - public TreeMap getFull(Text row) throws IOException { + TreeMap getFull(Text row) throws IOException { HStoreKey key = new HStoreKey(row, System.currentTimeMillis()); lock.obtainReadLock(); @@ -1029,7 +979,7 @@ * Return an iterator that scans over the HRegion, returning the indicated * columns. This Iterator must be closed by the caller. */ - public HInternalScannerInterface getScanner(Text[] cols, Text firstRow) + HInternalScannerInterface getScanner(Text[] cols, Text firstRow) throws IOException { lock.obtainReadLock(); try { @@ -1067,7 +1017,7 @@ * @return lockid * @see #put(long, Text, BytesWritable) */ - public long startUpdate(Text row) throws IOException { + long startUpdate(Text row) throws IOException { // We obtain a per-row lock, so other clients will block while one client // performs an update. The read lock is released by the client calling // #commit or #abort or if the HRegionServer lease on the lock expires. @@ -1085,8 +1035,7 @@ * This method really just tests the input, then calls an internal localput() * method. */ - public void put(long lockid, Text targetCol, BytesWritable val) - throws IOException { + void put(long lockid, Text targetCol, BytesWritable val) throws IOException { if(val.getSize() == DELETE_BYTES.getSize() && val.compareTo(DELETE_BYTES) == 0) { throw new IOException("Cannot insert value: " + val); @@ -1097,11 +1046,11 @@ /** * Delete a value or write a value. This is a just a convenience method for put(). */ - public void delete(long lockid, Text targetCol) throws IOException { + void delete(long lockid, Text targetCol) throws IOException { localput(lockid, targetCol, DELETE_BYTES); } - /* + /** * Private implementation. * * localput() is used for both puts and deletes. We just place the values @@ -1148,7 +1097,7 @@ * writes associated with the given row-lock. These values have not yet * been placed in memcache or written to the log. */ - public void abort(long lockid) throws IOException { + void abort(long lockid) throws IOException { Text row = getRowFromLock(lockid); if(row == null) { throw new LockException("No write lock for lockid " + lockid); @@ -1182,7 +1131,7 @@ * @param lockid Lock for row we're to commit. * @throws IOException */ - public void commit(final long lockid) throws IOException { + void commit(final long lockid) throws IOException { // Remove the row from the pendingWrites list so // that repeated executions won't screw this up. Text row = getRowFromLock(lockid); @@ -1286,7 +1235,8 @@ } } - /** Release the row lock! + /** + * Release the row lock! * @param lock Name of row whose lock we are to release */ void releaseRowLock(Text row) { @@ -1309,7 +1259,7 @@ } } - /* + /** * HScanner is an iterator through a bunch of rows in an HRegion. */ private static class HScanner implements HInternalScannerInterface { @@ -1321,7 +1271,7 @@ /** Create an HScanner with a handle on many HStores. */ @SuppressWarnings("unchecked") - public HScanner(Text[] cols, Text firstRow, HMemcache memcache, HStore[] stores) + HScanner(Text[] cols, Text firstRow, HMemcache memcache, HStore[] stores) throws IOException { long scanTime = System.currentTimeMillis(); this.scanners = new HInternalScannerInterface[stores.length + 1]; @@ -1391,9 +1341,12 @@ return multipleMatchers; } - /** + /* (non-Javadoc) + * * Grab the next row's worth of values. The HScanner will return the most * recent data value for each row that is not newer than the target time. + * + * @see org.apache.hadoop.hbase.HInternalScannerInterface#next(org.apache.hadoop.hbase.HStoreKey, java.util.TreeMap) */ public boolean next(HStoreKey key, TreeMap results) throws IOException { @@ -1477,7 +1430,9 @@ } } - /** All done with the scanner. */ + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.HInternalScannerInterface#close() + */ public void close() { for(int i = 0; i < scanners.length; i++) { if(scanners[i] != null) { @@ -1493,7 +1448,7 @@ * Convenience method creating new HRegions. * @param regionId ID to use * @param tableDesc Descriptor - * @param dir Home directory for the new region. + * @param rootDir Root directory of HBase instance * @param conf * @return New META region (ROOT or META). * @throws IOException @@ -1498,11 +1453,12 @@ * @return New META region (ROOT or META). * @throws IOException */ - public static HRegion createHRegion(final long regionId, - final HTableDescriptor tableDesc, final Path dir, final Configuration conf) + static HRegion createHRegion(final long regionId, + final HTableDescriptor tableDesc, final Path rootDir, + final Configuration conf) throws IOException { return createHRegion(new HRegionInfo(regionId, tableDesc, null, null), - dir, conf, null, null); + rootDir, conf, null); } /** @@ -1510,11 +1466,9 @@ * bootstrap code in the HMaster constructor * * @param info Info for region to create. - * @param dir Home dir for new region + * @param rootDir Root directory for HBase instance * @param conf * @param initialFiles InitialFiles to pass new HRegion. Pass null if none. - * @param oldLogFile Old log file to use in region initialization. Pass null - * if none. * @return new HRegion * * @throws IOException @@ -1519,16 +1473,15 @@ * * @throws IOException */ - public static HRegion createHRegion(final HRegionInfo info, - final Path dir, final Configuration conf, final Path initialFiles, - final Path oldLogFile) + static HRegion createHRegion(final HRegionInfo info, + final Path rootDir, final Configuration conf, final Path initialFiles) throws IOException { - Path regionDir = HStoreFile.getHRegionDir(dir, info.regionName); + Path regionDir = HStoreFile.getHRegionDir(rootDir, info.regionName); FileSystem fs = FileSystem.get(conf); fs.mkdirs(regionDir); - return new HRegion(dir, + return new HRegion(rootDir, new HLog(fs, new Path(regionDir, HREGION_LOGDIR_NAME), conf), - fs, conf, info, initialFiles, oldLogFile); + fs, conf, info, initialFiles); } /** @@ -1541,7 +1494,7 @@ * * @throws IOException */ - public static void addRegionToMETA(HRegion meta, HRegion r) + static void addRegionToMETA(HRegion meta, HRegion r) throws IOException { // The row key is the region name long writeid = meta.startUpdate(r.getRegionName()); @@ -1552,7 +1505,7 @@ meta.commit(writeid); } - public static void addRegionToMETA(final HClient client, + static void addRegionToMETA(final HClient client, final Text table, final HRegion region, final HServerAddress serverAddress, final long startCode) @@ -1578,7 +1531,7 @@ * @param regionName Region to remove. * @throws IOException */ - public static void removeRegionFromMETA(final HClient client, + static void removeRegionFromMETA(final HClient client, final Text table, final Text regionName) throws IOException { client.openTable(table); Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegiondirReader.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegiondirReader.java (revision 545790) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegiondirReader.java (working copy) @@ -36,12 +36,12 @@ * file system only. * TODO: Add dumping of HStoreFile content and HLog. */ -public class HRegiondirReader { +class HRegiondirReader { private final Configuration conf; private final Path parentdir; - private static final Pattern REGION_NAME_PARSER = - Pattern.compile(HGlobals.HREGIONDIR_PREFIX + + static final Pattern REGION_NAME_PARSER = + Pattern.compile(HConstants.HREGIONDIR_PREFIX + "([^_]+)_([^_]*)_([^_]*)"); private static final String USAGE = "Usage: " + @@ -50,7 +50,7 @@ private final List infos; - public HRegiondirReader(final HBaseConfiguration conf, + HRegiondirReader(final HBaseConfiguration conf, final String parentdirName) throws IOException { this.conf = conf; @@ -65,6 +65,9 @@ // Look for regions in parentdir. Path [] regiondirs = fs.listPaths(parentdir, new PathFilter() { + /* (non-Javadoc) + * @see org.apache.hadoop.fs.PathFilter#accept(org.apache.hadoop.fs.Path) + */ public boolean accept(Path path) { Matcher m = REGION_NAME_PARSER.matcher(path.getName()); return m != null && m.matches(); @@ -136,12 +139,11 @@ return families.toArray(new Text [] {}); } - public List getRegions() { + List getRegions() { return this.infos; } - public HRegionInfo getRegionInfo(final String tableName) - throws IOException { + HRegionInfo getRegionInfo(final String tableName) { HRegionInfo result = null; for(HRegionInfo i: getRegions()) { if(i.tableDesc.getName().equals(tableName)) { @@ -162,7 +164,7 @@ private void dump(final HRegionInfo info) throws IOException { HRegion r = new HRegion(this.parentdir, null, - FileSystem.get(this.conf), conf, info, null, null); + FileSystem.get(this.conf), conf, info, null); Text [] families = info.tableDesc.families().keySet().toArray(new Text [] {}); HInternalScannerInterface scanner = r.getScanner(families, new Text()); HStoreKey key = new HStoreKey(); @@ -201,6 +203,10 @@ } } + /** + * @param args + * @throws IOException + */ public static void main(String[] args) throws IOException { if (args.length < 1) { System.err.println(USAGE); Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java (revision 545790) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java (working copy) @@ -20,11 +20,9 @@ import java.io.DataInputStream; import java.io.DataOutput; import java.io.IOException; -import java.util.Map; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.io.WritableComparable; /** * HRegion information. @@ -32,13 +30,14 @@ * HRegions' table descriptor, etc. */ public class HRegionInfo implements WritableComparable { - public Text regionName; - public long regionId; - public Text startKey; - public Text endKey; - public boolean offLine; - public HTableDescriptor tableDesc; + Text regionName; + long regionId; + Text startKey; + Text endKey; + boolean offLine; + HTableDescriptor tableDesc; + /** Default constructor - creates empty object */ public HRegionInfo() { this.regionId = 0; this.tableDesc = new HTableDescriptor(); @@ -48,6 +47,12 @@ this.offLine = false; } + /** + * Construct a HRegionInfo object from byte array + * + * @param serializedBytes + * @throws IOException + */ public HRegionInfo(final byte [] serializedBytes) throws IOException { this(); readFields(new DataInputStream(new ByteArrayInputStream(serializedBytes))); @@ -53,9 +58,17 @@ readFields(new DataInputStream(new ByteArrayInputStream(serializedBytes))); } - public HRegionInfo(long regionId, HTableDescriptor tableDesc, - Text startKey, Text endKey) - throws IllegalArgumentException { + /** + * Construct HRegionInfo with explicit parameters + * + * @param regionId - the regionid + * @param tableDesc - the table descriptor + * @param startKey - first key in region + * @param endKey - end of key range + * @throws IllegalArgumentException + */ + public HRegionInfo(long regionId, HTableDescriptor tableDesc, Text startKey, + Text endKey) throws IllegalArgumentException { this.regionId = regionId; Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java (revision 545790) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java (working copy) @@ -26,18 +26,67 @@ * a handle to the HRegionInterface. ******************************************************************************/ public interface HRegionInterface extends VersionedProtocol { - public static final long versionID = 1L; // initial version + /** initial version */ + public static final long versionID = 1L; - // Get metainfo about an HRegion - - public HRegionInfo getRegionInfo(Text regionName) throws NotServingRegionException; + /** + * Get metainfo about an HRegion + * + * @param regionName - name of the region + * @return - HRegionInfo object for region + * @throws NotServingRegionException + */ + public HRegionInfo getRegionInfo(final Text regionName) throws NotServingRegionException; - // GET methods for an HRegion. + /** + * Retrieve a single value from the specified region for the specified row + * and column keys + * + * @param regionName - name of region + * @param row - row key + * @param column - column key + * @return - value for that region/row/column + * @throws IOException + */ + public BytesWritable get(final Text regionName, final Text row, final Text column) throws IOException; - public BytesWritable get(Text regionName, Text row, Text column) throws IOException; - public BytesWritable[] get(Text regionName, Text row, Text column, int numVersions) throws IOException; - public BytesWritable[] get(Text regionName, Text row, Text column, long timestamp, int numVersions) throws IOException; - public LabelledData[] getRow(Text regionName, Text row) throws IOException; + /** + * Get the specified number of versions of the specified row and column + * + * @param regionName - region name + * @param row - row key + * @param column - column key + * @param numVersions - number of versions to return + * @return - array of values + * @throws IOException + */ + public BytesWritable[] get(final Text regionName, final Text row, + final Text column, final int numVersions) throws IOException; + + /** + * Get the specified number of versions of the specified row and column with + * the specified timestamp. + * + * @param regionName - region name + * @param row - row key + * @param column - column key + * @param timestamp - timestamp + * @param numVersions - number of versions to return + * @return - array of values + * @throws IOException + */ + public BytesWritable[] get(final Text regionName, final Text row, final Text column, + final long timestamp, final int numVersions) throws IOException; + + /** + * Get all the data for the specified row + * + * @param regionName - region name + * @param row - row key + * @return - array of values + * @throws IOException + */ + public KeyedData[] getRow(final Text regionName, final Text row) throws IOException; ////////////////////////////////////////////////////////////////////////////// // Start an atomic row insertion/update. No changes are committed until the @@ -50,11 +99,80 @@ // The client can gain extra time with a call to renewLease(). ////////////////////////////////////////////////////////////////////////////// - public long startUpdate(Text regionName, long clientid, Text row) throws IOException; - public void put(Text regionName, long clientid, long lockid, Text column, BytesWritable val) throws IOException; - public void delete(Text regionName, long clientid, long lockid, Text column) throws IOException; - public void abort(Text regionName, long clientid, long lockid) throws IOException; - public void commit(Text regionName, long clientid, long lockid) throws IOException; + /** + * Start an atomic row insertion/update. No changes are committed until the + * call to commit() returns. A call to abort() will abandon any updates in progress. + * + * Callers to this method are given a lease for each unique lockid; before the + * lease expires, either abort() or commit() must be called. If it is not + * called, the system will automatically call abort() on the client's behalf. + * + * The client can gain extra time with a call to renewLease(). + * Start an atomic row insertion or update + * + * @param regionName - region name + * @param clientid - a unique value to identify the client + * @param row - Name of row to start update against. + * @return Row lockid. + * @throws IOException + */ + public long startUpdate(final Text regionName, final long clientid, + final Text row) throws IOException; + + /** + * Change a value for the specified column + * + * @param regionName - region name + * @param clientid - a unique value to identify the client + * @param lockid - lock id returned from startUpdate + * @param column - column whose value is being set + * @param val - new value for column + * @throws IOException + */ + public void put(final Text regionName, final long clientid, final long lockid, + final Text column, final BytesWritable val) throws IOException; + + /** + * Delete the value for a column + * + * @param regionName - region name + * @param clientid - a unique value to identify the client + * @param lockid - lock id returned from startUpdate + * @param column - name of column whose value is to be deleted + * @throws IOException + */ + public void delete(final Text regionName, final long clientid, final long lockid, + final Text column) throws IOException; + + /** + * Abort a row mutation + * + * @param regionName - region name + * @param clientid - a unique value to identify the client + * @param lockid - lock id returned from startUpdate + * @throws IOException + */ + public void abort(final Text regionName, final long clientid, + final long lockid) throws IOException; + + /** + * Finalize a row mutation + * + * @param regionName - region name + * @param clientid - a unique value to identify the client + * @param lockid - lock id returned from startUpdate + * @throws IOException + */ + public void commit(final Text regionName, final long clientid, + final long lockid) throws IOException; + + /** + * Renew lease on update + * + * @param lockid - lock id returned from startUpdate + * @param clientid - a unique value to identify the client + * @throws IOException + */ public void renewLease(long lockid, long clientid) throws IOException; ////////////////////////////////////////////////////////////////////////////// @@ -77,11 +195,10 @@ * Get the next set of values * * @param scannerId - clientId passed to openScanner - * @param key - the next HStoreKey - * @return - true if a value was retrieved + * @return - array of values * @throws IOException */ - public LabelledData[] next(long scannerId, HStoreKey key) throws IOException; + public KeyedData[] next(long scannerId) throws IOException; /** * Close a scanner Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (revision 545790) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (working copy) @@ -19,9 +19,10 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; +import java.util.LinkedList; import java.util.Map; import java.util.Random; +import java.util.SortedMap; import java.util.TreeMap; import java.util.Vector; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -42,9 +43,11 @@ * HRegionServer makes a set of HRegions available to clients. It checks in with * the HMaster. There are many HRegionServers in a single HBase deployment. ******************************************************************************/ -public class HRegionServer - implements HConstants, HRegionInterface, Runnable { +public class HRegionServer implements HConstants, HRegionInterface, Runnable { + /* (non-Javadoc) + * @see org.apache.hadoop.ipc.VersionedProtocol#getProtocolVersion(java.lang.String, long) + */ public long getProtocolVersion(final String protocol, @SuppressWarnings("unused") final long clientVersion) throws IOException { @@ -57,7 +60,8 @@ static final Log LOG = LogFactory.getLog(HRegionServer.class); volatile boolean stopRequested; - private Path regionDir; + volatile boolean abortRequested; + private Path rootDir; HServerInfo info; Configuration conf; private Random rand; @@ -63,7 +67,7 @@ private Random rand; // region name -> HRegion - TreeMap onlineRegions = new TreeMap(); + SortedMap onlineRegions; Map retiringRegions = new HashMap(); final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); @@ -69,6 +73,7 @@ final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private Vector outboundMsgs; + int numRetries; long threadWakeFrequency; private long msgInterval; @@ -78,20 +83,24 @@ private Thread splitOrCompactCheckerThread; Integer splitOrCompactLock = Integer.valueOf(0); - /* + /** * Interface used by the {@link org.apache.hadoop.io.retry} mechanism. */ - interface UpdateMetaInterface { - /* + public interface UpdateMetaInterface { + /** * @return True if succeeded. * @throws IOException */ - boolean update() throws IOException; + public boolean update() throws IOException; } - + + /** Runs periodically to determine if regions need to be compacted or split */ class SplitOrCompactChecker implements Runnable, RegionUnavailableListener { HClient client = new HClient(conf); + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.RegionUnavailableListener#closing(org.apache.hadoop.io.Text) + */ public void closing(final Text regionName) { lock.writeLock().lock(); try { @@ -106,6 +115,9 @@ } } + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.RegionUnavailableListener#closed(org.apache.hadoop.io.Text) + */ public void closed(final Text regionName) { lock.writeLock().lock(); try { @@ -118,6 +130,9 @@ } } + /* (non-Javadoc) + * @see java.lang.Runnable#run() + */ public void run() { while(! stopRequested) { long startTime = System.currentTimeMillis(); @@ -180,7 +195,7 @@ // splitting a 'normal' region, and the ROOT table needs to be // updated if we are splitting a META region. final Text tableToUpdate = - (oldRegion.find(META_TABLE_NAME.toString()) == 0) ? + region.getRegionInfo().tableDesc.getName().equals(META_TABLE_NAME) ? ROOT_TABLE_NAME : META_TABLE_NAME; if(LOG.isDebugEnabled()) { LOG.debug("Updating " + tableToUpdate + " with region split info"); @@ -188,6 +203,9 @@ // Wrap the update of META region with an org.apache.hadoop.io.retry. UpdateMetaInterface implementation = new UpdateMetaInterface() { + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.HRegionServer.UpdateMetaInterface#update() + */ public boolean update() throws IOException { HRegion.removeRegionFromMETA(client, tableToUpdate, region.getRegionName()); @@ -232,7 +250,11 @@ private Flusher cacheFlusher; private Thread cacheFlusherThread; Integer cacheFlusherLock = Integer.valueOf(0); + /** Runs periodically to flush the memcache */ class Flusher implements Runnable { + /* (non-Javadoc) + * @see java.lang.Runnable#run() + */ public void run() { while(! stopRequested) { long startTime = System.currentTimeMillis(); @@ -283,9 +305,9 @@ // File paths private FileSystem fs; - private Path oldlogfile; // Logging + HLog log; private LogRoller logRoller; private Thread logRollerThread; @@ -291,9 +313,7 @@ private Thread logRollerThread; Integer logRollerLock = Integer.valueOf(0); - /** - * Log rolling Runnable. - */ + /** Runs periodically to determine if the log should be rolled */ class LogRoller implements Runnable { private int maxLogEntries = conf.getInt("hbase.regionserver.maxlogentries", 30 * 1000); @@ -298,6 +318,9 @@ private int maxLogEntries = conf.getInt("hbase.regionserver.maxlogentries", 30 * 1000); + /* (non-Javadoc) + * @see java.lang.Runnable#run() + */ public void run() { while(! stopRequested) { synchronized(logRollerLock) { @@ -339,9 +362,13 @@ // Leases private Leases leases; - /** Start a HRegionServer at the default location */ + /** + * Starts a HRegionServer at the default location + * @param conf + * @throws IOException + */ public HRegionServer(Configuration conf) throws IOException { - this(new Path(conf.get(HREGION_DIR, DEFAULT_HREGION_DIR)), + this(new Path(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR)), new HServerAddress(conf.get(REGIONSERVER_ADDRESS, "localhost:0")), conf); } @@ -346,8 +373,14 @@ conf); } - /** Start a HRegionServer at an indicated location */ - public HRegionServer(Path regionDir, HServerAddress address, + /** + * Starts a HRegionServer at the specified location + * @param rootDir + * @param address + * @param conf + * @throws IOException + */ + public HRegionServer(Path rootDir, HServerAddress address, Configuration conf) throws IOException { // Basic setup @@ -352,9 +385,13 @@ // Basic setup this.stopRequested = false; - this.regionDir = regionDir; + this.abortRequested = false; + this.rootDir = rootDir; this.conf = conf; this.rand = new Random(); + this.onlineRegions = + Collections.synchronizedSortedMap(new TreeMap()); + this.outboundMsgs = new Vector(); this.scanners = Collections.synchronizedMap(new TreeMap()); @@ -360,6 +397,7 @@ Collections.synchronizedMap(new TreeMap()); // Config'ed params + this.numRetries = conf.getInt("hbase.client.retries.number", 2); this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000); this.msgInterval = conf.getLong("hbase.regionserver.msginterval", 15 * 1000); @@ -366,21 +404,19 @@ this.splitOrCompactCheckFrequency = conf.getLong("hbase.regionserver.thread.splitcompactcheckfrequency", 60 * 1000); - + // Cache flushing this.cacheFlusher = new Flusher(); - this.cacheFlusherThread = - new Thread(cacheFlusher, "HRegionServer.cacheFlusher"); + this.cacheFlusherThread = new Thread(cacheFlusher); // Check regions to see if they need to be split this.splitOrCompactChecker = new SplitOrCompactChecker(); - this.splitOrCompactCheckerThread = - new Thread(splitOrCompactChecker, "HRegionServer.splitOrCompactChecker"); + this.splitOrCompactCheckerThread = new Thread(splitOrCompactChecker); // Process requests from Master - this.toDo = new Vector(); + this.toDo = new LinkedList(); this.worker = new Worker(); - this.workerThread = new Thread(worker, "HRegionServer.worker"); + this.workerThread = new Thread(worker); try { // Server to handle client requests @@ -398,8 +434,7 @@ this.info.getServerAddress().getBindAddress() + "_" + this.info.getServerAddress().getPort(); - Path newlogdir = new Path(regionDir, "log" + "_" + serverName); - this.oldlogfile = new Path(regionDir, "oldlogfile" + "_" + serverName); + Path logdir = new Path(rootDir, "log" + "_" + serverName); // Logging @@ -404,14 +439,14 @@ // Logging this.fs = FileSystem.get(conf); - HLog.consolidateOldLog(newlogdir, oldlogfile, fs, conf); - // TODO: Now we have a consolidated log for all regions, sort and - // then split result by region passing the splits as reconstruction - // logs to HRegions on start. Or, rather than consolidate, split logs - // into per region files. - this.log = new HLog(fs, newlogdir, conf); + if(fs.exists(logdir)) { + throw new RegionServerRunningException("region server already running at " + + this.info.getServerAddress().toString()); + } + + this.log = new HLog(fs, logdir, conf); this.logRoller = new LogRoller(); - this.logRollerThread = new Thread(logRoller, "HRegionServer.logRoller"); + this.logRollerThread = new Thread(logRoller); // Remote HMaster @@ -420,19 +455,6 @@ new HServerAddress(conf.get(MASTER_ADDRESS)).getInetSocketAddress(), conf); - // Threads - - this.workerThread.start(); - this.cacheFlusherThread.start(); - this.splitOrCompactCheckerThread.start(); - this.logRollerThread.start(); - this.leases = new Leases(conf.getLong("hbase.regionserver.lease.period", - 3 * 60 * 1000), threadWakeFrequency); - - // Server - - this.server.start(); - } catch(IOException e) { this.stopRequested = true; throw e; @@ -437,23 +459,33 @@ this.stopRequested = true; throw e; } - - LOG.info("HRegionServer started at: " + address.toString()); } /** - * Set a flag that will cause all the HRegionServer threads to shut down + * Sets a flag that will cause all the HRegionServer threads to shut down * in an orderly fashion. */ - public synchronized void stop() { + synchronized void stop() { stopRequested = true; notifyAll(); // Wakes run() if it is sleeping } + + /** + * Cause the server to exit without closing the regions it is serving, the + * log it is using and without notifying the master. + * + * FOR DEBUGGING ONLY + */ + synchronized void abort() { + abortRequested = true; + stop(); + } - /** Wait on all threads to finish. + /** + * Wait on all threads to finish. * Presumption is that all closes and stops have already been called. */ - public void join() { + void join() { try { this.workerThread.join(); } catch(InterruptedException iex) { @@ -489,6 +521,33 @@ * load/unload instructions. */ public void run() { + + // Threads + + String threadName = Thread.currentThread().getName(); + + workerThread.setName(threadName + ".worker"); + workerThread.start(); + cacheFlusherThread.setName(threadName + ".cacheFlusher"); + cacheFlusherThread.start(); + splitOrCompactCheckerThread.setName(threadName + ".splitOrCompactChecker"); + splitOrCompactCheckerThread.start(); + logRollerThread.setName(threadName + ".logRoller"); + logRollerThread.start(); + leases = new Leases(conf.getLong("hbase.regionserver.lease.period", + 3 * 60 * 1000), threadWakeFrequency); + + // Server + + try { + this.server.start(); + LOG.info("HRegionServer started at: " + info.getServerAddress().toString()); + + } catch(IOException e) { + LOG.error(e); + stopRequested = true; + } + while(! stopRequested) { long lastMsg = 0; long waitTime; @@ -545,7 +604,6 @@ if (LOG.isDebugEnabled()) { LOG.debug("Got call server startup message"); } - toDo.clear(); closeAllRegions(); restart = true; break; @@ -554,8 +612,6 @@ if (LOG.isDebugEnabled()) { LOG.debug("Got regionserver stop message"); } - toDo.clear(); - closeAllRegions(); stopRequested = true; break; @@ -563,10 +619,15 @@ if (LOG.isDebugEnabled()) { LOG.debug("Got default message"); } - toDo.add(msgs[i]); + toDo.addLast(new ToDoEntry(msgs[i])); } } + if(restart || stopRequested) { + toDo.clear(); + break; + } + if(toDo.size() > 0) { if (LOG.isDebugEnabled()) { LOG.debug("notify on todo"); @@ -573,9 +634,6 @@ } toDo.notifyAll(); } - if(restart || stopRequested) { - break; - } } } catch(IOException e) { @@ -596,41 +654,65 @@ } } } - try { - HMsg[] exitMsg = { new HMsg(HMsg.MSG_REPORT_EXITING) }; - hbaseMaster.regionServerReport(info, exitMsg); - - } catch(IOException e) { - LOG.warn(e); + this.worker.stop(); + this.server.stop(); + leases.close(); + + // Send interrupts to wake up threads if sleeping so they notice shutdown. + + synchronized(logRollerLock) { + this.logRollerThread.interrupt(); } - try { - LOG.info("stopping server at: " + info.getServerAddress().toString()); + + synchronized(cacheFlusherLock) { + this.cacheFlusherThread.interrupt(); + } - // Send interrupts to wake up threads if sleeping so they notice shutdown. + synchronized(splitOrCompactLock) { + this.splitOrCompactCheckerThread.interrupt(); + } - synchronized(logRollerLock) { - this.logRollerThread.interrupt(); + if(abortRequested) { + try { + log.rollWriter(); + + } catch(IOException e) { + LOG.warn(e); } + LOG.info("aborting server at: " + info.getServerAddress().toString()); - synchronized(cacheFlusherLock) { - this.cacheFlusherThread.interrupt(); + } else { + Vector closedRegions = closeAllRegions(); + try { + log.closeAndDelete(); + + } catch(IOException e) { + LOG.error(e); } - - synchronized(splitOrCompactLock) { - this.splitOrCompactCheckerThread.interrupt(); + try { + HMsg[] exitMsg = new HMsg[closedRegions.size() + 1]; + exitMsg[0] = new HMsg(HMsg.MSG_REPORT_EXITING); + + // Tell the master what regions we are/were serving + + int i = 1; + for(HRegion region: closedRegions) { + exitMsg[i++] = new HMsg(HMsg.MSG_REPORT_CLOSE, region.getRegionInfo()); + } + + LOG.info("telling master that region server is shutting down at: " + +info.getServerAddress().toString()); + + hbaseMaster.regionServerReport(info, exitMsg); + + } catch(IOException e) { + LOG.warn(e); } - - this.worker.stop(); - this.server.stop(); + LOG.info("stopping server at: " + info.getServerAddress().toString()); + } - closeAllRegions(); - log.close(); - leases.close(); - join(); + join(); - } catch(IOException e) { - LOG.error(e); - } if(LOG.isDebugEnabled()) { LOG.debug("main thread exiting"); } @@ -650,7 +732,7 @@ } } - /** + /** * Add to the outbound message buffer * * When a region splits, we need to tell the master that there are two new @@ -671,11 +753,20 @@ // HMaster-given operations ////////////////////////////////////////////////////////////////////////////// - Vector toDo; + private static class ToDoEntry { + int tries; + HMsg msg; + ToDoEntry(HMsg msg) { + this.tries = 0; + this.msg = msg; + } + } + LinkedList toDo; private Worker worker; private Thread workerThread; + /** Thread that performs long running requests from the master */ class Worker implements Runnable { - public void stop() { + void stop() { synchronized(toDo) { toDo.notifyAll(); } @@ -681,8 +772,11 @@ } } + /* (non-Javadoc) + * @see java.lang.Runnable#run() + */ public void run() { - for(HMsg msg = null; !stopRequested; ) { + for(ToDoEntry e = null; !stopRequested; ) { synchronized(toDo) { while(toDo.size() == 0 && !stopRequested) { try { @@ -689,11 +783,11 @@ if (LOG.isDebugEnabled()) { LOG.debug("Wait on todo"); } - toDo.wait(); + toDo.wait(threadWakeFrequency); if (LOG.isDebugEnabled()) { LOG.debug("Wake on todo"); } - } catch(InterruptedException e) { + } catch(InterruptedException ex) { // continue } } @@ -700,60 +794,43 @@ if(stopRequested) { continue; } - msg = toDo.remove(0); + e = toDo.removeFirst(); } try { - switch(msg.getMsg()) { + if (LOG.isDebugEnabled()) { + LOG.debug(e.msg.toString()); + } + + switch(e.msg.getMsg()) { case HMsg.MSG_REGION_OPEN: // Open a region - if (LOG.isDebugEnabled()) { - LOG.debug("MSG_REGION_OPEN"); - } - openRegion(msg.getRegionInfo()); + openRegion(e.msg.getRegionInfo()); break; case HMsg.MSG_REGION_CLOSE: // Close a region - if (LOG.isDebugEnabled()) { - LOG.debug("MSG_REGION_CLOSE"); - } - closeRegion(msg.getRegionInfo(), true); + closeRegion(e.msg.getRegionInfo(), true); break; - case HMsg.MSG_REGION_MERGE: // Merge two regions - if (LOG.isDebugEnabled()) { - LOG.debug("MSG_REGION_MERGE"); - } - //TODO ??? - throw new IOException("TODO: need to figure out merge"); - //break; - - case HMsg.MSG_CALL_SERVER_STARTUP: // Close regions, restart - if (LOG.isDebugEnabled()) { - LOG.debug("MSG_CALL_SERVER_STARTUP"); - } - closeAllRegions(); - continue; - - case HMsg.MSG_REGIONSERVER_STOP: // Go away - if (LOG.isDebugEnabled()) { - LOG.debug("MSG_REGIONSERVER_STOP"); - } - stopRequested = true; - continue; - case HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT: // Close a region, don't reply - if (LOG.isDebugEnabled()) { - LOG.debug("MSG_REGION_CLOSE_WITHOUT_REPORT"); - } - closeRegion(msg.getRegionInfo(), false); + closeRegion(e.msg.getRegionInfo(), false); break; default: - throw new IOException("Impossible state during msg processing. Instruction: " + msg); + throw new AssertionError( + "Impossible state during msg processing. Instruction: " + + e.msg.toString()); + } + } catch(IOException ie) { + if(e.tries < numRetries) { + LOG.warn(ie); + e.tries++; + synchronized(toDo) { + toDo.addLast(e); + } + } else { + LOG.error("unable to process message: " + e.msg.toString(), ie); } - } catch(IOException e) { - LOG.error(e); } } LOG.info("worker thread exiting"); @@ -761,15 +838,18 @@ } void openRegion(HRegionInfo regionInfo) throws IOException { - this.lock.writeLock().lock(); - try { - HRegion region = - new HRegion(regionDir, log, fs, conf, regionInfo, null, oldlogfile); - this.onlineRegions.put(region.getRegionName(), region); - reportOpen(region); - } finally { - this.lock.writeLock().unlock(); + HRegion region = onlineRegions.get(regionInfo.regionName); + if(region == null) { + region = new HRegion(rootDir, log, fs, conf, regionInfo, null); + + this.lock.writeLock().lock(); + try { + this.onlineRegions.put(region.getRegionName(), region); + } finally { + this.lock.writeLock().unlock(); + } } + reportOpen(region); } void closeRegion(final HRegionInfo hri, final boolean reportWhenCompleted) @@ -791,7 +871,7 @@ } /** Called either when the master tells us to restart or from stop() */ - void closeAllRegions() { + Vector closeAllRegions() { Vector regionsToClose = new Vector(); this.lock.writeLock().lock(); try { @@ -800,8 +880,7 @@ } finally { this.lock.writeLock().unlock(); } - for(Iterator it = regionsToClose.iterator(); it.hasNext(); ) { - HRegion region = it.next(); + for(HRegion region: regionsToClose) { if (LOG.isDebugEnabled()) { LOG.debug("closing region " + region.getRegionName()); } @@ -812,6 +891,7 @@ LOG.error("error closing region " + region.getRegionName(), e); } } + return regionsToClose; } ////////////////////////////////////////////////////////////////////////////// @@ -818,8 +898,10 @@ // HRegionInterface ////////////////////////////////////////////////////////////////////////////// - /** Obtain a table descriptor for the given region */ - public HRegionInfo getRegionInfo(Text regionName) + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.HRegionInterface#getRegionInfo(org.apache.hadoop.io.Text) + */ + public HRegionInfo getRegionInfo(final Text regionName) throws NotServingRegionException { return getRegion(regionName).getRegionInfo(); } @@ -824,34 +906,44 @@ return getRegion(regionName).getRegionInfo(); } - /** Get the indicated row/column */ - public BytesWritable get(Text regionName, Text row, Text column) - throws IOException { + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.HRegionInterface#get(org.apache.hadoop.io.Text, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text) + */ + public BytesWritable get(final Text regionName, final Text row, + final Text column) throws IOException { + return getRegion(regionName).get(row, column); } - /** Get multiple versions of the indicated row/col */ - public BytesWritable[] get(Text regionName, Text row, Text column, - int numVersions) - throws IOException { + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.HRegionInterface#get(org.apache.hadoop.io.Text, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text, int) + */ + public BytesWritable[] get(final Text regionName, final Text row, + final Text column, final int numVersions) throws IOException { + return getRegion(regionName).get(row, column, numVersions); } - /** Get multiple timestamped versions of the indicated row/col */ - public BytesWritable[] get(Text regionName, Text row, Text column, - long timestamp, int numVersions) - throws IOException { + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.HRegionInterface#get(org.apache.hadoop.io.Text, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text, long, int) + */ + public BytesWritable[] get(final Text regionName, final Text row, final Text column, + final long timestamp, final int numVersions) throws IOException { + return getRegion(regionName).get(row, column, timestamp, numVersions); } - /** Get all the columns (along with their names) for a given row. */ - public LabelledData[] getRow(Text regionName, Text row) throws IOException { + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.HRegionInterface#getRow(org.apache.hadoop.io.Text, org.apache.hadoop.io.Text) + */ + public KeyedData[] getRow(final Text regionName, final Text row) throws IOException { HRegion region = getRegion(regionName); TreeMap map = region.getFull(row); - LabelledData result[] = new LabelledData[map.size()]; + KeyedData result[] = new KeyedData[map.size()]; int counter = 0; for (Map.Entry es: map.entrySet()) { - result[counter++] = new LabelledData(es.getKey(), es.getValue()); + result[counter++] = + new KeyedData(new HStoreKey(row, es.getKey()), es.getValue()); } return result; } @@ -856,44 +948,31 @@ return result; } - /** - * Start an update to the HBase. This also creates a lease associated with - * the caller. + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.HRegionInterface#next(long) */ - private static class RegionListener extends LeaseListener { - private HRegion localRegion; - private long localLockId; - - public RegionListener(HRegion region, long lockId) { - this.localRegion = region; - this.localLockId = lockId; - } + public KeyedData[] next(final long scannerId) + throws IOException { - public void leaseExpired() { - try { - localRegion.abort(localLockId); - - } catch(IOException iex) { - LOG.error(iex); - } - } - } - - public LabelledData[] next(final long scannerId, final HStoreKey key) - throws IOException { Text scannerName = new Text(String.valueOf(scannerId)); HInternalScannerInterface s = scanners.get(scannerName); if (s == null) { - throw new UnknownScannerException("Name: " + scannerName + ", key " + - key); + throw new UnknownScannerException("Name: " + scannerName); } leases.renewLease(scannerName, scannerName); + + // Collect values to be returned here + + ArrayList values = new ArrayList(); + TreeMap results = new TreeMap(); - ArrayList values = new ArrayList(); - // Keep getting rows till we find one that has at least one non-deleted - // column value. + + // Keep getting rows until we find one that has at least one non-deleted column value + + HStoreKey key = new HStoreKey(); while (s.next(key, results)) { for(Map.Entry e: results.entrySet()) { + HStoreKey k = new HStoreKey(key.getRow(), e.getKey(), key.getTimestamp()); BytesWritable val = e.getValue(); if(val.getSize() == DELETE_BYTES.getSize() && val.compareTo(DELETE_BYTES) == 0) { @@ -898,20 +977,31 @@ if(val.getSize() == DELETE_BYTES.getSize() && val.compareTo(DELETE_BYTES) == 0) { // Column value is deleted. Don't return it. + if (LOG.isDebugEnabled()) { + LOG.debug("skipping deleted value for key: " + k.toString()); + } continue; } - values.add(new LabelledData(e.getKey(), val)); + if (LOG.isDebugEnabled()) { + LOG.debug("adding value for key: " + k.toString()); + } + values.add(new KeyedData(k, val)); } - if (values.size() > 0) { - // Row has something in it. Let it out. Else go get another row. + if(values.size() > 0) { + // Row has something in it. Return the value. break; } - // Need to clear results before we go back up and call 'next' again. + + // No data for this row, go get another. + results.clear(); } - return values.toArray(new LabelledData[values.size()]); + return values.toArray(new KeyedData[values.size()]); } + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.HRegionInterface#startUpdate(org.apache.hadoop.io.Text, long, org.apache.hadoop.io.Text) + */ public long startUpdate(Text regionName, long clientid, Text row) throws IOException { HRegion region = getRegion(regionName); @@ -923,7 +1013,29 @@ return lockid; } - /** Add something to the HBase. */ + /** Create a lease for an update. If it times out, the update is aborted */ + private static class RegionListener implements LeaseListener { + private HRegion localRegion; + private long localLockId; + + RegionListener(HRegion region, long lockId) { + this.localRegion = region; + this.localLockId = lockId; + } + + public void leaseExpired() { + try { + localRegion.abort(localLockId); + + } catch(IOException iex) { + LOG.error(iex); + } + } + } + + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.HRegionInterface#put(org.apache.hadoop.io.Text, long, long, org.apache.hadoop.io.Text, org.apache.hadoop.io.BytesWritable) + */ public void put(Text regionName, long clientid, long lockid, Text column, BytesWritable val) throws IOException { HRegion region = getRegion(regionName, true); @@ -932,7 +1044,9 @@ region.put(lockid, column, val); } - /** Remove a cell from the HBase. */ + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.HRegionInterface#delete(org.apache.hadoop.io.Text, long, long, org.apache.hadoop.io.Text) + */ public void delete(Text regionName, long clientid, long lockid, Text column) throws IOException { HRegion region = getRegion(regionName); @@ -941,7 +1055,9 @@ region.delete(lockid, column); } - /** Abandon the transaction */ + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.HRegionInterface#abort(org.apache.hadoop.io.Text, long, long) + */ public void abort(Text regionName, long clientid, long lockid) throws IOException { HRegion region = getRegion(regionName, true); @@ -950,7 +1066,9 @@ region.abort(lockid); } - /** Confirm the transaction */ + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.HRegionInterface#commit(org.apache.hadoop.io.Text, long, long) + */ public void commit(Text regionName, long clientid, long lockid) throws IOException { HRegion region = getRegion(regionName, true); @@ -959,7 +1077,9 @@ region.commit(lockid); } - /** Don't let the client's lease expire just yet... */ + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.HRegionInterface#renewLease(long, long) + */ public void renewLease(long lockid, long clientid) throws IOException { leases.renewLease(new Text(String.valueOf(clientid)), new Text(String.valueOf(lockid))); @@ -965,7 +1085,8 @@ new Text(String.valueOf(lockid))); } - /** Private utility method for safely obtaining an HRegion handle. + /** + * Private utility method for safely obtaining an HRegion handle. * @param regionName Name of online {@link HRegion} to return * @return {@link HRegion} for regionName * @throws NotServingRegionException @@ -975,7 +1096,8 @@ return getRegion(regionName, false); } - /** Private utility method for safely obtaining an HRegion handle. + /** + * Private utility method for safely obtaining an HRegion handle. * @param regionName Name of online {@link HRegion} to return * @param checkRetiringRegions Set true if we're to check retiring regions * as well as online regions. @@ -1013,14 +1135,21 @@ ////////////////////////////////////////////////////////////////////////////// Map scanners; - - private class ScannerListener extends LeaseListener { + + /** + * Instantiated as a scanner lease. + * If the lease times out, the scanner is closed + */ + private class ScannerListener implements LeaseListener { private Text scannerName; - public ScannerListener(Text scannerName) { + ScannerListener(Text scannerName) { this.scannerName = scannerName; } + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.LeaseListener#leaseExpired() + */ public void leaseExpired() { LOG.info("Scanner " + scannerName + " lease expired"); HInternalScannerInterface s = null; @@ -1033,7 +1162,9 @@ } } - /** Start a scanner for a given HRegion. */ + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.HRegionInterface#openScanner(org.apache.hadoop.io.Text, org.apache.hadoop.io.Text[], org.apache.hadoop.io.Text) + */ public long openScanner(Text regionName, Text[] cols, Text firstRow) throws IOException { HRegion r = getRegion(regionName); @@ -1054,6 +1185,9 @@ return scannerId; } + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.HRegionInterface#close(long) + */ public void close(long scannerId) throws IOException { Text scannerName = new Text(String.valueOf(scannerId)); HInternalScannerInterface s = null; @@ -1080,6 +1214,9 @@ System.exit(0); } + /** + * @param args + */ public static void main(String [] args) { if (args.length < 1) { printUsageAndExit(); @@ -1100,7 +1237,7 @@ try { (new Thread(new HRegionServer(conf))).start(); } catch (Throwable t) { - LOG.error( "Can not start master because "+ + LOG.error( "Can not start region server because "+ StringUtils.stringifyException(t) ); System.exit(-1); } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (revision 545790) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (working copy) @@ -42,7 +42,7 @@ * Locking and transactions are handled at a higher level. This API should not * be called directly by any writer, but rather by an HRegion manager. ******************************************************************************/ -public class HStore { +class HStore implements HConstants { private static final Log LOG = LogFactory.getLog(HStore.class); static final String COMPACTION_DIR = "compaction.tmp"; @@ -64,7 +64,7 @@ Integer compactLock = 0; Integer flushLock = 0; - private final HLocking lock = new HLocking(); + final HLocking lock = new HLocking(); TreeMap maps = new TreeMap(); TreeMap mapFiles = new TreeMap(); @@ -98,8 +98,16 @@ * *

It's assumed that after this constructor returns, the reconstructionLog * file will be deleted (by whoever has instantiated the HStore). + * + * @param dir - log file directory + * @param regionName - name of region + * @param family - name of column family + * @param fs - file system object + * @param reconstructionLog - existing log file to apply if any + * @param conf - configuration object + * @throws IOException */ - public HStore(Path dir, Text regionName, HColumnDescriptor family, + HStore(Path dir, Text regionName, HColumnDescriptor family, FileSystem fs, Path reconstructionLog, Configuration conf) throws IOException { this.dir = dir; @@ -200,18 +208,25 @@ // Check this edit is for me. Also, guard against writing // METACOLUMN info such as HBASE::CACHEFLUSH entries Text column = val.getColumn(); - if (!key.getRegionName().equals(this.regionName) || - column.equals(HLog.METACOLUMN) || - HStoreKey.extractFamily(column).equals(this.familyName)) { + if (column.equals(HLog.METACOLUMN) + || !key.getRegionName().equals(this.regionName) + || !HStoreKey.extractFamily(column).equals(this.familyName)) { if (LOG.isDebugEnabled()) { LOG.debug("Passing on edit " + key.getRegionName() + ", " - + key.getRegionName() + ", " + column.toString() + ": " - + new String(val.getVal().get())); + + column.toString() + ": " + new String(val.getVal().get()) + + ", my region: " + this.regionName + ", my column: " + + this.familyName); } continue; } - reconstructedCache.put(new HStoreKey(key.getRow(), val.getColumn(), - val.getTimestamp()), val.getVal()); + byte[] bytes = new byte[val.getVal().getSize()]; + System.arraycopy(val.getVal().get(), 0, bytes, 0, bytes.length); + HStoreKey k = new HStoreKey(key.getRow(), column,val.getTimestamp()); + if (LOG.isDebugEnabled()) { + LOG.debug("Applying edit " + k.toString() + "=" + + new String(bytes, UTF8_ENCODING)); + } + reconstructedCache.put(k, new BytesWritable(bytes)); } } finally { login.close(); @@ -248,8 +263,11 @@ LOG.info("HStore online for " + this.regionName + "/" + this.familyName); } - /** Turn off all the MapFile readers */ - public void close() throws IOException { + /** + * Turn off all the MapFile readers + * @throws IOException + */ + void close() throws IOException { LOG.info("closing HStore for " + this.regionName + "/" + this.familyName); this.lock.obtainWriteLock(); try { @@ -279,8 +297,13 @@ * Also, we are not expecting any reads of this MapFile just yet. * * Return the entire list of HStoreFiles currently used by the HStore. + * + * @param inputCache - memcache to flush + * @param logCacheFlushId - flush sequence number + * @return - Vector of all the HStoreFiles in use + * @throws IOException */ - public Vector flushCache(TreeMap inputCache, + Vector flushCache(TreeMap inputCache, long logCacheFlushId) throws IOException { return flushCacheHelper(inputCache, logCacheFlushId, true); @@ -351,7 +374,10 @@ } } - public Vector getAllMapFiles() { + /** + * @return - vector of all the HStore files in use + */ + Vector getAllMapFiles() { this.lock.obtainReadLock(); try { return new Vector(mapFiles.values()); @@ -380,8 +406,10 @@ * * We don't want to hold the structureLock for the whole time, as a compact() * can be lengthy and we want to allow cache-flushes during this period. + * + * @throws IOException */ - public void compact() throws IOException { + void compact() throws IOException { compactHelper(false); } @@ -766,7 +794,7 @@ * * The returned object should map column names to byte arrays (byte[]). */ - public void getFull(HStoreKey key, TreeMap results) throws IOException { + void getFull(HStoreKey key, TreeMap results) throws IOException { this.lock.obtainReadLock(); try { MapFile.Reader[] maparray @@ -806,7 +834,7 @@ * * If 'numVersions' is negative, the method returns all available versions. */ - public BytesWritable[] get(HStoreKey key, int numVersions) throws IOException { + BytesWritable[] get(HStoreKey key, int numVersions) throws IOException { if(numVersions <= 0) { throw new IllegalArgumentException("Number of versions must be > 0"); } @@ -833,10 +861,9 @@ if(numVersions > 0 && (results.size() >= numVersions)) { break; - } else { - results.add(readval); - readval = new BytesWritable(); } + results.add(readval); + readval = new BytesWritable(); } } } @@ -845,12 +872,8 @@ } } - if(results.size() == 0) { - return null; - - } else { - return results.toArray(new BytesWritable[results.size()]); - } + return results.size() == 0 ? + null :results.toArray(new BytesWritable[results.size()]); } finally { this.lock.releaseReadLock(); @@ -863,7 +886,7 @@ * @param midKey - the middle key for the largest MapFile * @return - size of the largest MapFile */ - public long getLargestFileSize(Text midKey) { + long getLargestFileSize(Text midKey) { long maxSize = 0L; if (this.mapFiles.size() <= 0) { return maxSize; @@ -904,7 +927,7 @@ /** * @return Returns the number of map files currently in use */ - public int getNMaps() { + int getNMaps() { this.lock.obtainReadLock(); try { return maps.size(); @@ -933,7 +956,7 @@ * Return a set of MapFile.Readers, one for each HStore file. * These should be closed after the user is done with them. */ - public HInternalScannerInterface getScanner(long timestamp, Text targetCols[], + HInternalScannerInterface getScanner(long timestamp, Text targetCols[], Text firstRow) throws IOException { return new HStoreScanner(timestamp, targetCols, firstRow); @@ -947,7 +970,7 @@ class HStoreScanner extends HAbstractScanner { private MapFile.Reader[] readers; - public HStoreScanner(long timestamp, Text[] targetCols, Text firstRow) + HStoreScanner(long timestamp, Text[] targetCols, Text firstRow) throws IOException { super(timestamp, targetCols); @@ -1000,6 +1023,7 @@ * @param firstRow - seek to this row * @return - true if this is the first row or if the row was not found */ + @Override boolean findFirstRow(int i, Text firstRow) throws IOException { HStoreKey firstKey = (HStoreKey)readers[i].getClosest(new HStoreKey(firstRow), vals[i]); @@ -1023,6 +1047,7 @@ * @param i - which reader to fetch next value from * @return - true if there is more data available */ + @Override boolean getNext(int i) throws IOException { vals[i] = new BytesWritable(); if(! readers[i].next(keys[i], vals[i])) { @@ -1033,6 +1058,7 @@ } /** Close down the indicated reader. */ + @Override void closeSubScanner(int i) { try { if(readers[i] != null) { @@ -1052,6 +1078,7 @@ } /** Shut it down! */ + @Override public void close() { if(! scannerClosed) { try { Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java (revision 545790) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java (working copy) @@ -15,8 +15,6 @@ */ package org.apache.hadoop.hbase; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.*; import java.io.*; @@ -25,9 +23,15 @@ * A Key for a stored row ******************************************************************************/ public class HStoreKey implements WritableComparable { - private final Log LOG = LogFactory.getLog(this.getClass().getName()); - public static Text extractFamily(Text col) throws IOException { + /** + * Extracts the column family name from a column + * For example, returns 'info' if the specified column was 'info:server' + * + * @param col - name of column + * @return - column family name + */ + public static Text extractFamily(Text col) { String column = col.toString(); int colpos = column.indexOf(":"); if(colpos < 0) { @@ -40,6 +44,7 @@ Text column; long timestamp; + /** Default constructor used in conjunction with Writable interface */ public HStoreKey() { this.row = new Text(); this.column = new Text(); @@ -46,6 +51,13 @@ this.timestamp = Long.MAX_VALUE; } + /** + * Create an HStoreKey specifying only the row + * The column defaults to the empty string and the time stamp defaults to + * Long.MAX_VALUE + * + * @param row - row key + */ public HStoreKey(Text row) { this.row = new Text(row); this.column = new Text(); @@ -52,6 +64,13 @@ this.timestamp = Long.MAX_VALUE; } + /** + * Create an HStoreKey specifying the row and timestamp + * The column name defaults to the empty string + * + * @param row - row key + * @param timestamp - timestamp value + */ public HStoreKey(Text row, long timestamp) { this.row = new Text(row); this.column = new Text(); @@ -58,6 +77,13 @@ this.timestamp = timestamp; } + /** + * Create an HStoreKey specifying the row and column names + * The timestamp defaults to Long.MAX_VALUE + * + * @param row - row key + * @param column - column key + */ public HStoreKey(Text row, Text column) { this.row = new Text(row); this.column = new Text(column); @@ -64,6 +90,13 @@ this.timestamp = Long.MAX_VALUE; } + /** + * Create an HStoreKey specifying all the fields + * + * @param row - row key + * @param column - column key + * @param timestamp - timestamp value + */ public HStoreKey(Text row, Text column, long timestamp) { this.row = new Text(row); this.column = new Text(column); @@ -70,6 +103,23 @@ this.timestamp = timestamp; } + /** + * Construct a new HStoreKey from another + * + * @param other - the source key + */ + public HStoreKey(HStoreKey other) { + this(); + this.row.set(other.row); + this.column.set(other.column); + this.timestamp = other.timestamp; + } + + /** + * Change the value of the row key + * + * @param newrow - new row key value + */ public void setRow(Text newrow) { this.row.set(newrow); } @@ -74,6 +124,11 @@ this.row.set(newrow); } + /** + * Change the value of the column key + * + * @param newcol - new column key value + */ public void setColumn(Text newcol) { this.column.set(newcol); } @@ -78,6 +133,11 @@ this.column.set(newcol); } + /** + * Change the value of the timestamp field + * + * @param timestamp - new timestamp value + */ public void setVersion(long timestamp) { this.timestamp = timestamp; } @@ -82,6 +142,18 @@ this.timestamp = timestamp; } + /** + * Set the value of this HStoreKey from the supplied key + * + * @param k - key value to copy + */ + public void set(HStoreKey k) { + this.row = k.getRow(); + this.column = k.getColumn(); + this.timestamp = k.getTimestamp(); + } + + /** @return value of row key */ public Text getRow() { return row; } @@ -86,6 +158,7 @@ return row; } + /** @return value of column key */ public Text getColumn() { return column; } @@ -90,6 +163,7 @@ return column; } + /** @return value of timestamp */ public long getTimestamp() { return timestamp; } @@ -125,18 +199,12 @@ * @see #matchesWithoutColumn(HStoreKey) */ public boolean matchesRowFamily(HStoreKey other) { - boolean status = false; - try { - status = this.row.compareTo(other.row) == 0 + return this.row.compareTo(other.row) == 0 && extractFamily(this.column).compareTo( extractFamily(other.getColumn())) == 0; - - } catch(IOException e) { - LOG.error(e); - } - return status; } + @Override public String toString() { return row.toString() + "/" + column.toString() + "/" + timestamp; } @@ -158,6 +226,9 @@ // Comparable ////////////////////////////////////////////////////////////////////////////// + /* (non-Javadoc) + * @see java.lang.Comparable#compareTo(java.lang.Object) + */ public int compareTo(Object o) { HStoreKey other = (HStoreKey) o; int result = this.row.compareTo(other.row); @@ -180,6 +251,9 @@ // Writable ////////////////////////////////////////////////////////////////////////////// + /* (non-Javadoc) + * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput) + */ public void write(DataOutput out) throws IOException { row.write(out); column.write(out); @@ -186,6 +260,9 @@ out.writeLong(timestamp); } + /* (non-Javadoc) + * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput) + */ public void readFields(DataInput in) throws IOException { row.readFields(in); column.readFields(in); Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTableDescriptor.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTableDescriptor.java (revision 545790) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTableDescriptor.java (working copy) @@ -43,7 +43,8 @@ */ private static final Pattern LEGAL_TABLE_NAME = Pattern.compile("[\\w-]+"); - + + /** Constructs an empty object */ public HTableDescriptor() { this.name = new Text(); this.families = new TreeMap(); @@ -66,6 +67,7 @@ this.families = new TreeMap(); } + /** @return name of table */ public Text getName() { return name; } @@ -78,7 +80,12 @@ families.put(family.getName(), family); } - /** Do we contain a given column? */ + /** + * Checks to see if this table contains the given column family + * + * @param family - family name + * @return true if the table contains the specified family name + */ public boolean hasFamily(Text family) { return families.containsKey(family); } @@ -87,6 +94,8 @@ * * TODO: What is this used for? Seems Dangerous to let people play with our * private members. + * + * @return map of family members */ public TreeMap families() { return families; @@ -95,7 +104,7 @@ @Override public String toString() { return "name: " + this.name.toString() + ", families: " + this.families; - } + } @Override public boolean equals(Object obj) { Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/KeyedData.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/KeyedData.java (revision 544513) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/KeyedData.java (working copy) @@ -0,0 +1,74 @@ +/** + * Copyright 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; +import org.apache.hadoop.io.*; + +import java.io.*; + +/******************************************************************************* + * LabelledData is just a data pair. + * It includes an HStoreKey and some associated data. + ******************************************************************************/ +public class KeyedData implements Writable { + HStoreKey key; + BytesWritable data; + + /** Default constructor. Used by Writable interface */ + public KeyedData() { + this.key = new HStoreKey(); + this.data = new BytesWritable(); + } + + /** + * Create a KeyedData object specifying the parts + * @param key - HStoreKey + * @param data - BytesWritable + */ + public KeyedData(HStoreKey key, BytesWritable data) { + this.key = key; + this.data = data; + } + + /** @return - returns the key */ + public HStoreKey getKey() { + return key; + } + + /** @return - returns the value */ + public BytesWritable getData() { + return data; + } + + ////////////////////////////////////////////////////////////////////////////// + // Writable + ////////////////////////////////////////////////////////////////////////////// + + /* (non-Javadoc) + * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput) + */ + public void write(DataOutput out) throws IOException { + key.write(out); + data.write(out); + } + + /* (non-Javadoc) + * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput) + */ + public void readFields(DataInput in) throws IOException { + key.readFields(in); + data.readFields(in); + } +} Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/LabelledData.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/LabelledData.java (revision 545790) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/LabelledData.java (working copy) @@ -1,60 +0,0 @@ -/** - * Copyright 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase; -import org.apache.hadoop.io.*; - -import java.io.*; - -/******************************************************************************* - * LabelledData is just a data pair. - * It includes a Text label and some associated data. - ******************************************************************************/ -public class LabelledData implements Writable { - Text label; - BytesWritable data; - - public LabelledData() { - this.label = new Text(); - this.data = new BytesWritable(); - } - - public LabelledData(Text label, BytesWritable data) { - this.label = new Text(label); - this.data = data; - } - - public Text getLabel() { - return label; - } - - public BytesWritable getData() { - return data; - } - - ////////////////////////////////////////////////////////////////////////////// - // Writable - ////////////////////////////////////////////////////////////////////////////// - - public void write(DataOutput out) throws IOException { - label.write(out); - data.write(out); - } - - public void readFields(DataInput in) throws IOException { - label.readFields(in); - data.readFields(in); - } -} Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/LeaseListener.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/LeaseListener.java (revision 545790) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/LeaseListener.java (working copy) @@ -17,7 +17,7 @@ /******************************************************************************* - * LeaseListener is a small class meant to be overridden by users of the Leases + * LeaseListener is an interface meant to be implemented by users of the Leases * class. * * It receives events from the Leases class about the status of its accompanying @@ -24,18 +24,7 @@ * lease. Users of the Leases class can use a LeaseListener subclass to, for * example, clean up resources after a lease has expired. ******************************************************************************/ -public abstract class LeaseListener { - public LeaseListener() { - } - - public void leaseRenewed() { - } - - /** When the user cancels a lease, this method is called. */ - public void leaseCancelled() { - } - +public interface LeaseListener { /** When a lease expires, this method is called. */ - public void leaseExpired() { - } + public void leaseExpired(); } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java (revision 545790) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java (working copy) @@ -48,7 +48,12 @@ TreeSet sortedLeases = new TreeSet(); boolean running = true; - /** Indicate the length of the lease, in milliseconds */ + /** + * Creates a lease + * + * @param leasePeriod - length of time (milliseconds) that the lease is valid + * @param leaseCheckFrequency - how often the lease should be checked (milliseconds) + */ public Leases(long leasePeriod, long leaseCheckFrequency) { this.leasePeriod = leasePeriod; this.leaseCheckFrequency = leaseCheckFrequency; @@ -59,7 +64,7 @@ } /** - * Shut down this Leases outfit. All pending leases will be destroyed, + * Shut down this Leases instance. All pending leases will be destroyed, * without any cancellation calls. */ public void close() { @@ -89,9 +94,15 @@ } /** A client obtains a lease... */ + /** + * Obtain a lease + * + * @param holderId - name of lease holder + * @param resourceId - resource being leased + * @param listener - listener that will process lease expirations + */ public void createLease(Text holderId, Text resourceId, - final LeaseListener listener) - throws IOException { + final LeaseListener listener) { synchronized(leases) { synchronized(sortedLeases) { Lease lease = new Lease(holderId, resourceId, listener); @@ -97,7 +108,7 @@ Lease lease = new Lease(holderId, resourceId, listener); Text leaseId = lease.getLeaseId(); if(leases.get(leaseId) != null) { - throw new IOException("Impossible state for createLease(): Lease " + + throw new AssertionError("Impossible state for createLease(): Lease " + getLeaseName(holderId, resourceId) + " is still held."); } leases.put(leaseId, lease); @@ -110,6 +121,13 @@ } /** A client renews a lease... */ + /** + * Renew a lease + * + * @param holderId - name of lease holder + * @param resourceId - resource being leased + * @throws IOException + */ public void renewLease(Text holderId, Text resourceId) throws IOException { synchronized(leases) { synchronized(sortedLeases) { @@ -132,8 +150,12 @@ } } - /** A client explicitly cancels a lease. - * The lease-cleanup method is not called. + /** + * Client explicitly cancels a lease. + * + * @param holderId - name of lease holder + * @param resourceId - resource being leased + * @throws IOException */ public void cancelLease(Text holderId, Text resourceId) throws IOException { synchronized(leases) { @@ -152,7 +174,6 @@ sortedLeases.remove(lease); leases.remove(leaseId); - lease.cancelled(); } } if (LOG.isDebugEnabled()) { @@ -197,7 +218,8 @@ } /** This class tracks a single Lease. */ - class Lease implements Comparable { + @SuppressWarnings("unchecked") + private class Lease implements Comparable { Text holderId; Text resourceId; LeaseListener listener; @@ -203,7 +225,7 @@ LeaseListener listener; long lastUpdate; - public Lease(Text holderId, Text resourceId, LeaseListener listener) { + Lease(Text holderId, Text resourceId, LeaseListener listener) { this.holderId = holderId; this.resourceId = resourceId; this.listener = listener; @@ -210,24 +232,19 @@ renew(); } - public Text getLeaseId() { + Text getLeaseId() { return createLeaseId(holderId, resourceId); } - public boolean shouldExpire() { + boolean shouldExpire() { return (System.currentTimeMillis() - lastUpdate > leasePeriod); } - public void renew() { + void renew() { this.lastUpdate = System.currentTimeMillis(); - listener.leaseRenewed(); - } - - public void cancelled() { - listener.leaseCancelled(); } - public void expired() { + void expired() { if (LOG.isDebugEnabled()) { LOG.debug("Lease expired " + getLeaseName(this.holderId, this.resourceId)); Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/RegionServerRunningException.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/RegionServerRunningException.java (revision 0) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/RegionServerRunningException.java (revision 0) @@ -0,0 +1,40 @@ +/** + * Copyright 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; + +/** + * Thrown if the region server log directory exists (which indicates another + * region server is running at the same address) + */ +public class RegionServerRunningException extends IOException { + private static final long serialVersionUID = 1L << 31 - 1L; + + /** Default Constructor */ + public RegionServerRunningException() { + super(); + } + + /** + * Constructs the exception and supplies a string as the message + * @param s - message + */ + public RegionServerRunningException(String s) { + super(s); + } + +} Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/AbstractMergeTestBase.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/AbstractMergeTestBase.java (revision 545790) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/AbstractMergeTestBase.java (working copy) @@ -19,6 +19,7 @@ import java.io.UnsupportedEncodingException; import java.util.Random; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.dfs.MiniDFSCluster; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -36,6 +37,7 @@ protected FileSystem fs; protected Path dir; + @Override public void setUp() throws Exception { super.setUp(); rand = new Random(); @@ -87,23 +89,19 @@ // Now create the root and meta regions and insert the data regions // created above into the meta - HRegion root = HRegion.createNewHRegion(fs, dir, conf, - HGlobals.rootTableDesc, 0L, null, null); - HRegion meta = HRegion.createNewHRegion(fs, dir, conf, - HGlobals.metaTableDesc, 1L, null, null); + HRegion root = createNewHRegion(fs, dir, conf, HGlobals.rootTableDesc, 0L, null, null); + HRegion meta = createNewHRegion(fs, dir, conf, HGlobals.metaTableDesc, 1L, null, null); - HRegion.addRegionToMeta(root, meta); + HRegion.addRegionToMETA(root, meta); for(int i = 0; i < regions.length; i++) { - HRegion.addRegionToMeta(meta, regions[i]); + HRegion.addRegionToMETA(meta, regions[i]); } root.close(); - root.getLog().close(); - fs.delete(new Path(root.getRegionDir(), HConstants.HREGION_LOGDIR_NAME)); + root.getLog().closeAndDelete(); meta.close(); - meta.getLog().close(); - fs.delete(new Path(meta.getRegionDir(), HConstants.HREGION_LOGDIR_NAME)); + meta.getLog().closeAndDelete(); } catch(Throwable t) { t.printStackTrace(); @@ -111,6 +109,7 @@ } } + @Override public void tearDown() throws Exception { super.tearDown(); dfsCluster.shutdown(); @@ -118,8 +117,7 @@ private HRegion createAregion(Text startKey, Text endKey, int firstRow, int nrows) throws IOException { - HRegion region = HRegion.createNewHRegion(fs, dir, conf, desc, - rand.nextLong(), startKey, endKey); + HRegion region = createNewHRegion(fs, dir, conf, desc, rand.nextLong(), startKey, endKey); System.out.println("created region " + region.getRegionName()); @@ -138,9 +136,22 @@ region.log.rollWriter(); region.compactStores(); region.close(); - region.getLog().close(); - fs.delete(new Path(region.getRegionDir(), HConstants.HREGION_LOGDIR_NAME)); + region.getLog().closeAndDelete(); region.getRegionInfo().offLine = true; return region; } + + private HRegion createNewHRegion(FileSystem fs, Path dir, + Configuration conf, HTableDescriptor desc, long regionId, Text startKey, + Text endKey) throws IOException { + + HRegionInfo info = new HRegionInfo(regionId, desc, startKey, endKey); + Path regionDir = HStoreFile.getHRegionDir(dir, info.regionName); + fs.mkdirs(regionDir); + + return new HRegion(dir, + new HLog(fs, new Path(regionDir, HConstants.HREGION_LOGDIR_NAME), conf), + fs, conf, info, null); + } + } Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java (revision 545790) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java (working copy) @@ -22,6 +22,7 @@ public abstract class HBaseClusterTestCase extends HBaseTestCase { protected MiniHBaseCluster cluster; final boolean miniHdfs; + int regionServers; protected HBaseClusterTestCase() { this(true); @@ -26,6 +27,11 @@ protected HBaseClusterTestCase() { this(true); } + + protected HBaseClusterTestCase(int regionServers) { + this(true); + this.regionServers = regionServers; + } protected HBaseClusterTestCase(String name) { this(name, true); @@ -34,6 +40,7 @@ protected HBaseClusterTestCase(final boolean miniHdfs) { super(); this.miniHdfs = miniHdfs; + this.regionServers = 1; } protected HBaseClusterTestCase(String name, final boolean miniHdfs) { @@ -39,13 +46,17 @@ protected HBaseClusterTestCase(String name, final boolean miniHdfs) { super(name); this.miniHdfs = miniHdfs; + this.regionServers = 1; } + @Override public void setUp() throws Exception { super.setUp(); - this.cluster = new MiniHBaseCluster(this.conf, 1, this.miniHdfs); + this.cluster = + new MiniHBaseCluster(this.conf, this.regionServers, this.miniHdfs); } + @Override public void tearDown() throws Exception { super.tearDown(); if (this.cluster != null) { Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (revision 545790) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (working copy) @@ -38,7 +38,7 @@ private HMaster master; private Thread masterThread; private HRegionServer[] regionServers; - private Thread[] regionThreads; + Thread[] regionThreads; /** * Starts a MiniHBaseCluster on top of a new MiniDFSCluster @@ -94,7 +94,7 @@ try { try { this.fs = FileSystem.get(conf); - this.parentdir = new Path(conf.get(HREGION_DIR, DEFAULT_HREGION_DIR)); + this.parentdir = new Path(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR)); fs.mkdirs(parentdir); } catch(Throwable e) { @@ -146,8 +146,8 @@ } /** - * Returns the rpc address actually used by the master server, because the - * supplied port is not necessarily the actual port used. + * @return Returns the rpc address actually used by the master server, because + * the supplied port is not necessarily the actual port used. */ public HServerAddress getHMasterAddress() { return master.getMasterAddress(); @@ -153,6 +153,32 @@ return master.getMasterAddress(); } + /** + * Shut down the specified region server cleanly + * + * @param serverNumber + */ + public void stopRegionServer(int serverNumber) { + if(serverNumber >= regionServers.length) { + throw new ArrayIndexOutOfBoundsException( + "serverNumber > number of region servers"); + } + this.regionServers[serverNumber].stop(); + } + + /** + * Cause a region server to exit without cleaning up + * + * @param serverNumber + */ + public void abortRegionServer(int serverNumber) { + if(serverNumber >= regionServers.length) { + throw new ArrayIndexOutOfBoundsException( + "serverNumber > number of region servers"); + } + this.regionServers[serverNumber].abort(); + } + /** Shut down the HBase cluster */ public void shutdown() { LOG.info("Shutting down the HBase Cluster"); Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCleanRegionServerExit.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCleanRegionServerExit.java (revision 0) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCleanRegionServerExit.java (revision 0) @@ -0,0 +1,59 @@ +/** + * Copyright 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; + +/** Tests region server failover when a region server exits cleanly */ +public class TestCleanRegionServerExit extends HBaseClusterTestCase { + + private HClient client; + + /** Constructor */ + public TestCleanRegionServerExit() { + super(2); // Start two region servers + client = new HClient(conf); + } + + /** The test */ + public void testCleanRegionServerExit() { + try { + // When the META table can be opened, the region servers are running + + client.openTable(HConstants.META_TABLE_NAME); + + } catch(IOException e) { + e.printStackTrace(); + fail(); + } + + // Shut down a region server cleanly + + this.cluster.stopRegionServer(0); + try { + this.cluster.regionThreads[0].join(); + + } catch(InterruptedException e) { + } + + try { + Thread.sleep(60000); // Wait for cluster to adjust + + } catch(InterruptedException e) { + } + } + +} Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestGet.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestGet.java (revision 545790) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestGet.java (working copy) @@ -27,6 +27,7 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; +/** Test case for get */ public class TestGet extends HBaseTestCase { private static final Text CONTENTS = new Text("contents:"); private static final Text ROW_KEY = new Text(HGlobals.rootRegionInfo.regionName); @@ -59,6 +60,10 @@ } } + /** + * Constructor + * @throws IOException + */ public void testGet() throws IOException { MiniDFSCluster cluster = null; @@ -81,7 +86,7 @@ HLog log = new HLog(fs, new Path(regionDir, "log"), conf); - HRegion r = new HRegion(dir, log, fs, conf, info, null, null); + HRegion r = new HRegion(dir, log, fs, conf, info, null); // Write information to the table @@ -126,7 +131,7 @@ r.close(); log.rollWriter(); - r = new HRegion(dir, log, fs, conf, info, null, null); + r = new HRegion(dir, log, fs, conf, info, null); // Read it back @@ -156,7 +161,7 @@ r.close(); log.rollWriter(); - r = new HRegion(dir, log, fs, conf, info, null, null); + r = new HRegion(dir, log, fs, conf, info, null); // Read it back @@ -165,6 +170,7 @@ // Close region once and for all r.close(); + log.closeAndDelete(); } catch(IOException e) { e.printStackTrace(); Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHLog.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHLog.java (revision 545790) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHLog.java (working copy) @@ -15,6 +15,7 @@ */ package org.apache.hadoop.hbase; +import java.io.IOException; import java.util.TreeMap; import org.apache.hadoop.fs.FileSystem; @@ -24,74 +25,83 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.SequenceFile.Reader; +/** JUnit test case for HLog */ public class TestHLog extends HBaseTestCase implements HConstants { - protected void setUp() throws Exception { + @Override + public void setUp() throws Exception { super.setUp(); } - public void testAppend() throws Exception { - Path dir = getUnitTestdir(getName()); - FileSystem fs = FileSystem.get(this.conf); - if (fs.exists(dir)) { - fs.delete(dir); - } - final int COL_COUNT = 10; - final Text regionName = new Text("regionname"); - final Text tableName = new Text("tablename"); - final Text row = new Text("row"); - Reader reader = null; - HLog log = new HLog(fs, dir, this.conf); + /** The test */ + public void testAppend() { try { - // Write columns named 1, 2, 3, etc. and then values of single byte - // 1, 2, 3... - TreeMap cols = new TreeMap(); - for (int i = 0; i < COL_COUNT; i++) { - cols.put(new Text(Integer.toString(i)), - new BytesWritable(new byte[] { (byte)(i + '0') })); - } - long timestamp = System.currentTimeMillis(); - log.append(regionName, tableName, row, cols, timestamp); - long logSeqId = log.startCacheFlush(); - log.completeCacheFlush(regionName, tableName, logSeqId); - log.close(); - Path filename = log.computeFilename(log.filenum - 1); - log = null; - // Now open a reader on the log and assert append worked. - reader = new SequenceFile.Reader(fs, filename, conf); - HLogKey key = new HLogKey(); - HLogEdit val = new HLogEdit(); - for (int i = 0; i < COL_COUNT; i++) { - reader.next(key, val); - assertEquals(key.getRegionName(), regionName); - assertEquals(key.getTablename(), tableName); - assertEquals(key.getRow(), row); - assertEquals(val.getVal().get()[0], (byte)(i + '0')); - System.out.println(key + " " + val); - } - while (reader.next(key, val)) { - // Assert only one more row... the meta flushed row. - assertEquals(key.getRegionName(), regionName); - assertEquals(key.getTablename(), tableName); - assertEquals(key.getRow(), HLog.METAROW); - assertEquals(val.getColumn(), HLog.METACOLUMN); - assertEquals(0, val.getVal().compareTo(COMPLETE_CACHEFLUSH)); - System.out.println(key + " " + val); - } - } finally { - if (log != null) { - log.close(); - } - if (reader != null) { - reader.close(); - } + Path dir = getUnitTestdir(getName()); + FileSystem fs = FileSystem.get(this.conf); if (fs.exists(dir)) { fs.delete(dir); } + final int COL_COUNT = 10; + final Text regionName = new Text("regionname"); + final Text tableName = new Text("tablename"); + final Text row = new Text("row"); + Reader reader = null; + HLog log = new HLog(fs, dir, this.conf); + try { + // Write columns named 1, 2, 3, etc. and then values of single byte + // 1, 2, 3... + TreeMap cols = new TreeMap(); + for (int i = 0; i < COL_COUNT; i++) { + cols.put(new Text(Integer.toString(i)), + new BytesWritable(new byte[] { (byte)(i + '0') })); + } + long timestamp = System.currentTimeMillis(); + log.append(regionName, tableName, row, cols, timestamp); + long logSeqId = log.startCacheFlush(); + log.completeCacheFlush(regionName, tableName, logSeqId); + log.close(); + Path filename = log.computeFilename(log.filenum - 1); + log = null; + // Now open a reader on the log and assert append worked. + reader = new SequenceFile.Reader(fs, filename, conf); + HLogKey key = new HLogKey(); + HLogEdit val = new HLogEdit(); + for (int i = 0; i < COL_COUNT; i++) { + reader.next(key, val); + assertEquals(regionName, key.getRegionName()); + assertEquals(tableName, key.getTablename()); + assertEquals(row, key.getRow()); + assertEquals((byte)(i + '0'), val.getVal().get()[0]); + System.out.println(key + " " + val); + } + while (reader.next(key, val)) { + // Assert only one more row... the meta flushed row. + assertEquals(regionName, key.getRegionName()); + assertEquals(tableName, key.getTablename()); + assertEquals(HLog.METAROW, key.getRow()); + assertEquals(HLog.METACOLUMN, val.getColumn()); + assertEquals(0, val.getVal().compareTo(COMPLETE_CACHEFLUSH)); + System.out.println(key + " " + val); + } + } finally { + if (log != null) { + log.close(); + } + if (reader != null) { + reader.close(); + } + if (fs.exists(dir)) { + fs.delete(dir); + } + } + } catch(IOException e) { + e.printStackTrace(); + fail(); } } - protected void tearDown() throws Exception { + @Override + public void tearDown() throws Exception { super.tearDown(); } } \ No newline at end of file Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHMemcache.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHMemcache.java (revision 545790) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHMemcache.java (working copy) @@ -29,6 +29,7 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; +/** memcache test case */ public class TestHMemcache extends TestCase { private HMemcache hmemcache; @@ -41,6 +42,10 @@ private static final String COLUMN_FAMILY = "column"; + /* (non-Javadoc) + * @see junit.framework.TestCase#setUp() + */ + @Override protected void setUp() throws Exception { super.setUp(); @@ -55,6 +60,10 @@ "org.apache.hadoop.fs.LocalFileSystem"); } + /* (non-Javadoc) + * @see junit.framework.TestCase#tearDown() + */ + @Override protected void tearDown() throws Exception { super.tearDown(); } @@ -117,6 +126,10 @@ return s; } + /** + * Test memcache snapshots + * @throws IOException + */ public void testSnapshotting() throws IOException { final int snapshotCount = 5; final Text tableName = new Text(getName()); @@ -121,20 +134,16 @@ final int snapshotCount = 5; final Text tableName = new Text(getName()); HLog log = getLogfile(); - try { - // Add some rows, run a snapshot. Do it a few times. - for (int i = 0; i < snapshotCount; i++) { - addRows(this.hmemcache); - Snapshot s = runSnapshot(this.hmemcache, log); - log.completeCacheFlush(new Text(Integer.toString(i)), - tableName, s.sequenceId); - // Clean up snapshot now we are done with it. - this.hmemcache.deleteSnapshot(); - } - log.close(); - } finally { - log.dir.getFileSystem(this.conf).delete(log.dir); + // Add some rows, run a snapshot. Do it a few times. + for (int i = 0; i < snapshotCount; i++) { + addRows(this.hmemcache); + Snapshot s = runSnapshot(this.hmemcache, log); + log.completeCacheFlush(new Text(Integer.toString(i)), + tableName, s.sequenceId); + // Clean up snapshot now we are done with it. + this.hmemcache.deleteSnapshot(); } + log.closeAndDelete(); } private void isExpectedRow(final int rowIndex, @@ -157,7 +166,8 @@ } } - public void testGetFull() throws IOException { + /** Test getFull from memcache */ + public void testGetFull() { addRows(this.hmemcache); for (int i = 0; i < ROW_COUNT; i++) { HStoreKey hsk = new HStoreKey(getRowName(i)); @@ -166,6 +176,10 @@ } } + /** + * Test memcache scanner + * @throws IOException + */ public void testScanner() throws IOException { addRows(this.hmemcache); long timestamp = System.currentTimeMillis(); Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java (revision 545790) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java (working copy) @@ -37,7 +37,7 @@ * HRegions or in the HBaseMaster, so only basic testing is possible. */ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListener { - private Logger LOG = Logger.getLogger(this.getClass().getName()); + Logger LOG = Logger.getLogger(this.getClass().getName()); /** Constructor */ public TestHRegion() { @@ -83,10 +83,9 @@ private static FileSystem fs = null; private static Path parentdir = null; private static Path newlogdir = null; - private static Path oldlogfile = null; private static HLog log = null; private static HTableDescriptor desc = null; - private static HRegion region = null; + static HRegion region = null; private static int numInserted = 0; @@ -99,7 +98,6 @@ parentdir = new Path("/hbase"); fs.mkdirs(parentdir); newlogdir = new Path(parentdir, "log"); - oldlogfile = new Path(parentdir, "oldlogfile"); log = new HLog(fs, newlogdir, conf); desc = new HTableDescriptor("test"); @@ -106,7 +104,7 @@ desc.addFamily(new HColumnDescriptor("contents:")); desc.addFamily(new HColumnDescriptor("anchor:")); region = new HRegion(parentdir, log, fs, conf, - new HRegionInfo(1, desc, null, null), null, oldlogfile); + new HRegionInfo(1, desc, null, null), null); } // Test basic functionality. Writes to contents:basic and anchor:anchornum-* @@ -208,6 +206,7 @@ Listthreads = new ArrayList(threadCount); for (int i = 0; i < threadCount; i++) { threads.add(new Thread(Integer.toString(i)) { + @Override public void run() { long [] lockids = new long[lockCount]; // Get locks. @@ -822,7 +821,7 @@ f.delete(); } - private void cleanup() throws IOException { + private void cleanup() { // Shut down the mini cluster Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestRegionServerAbort.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestRegionServerAbort.java (revision 0) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestRegionServerAbort.java (revision 0) @@ -0,0 +1,54 @@ +/** + * Copyright 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; + +/** Tests region server failover when a region server exits cleanly */ +public class TestRegionServerAbort extends HBaseClusterTestCase { + + private HClient client; + + /** Constructor */ + public TestRegionServerAbort() { + super(2); // Start two region servers + client = new HClient(conf); + } + + /** The test */ + public void testRegionServerAbort() { + try { + // When the META table can be opened, the region servers are running + + client.openTable(HConstants.META_TABLE_NAME); + + } catch(IOException e) { + e.printStackTrace(); + fail(); + } + + // Force a region server to exit "ungracefully" + + this.cluster.abortRegionServer(0); + + try { + Thread.sleep(120000); // Wait for cluster to adjust + + } catch(InterruptedException e) { + } + } + +} Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner.java (revision 545790) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner.java (working copy) @@ -132,7 +132,9 @@ validateRegionInfo(bytes); } - /** The test! */ + /** The test! + * @throws IOException + */ public void testScanner() throws IOException { MiniDFSCluster cluster = null; FileSystem fs = null; @@ -152,7 +154,7 @@ HLog log = new HLog(fs, new Path(regionDir, "log"), conf); - region = new HRegion(dir, log, fs, conf, REGION_INFO, null, null); + region = new HRegion(dir, log, fs, conf, REGION_INFO, null); // Write information to the meta table @@ -175,7 +177,7 @@ region.close(); log.rollWriter(); - region = new HRegion(dir, log, fs, conf, REGION_INFO, null, null); + region = new HRegion(dir, log, fs, conf, REGION_INFO, null); // Verify we can get the data back now that it is on disk. @@ -216,7 +218,7 @@ region.close(); log.rollWriter(); - region = new HRegion(dir, log, fs, conf, REGION_INFO, null, null); + region = new HRegion(dir, log, fs, conf, REGION_INFO, null); // Validate again @@ -252,7 +254,7 @@ region.close(); log.rollWriter(); - region = new HRegion(dir, log, fs, conf, REGION_INFO, null, null); + region = new HRegion(dir, log, fs, conf, REGION_INFO, null); // Validate again @@ -258,6 +260,11 @@ scan(true, address.toString()); getRegionInfo(); + + // clean up + + region.close(); + log.closeAndDelete(); } catch(IOException e) { e.printStackTrace(); Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java (revision 545790) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java (working copy) @@ -1,3 +1,18 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.hbase; import java.io.IOException; @@ -43,10 +58,10 @@ List newRegions = new ArrayList(2); newRegions.add(HRegion.createHRegion( new HRegionInfo(2L, desc, null, new Text("midway")), - homedir, this.conf, null, null)); + homedir, this.conf, null)); newRegions.add(HRegion.createHRegion( new HRegionInfo(3L, desc, new Text("midway"), null), - homedir, this.conf, null, null)); + homedir, this.conf, null)); for (HRegion r: newRegions) { HRegion.addRegionToMETA(client, HConstants.META_TABLE_NAME, r, this.cluster.getHMasterAddress(), -1L); @@ -52,7 +67,7 @@ this.cluster.getHMasterAddress(), -1L); } regions = scan(client, HConstants.META_TABLE_NAME); - assertEquals("Should be two regions only", regions.size(), 2); + assertEquals("Should be two regions only", 2, regions.size()); } private List scan(final HClient client, final Text table) @@ -68,8 +83,7 @@ HMaster.METACOLUMNS, new Text()); while (true) { TreeMap results = new TreeMap(); - HStoreKey key = new HStoreKey(); - LabelledData[] values = regionServer.next(scannerId, key); + KeyedData[] values = regionServer.next(scannerId); if (values.length == 0) { break; } @@ -78,7 +92,7 @@ byte[] bytes = new byte[values[i].getData().getSize()]; System.arraycopy(values[i].getData().get(), 0, bytes, 0, bytes.length); - results.put(values[i].getLabel(), bytes); + results.put(values[i].getKey().getColumn(), bytes); } HRegionInfo info = HRegion.getRegionInfo(results); @@ -84,10 +98,9 @@ HRegionInfo info = HRegion.getRegionInfo(results); String serverName = HRegion.getServerName(results); long startCode = HRegion.getStartCode(results); - LOG.info(Thread.currentThread().getName() + " scanner: " + - Long.valueOf(scannerId) + " row: " + key + - ": regioninfo: {" + info.toString() + "}, server: " + serverName + - ", startCode: " + startCode); + LOG.info(Thread.currentThread().getName() + " scanner: " + + Long.valueOf(scannerId) + ": regioninfo: {" + info.toString() + + "}, server: " + serverName + ", startCode: " + startCode); regions.add(info); } } finally {