Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java (revision 559130) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java (working copy) @@ -47,7 +47,7 @@ public class HClient implements HConstants { final Log LOG = LogFactory.getLog(this.getClass().getName()); - private static final Text[] META_COLUMNS = { + static final Text[] META_COLUMNS = { COLUMN_FAMILY }; @@ -59,9 +59,9 @@ long pause; int numRetries; - private HMasterInterface master; + HMasterInterface master; private final Configuration conf; - private long currentLockId; + private volatile long currentLockId; private Class serverInterfaceClass; protected class BatchHandler { @@ -221,12 +221,373 @@ return result; } } + + /** encapsulates finding the servers for a table */ + protected class TableServers { + // Map tableName -> (Map startRow -> (HRegionInfo, HServerAddress) + private TreeMap> tablesToServers; + + /** constructor */ + public TableServers() { + this.tablesToServers = + new TreeMap>(); + } + + /** + * Gets the servers of the given table out of cache, or calls + * findServersForTable if there is nothing in the cache. + * + * @param tableName - the table to be located + * @return map of startRow -> RegionLocation + * @throws IOException - if the table can not be located after retrying + */ + public synchronized SortedMap + getTableServers(Text tableName) throws IOException { + if(tableName == null || tableName.getLength() == 0) { + throw new IllegalArgumentException( + "table name cannot be null or zero length"); + } + SortedMap serverResult = + tablesToServers.get(tableName); + + if (serverResult == null ) { + if (LOG.isDebugEnabled()) { + LOG.debug("No servers for " + tableName + ". Doing a find..."); + } + // We don't know where the table is. + // Load the information from meta. + serverResult = findServersForTable(tableName); + } + return serverResult; + } + + /* + * Clears the cache of all known information about the specified table and + * locates a table by searching the META or ROOT region (as appropriate) or + * by querying the master for the location of the root region if that is the + * table requested. + * + * @param tableName - name of table to find servers for + * @return - map of first row to table info for all regions in the table + * @throws IOException + */ + private SortedMap findServersForTable(Text tableName) + throws IOException { + + // Wipe out everything we know about this table + + if (this.tablesToServers.containsKey(tableName)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Wiping out all we know of " + tableName); + } + this.tablesToServers.remove(tableName); + } + + SortedMap servers = null; + + if (tableName.equals(ROOT_TABLE_NAME)) { + servers = locateRootRegion(); + + } else if (tableName.equals(META_TABLE_NAME)) { + if (tablesToServers.get(ROOT_TABLE_NAME) == null) { + findServersForTable(ROOT_TABLE_NAME); + } + for (int tries = 0; tries < numRetries; tries++) { + try { + servers = loadMetaFromRoot(); + break; + + } catch (IOException e) { + if (tries < numRetries - 1) { + findServersForTable(ROOT_TABLE_NAME); + continue; + } + throw e; + } + } + } else { + for (int tries = 0; tries < numRetries; tries++) { + boolean success = true; // assume this works + + SortedMap metaServers = + this.tablesToServers.get(META_TABLE_NAME); + if (metaServers == null) { + metaServers = findServersForTable(META_TABLE_NAME); + } + Text firstMetaRegion = metaServers.headMap(tableName).lastKey(); + metaServers = metaServers.tailMap(firstMetaRegion); + + servers = new TreeMap(); + for (RegionLocation t: metaServers.values()) { + try { + servers.putAll(scanOneMetaRegion(t, tableName)); + + } catch (IOException e) { + e.printStackTrace(); + if(tries < numRetries - 1) { + findServersForTable(META_TABLE_NAME); + success = false; + break; + } + throw e; + } + } + if (success) { + break; + } + } + } + this.tablesToServers.put(tableName, servers); + if (LOG.isDebugEnabled()) { + if(servers != null) { + for (Map.Entry e: servers.entrySet()) { + LOG.debug("Server " + e.getKey() + " is serving: " + e.getValue() + + " for table " + tableName); + } + } + } + return servers; + } + + /* + * Load the meta table from the root table. + * + * @return map of first row to TableInfo for all meta regions + * @throws IOException + */ + private TreeMap loadMetaFromRoot() throws IOException { + SortedMap rootRegion = + this.tablesToServers.get(ROOT_TABLE_NAME); + return scanOneMetaRegion(rootRegion.get(rootRegion.firstKey()), META_TABLE_NAME); + } + + /* + * Repeatedly try to find the root region by asking the master for where it is + * @return TreeMap for root regin if found + * @throws NoServerForRegionException - if the root region can not be located + * after retrying + * @throws IOException + */ + private TreeMap locateRootRegion() throws IOException { + checkMaster(); + + HServerAddress rootRegionLocation = null; + for(int tries = 0; tries < numRetries; tries++) { + int localTimeouts = 0; + while(rootRegionLocation == null && localTimeouts < numRetries) { + rootRegionLocation = master.findRootRegion(); + if(rootRegionLocation == null) { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Sleeping. Waiting for root region."); + } + Thread.sleep(pause); + if (LOG.isDebugEnabled()) { + LOG.debug("Wake. Retry finding root region."); + } + } catch(InterruptedException iex) { + // continue + } + localTimeouts++; + } + } + + if(rootRegionLocation == null) { + throw new NoServerForRegionException( + "Timed out trying to locate root region"); + } + + HRegionInterface rootRegion = getHRegionConnection(rootRegionLocation); + + try { + rootRegion.getRegionInfo(HGlobals.rootRegionInfo.regionName); + break; + } catch(IOException e) { + if(tries == numRetries - 1) { + // Don't bother sleeping. We've run out of retries. + if(e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + throw e; + } + + // Sleep and retry finding root region. + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Root region location changed. Sleeping."); + } + Thread.sleep(pause); + if (LOG.isDebugEnabled()) { + LOG.debug("Wake. Retry finding root region."); + } + } catch(InterruptedException iex) { + // continue + } + } + rootRegionLocation = null; + } + + if (rootRegionLocation == null) { + throw new NoServerForRegionException( + "unable to locate root region server"); + } + + TreeMap rootServer = new TreeMap(); + rootServer.put(EMPTY_START_ROW, + new RegionLocation(HGlobals.rootRegionInfo, rootRegionLocation)); + + return rootServer; + } + + /* + * Scans a single meta region + * @param t the meta region we're going to scan + * @param tableName the name of the table we're looking for + * @return returns a map of startingRow to TableInfo + * @throws RegionNotFoundException - if table does not exist + * @throws IllegalStateException - if table is offline + * @throws NoServerForRegionException - if table can not be found after retrying + * @throws IOException + */ + private TreeMap scanOneMetaRegion(final RegionLocation t, + final Text tableName) throws IOException { + HRegionInterface server = getHRegionConnection(t.serverAddress); + TreeMap servers = new TreeMap(); + for(int tries = 0; servers.size() == 0 && tries < numRetries; tries++) { + + long scannerId = -1L; + try { + scannerId = + server.openScanner(t.regionInfo.regionName, META_COLUMNS, tableName, + System.currentTimeMillis(), null); + + DataInputBuffer inbuf = new DataInputBuffer(); + while(true) { + HRegionInfo regionInfo = null; + String serverAddress = null; + KeyedData[] values = server.next(scannerId); + if(values.length == 0) { + if(servers.size() == 0) { + // If we didn't find any servers then the table does not exist + throw new TableNotFoundException("table '" + tableName + + "' does not exist in " + t); + } + + // We found at least one server for the table and now we're done. + if (LOG.isDebugEnabled()) { + LOG.debug("Found " + servers.size() + " server(s) for " + + "location: " + t + " for tablename " + tableName); + } + break; + } + + byte[] bytes = null; + TreeMap results = new TreeMap(); + for(int i = 0; i < values.length; i++) { + results.put(values[i].getKey().getColumn(), values[i].getData()); + } + regionInfo = new HRegionInfo(); + bytes = results.get(COL_REGIONINFO); + inbuf.reset(bytes, bytes.length); + regionInfo.readFields(inbuf); + + if(!regionInfo.tableDesc.getName().equals(tableName)) { + // We're done + if (LOG.isDebugEnabled()) { + LOG.debug("Found " + servers.size() + " servers for table " + + tableName); + } + break; + } + + if(regionInfo.offLine) { + throw new IllegalStateException("table offline: " + tableName); + } + + bytes = results.get(COL_SERVER); + if(bytes == null || bytes.length == 0) { + // We need to rescan because the table we want is unassigned. + if(LOG.isDebugEnabled()) { + LOG.debug("no server address for " + regionInfo.toString()); + } + servers.clear(); + break; + } + serverAddress = new String(bytes, UTF8_ENCODING); + servers.put(regionInfo.startKey, + new RegionLocation(regionInfo, new HServerAddress(serverAddress))); + } + } catch (IOException e) { + if(tries == numRetries - 1) { // no retries left + if(e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + throw e; + } + + } finally { + if(scannerId != -1L) { + try { + server.close(scannerId); + } catch(Exception ex) { + LOG.warn(ex); + } + } + } + + if(servers.size() == 0 && tries == numRetries - 1) { + throw new NoServerForRegionException("failed to find server for " + + tableName + " after " + numRetries + " retries"); + } + + if (servers.size() <= 0) { + // The table is not yet being served. Sleep and retry. + if (LOG.isDebugEnabled()) { + LOG.debug("Sleeping. Table " + tableName + + " not currently being served."); + } + try { + Thread.sleep(pause); + } catch (InterruptedException ie) { + // continue + } + if (LOG.isDebugEnabled()) { + LOG.debug("Wake. Retry finding table " + tableName); + } + } + } + return servers; + } + + /** + * Reloads servers for the specified table. + * @param tableName name of table whose servers are to be reloaded + * @return map of start key -> RegionLocation + * @throws IOException + */ + public synchronized SortedMap + reloadTableServers(final Text tableName) + throws IOException { + // Reload information for the whole table + SortedMap servers = findServersForTable(tableName); + + if (LOG.isDebugEnabled()) { + LOG.debug("Result of findTable: " + servers.toString()); + } + + if (tablesToServers.get(tableName) == null) { + throw new TableNotFoundException(tableName.toString()); + } + + return servers; + } + + } - // Map tableName -> (Map startRow -> (HRegionInfo, HServerAddress) - private TreeMap> tablesToServers; + protected TableServers tableServers; - // For the "current" table: Map startRow -> (HRegionInfo, HServerAddress) - SortedMap tableServers; + // For the "current" table: Map startRow -> RegionLocation + SortedMap currentTableServers; // Known region HServerAddress.toString() -> HRegionInterface private TreeMap servers; @@ -252,8 +613,8 @@ this.numRetries = conf.getInt("hbase.client.retries.number", 5); this.master = null; - this.tablesToServers = new TreeMap>(); - this.tableServers = null; + this.tableServers = new TableServers(); + this.currentTableServers = null; this.servers = new TreeMap(); // For row mutation operations @@ -263,25 +624,7 @@ this.rand = new Random(); } - /** - * @param tableName Table to check. - * @return True if table exists already. - * @throws IOException - */ - public boolean tableExists(final Text tableName) throws IOException { - HTableDescriptor [] tables = listTables(); - boolean result = false; - for (int i = 0; i < tables.length; i++) { - if (tables[i].getName().equals(tableName)) { - result = true; - break; - } - } - return result; - } - - /* Find the address of the master and connect to it - */ + /* Find the address of the master and connect to it */ protected void checkMaster() throws MasterNotRunningException { if (this.master != null) { return; @@ -323,10 +666,6 @@ } } - ////////////////////////////////////////////////////////////////////////////// - // Administrative methods - ////////////////////////////////////////////////////////////////////////////// - /** * @return - true if the master server is running */ @@ -341,8 +680,85 @@ } return true; } + + /** + * Reloads the cached server information for the current table + * + * @param info RegionInfo for a region that is a part of the table + * @throws IOException + */ + protected synchronized void reloadCurrentTable(RegionLocation info) + throws IOException { + this.currentTableServers = tableServers.reloadTableServers( + info.getRegionInfo().getTableDesc().getName()); + } /** + * Find region location hosting passed row using cached info + * @param row Row to find. + * @return Location of row. + */ + protected synchronized RegionLocation getRegionLocation(Text row) { + if(this.currentTableServers == null) { + throw new IllegalStateException("Must open table first"); + } + + // Only one server will have the row we are looking for + Text serverKey = (this.currentTableServers.containsKey(row))? row: + this.currentTableServers.headMap(row).lastKey(); + return this.currentTableServers.get(serverKey); + } + + /** + * Establishes a connection to the region server at the specified address. + * @param regionServer - the server to connect to + * @throws IOException + */ + protected synchronized HRegionInterface getHRegionConnection ( + HServerAddress regionServer) throws IOException { + + getRegionServerInterface(); + + // See if we already have a connection + HRegionInterface server = this.servers.get(regionServer.toString()); + + if (server == null) { // Get a connection + long versionId = 0; + try { + versionId = + serverInterfaceClass.getDeclaredField("versionID").getLong(server); + + } catch (IllegalAccessException e) { + // Should never happen unless visibility of versionID changes + throw new UnsupportedOperationException( + "Unable to open a connection to a " + serverInterfaceClass.getName() + + " server.", e); + + } catch (NoSuchFieldException e) { + // Should never happen unless versionID field name changes in HRegionInterface + throw new UnsupportedOperationException( + "Unable to open a connection to a " + serverInterfaceClass.getName() + + " server.", e); + } + + try { + server = (HRegionInterface) RPC.waitForProxy(serverInterfaceClass, + versionId, regionServer.getInetSocketAddress(), this.conf); + + } catch (RemoteException e) { + throw RemoteExceptionHandler.decodeRemoteException(e); + } + + this.servers.put(regionServer.toString(), server); + } + return server; + } + + // + // Administrative methods + // + + /** * Creates a new table * * @param desc table descriptor for table @@ -359,17 +775,8 @@ throws IOException { createTableAsync(desc); - // Save the current table - SortedMap oldServers = this.tableServers; - try { - // Wait for new table to come on-line - findServersForTable(desc.getName()); - } finally { - if(oldServers != null && oldServers.size() != 0) { - // Restore old current table if there was one - this.tableServers = oldServers; - } - } + // Wait for new table to come on-line + tableServers.getTableServers(desc.getName()); } /** @@ -469,44 +876,6 @@ } /** - * Add a column to an existing table - * - * @param tableName name of the table to add column to - * @param column column descriptor of column to be added - * @throws IOException - */ - public synchronized void addColumn(Text tableName, HColumnDescriptor column) - throws IOException { - checkReservedTableName(tableName); - checkMaster(); - try { - this.master.addColumn(tableName, column); - - } catch (RemoteException e) { - throw RemoteExceptionHandler.decodeRemoteException(e); - } - } - - /** - * Delete a column from a table - * - * @param tableName name of table - * @param columnName name of column to be deleted - * @throws IOException - */ - public synchronized void deleteColumn(Text tableName, Text columnName) - throws IOException { - checkReservedTableName(tableName); - checkMaster(); - try { - this.master.deleteColumn(tableName, columnName); - - } catch(RemoteException e) { - throw RemoteExceptionHandler.decodeRemoteException(e); - } - } - - /** * Brings a table on-line (enables it) * * @param tableName name of the table @@ -649,41 +1018,96 @@ break; } } - if(disabled) { - break; - } - - } catch(IOException e) { - if(tries == numRetries - 1) { // no more retries - if(e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; - } - - } finally { - if(scannerId != -1L) { - try { - server.close(scannerId); - - } catch(Exception e) { - LOG.warn(e); - } - } - } - if(LOG.isDebugEnabled()) { - LOG.debug("Sleep. Waiting for first region to be disabled from " + tableName); - } - try { - Thread.sleep(pause); - } catch(InterruptedException e) { - // continue - } - if(LOG.isDebugEnabled()) { - LOG.debug("Wake. Waiting for first region to be disabled from " + tableName); - } + if(disabled) { + break; + } + + } catch(IOException e) { + if(tries == numRetries - 1) { // no more retries + if(e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + throw e; + } + + } finally { + if(scannerId != -1L) { + try { + server.close(scannerId); + + } catch(Exception e) { + LOG.warn(e); + } + } + } + if(LOG.isDebugEnabled()) { + LOG.debug("Sleep. Waiting for first region to be disabled from " + tableName); + } + try { + Thread.sleep(pause); + } catch(InterruptedException e) { + // continue + } + if(LOG.isDebugEnabled()) { + LOG.debug("Wake. Waiting for first region to be disabled from " + tableName); + } + } + LOG.info("Disabled table " + tableName); + } + + /** + * @param tableName Table to check. + * @return True if table exists already. + * @throws IOException + */ + public boolean tableExists(final Text tableName) throws IOException { + HTableDescriptor [] tables = listTables(); + boolean result = false; + for (int i = 0; i < tables.length; i++) { + if (tables[i].getName().equals(tableName)) { + result = true; + break; + } + } + return result; + } + + /** + * Add a column to an existing table + * + * @param tableName name of the table to add column to + * @param column column descriptor of column to be added + * @throws IOException + */ + public synchronized void addColumn(Text tableName, HColumnDescriptor column) + throws IOException { + checkReservedTableName(tableName); + checkMaster(); + try { + this.master.addColumn(tableName, column); + + } catch (RemoteException e) { + throw RemoteExceptionHandler.decodeRemoteException(e); + } + } + + /** + * Delete a column from a table + * + * @param tableName name of table + * @param columnName name of column to be deleted + * @throws IOException + */ + public synchronized void deleteColumn(Text tableName, Text columnName) + throws IOException { + checkReservedTableName(tableName); + checkMaster(); + try { + this.master.deleteColumn(tableName, columnName); + + } catch(RemoteException e) { + throw RemoteExceptionHandler.decodeRemoteException(e); } - LOG.info("Disabled table " + tableName); } /** @@ -712,9 +1136,13 @@ } } - private RegionLocation getFirstMetaServerForTable(Text tableName) throws IOException { - SortedMap metaservers = findMetaServersForTable(tableName); - return metaservers.get(metaservers.firstKey()); + private RegionLocation getFirstMetaServerForTable(Text tableName) + throws IOException { + SortedMap metaservers = + tableServers.getTableServers(META_TABLE_NAME); + + return metaservers.get((metaservers.containsKey(tableName)) ? + tableName : metaservers.headMap(tableName).lastKey()); } ////////////////////////////////////////////////////////////////////////////// @@ -734,7 +1162,7 @@ if(this.currentLockId != -1 || batch != null) { throw new IllegalStateException("update in progress"); } - this.tableServers = getTableServers(tableName); + this.currentTableServers = tableServers.getTableServers(tableName); } /** @@ -742,13 +1170,13 @@ * @return Array of region starting row keys */ public synchronized Text[] getStartKeys() { - if(this.tableServers == null) { + if(this.currentTableServers == null) { throw new IllegalStateException("Must open table first"); } - Text[] keys = new Text[tableServers.size()]; + Text[] keys = new Text[currentTableServers.size()]; int i = 0; - for(Text key: tableServers.keySet()){ + for(Text key: currentTableServers.keySet()){ keys[i++] = key; } return keys; @@ -755,349 +1183,6 @@ } /** - * Gets the servers of the given table. - * - * @param tableName - the table to be located - * @throws IOException - if the table can not be located after retrying - */ - protected synchronized SortedMap getTableServers(Text tableName) throws IOException { - if(tableName == null || tableName.getLength() == 0) { - throw new IllegalArgumentException("table name cannot be null or zero length"); - } - SortedMap serverResult = tablesToServers.get(tableName); - if (serverResult == null ) { - if (LOG.isDebugEnabled()) { - LOG.debug("No servers for " + tableName + ". Doing a find..."); - } - // We don't know where the table is. - // Load the information from meta. - serverResult = findServersForTable(tableName); - } - return serverResult; - } - - /* - * Locates a table by searching the META region - * - * @param tableName - name of table to find servers for - * @return - map of first row to table info for all regions in the table - * @throws IOException - */ - private SortedMap findServersForTable(Text tableName) - throws IOException { - SortedMap servers = null; - if(tableName.equals(ROOT_TABLE_NAME)) { - servers = locateRootRegion(); - } else if(tableName.equals(META_TABLE_NAME)) { - servers = loadMetaFromRoot(); - } else { - servers = new TreeMap(); - for(RegionLocation t: findMetaServersForTable(tableName).values()) { - servers.putAll(scanOneMetaRegion(t, tableName)); - } - this.tablesToServers.put(tableName, servers); - } - if (LOG.isDebugEnabled()) { - for (Map.Entry e: servers.entrySet()) { - LOG.debug("Server " + e.getKey() + " is serving: " + e.getValue()); - } - } - return servers; - } - - /* - * Finds the meta servers that contain information about the specified table - * @param tableName - the name of the table to get information about - * @return - returns a SortedMap of the meta servers - * @throws IOException - */ - private SortedMap findMetaServersForTable(final Text tableName) - throws IOException { - SortedMap metaServers = - this.tablesToServers.get(META_TABLE_NAME); - if(metaServers == null) { // Don't know where the meta is - metaServers = loadMetaFromRoot(); - } - Text firstMetaRegion = (metaServers.containsKey(tableName)) ? - tableName : metaServers.headMap(tableName).lastKey(); - return metaServers.tailMap(firstMetaRegion); - } - - /* - * Load the meta table from the root table. - * - * @return map of first row to TableInfo for all meta regions - * @throws IOException - */ - private TreeMap loadMetaFromRoot() throws IOException { - SortedMap rootRegion = - this.tablesToServers.get(ROOT_TABLE_NAME); - if(rootRegion == null) { - rootRegion = locateRootRegion(); - } - return scanRoot(rootRegion.get(rootRegion.firstKey())); - } - - /* - * Repeatedly try to find the root region by asking the master for where it is - * @return TreeMap for root regin if found - * @throws NoServerForRegionException - if the root region can not be located - * after retrying - * @throws IOException - */ - private TreeMap locateRootRegion() throws IOException { - checkMaster(); - - HServerAddress rootRegionLocation = null; - for(int tries = 0; tries < numRetries; tries++) { - int localTimeouts = 0; - while(rootRegionLocation == null && localTimeouts < numRetries) { - rootRegionLocation = master.findRootRegion(); - if(rootRegionLocation == null) { - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Sleeping. Waiting for root region."); - } - Thread.sleep(this.pause); - if (LOG.isDebugEnabled()) { - LOG.debug("Wake. Retry finding root region."); - } - } catch(InterruptedException iex) { - // continue - } - localTimeouts++; - } - } - - if(rootRegionLocation == null) { - throw new NoServerForRegionException( - "Timed out trying to locate root region"); - } - - HRegionInterface rootRegion = getHRegionConnection(rootRegionLocation); - - try { - rootRegion.getRegionInfo(HGlobals.rootRegionInfo.regionName); - break; - } catch(IOException e) { - if(tries == numRetries - 1) { - // Don't bother sleeping. We've run out of retries. - if(e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; - } - - // Sleep and retry finding root region. - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Root region location changed. Sleeping."); - } - Thread.sleep(this.pause); - if (LOG.isDebugEnabled()) { - LOG.debug("Wake. Retry finding root region."); - } - } catch(InterruptedException iex) { - // continue - } - } - rootRegionLocation = null; - } - - if (rootRegionLocation == null) { - throw new NoServerForRegionException( - "unable to locate root region server"); - } - - TreeMap rootServer = new TreeMap(); - rootServer.put(EMPTY_START_ROW, - new RegionLocation(HGlobals.rootRegionInfo, rootRegionLocation)); - - this.tablesToServers.put(ROOT_TABLE_NAME, rootServer); - return rootServer; - } - - /* - * Scans the root region to find all the meta regions - * @return - TreeMap of meta region servers - * @throws IOException - */ - private TreeMap scanRoot(RegionLocation rootRegion) - throws IOException { - TreeMap metaservers = - scanOneMetaRegion(rootRegion, META_TABLE_NAME); - this.tablesToServers.put(META_TABLE_NAME, metaservers); - return metaservers; - } - - /* - * Scans a single meta region - * @param t the meta region we're going to scan - * @param tableName the name of the table we're looking for - * @return returns a map of startingRow to TableInfo - * @throws RegionNotFoundException - if table does not exist - * @throws IllegalStateException - if table is offline - * @throws NoServerForRegionException - if table can not be found after retrying - * @throws IOException - */ - private TreeMap scanOneMetaRegion(final RegionLocation t, - final Text tableName) - throws IOException { - HRegionInterface server = getHRegionConnection(t.serverAddress); - TreeMap servers = new TreeMap(); - for(int tries = 0; servers.size() == 0 && tries < this.numRetries; - tries++) { - - long scannerId = -1L; - try { - scannerId = - server.openScanner(t.regionInfo.regionName, META_COLUMNS, tableName, - System.currentTimeMillis(), null); - - DataInputBuffer inbuf = new DataInputBuffer(); - while(true) { - HRegionInfo regionInfo = null; - String serverAddress = null; - KeyedData[] values = server.next(scannerId); - if(values.length == 0) { - if(servers.size() == 0) { - // If we didn't find any servers then the table does not exist - throw new RegionNotFoundException("table '" + tableName + - "' does not exist in " + t); - } - - // We found at least one server for the table and now we're done. - if (LOG.isDebugEnabled()) { - LOG.debug("Found " + servers.size() + " server(s) for " + - "location: " + t + " for tablename " + tableName); - } - break; - } - - byte[] bytes = null; - TreeMap results = new TreeMap(); - for(int i = 0; i < values.length; i++) { - results.put(values[i].getKey().getColumn(), values[i].getData()); - } - regionInfo = new HRegionInfo(); - bytes = results.get(COL_REGIONINFO); - inbuf.reset(bytes, bytes.length); - regionInfo.readFields(inbuf); - - if(!regionInfo.tableDesc.getName().equals(tableName)) { - // We're done - if (LOG.isDebugEnabled()) { - LOG.debug("Found " + servers.size() + " servers for table " + - tableName); - } - break; - } - - if(regionInfo.offLine) { - throw new IllegalStateException("table offline: " + tableName); - } - - bytes = results.get(COL_SERVER); - if(bytes == null || bytes.length == 0) { - // We need to rescan because the table we want is unassigned. - if(LOG.isDebugEnabled()) { - LOG.debug("no server address for " + regionInfo.toString()); - } - servers.clear(); - break; - } - serverAddress = new String(bytes, UTF8_ENCODING); - servers.put(regionInfo.startKey, - new RegionLocation(regionInfo, new HServerAddress(serverAddress))); - } - } catch (IOException e) { - if(tries == numRetries - 1) { // no retries left - if(e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; - } - - } finally { - if(scannerId != -1L) { - try { - server.close(scannerId); - } catch(Exception ex) { - LOG.warn(ex); - } - } - } - - if(servers.size() == 0 && tries == this.numRetries - 1) { - throw new NoServerForRegionException("failed to find server for " - + tableName + " after " + this.numRetries + " retries"); - } - - if (servers.size() <= 0) { - // The table is not yet being served. Sleep and retry. - if (LOG.isDebugEnabled()) { - LOG.debug("Sleeping. Table " + tableName + - " not currently being served."); - } - try { - Thread.sleep(this.pause); - } catch (InterruptedException ie) { - // continue - } - if (LOG.isDebugEnabled()) { - LOG.debug("Wake. Retry finding table " + tableName); - } - } - } - return servers; - } - - /** - * Establishes a connection to the region server at the specified address. - * @param regionServer - the server to connect to - * @throws IOException - */ - protected synchronized HRegionInterface getHRegionConnection ( - HServerAddress regionServer) throws IOException { - - getRegionServerInterface(); - - // See if we already have a connection - HRegionInterface server = this.servers.get(regionServer.toString()); - - if (server == null) { // Get a connection - long versionId = 0; - try { - versionId = - serverInterfaceClass.getDeclaredField("versionID").getLong(server); - - } catch (IllegalAccessException e) { - // Should never happen unless visibility of versionID changes - throw new UnsupportedOperationException( - "Unable to open a connection to a " + serverInterfaceClass.getName() - + " server.", e); - - } catch (NoSuchFieldException e) { - // Should never happen unless versionID field name changes in HRegionInterface - throw new UnsupportedOperationException( - "Unable to open a connection to a " + serverInterfaceClass.getName() - + " server.", e); - } - - try { - server = (HRegionInterface) RPC.waitForProxy(serverInterfaceClass, - versionId, regionServer.getInetSocketAddress(), this.conf); - - } catch (RemoteException e) { - throw RemoteExceptionHandler.decodeRemoteException(e); - } - - this.servers.put(regionServer.toString(), server); - } - return server; - } - - /** * List all the userspace tables. In other words, scan the META table. * * If we wanted this to be really fast, we could implement a special @@ -1112,12 +1197,7 @@ TreeSet uniqueTables = new TreeSet(); SortedMap metaTables = - this.tablesToServers.get(META_TABLE_NAME); - - if(metaTables == null) { - // Meta is not loaded yet so go do that - metaTables = loadMetaFromRoot(); - } + tableServers.getTableServers(META_TABLE_NAME); for (RegionLocation t: metaTables.values()) { HRegionInterface server = getHRegionConnection(t.serverAddress); @@ -1157,46 +1237,6 @@ return uniqueTables.toArray(new HTableDescriptor[uniqueTables.size()]); } - /* - * Find region location hosting passed row using cached info - * @param row Row to find. - * @return Location of row. - */ - protected synchronized RegionLocation getRegionLocation(Text row) { - if(this.tableServers == null) { - throw new IllegalStateException("Must open table first"); - } - - // Only one server will have the row we are looking for - Text serverKey = (this.tableServers.containsKey(row))? row: - this.tableServers.headMap(row).lastKey(); - return this.tableServers.get(serverKey); - } - - /* - * Clear caches of passed region location, reload servers for the passed - * region's table and then ensure region location can be found. - * @param info Region location to find. - * @throws IOException - */ - synchronized void findRegion(final RegionLocation info) throws IOException { - // Wipe out everything we know about this table - if (LOG.isDebugEnabled()) { - LOG.debug("Wiping out all we know of " + info); - } - this.tablesToServers.remove(info.regionInfo.tableDesc.getName()); - this.tableServers.clear(); - - // Reload information for the whole table - this.tableServers = findServersForTable(info.regionInfo.tableDesc.getName()); - if (LOG.isDebugEnabled()) { - LOG.debug("Result of findRegion: " + this.tableServers.toString()); - } - if (this.tableServers.get(info.regionInfo.startKey) == null) { - throw new RegionNotFoundException(info.regionInfo.regionName.toString()); - } - } - /** * Get a single value for the specified row and column * @@ -1221,7 +1261,7 @@ } throw e; } - findRegion(info); + reloadCurrentTable(info); } try { Thread.sleep(this.pause); @@ -1260,7 +1300,7 @@ } throw e; } - findRegion(info); + reloadCurrentTable(info); } try { Thread.sleep(this.pause); @@ -1309,7 +1349,7 @@ } throw e; } - findRegion(info); + reloadCurrentTable(info); } try { Thread.sleep(this.pause); @@ -1354,7 +1394,7 @@ } throw e; } - findRegion(info); + reloadCurrentTable(info); } try { Thread.sleep(this.pause); @@ -1430,7 +1470,7 @@ public synchronized HScannerInterface obtainScanner(Text[] columns, Text startRow, long timestamp, RowFilterInterface filter) throws IOException { - if(this.tableServers == null) { + if(this.currentTableServers == null) { throw new IllegalStateException("Must open table first"); } return new ClientScanner(columns, startRow, timestamp, filter); @@ -1445,7 +1485,7 @@ * Note that in batch mode, calls to commit or abort are ignored. */ public synchronized void startBatchUpdate() { - if(this.tableServers == null) { + if(this.currentTableServers == null) { throw new IllegalStateException("Must open table first"); } @@ -1531,7 +1571,7 @@ } catch (InterruptedException ex) { } try { - findRegion(info); + reloadCurrentTable(info); } catch (IOException ex) { e = ex; @@ -1730,7 +1770,7 @@ private Text startRow; private long scanTime; private boolean closed; - private RegionLocation[] regions; + private volatile RegionLocation[] regions; @SuppressWarnings("hiding") private int currentRegion; private HRegionInterface server; @@ -1740,15 +1780,17 @@ private void loadRegions() { Text firstServer = null; if(this.startRow == null || this.startRow.getLength() == 0) { - firstServer = tableServers.firstKey(); + firstServer = currentTableServers.firstKey(); - } else if(tableServers.containsKey(startRow)) { + } else if(currentTableServers.containsKey(startRow)) { firstServer = startRow; } else { - firstServer = tableServers.headMap(startRow).lastKey(); + firstServer = currentTableServers.headMap(startRow).lastKey(); } - Collection info = tableServers.tailMap(firstServer).values(); + Collection info = + currentTableServers.tailMap(firstServer).values(); + this.regions = info.toArray(new RegionLocation[info.size()]); } @@ -1784,10 +1826,9 @@ return false; } try { - this.server = getHRegionConnection(this.regions[currentRegion].serverAddress); - for(int tries = 0; tries < numRetries; tries++) { RegionLocation info = this.regions[currentRegion]; + this.server = getHRegionConnection(this.regions[currentRegion].serverAddress); try { if (this.filter == null) { @@ -1811,7 +1852,7 @@ } throw e; } - findRegion(info); + reloadCurrentTable(info); loadRegions(); } } @@ -2011,8 +2052,8 @@ DEFAULT_REGION_SERVER_CLASS); try { - this.serverInterfaceClass = (Class) Class - .forName(serverClassName); + this.serverInterfaceClass = + (Class) Class.forName(serverClassName); } catch (ClassNotFoundException e) { throw new UnsupportedOperationException( "Unable to find region server interface " + serverClassName, e); Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java (revision 559130) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java (working copy) @@ -101,8 +101,8 @@ LOG.debug("splitting " + logfiles.length + " log files in " + srcDir.toString()); } - TreeMap logWriters = - new TreeMap(); + HashMap logWriters = + new HashMap(); try { for(int i = 0; i < logfiles.length; i++) { SequenceFile.Reader in = @@ -116,6 +116,9 @@ if (w == null) { Path logfile = new Path(HStoreFile.getHRegionDir(rootDir, regionName), HREGION_OLDLOGFILE_NAME); + if (LOG.isDebugEnabled()) { + LOG.debug("getting new log file writer for path " + logfile); + } w = SequenceFile.createWriter(fs, conf, logfile, HLogKey.class, HLogEdit.class); logWriters.put(regionName, w); @@ -205,8 +208,6 @@ // continue; } } - - // Close the current writer (if any), and grab a new one. if(writer != null) { @@ -280,7 +281,6 @@ * @throws IOException */ synchronized void closeAndDelete() throws IOException { - rollWriter(); close(); fs.delete(dir); } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLogKey.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLogKey.java (revision 559130) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLogKey.java (working copy) @@ -23,7 +23,7 @@ import java.io.*; -/******************************************************************************* +/** * A Key for an entry in the change log. * * The log intermingles edits to many tables and rows, so each log entry @@ -29,7 +29,7 @@ * The log intermingles edits to many tables and rows, so each log entry * identifies the appropriate table and row. Within a table and row, they're * also sorted. - ******************************************************************************/ + */ public class HLogKey implements WritableComparable { Text regionName = new Text(); Text tablename = new Text(); @@ -79,11 +79,17 @@ return logSeqNum; } + /** + * {@inheritDoc} + */ @Override public String toString() { - return tablename + " " + regionName + " " + row + " " + logSeqNum; + return tablename + "," + regionName + "," + row + "," + logSeqNum; } + /** + * {@inheritDoc} + */ @Override public boolean equals(Object obj) { return compareTo(obj) == 0; @@ -89,6 +95,9 @@ return compareTo(obj) == 0; } + /** + * {@inheritDoc} + */ @Override public int hashCode() { int result = this.regionName.hashCode(); @@ -97,12 +106,12 @@ return result; } - ////////////////////////////////////////////////////////////////////////////// + // // Comparable - ////////////////////////////////////////////////////////////////////////////// + // - /* (non-Javadoc) - * @see java.lang.Comparable#compareTo(java.lang.Object) + /** + * {@inheritDoc} */ public int compareTo(Object o) { HLogKey other = (HLogKey) o; @@ -124,12 +133,12 @@ return result; } - ////////////////////////////////////////////////////////////////////////////// + // // Writable - ////////////////////////////////////////////////////////////////////////////// + // - /* (non-Javadoc) - * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput) + /** + * {@inheritDoc} */ public void write(DataOutput out) throws IOException { this.regionName.write(out); @@ -138,8 +147,8 @@ out.writeLong(logSeqNum); } - /* (non-Javadoc) - * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput) + /** + * {@inheritDoc} */ public void readFields(DataInput in) throws IOException { this.regionName.readFields(in); Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (revision 559130) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (working copy) @@ -23,9 +23,9 @@ import java.io.DataOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; -import java.util.Iterator; import java.util.Map; import java.util.Random; import java.util.Set; @@ -35,10 +35,10 @@ import java.util.TimerTask; import java.util.TreeMap; import java.util.TreeSet; -import java.util.Vector; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -98,7 +98,7 @@ long metaRescanInterval; - HServerAddress rootRegionLocation; + volatile HServerAddress rootRegionLocation; /** * Columns in the 'meta' ROOT and META tables. @@ -106,10 +106,6 @@ static final Text METACOLUMNS[] = { COLUMN_FAMILY }; - - - boolean rootScanned; - int numMetaRegions; /** * Base HRegion scanner class. Holds utilty common to ROOT and @@ -152,10 +148,10 @@ * The META scanner needs to wake up: *
    *
  1. when a META region comes on line
  2. - * periodically to rescan the known META regions + * periodically to rescan the online META regions *
* - *

A META region is not 'known' until it has been scanned + *

A META region is not 'online' until it has been scanned * once. */ abstract class BaseScanner implements Runnable { @@ -160,21 +156,36 @@ */ abstract class BaseScanner implements Runnable { private final Text FIRST_ROW = new Text(); + protected boolean rootRegion; + protected abstract void initialScan(); + protected abstract void maintenanceScan(); + + /** + * {@inheritDoc} + */ + public void run() { + initialScan(); + while (!closed) { + try { + Thread.sleep(metaRescanInterval); + } catch (InterruptedException e) { + continue; + } + maintenanceScan(); + } + LOG.info(this.getClass().getSimpleName() + " exiting"); + } + /** * @param region Region to scan - * @return True if scan completed. * @throws IOException */ - protected boolean scanRegion(final MetaRegion region) - throws IOException { - boolean scannedRegion = false; + protected void scanRegion(final MetaRegion region) throws IOException { HRegionInterface regionServer = null; long scannerId = -1L; - if (LOG.isDebugEnabled()) { - LOG.debug(Thread.currentThread().getName() + " scanning meta region " + + LOG.info(Thread.currentThread().getName() + " scanning meta region " + region.regionName); - } try { regionServer = client.getHRegionConnection(region.server); @@ -181,6 +192,7 @@ scannerId = regionServer.openScanner(region.regionName, METACOLUMNS, FIRST_ROW, System.currentTimeMillis(), null); + int numberOfRegionsFound = 0; while (true) { TreeMap results = new TreeMap(); KeyedData[] values = regionServer.next(scannerId); @@ -204,21 +216,30 @@ // Note Region has been assigned. checkAssigned(info, serverName, startCode); - scannedRegion = true; + + numberOfRegionsFound += 1; + } + if(rootRegion) { + numberOfMetaRegions.set(numberOfRegionsFound); } - } catch (UnknownScannerException e) { - // Reset scannerId so we do not try closing a scanner the other side - // has lost account of: prevents duplicated stack trace out of the - // below close in the finally. - scannerId = -1L; + } catch (IOException e) { + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + + if (e instanceof UnknownScannerException) { + // Reset scannerId so we do not try closing a scanner the other side + // has lost account of: prevents duplicated stack trace out of the + // below close in the finally. + scannerId = -1L; + } + } + throw e; } finally { try { - if (scannerId != -1L) { - if (regionServer != null) { + if (scannerId != -1L && regionServer != null) { regionServer.close(scannerId); - } } } catch (IOException e) { if (e instanceof RemoteException) { @@ -227,11 +248,8 @@ LOG.error(e); } } - if (LOG.isDebugEnabled()) { - LOG.debug(Thread.currentThread().getName() + " scan of meta region " + + LOG.info(Thread.currentThread().getName() + " scan of meta region " + region.regionName + " complete"); - } - return scannedRegion; } protected void checkAssigned(final HRegionInfo info, @@ -286,6 +304,8 @@ } } + volatile boolean rootScanned; + /** * Scanner for the ROOT HRegion. */ @@ -290,36 +310,33 @@ * Scanner for the ROOT HRegion. */ class RootScanner extends BaseScanner { - /** - * {@inheritDoc} - */ - public void run() { - if (LOG.isDebugEnabled()) { - LOG.debug("Running ROOT scanner"); - } + /** Constructor */ + public RootScanner() { + rootRegion = true; + } + + private void scanRoot() { int tries = 0; - while(!closed && tries < numRetries) { - try { + while (!closed && tries < numRetries) { + while(!closed && rootRegionLocation == null) { // rootRegionLocation will be filled in when we get an 'open region' // regionServerReport message from the HRegionServer that has been - // allocated the ROOT region below. If we get back false, then - // HMaster has closed. - if (waitForRootRegionOrClose()) { - continue; + // allocated the ROOT region below. + try { + Thread.sleep(threadWakeFrequency); + } catch (InterruptedException e) { } + } + if (closed) { + continue; + } + + try { synchronized(rootScannerLock) { // Don't interrupt us while we're working - rootScanned = false; - // Make a MetaRegion instance for ROOT region to pass scanRegion. - MetaRegion mr = new MetaRegion(); - mr.regionName = HGlobals.rootRegionInfo.regionName; - mr.server = HMaster.this.rootRegionLocation; - mr.startKey = null; - if (scanRegion(mr)) { - numMetaRegions += 1; - } - rootScanned = true; + scanRegion(new MetaRegion(rootRegionLocation, + HGlobals.rootRegionInfo.regionName, null)); } - tries = 0; + break; } catch (IOException e) { if (e instanceof RemoteException) { @@ -325,29 +342,37 @@ if (e instanceof RemoteException) { try { e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - + } catch (IOException ex) { - LOG.warn(ex); + e = ex; } } - tries++; - if(tries < numRetries) { - LOG.warn("ROOT scanner", e); - + tries += 1; + if (tries == 1) { + LOG.warn(e); + } else { - LOG.error("ROOT scanner", e); - closed = true; - break; + LOG.error(e); } } - try { - Thread.sleep(metaRescanInterval); - } catch(InterruptedException e) { - // Catch and go around again. If interrupt, its spurious or we're - // being shutdown. Go back up to the while test. + if (!closed) { + try { + Thread.sleep(threadWakeFrequency); + } catch (InterruptedException e) { + } } - } - LOG.info("ROOT scanner exiting"); + } + } + + @Override + protected void initialScan() { + scanRoot(); + rootScanned = true; + } + + @Override + protected void maintenanceScan() { + scanRoot(); } } @@ -360,6 +385,12 @@ HServerAddress server; Text regionName; Text startKey; + + MetaRegion(HServerAddress server, Text regionName, Text startKey) { + this.server = server; + this.regionName = regionName; + this.startKey = startKey; + } /** * {@inheritDoc} @@ -395,13 +426,18 @@ } } + + /** Set by root scanner to indicate the number of meta regions */ + AtomicInteger numberOfMetaRegions; /** Work for the meta scanner is queued up here */ - Vector metaRegionsToScan; + BlockingQueue metaRegionsToScan; - SortedMap knownMetaRegions; - - boolean allMetaRegionsScanned; + /** These are the online meta regions */ + SortedMap onlineMetaRegions; + + /** Set by meta scanner after initial scan */ + volatile boolean initialMetaScanComplete; /** * MetaScanner META table. @@ -413,30 +449,18 @@ * action would prevent other work from getting done. */ class MetaScanner extends BaseScanner { - /** - * {@inheritDoc} - */ - @SuppressWarnings("null") - public void run() { - while (!closed) { - if (LOG.isDebugEnabled()) { - LOG.debug("Running META scanner"); - } - MetaRegion region = null; - while (region == null && !closed) { - synchronized (metaRegionsToScan) { - if (metaRegionsToScan.size() != 0) { - region = metaRegionsToScan.remove(0); - } - if (region == null) { - try { - metaRegionsToScan.wait(threadWakeFrequency); - } catch (InterruptedException e) { - // Catch and go around again. We've been woken because there - // are new meta regions available or because we are being - // shut down. - } - } + /** Constructor */ + public MetaScanner() { + rootRegion = false; + } + + private void scanOneMetaRegion(MetaRegion region) { + int tries = 0; + while (!closed && tries < numRetries) { + while (!closed && !rootScanned && rootRegionLocation == null) { + try { + Thread.sleep(threadWakeFrequency); + } catch (InterruptedException e) { } } if (closed) { @@ -442,56 +466,14 @@ if (closed) { continue; } + try { - synchronized(metaScannerLock) { // Don't interrupt us while we're working + synchronized (metaScannerLock) { + // Don't interrupt us while we're working scanRegion(region); - knownMetaRegions.put(region.startKey, region); - if (rootScanned && knownMetaRegions.size() == numMetaRegions) { - if(LOG.isDebugEnabled()) { - LOG.debug("all meta regions scanned"); - } - allMetaRegionsScanned = true; - metaRegionsScanned(); - } + onlineMetaRegions.put(region.startKey, region); } - - int tries = 0; - do { - try { - Thread.sleep(metaRescanInterval); - } catch(InterruptedException ex) { - // Catch and go around again. - } - if(!allMetaRegionsScanned // A meta region must have split - || closed) { // We're shutting down - break; - } - - try { - - // Rescan the known meta regions every so often - synchronized(metaScannerLock) { // Don't interrupt us while we're working - Vector v = new Vector(); - v.addAll(knownMetaRegions.values()); - for(Iterator i = v.iterator(); i.hasNext(); ) { - scanRegion(i.next()); - } - } - tries = 0; - - } catch (IOException e) { - if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - tries++; - if(tries < numRetries) { - LOG.warn("META scanner", e); - - } else { - throw e; - } - } - } while(true); + break; } catch (IOException e) { if (e instanceof RemoteException) { @@ -496,19 +478,58 @@ } catch (IOException e) { if (e instanceof RemoteException) { try { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - + e = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) e); + } catch (IOException ex) { - LOG.warn(ex); + e = ex; } } - LOG.error("META scanner", e); - closed = true; + tries += 1; + if (tries == 1) { + LOG.warn(e); + + } else { + LOG.error(e); + } + } + if (!closed) { + try { + Thread.sleep(threadWakeFrequency); + } catch (InterruptedException e) { + } } } - LOG.info("META scanner exiting"); } + @Override + protected void initialScan() { + MetaRegion region = null; + while (!closed && region == null && !metaRegionsScanned()) { + try { + region = + metaRegionsToScan.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + // continue + } + + if (region != null) { + scanOneMetaRegion(region); + } + } + initialMetaScanComplete = true; + } + + @Override + protected void maintenanceScan() { + ArrayList regions = new ArrayList(); + regions.addAll(onlineMetaRegions.values()); + for (MetaRegion r: regions) { + scanOneMetaRegion(r); + } + metaRegionsScanned(); + } + /** * Called by the meta scanner when it has completed scanning all meta * regions. This wakes up any threads that were waiting for this to happen. @@ -513,8 +534,13 @@ * Called by the meta scanner when it has completed scanning all meta * regions. This wakes up any threads that were waiting for this to happen. */ - private synchronized void metaRegionsScanned() { + private synchronized boolean metaRegionsScanned() { + if (!rootScanned || numberOfMetaRegions.get() != onlineMetaRegions.size()) { + return false; + } + LOG.info("all meta regions scanned"); notifyAll(); + return true; } /** @@ -521,12 +547,17 @@ * Other threads call this method to wait until all the meta regions have * been scanned. */ - synchronized boolean waitForMetaScanOrClose() { - while(!closed && !allMetaRegionsScanned) { + synchronized boolean waitForMetaRegionsOrClose() { + while (!closed) { + if (rootScanned + && numberOfMetaRegions.get() == onlineMetaRegions.size()) { + + break; + } + try { wait(threadWakeFrequency); - } catch(InterruptedException e) { - // continue + } catch (InterruptedException e) { } } return closed; @@ -664,14 +695,14 @@ // Scans the meta table - this.numMetaRegions = 0; - this.metaRegionsToScan = new Vector(); + this.numberOfMetaRegions = new AtomicInteger(); + this.metaRegionsToScan = new LinkedBlockingQueue(); - this.knownMetaRegions = + this.onlineMetaRegions = Collections.synchronizedSortedMap(new TreeMap()); - this.allMetaRegionsScanned = false; - + this.initialMetaScanComplete = false; + this.metaScanner = new MetaScanner(); this.metaScannerThread = new Thread(metaScanner, "HMaster.metaScanner"); @@ -750,7 +781,28 @@ if (LOG.isDebugEnabled()) { LOG.debug("Processing " + op.toString()); } - op.process(); + if (!op.process()) { + // Operation would have blocked because not all meta regions are + // online. This could cause a deadlock, because this thread is waiting + // for the missing meta region(s) to come back online, but since it + // is waiting, it cannot process the meta region online operation it + // is waiting for. So put this operation back on the queue for now. + + if (msgQueue.size() == 0) { + // The queue is currently empty so wait for a while to see if what + // we need comes in first + + try { + Thread.sleep(threadWakeFrequency); + } catch (InterruptedException e) { + } + } + try { + msgQueue.put(op); + } catch (InterruptedException e) { + throw new RuntimeException("Putting into msgQueue was interrupted.", e); + } + } } catch (Exception ex) { if (ex instanceof RemoteException) { @@ -837,24 +889,6 @@ } } - /** - * Wait until rootRegionLocation has been set or until the - * closed flag has been set. - * @return True if rootRegionLocation was populated. - */ - synchronized boolean waitForRootRegionOrClose() { - while (!closed && rootRegionLocation == null) { - try { - wait(threadWakeFrequency); - } catch(InterruptedException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Wake from wait for root region (or close) (IE)"); - } - } - } - return this.rootRegionLocation == null; - } - ////////////////////////////////////////////////////////////////////////////// // HMasterRegionInterface ////////////////////////////////////////////////////////////////////////////// @@ -885,7 +919,9 @@ serversToServerInfo.put(s, serverInfo); if(!closed) { long serverLabel = getServerLabel(s); - LOG.debug("Created lease for " + serverLabel); + if (LOG.isTraceEnabled()) { + LOG.trace("Created lease for " + serverLabel); + } serverLeases.createLease(serverLabel, serverLabel, new ServerExpirer(s)); } } @@ -903,20 +939,26 @@ long serverLabel = getServerLabel(s); if (msgs.length > 0 && msgs[0].getMsg() == HMsg.MSG_REPORT_EXITING) { + // HRegionServer is shutting down. Cancel the server's lease. - LOG.debug("Region server " + s + ": MSG_REPORT_EXITING"); + + LOG.info("Region server " + s + ": MSG_REPORT_EXITING"); cancelLease(s, serverLabel); - // Get all the regions the server was serving reassigned (if we - // are not shutting down). + // Get all the regions the server was serving reassigned + // (if we are not shutting down). + if (!closed) { for (int i = 1; i < msgs.length; i++) { HRegionInfo info = msgs[i].getRegionInfo(); + if (info.tableDesc.getName().equals(ROOT_TABLE_NAME)) { rootRegionLocation = null; + } else if (info.tableDesc.getName().equals(META_TABLE_NAME)) { - allMetaRegionsScanned = false; + onlineMetaRegions.remove(info.getStartKey()); } + unassignedRegions.put(info.regionName, info); assignAttempts.put(info.regionName, Long.valueOf(0L)); } @@ -931,7 +973,7 @@ // Tell server to shut down if we are shutting down. This should // happen after check of MSG_REPORT_EXITING above, since region server // will send us one of these messages after it gets MSG_REGIONSERVER_STOP - return HMsg.MSG_REGIONSERVER_STOP_IN_ARRAY; + return new HMsg[]{new HMsg(HMsg.MSG_REGIONSERVER_STOP)}; } HServerInfo storedInfo = serversToServerInfo.get(s); @@ -942,10 +984,10 @@ // The HBaseMaster may have been restarted. // Tell the RegionServer to start over and call regionServerStartup() - HMsg returnMsgs[] = new HMsg[1]; - returnMsgs[0] = new HMsg(HMsg.MSG_CALL_SERVER_STARTUP); - return returnMsgs; - } else if(storedInfo.getStartCode() != serverInfo.getStartCode()) { + + return new HMsg[]{new HMsg(HMsg.MSG_CALL_SERVER_STARTUP)}; + + } else if (storedInfo.getStartCode() != serverInfo.getStartCode()) { // This state is reachable if: // @@ -956,11 +998,12 @@ // // The answer is to ask A to shut down for good. - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("region server race condition detected: " + s); } - return HMsg.MSG_REGIONSERVER_STOP_IN_ARRAY; + return new HMsg[]{new HMsg(HMsg.MSG_REGIONSERVER_STOP)}; + } else { // All's well. Renew the server's lease. @@ -983,7 +1026,7 @@ if (serversToServerInfo.remove(serverName) != null) { // Only cancel lease once. // This method can be called a couple of times during shutdown. - LOG.debug("Cancelling lease for " + serverName); + LOG.info("Cancelling lease for " + serverName); serverLeases.cancelLease(serverLabel, serverLabel); } } @@ -990,7 +1033,7 @@ /** Process all the incoming messages from a server that's contacted us. */ private HMsg[] processMsgs(HServerInfo info, HMsg incomingMsgs[]) throws IOException { - Vector returnMsgs = new Vector(); + ArrayList returnMsgs = new ArrayList(); TreeMap regionsToKill = killList.remove(info.getServerAddress().toString()); @@ -1020,17 +1063,9 @@ returnMsgs.add(new HMsg(HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT, region)); } else { - - if(LOG.isDebugEnabled()) { - LOG.debug(info.getServerAddress().toString() + " serving " - + region.regionName); - } + LOG.info(info.getServerAddress().toString() + " serving " + + region.regionName); - // Note that it has been assigned and is waiting for the meta table - // to be updated. - - pendingRegions.add(region.regionName); - // Remove from unassigned list so we don't assign it to someone else unassignedRegions.remove(region.regionName); @@ -1041,27 +1076,14 @@ // Store the Root Region location (in memory) rootRegionLocation = new HServerAddress(info.getServerAddress()); - - // Wake up threads waiting for the root server - - rootRegionIsAvailable(); break; - - } else if(region.tableDesc.getName().equals(META_TABLE_NAME)) { - - // It's a meta region. Put it on the queue to be scanned. - - MetaRegion r = new MetaRegion(); - r.server = info.getServerAddress(); - r.regionName = region.regionName; - r.startKey = region.startKey; - - synchronized(metaRegionsToScan) { - metaRegionsToScan.add(r); - metaRegionsToScan.notifyAll(); - } } + // Note that the table has been assigned and is waiting for the meta + // table to be updated. + + pendingRegions.add(region.regionName); + // Queue up an update to note the region location. try { @@ -1073,10 +1095,8 @@ break; case HMsg.MSG_REPORT_CLOSE: - if(LOG.isDebugEnabled()) { - LOG.debug(info.getServerAddress().toString() + " no longer serving " - + region.regionName); - } + LOG.info(info.getServerAddress().toString() + " no longer serving " + + region.regionName); if(region.regionName.compareTo(HGlobals.rootRegionInfo.regionName) == 0) { // Root region rootRegionLocation = null; @@ -1111,17 +1131,27 @@ } break; - case HMsg.MSG_NEW_REGION: - if(LOG.isDebugEnabled()) { - LOG.debug("new region " + region.regionName); - } - + case HMsg.MSG_REPORT_SPLIT: // A region has split and the old server is serving the two new regions. + HRegionInfo newRegionA = incomingMsgs[++i].getRegionInfo(); + HRegionInfo newRegionB = incomingMsgs[++i].getRegionInfo(); + + LOG.info("region " + region.regionName + " split. New regions are: " + + newRegionA.regionName + ", " + newRegionB.regionName); + if(region.tableDesc.getName().equals(META_TABLE_NAME)) { // A meta region has split. - allMetaRegionsScanned = false; + onlineMetaRegions.remove(region.getStartKey()); + onlineMetaRegions.put(newRegionA.getStartKey(), + new MetaRegion(info.getServerAddress(), newRegionA.getRegionName(), + newRegionA.getStartKey())); + onlineMetaRegions.put(newRegionB.getStartKey(), + new MetaRegion(info.getServerAddress(), newRegionB.getRegionName(), + newRegionB.getStartKey())); + + numberOfMetaRegions.incrementAndGet(); } break; @@ -1127,8 +1157,9 @@ break; default: - throw new IOException("Impossible state during msg processing. Instruction: " - + incomingMsgs[i].getMsg()); + throw new IOException( + "Impossible state during msg processing. Instruction: " + + incomingMsgs[i].getMsg()); } } @@ -1153,10 +1184,8 @@ HRegionInfo regionInfo = unassignedRegions.get(curRegionName); long assignedTime = assignAttempts.get(curRegionName); if (now - assignedTime > maxRegionOpenTime) { - if(LOG.isDebugEnabled()) { - LOG.debug("assigning region " + regionInfo.regionName + " to server " - + info.getServerAddress().toString()); - } + LOG.info("assigning region " + regionInfo.regionName + " to server " + + info.getServerAddress().toString()); returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo)); @@ -1172,15 +1201,6 @@ return returnMsgs.toArray(new HMsg[returnMsgs.size()]); } - /** - * Called when the master has received a report from a region server that it - * is now serving the root region. Causes any threads waiting for the root - * region to be available to be woken up. - */ - private synchronized void rootRegionIsAvailable() { - notifyAll(); - } - ////////////////////////////////////////////////////////////////////////////// // Some internal classes to manage msg-passing and client operations ////////////////////////////////////////////////////////////////////////////// @@ -1190,13 +1210,10 @@ COLUMN_FAMILY }; protected final Text startRow = new Text(); - protected long clientId; - PendingOperation() { - this.clientId = rand.nextLong(); - } + PendingOperation() {} - abstract void process() throws IOException; + abstract boolean process() throws IOException; } /** @@ -1208,6 +1225,9 @@ private HServerAddress deadServer; private String deadServerName; private long oldStartCode; + private boolean logSplit; + private boolean rootChecked; + private boolean rootRescanned; private class ToDoEntry { boolean deleteRegion; @@ -1228,6 +1248,9 @@ this.deadServer = serverInfo.getServerAddress(); this.deadServerName = this.deadServer.toString(); this.oldStartCode = serverInfo.getStartCode(); + this.logSplit = false; + this.rootChecked = false; + this.rootRescanned = false; } /** Finds regions that the dead region server was serving */ @@ -1234,7 +1257,7 @@ private void scanMetaRegion(HRegionInterface server, long scannerId, Text regionName) throws IOException { - Vector toDoList = new Vector(); + ArrayList toDoList = new ArrayList(); TreeMap regions = new TreeMap(); DataInputBuffer inbuf = new DataInputBuffer(); @@ -1329,13 +1352,10 @@ LOG.error(e); break; } - - if(LOG.isDebugEnabled()) { - LOG.debug(serverName + " was serving " + info.toString()); - } + LOG.info(serverName + " was serving " + info.toString()); if(info.tableDesc.getName().equals(META_TABLE_NAME)) { - allMetaRegionsScanned = false; + onlineMetaRegions.remove(info.getStartKey()); } ToDoEntry todo = new ToDoEntry(row, info); @@ -1383,8 +1403,8 @@ } // Remove server from root/meta entries - for(int i = 0; i < toDoList.size(); i++) { - ToDoEntry e = toDoList.get(i); + long clientId = rand.nextLong(); + for (ToDoEntry e: toDoList) { long lockid = server.startUpdate(regionName, clientId, e.row); if(e.deleteRegion) { server.delete(regionName, clientId, lockid, COL_REGIONINFO); @@ -1400,7 +1420,7 @@ server.delete(regionName, clientId, lockid, COL_STARTCODE); server.commit(regionName, clientId, lockid, System.currentTimeMillis()); } - + // Get regions reassigned for(Map.Entry e: regions.entrySet()) { @@ -1413,69 +1433,89 @@ } @Override - void process() throws IOException { - if(LOG.isDebugEnabled()) { - LOG.debug("server shutdown: " + deadServerName); - } - - // Process the old log file - - HLog.splitLog(dir, new Path(dir, "log" + "_" + deadServer.getBindAddress() - + "_" + deadServer.getPort()), fs, conf); + boolean process() throws IOException { + LOG.info("server shutdown: " + deadServerName); + + if(!logSplit) { + // Process the old log file - if(rootRegionLocation != null - && deadServerName.equals(rootRegionLocation.toString())) { + HLog.splitLog(dir, new Path(dir, "log" + "_" + deadServer.getBindAddress() + + "_" + deadServer.getPort()), fs, conf); - rootRegionLocation = null; - unassignedRegions.put(HGlobals.rootRegionInfo.regionName, - HGlobals.rootRegionInfo); - assignAttempts.put(HGlobals.rootRegionInfo.regionName, - Long.valueOf(0L)); + logSplit = true; } - - // Scan the ROOT region - HRegionInterface server = null; - long scannerId = -1L; - for(int tries = 0; tries < numRetries; tries ++) { - if(waitForRootRegionOrClose()) {// Wait until the root region is available - return; // We're shutting down. Forget it. + if(!rootChecked) { + if(rootRegionLocation != null + && deadServer.equals(rootRegionLocation)) { + + rootRegionLocation = null; + unassignedRegions.put(HGlobals.rootRegionInfo.regionName, + HGlobals.rootRegionInfo); + assignAttempts.put(HGlobals.rootRegionInfo.regionName, + Long.valueOf(0L)); } - server = client.getHRegionConnection(rootRegionLocation); - scannerId = -1L; - - try { - LOG.debug("scanning root region"); - scannerId = server.openScanner(HGlobals.rootRegionInfo.regionName, - columns, startRow, System.currentTimeMillis(), null); - scanMetaRegion(server, scannerId, HGlobals.rootRegionInfo.regionName); - break; - - } catch (IOException e) { - if (tries == numRetries - 1) { - if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + rootChecked = true; + } + + if(!rootRescanned) { + // Scan the ROOT region + + HRegionInterface server = null; + long scannerId = -1L; + for(int tries = 0; tries < numRetries; tries ++) { + if (closed) { + return true; + } + if (rootRegionLocation == null || !rootScanned) { + // We can't proceed until the root region is online and has been + // scanned + return false; + } + server = client.getHRegionConnection(rootRegionLocation); + scannerId = -1L; + + try { + if (LOG.isDebugEnabled()) { + LOG.debug("scanning root region"); + } + scannerId = server.openScanner(HGlobals.rootRegionInfo.regionName, + columns, startRow, System.currentTimeMillis(), null); + scanMetaRegion(server, scannerId, HGlobals.rootRegionInfo.regionName); + break; + + } catch (IOException e) { + if (tries == numRetries - 1) { + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + throw e; } - throw e; } } + rootRescanned = true; } - // We can not scan every meta region if they have not already been assigned - // and scanned. - - for(int tries = 0; tries < numRetries; tries ++) { + for (int tries = 0; tries < numRetries; tries++) { try { - if(metaScanner.waitForMetaScanOrClose()) { - return; // We're shutting down. Forget it. + if (closed) { + return true; + } + if (!rootScanned + || numberOfMetaRegions.get() != onlineMetaRegions.size()) { + + // We can't proceed because not all of the meta regions are online. + // We can't block either because that would prevent the meta region + // online message from being processed. So return false to have this + // operation requeued. + + return false; } - for(Iterator i = knownMetaRegions.values().iterator(); - i.hasNext(); ) { + for (MetaRegion r: onlineMetaRegions.values()) { - server = null; - scannerId = -1L; - MetaRegion r = i.next(); + HRegionInterface server = null; + long scannerId = -1L; server = client.getHRegionConnection(r.server); @@ -1495,6 +1535,7 @@ } } } + return true; } } @@ -1529,19 +1570,9 @@ } @Override - void process() throws IOException { - for(int tries = 0; tries < numRetries; tries ++) { - - // We can not access any meta region if they have not already been assigned - // and scanned. - - if(metaScanner.waitForMetaScanOrClose()) { - return; // We're shutting down. Forget it. - } - - if(LOG.isDebugEnabled()) { - LOG.debug("region closed: " + regionInfo.regionName); - } + boolean process() throws IOException { + for (int tries = 0; tries < numRetries; tries++) { + LOG.info("region closed: " + regionInfo.regionName); // Mark the Region as unavailable in the appropriate meta table @@ -1547,21 +1578,39 @@ Text metaRegionName; HRegionInterface server; + + if (closed) { + return true; + } if (rootRegion) { - metaRegionName = HGlobals.rootRegionInfo.regionName; - if(waitForRootRegionOrClose()) {// Make sure root region available - return; // We're shutting down. Forget it. + if (rootRegionLocation == null || !rootScanned) { + // We can't proceed until the root region is online and has been + // scanned + return false; } + metaRegionName = HGlobals.rootRegionInfo.regionName; server = client.getHRegionConnection(rootRegionLocation); + onlineMetaRegions.remove(regionInfo.getStartKey()); } else { + if (!rootScanned + || numberOfMetaRegions.get() != onlineMetaRegions.size()) { + + // We can't proceed because not all of the meta regions are online. + // We can't block either because that would prevent the meta region + // online message from being processed. So return false to have this + // operation requeued. + + return false; + } + MetaRegion r = null; - if(knownMetaRegions.containsKey(regionInfo.regionName)) { - r = knownMetaRegions.get(regionInfo.regionName); + if (onlineMetaRegions.containsKey(regionInfo.getRegionName())) { + r = onlineMetaRegions.get(regionInfo.getRegionName()); } else { - r = knownMetaRegions.get( - knownMetaRegions.headMap(regionInfo.regionName).lastKey()); + r = onlineMetaRegions.get( + onlineMetaRegions.headMap(regionInfo.getRegionName()).lastKey()); } metaRegionName = r.regionName; server = client.getHRegionConnection(r.server); @@ -1567,8 +1616,11 @@ server = client.getHRegionConnection(r.server); } + long clientId = rand.nextLong(); try { - long lockid = server.startUpdate(metaRegionName, clientId, regionInfo.regionName); + long lockid = server.startUpdate(metaRegionName, clientId, + regionInfo.regionName); + if(deleteRegion) { server.delete(metaRegionName, clientId, lockid, COL_REGIONINFO); @@ -1600,9 +1652,7 @@ } if(reassignRegion) { - if(LOG.isDebugEnabled()) { - LOG.debug("reassign region: " + regionInfo.regionName); - } + LOG.info("reassign region: " + regionInfo.regionName); unassignedRegions.put(regionInfo.regionName, regionInfo); assignAttempts.put(regionInfo.regionName, Long.valueOf(0L)); @@ -1620,6 +1670,7 @@ throw e; } } + return true; } } @@ -1630,8 +1681,8 @@ */ private class PendingOpenReport extends PendingOperation { private boolean rootRegion; - private Text regionName; - private byte [] serverAddress; + private HRegionInfo region; + private HServerAddress serverAddress; private byte [] startCode; PendingOpenReport(HServerInfo info, HRegionInfo region) { @@ -1643,12 +1694,11 @@ // Just an ordinary region. Look for it in the META table. this.rootRegion = false; } - this.regionName = region.regionName; + this.region = region; + this.serverAddress = info.getServerAddress(); try { - this.serverAddress = info.getServerAddress().toString(). - getBytes(UTF8_ENCODING); - this.startCode = String.valueOf(info.getStartCode()). - getBytes(UTF8_ENCODING); + this.startCode = + String.valueOf(info.getStartCode()).getBytes(UTF8_ENCODING); } catch(UnsupportedEncodingException e) { LOG.error(e); } @@ -1655,20 +1705,10 @@ } @Override - void process() throws IOException { - for(int tries = 0; tries < numRetries; tries ++) { - - // We can not access any meta region if they have not already been assigned - // and scanned. - - if(metaScanner.waitForMetaScanOrClose()) { - return; // We're shutting down. Forget it. - } - - if(LOG.isDebugEnabled()) { - LOG.debug(regionName + " open on " - + new String(this.serverAddress, UTF8_ENCODING)); - } + boolean process() throws IOException { + for (int tries = 0; tries < numRetries; tries++) { + LOG.info(region.getRegionName() + " open on " + + this.serverAddress.toString()); // Register the newly-available Region's location. @@ -1674,21 +1714,37 @@ Text metaRegionName; HRegionInterface server; - if(rootRegion) { - metaRegionName = HGlobals.rootRegionInfo.regionName; - if(waitForRootRegionOrClose()) {// Make sure root region available - return; // We're shutting down. Forget it. + if (closed) { + return true; + } + if (rootRegion) { + if (rootRegionLocation == null || !rootScanned) { + // We can't proceed until the root region is online and has been + // scanned + return false; } + metaRegionName = HGlobals.rootRegionInfo.regionName; server = client.getHRegionConnection(rootRegionLocation); } else { + if (!rootScanned + || numberOfMetaRegions.get() != onlineMetaRegions.size()) { + + // We can't proceed because not all of the meta regions are online. + // We can't block either because that would prevent the meta region + // online message from being processed. So return false to have this + // operation requeued. + + return false; + } + MetaRegion r = null; - if(knownMetaRegions.containsKey(regionName)) { - r = knownMetaRegions.get(regionName); + if (onlineMetaRegions.containsKey(region.getRegionName())) { + r = onlineMetaRegions.get(region.getRegionName()); } else { - r = knownMetaRegions.get( - knownMetaRegions.headMap(regionName).lastKey()); + r = onlineMetaRegions.get( + onlineMetaRegions.headMap(region.getRegionName()).lastKey()); } metaRegionName = r.regionName; server = client.getHRegionConnection(r.server); @@ -1693,12 +1749,15 @@ metaRegionName = r.regionName; server = client.getHRegionConnection(r.server); } - if(LOG.isDebugEnabled()) { - LOG.debug("updating row " + regionName + " in table " + metaRegionName); - } + LOG.info("updating row " + region.getRegionName() + " in table " + + metaRegionName); + + long clientId = rand.nextLong(); try { - long lockid = server.startUpdate(metaRegionName, clientId, regionName); - server.put(metaRegionName, clientId, lockid, COL_SERVER, serverAddress); + long lockid = server.startUpdate(metaRegionName, clientId, + region.getRegionName()); + server.put(metaRegionName, clientId, lockid, COL_SERVER, + serverAddress.toString().getBytes(UTF8_ENCODING)); server.put(metaRegionName, clientId, lockid, COL_STARTCODE, startCode); server.commit(metaRegionName, clientId, lockid, System.currentTimeMillis()); @@ -1703,10 +1762,31 @@ server.commit(metaRegionName, clientId, lockid, System.currentTimeMillis()); + if (region.tableDesc.getName().equals(META_TABLE_NAME)) { + // It's a meta region. + + MetaRegion m = + new MetaRegion(serverAddress, region.regionName, region.startKey); + + if (!initialMetaScanComplete) { + // Put it on the queue to be scanned for the first time. + + try { + metaRegionsToScan.put(m); + } catch (InterruptedException e) { + throw new RuntimeException( + "Putting into metaRegionsToScan was interrupted.", e); + } + } else { + // Add it to the online meta regions + + onlineMetaRegions.put(region.startKey, m); + } + } break; } catch (IOException e) { - if(tries == numRetries - 1) { + if (tries == numRetries - 1) { if (e instanceof RemoteException) { e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); } @@ -1713,8 +1793,9 @@ throw e; } } - pendingRegions.remove(regionName); + pendingRegions.remove(region.getRegionName()); } + return true; } } @@ -1757,17 +1838,18 @@ } HRegionInfo newRegion = new HRegionInfo(rand.nextLong(), desc, null, null); - for(int tries = 0; tries < numRetries; tries++) { + for (int tries = 0; tries < numRetries; tries++) { try { // We can not access meta regions if they have not already been // assigned and scanned. If we timeout waiting, just shutdown. - if (metaScanner.waitForMetaScanOrClose()) { - return; + if (metaScanner.waitForMetaRegionsOrClose()) { + break; } - createTable(newRegion); + createTable(newRegion); + LOG.info("created table " + desc.getName()); break; } catch (IOException e) { - if(tries == numRetries - 1) { + if (tries == numRetries - 1) { if (e instanceof RemoteException) { e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); } @@ -1775,10 +1857,6 @@ } } } - - if(LOG.isDebugEnabled()) { - LOG.debug("created table " + desc.getName()); - } } /* @@ -1800,9 +1878,9 @@ // table would sit should it exist. Open scanner on it. If a region // for the table we want to create already exists, then table already // created. Throw already-exists exception. - MetaRegion m = (knownMetaRegions.containsKey(newRegion.regionName))? - knownMetaRegions.get(newRegion.regionName): - knownMetaRegions.get(knownMetaRegions. + MetaRegion m = (onlineMetaRegions.containsKey(newRegion.regionName)) ? + onlineMetaRegions.get(newRegion.regionName) : + onlineMetaRegions.get(onlineMetaRegions. headMap(newRegion.getTableDesc().getName()).lastKey()); Text metaRegionName = m.regionName; HRegionInterface connection = client.getHRegionConnection(m.server); @@ -1862,9 +1940,7 @@ */ public void deleteTable(Text tableName) throws IOException { new TableDelete(tableName).process(); - if(LOG.isDebugEnabled()) { - LOG.debug("deleted table: " + tableName); - } + LOG.info("deleted table: " + tableName); } /** @@ -1921,26 +1997,26 @@ // We can not access any meta region if they have not already been // assigned and scanned. - if(metaScanner.waitForMetaScanOrClose()) { - return; // We're shutting down. Forget it. + if (metaScanner.waitForMetaRegionsOrClose()) { + throw new MasterNotRunningException(); // We're shutting down. Forget it. } Text firstMetaRegion = null; - if(knownMetaRegions.size() == 1) { - firstMetaRegion = knownMetaRegions.firstKey(); + if (onlineMetaRegions.size() == 1) { + firstMetaRegion = onlineMetaRegions.firstKey(); - } else if(knownMetaRegions.containsKey(tableName)) { + } else if (onlineMetaRegions.containsKey(tableName)) { firstMetaRegion = tableName; } else { - firstMetaRegion = knownMetaRegions.headMap(tableName).lastKey(); + firstMetaRegion = onlineMetaRegions.headMap(tableName).lastKey(); } - this.metaRegions.addAll(knownMetaRegions.tailMap(firstMetaRegion).values()); + this.metaRegions.addAll(onlineMetaRegions.tailMap(firstMetaRegion).values()); } void process() throws IOException { - for(int tries = 0; tries < numRetries; tries++) { + for (int tries = 0; tries < numRetries; tries++) { boolean tableExists = false; try { synchronized(metaScannerLock) { // Prevent meta scanner from running @@ -2038,7 +2114,7 @@ } // synchronized(metaScannerLock) } catch (IOException e) { - if(tries == numRetries - 1) { + if (tries == numRetries - 1) { if (e instanceof RemoteException) { e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java (revision 559130) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java (working copy) @@ -42,10 +42,7 @@ /** Master tells region server to stop */ public static final byte MSG_REGIONSERVER_STOP = 5; - - public static final HMsg [] MSG_REGIONSERVER_STOP_IN_ARRAY = - {new HMsg(HMsg.MSG_REGIONSERVER_STOP)}; - + /** Stop serving the specified region and don't report back that it's closed */ public static final byte MSG_REGION_CLOSE_WITHOUT_REPORT = 6; @@ -57,10 +54,20 @@ /** region server is no longer serving the specified region */ public static final byte MSG_REPORT_CLOSE = 101; - /** region server is now serving a region produced by a region split */ - public static final byte MSG_NEW_REGION = 103; + /** + * region server split the region associated with this message. + * + * note that this message is immediately followed by two MSG_REPORT_OPEN + * messages, one for each of the new regions resulting from the split + */ + public static final byte MSG_REPORT_SPLIT = 103; - /** region server is shutting down */ + /** + * region server is shutting down + * + * note that this message is followed by MSG_REPORT_CLOSE messages for each + * region the region server was serving. + */ public static final byte MSG_REPORT_EXITING = 104; byte msg; @@ -108,6 +115,9 @@ return info; } + /** + * {@inheritDoc} + */ @Override public String toString() { StringBuilder message = new StringBuilder(); @@ -140,8 +150,8 @@ message.append("MSG_REPORT_CLOSE : "); break; - case MSG_NEW_REGION: - message.append("MSG_NEW_REGION : "); + case MSG_REPORT_SPLIT: + message.append("MSG_REGION_SPLIT : "); break; case MSG_REPORT_EXITING: @@ -162,8 +172,8 @@ // Writable ////////////////////////////////////////////////////////////////////////////// - /* (non-Javadoc) - * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput) + /** + * {@inheritDoc} */ public void write(DataOutput out) throws IOException { out.writeByte(msg); @@ -170,8 +180,8 @@ info.write(out); } - /* (non-Javadoc) - * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput) + /** + * {@inheritDoc} */ public void readFields(DataInput in) throws IOException { this.msg = in.readByte(); Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (revision 559130) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (working copy) @@ -407,6 +407,23 @@ * @throws IOException */ public Vector close() throws IOException { + return close(false); + } + + /** + * Close down this HRegion. Flush the cache unless abort parameter is true, + * Shut down each HStore, don't service any more calls. + * + * This method could take some time to execute, so don't call it from a + * time-sensitive thread. + * + * @param abort true if server is aborting (only during testing) + * @return Vector of all the storage files that the HRegion's component + * HStores make use of. It's a list of HStoreFile objects. + * + * @throws IOException + */ + Vector close(boolean abort) throws IOException { lock.obtainWriteLock(); try { boolean shouldClose = false; @@ -430,7 +447,11 @@ return null; } LOG.info("closing region " + this.regionInfo.regionName); - Vector allHStoreFiles = internalFlushcache(); + Vector allHStoreFiles = null; + if (!abort) { + // Don't flush the cache if we are aborting during a test. + allHStoreFiles = internalFlushcache(); + } for (HStore store: stores.values()) { store.close(); } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (revision 559130) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (working copy) @@ -194,7 +194,7 @@ private void split(final HRegion region, final Text midKey) throws IOException { - final Text oldRegion = region.getRegionName(); + final HRegionInfo oldRegionInfo = region.getRegionInfo(); final HRegion[] newRegions = region.closeAndSplit(midKey, this); // When a region is split, the META table needs to updated if we're @@ -204,9 +204,7 @@ final Text tableToUpdate = region.getRegionInfo().tableDesc.getName().equals(META_TABLE_NAME) ? ROOT_TABLE_NAME : META_TABLE_NAME; - if(LOG.isDebugEnabled()) { - LOG.debug("Updating " + tableToUpdate + " with region split info"); - } + LOG.info("Updating " + tableToUpdate + " with region split info"); // Remove old region from META @@ -249,11 +247,11 @@ if (LOG.isDebugEnabled()) { LOG.debug("Reporting region split to master"); } - reportSplit(newRegions[0].getRegionInfo(), newRegions[1]. - getRegionInfo()); + reportSplit(oldRegionInfo, newRegions[0].getRegionInfo(), + newRegions[1].getRegionInfo()); LOG.info("region split, META update, and report to master all" + - " successful. Old region=" + oldRegion + ", new regions: " + - newRegions[0].getRegionName() + ", " + + " successful. Old region=" + oldRegionInfo.getRegionName() + + ", new regions: " + newRegions[0].getRegionName() + ", " + newRegions[1].getRegionName()); // Finally, start serving the new regions @@ -262,6 +260,7 @@ try { onlineRegions.put(newRegions[0].getRegionName(), newRegions[0]); onlineRegions.put(newRegions[1].getRegionName(), newRegions[1]); + } finally { lock.writeLock().unlock(); } @@ -461,23 +460,19 @@ address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10), false, conf); - // Use configured nameserver & interface to get local hostname. - // 'serverInfo' is sent to master. Should have name of this host rather than - // 'localhost' or 0.0.0.0 or 127.0.0.1 in it. - String localHostname = DNS.getDefaultHost( - conf.get("dfs.datanode.dns.interface","default"), - conf.get("dfs.datanode.dns.nameserver","default")); - InetSocketAddress hostnameAddress = new InetSocketAddress(localHostname, - server.getListenerAddress().getPort()); - this.serverInfo = new HServerInfo(new HServerAddress(hostnameAddress), - this.rand.nextLong()); + // Use interface to get the 'real' IP for this host. + // 'serverInfo' is sent to master. Should have the real IP of this host + // rather than 'localhost' or 0.0.0.0 or 127.0.0.1 in it. + String realIP = DNS.getDefaultIP( + conf.get("dfs.datanode.dns.interface","default")); + + this.serverInfo = new HServerInfo(new HServerAddress( + new InetSocketAddress(realIP, server.getListenerAddress().getPort())), + this.rand.nextLong()); - // Local file paths - String serverName = localHostname + "_" + - this.serverInfo.getServerAddress().getPort(); + Path logdir = new Path(rootDir, "log" + "_" + realIP + "_" + + this.serverInfo.getServerAddress().getPort()); - Path logdir = new Path(rootDir, "log" + "_" + serverName); - // Logging this.fs = FileSystem.get(conf); if(fs.exists(logdir)) { @@ -636,54 +631,48 @@ } try { - HMsg msgs[] = hbaseMaster.regionServerReport(serverInfo, outboundArray); + HMsg msgs[] = + hbaseMaster.regionServerReport(serverInfo, outboundArray); lastMsg = System.currentTimeMillis(); + // Queue up the HMaster's instruction stream for processing - synchronized(toDo) { - boolean restart = false; - for(int i = 0; i < msgs.length && !stopRequested && !restart; i++) { - switch(msgs[i].getMsg()) { - - case HMsg.MSG_CALL_SERVER_STARTUP: - if (LOG.isDebugEnabled()) { - LOG.debug("Got call server startup message"); - } - closeAllRegions(); - restart = true; - break; - - case HMsg.MSG_REGIONSERVER_STOP: - if (LOG.isDebugEnabled()) { - LOG.debug("Got regionserver stop message"); - } - stopRequested = true; - break; - - default: - if (LOG.isDebugEnabled()) { - LOG.debug("Got default message"); - } - try { - toDo.put(new ToDoEntry(msgs[i])); - } catch (InterruptedException e) { - throw new RuntimeException("Putting into msgQueue was interrupted.", e); - } + + boolean restart = false; + for(int i = 0; i < msgs.length && !stopRequested && !restart; i++) { + switch(msgs[i].getMsg()) { + + case HMsg.MSG_CALL_SERVER_STARTUP: + if (LOG.isDebugEnabled()) { + LOG.debug("Got call server startup message"); + } + closeAllRegions(); + restart = true; + break; + + case HMsg.MSG_REGIONSERVER_STOP: + if (LOG.isDebugEnabled()) { + LOG.debug("Got regionserver stop message"); } - } - - if(restart || stopRequested) { - toDo.clear(); + stopRequested = true; break; - } - - if(toDo.size() > 0) { + + default: if (LOG.isDebugEnabled()) { - LOG.debug("notify on todo"); + LOG.debug("Got default message"); + } + try { + toDo.put(new ToDoEntry(msgs[i])); + } catch (InterruptedException e) { + throw new RuntimeException("Putting into msgQueue was interrupted.", e); } - toDo.notifyAll(); } } + if(restart || stopRequested) { + toDo.clear(); + break; + } + } catch (IOException e) { if (e instanceof RemoteException) { try { @@ -730,7 +719,7 @@ if (abortRequested) { try { - log.rollWriter(); + log.close(); } catch (IOException e) { if (e instanceof RemoteException) { try { @@ -742,10 +731,11 @@ } LOG.warn(e); } + closeAllRegions(); // Don't leave any open file handles LOG.info("aborting server at: " + serverInfo.getServerAddress().toString()); } else { - Vector closedRegions = closeAllRegions(); + ArrayList closedRegions = closeAllRegions(); try { log.closeAndDelete(); } catch (IOException e) { @@ -815,10 +805,12 @@ * updated the meta or root regions, and the master will pick that up on its * next rescan of the root or meta tables. */ - void reportSplit(HRegionInfo newRegionA, HRegionInfo newRegionB) { + void reportSplit(HRegionInfo oldRegion, HRegionInfo newRegionA, + HRegionInfo newRegionB) { synchronized(outboundMsgs) { - outboundMsgs.add(new HMsg(HMsg.MSG_NEW_REGION, newRegionA)); - outboundMsgs.add(new HMsg(HMsg.MSG_NEW_REGION, newRegionB)); + outboundMsgs.add(new HMsg(HMsg.MSG_REPORT_SPLIT, oldRegion)); + outboundMsgs.add(new HMsg(HMsg.MSG_REPORT_OPEN, newRegionA)); + outboundMsgs.add(new HMsg(HMsg.MSG_REPORT_OPEN, newRegionB)); } } @@ -859,9 +851,7 @@ continue; } try { - if (LOG.isDebugEnabled()) { - LOG.debug(e.msg.toString()); - } + LOG.info(e.msg.toString()); switch(e.msg.getMsg()) { @@ -942,8 +932,8 @@ } /** Called either when the master tells us to restart or from stop() */ - Vector closeAllRegions() { - Vector regionsToClose = new Vector(); + ArrayList closeAllRegions() { + ArrayList regionsToClose = new ArrayList(); this.lock.writeLock().lock(); try { regionsToClose.addAll(onlineRegions.values()); @@ -956,7 +946,7 @@ LOG.debug("closing region " + region.getRegionName()); } try { - region.close(); + region.close(abortRequested); LOG.debug("region closed " + region.getRegionName()); } catch (IOException e) { if (e instanceof RemoteException) { Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (revision 559130) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (working copy) @@ -265,10 +265,11 @@ || !key.getRegionName().equals(this.regionName) || !HStoreKey.extractFamily(column).equals(this.familyName)) { if (LOG.isDebugEnabled()) { - LOG.debug("Passing on edit " + key.getRegionName() + ", " - + column.toString() + ": " + new String(val.getVal()) - + ", my region: " + this.regionName + ", my column: " - + this.familyName); + LOG.debug("Passing on edit " + key.getRegionName() + ", " + + column.toString() + ": " + + new String(val.getVal(), UTF8_ENCODING) + + ", my region: " + this.regionName + ", my column: " + + this.familyName); } continue; } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/RegionNotFoundException.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/RegionNotFoundException.java (revision 559130) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/RegionNotFoundException.java (working copy) @@ -1,34 +0,0 @@ -/** - * Copyright 2007 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase; - -import java.io.IOException; - -public class RegionNotFoundException extends IOException { - private static final long serialVersionUID = 993179627856392526L; - - public RegionNotFoundException() { - super(); - } - - public RegionNotFoundException(String s) { - super(s); - } -} \ No newline at end of file Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/TableNotFoundException.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/TableNotFoundException.java (revision 558872) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/TableNotFoundException.java (working copy) @@ -0,0 +1,37 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; + +/** Thrown when a table can not be located */ +public class TableNotFoundException extends IOException { + private static final long serialVersionUID = 993179627856392526L; + + /** default constructor */ + public TableNotFoundException() { + super(); + } + + /** @param s message */ + public TableNotFoundException(String s) { + super(s); + } +} Index: src/contrib/hbase/src/test/hbase-site.xml =================================================================== --- src/contrib/hbase/src/test/hbase-site.xml (revision 559130) +++ src/contrib/hbase/src/test/hbase-site.xml (working copy) @@ -52,7 +52,7 @@ hbase.regionserver.handler.count - 3 + 5 Count of RPC Server instances spun up on RegionServers Same property is used by the HMaster for count of master handlers. Default is 10. @@ -58,4 +58,21 @@ Default is 10. + + hbase.master.lease.period + 5000 + Length of time the master will wait before timing out a region + server lease. Since region servers report in every second (see above), this + value has been reduced so that the master will notice a dead region server + sooner. The default is 30 seconds. + + + + hbase.master.lease.thread.wakefrequency + 2500 + The interval between checks for expired region server leases. + This value has been reduced due to the other reduced values above so that + the master will notice a dead region server sooner. The default is 15 seconds. + + Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (revision 559130) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (working copy) @@ -197,6 +197,16 @@ } /** + * Cause a region server to exit without cleaning up + * + * @param serverNumber + */ + public void abortRegionServer(int serverNumber) { + HRegionServer server = this.regionServers.remove(serverNumber); + server.abort(); + } + + /** * Shut down the specified region server cleanly * * @param serverNumber @@ -202,11 +212,8 @@ * @param serverNumber */ public void stopRegionServer(int serverNumber) { - if (serverNumber >= regionServers.size()) { - throw new ArrayIndexOutOfBoundsException( - "serverNumber > number of region servers"); - } - this.regionServers.get(serverNumber).stop(); + HRegionServer server = this.regionServers.remove(serverNumber); + server.stop(); } /** @@ -215,12 +222,9 @@ * @param serverNumber */ public void waitOnRegionServer(int serverNumber) { - if (serverNumber >= regionServers.size()) { - throw new ArrayIndexOutOfBoundsException( - "serverNumber > number of region servers"); - } + Thread regionServerThread = this.regionThreads.remove(serverNumber); try { - this.regionThreads.get(serverNumber).join(); + regionServerThread.join(); } catch (InterruptedException e) { e.printStackTrace(); } @@ -226,19 +230,6 @@ } } - /** - * Cause a region server to exit without cleaning up - * - * @param serverNumber - */ - public void abortRegionServer(int serverNumber) { - if(serverNumber >= this.regionServers.size()) { - throw new ArrayIndexOutOfBoundsException( - "serverNumber > number of region servers"); - } - this.regionServers.get(serverNumber).abort(); - } - /** Shut down the HBase cluster */ public void shutdown() { LOG.info("Shutting down the HBase Cluster"); Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCleanRegionServerExit.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCleanRegionServerExit.java (revision 559130) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCleanRegionServerExit.java (working copy) @@ -20,6 +20,9 @@ package org.apache.hadoop.hbase; import java.io.IOException; +import java.util.TreeMap; + +import org.apache.hadoop.io.Text; /** * Tests region server failover when a region server exits. @@ -26,7 +29,18 @@ */ public class TestCleanRegionServerExit extends HBaseClusterTestCase { private HClient client; + + /** constructor */ + public TestCleanRegionServerExit() { + super(); + conf.setInt("ipc.client.timeout", 5000); // reduce ipc client timeout + conf.setInt("ipc.client.connect.max.retries", 5); // and number of retries + conf.setInt("hbase.client.retries.number", 2); // reduce HBase retries + } + /** + * {@inheritDoc} + */ @Override public void setUp() throws Exception { super.setUp(); @@ -33,51 +47,51 @@ this.client = new HClient(conf); } - public void testCleanRegionServerExit() - throws IOException, InterruptedException { - try { - // When the META table can be opened, the region servers are running - this.client.openTable(HConstants.META_TABLE_NAME); - // Put something into the meta table. - this.client.createTable(new HTableDescriptor(getName())); - // Get current region server instance. - HRegionServer hsr = this.cluster.regionServers.get(0); - Thread hrst = this.cluster.regionThreads.get(0); - // Start up a new one to take over serving of root and meta after we shut - // down the current meta/root host. - this.cluster.startRegionServer(); - // Now shutdown the region server and wait for it to go down. - hsr.stop(); - hrst.join(); - // The recalibration of the client is not working properly. FIX. - // After above is fixed, add in assertions that we can get data from - // newly located meta table. - } catch(Exception e) { - e.printStackTrace(); - fail(); - } - } - -/* Comment out till recalibration of client is working properly. - - public void testRegionServerAbort() - throws IOException, InterruptedException { + /** + * The test + * @throws IOException + */ + public void testCleanRegionServerExit() throws IOException { // When the META table can be opened, the region servers are running this.client.openTable(HConstants.META_TABLE_NAME); // Put something into the meta table. - this.client.createTable(new HTableDescriptor(getName())); - // Get current region server instance. - HRegionServer hsr = this.cluster.regionServers.get(0); - Thread hrst = this.cluster.regionThreads.get(0); - // Start up a new one to take over serving of root and meta after we shut - // down the current meta/root host. + String tableName = getName(); + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor(HConstants.COLUMN_FAMILY.toString())); + this.client.createTable(desc); + // put some values in the table + this.client.openTable(new Text(tableName)); + Text row = new Text("row1"); + long lockid = client.startUpdate(row); + client.put(lockid, HConstants.COLUMN_FAMILY, + tableName.getBytes(HConstants.UTF8_ENCODING)); + client.commit(lockid); + // Start up a new region server to take over serving of root and meta + // after we shut down the current meta/root host. this.cluster.startRegionServer(); - // Force a region server to exit "ungracefully" - hsr.abort(); - hrst.join(); - // The recalibration of the client is not working properly. FIX. - // After above is fixed, add in assertions that we can get data from - // newly located meta table. + // Now shutdown the region server and wait for it to go down. + this.cluster.stopRegionServer(0); + this.cluster.waitOnRegionServer(0); + + // Verify that the client can find the data after the region has been moved + // to a different server + + HScannerInterface scanner = + client.obtainScanner(HConstants.COLUMN_FAMILY_ARRAY, new Text()); + + try { + HStoreKey key = new HStoreKey(); + TreeMap results = new TreeMap(); + while (scanner.next(key, results)) { + assertTrue(key.getRow().equals(row)); + assertEquals(1, results.size()); + byte[] bytes = results.get(HConstants.COLUMN_FAMILY); + assertNotNull(bytes); + assertTrue(tableName.equals(new String(bytes, HConstants.UTF8_ENCODING))); + } + System.out.println("Success!"); + } finally { + scanner.close(); + } } -*/ } Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestRegionServerAbort.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestRegionServerAbort.java (revision 0) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestRegionServerAbort.java (revision 0) @@ -0,0 +1,97 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.util.TreeMap; + +import org.apache.hadoop.io.Text; + +/** + * Tests region server failover when a region server exits. + */ +public class TestRegionServerAbort extends HBaseClusterTestCase { + private HClient client; + + /** constructor */ + public TestRegionServerAbort() { + super(); + conf.setInt("ipc.client.timeout", 5000); // reduce client timeout + conf.setInt("ipc.client.connect.max.retries", 5); // and number of retries + conf.setInt("hbase.client.retries.number", 2); // reduce HBase retries + } + + /** + * {@inheritDoc} + */ + @Override + public void setUp() throws Exception { + super.setUp(); + this.client = new HClient(conf); + } + + /** + * The test + * @throws IOException + */ + public void testRegionServerAbort() throws IOException { + // When the META table can be opened, the region servers are running + this.client.openTable(HConstants.META_TABLE_NAME); + // Put something into the meta table. + String tableName = getName(); + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor(HConstants.COLUMN_FAMILY.toString())); + this.client.createTable(desc); + // put some values in the table + this.client.openTable(new Text(tableName)); + Text row = new Text("row1"); + long lockid = client.startUpdate(row); + client.put(lockid, HConstants.COLUMN_FAMILY, + tableName.getBytes(HConstants.UTF8_ENCODING)); + client.commit(lockid); + // Start up a new region server to take over serving of root and meta + // after we shut down the current meta/root host. + this.cluster.startRegionServer(); + // Now shutdown the region server and wait for it to go down. + this.cluster.abortRegionServer(0); + this.cluster.waitOnRegionServer(0); + + // Verify that the client can find the data after the region has been moved + // to a different server + + HScannerInterface scanner = + client.obtainScanner(HConstants.COLUMN_FAMILY_ARRAY, new Text()); + + try { + HStoreKey key = new HStoreKey(); + TreeMap results = new TreeMap(); + while (scanner.next(key, results)) { + assertTrue(key.getRow().equals(row)); + assertEquals(1, results.size()); + byte[] bytes = results.get(HConstants.COLUMN_FAMILY); + assertNotNull(bytes); + assertTrue(tableName.equals(new String(bytes, HConstants.UTF8_ENCODING))); + } + System.out.println("Success!"); + } finally { + scanner.close(); + } + } +}