Index: src/contrib/hbase/conf/hbase-default.xml =================================================================== --- src/contrib/hbase/conf/hbase-default.xml (revision 0) +++ src/contrib/hbase/conf/hbase-default.xml (revision 0) @@ -0,0 +1,48 @@ + + + + + hbase.master + localhost:60000 + The host and port that the HBase master runs at. + TODO: Support 'local' (All running in single context). + + + + hbase.regionserver + localhost:60010 + The host and port a HBase region server runs at. + + + + hbase.regiondir + ${hadoop.tmp.dir}/hbase + The directory shared by region servers. + + + + hbase.client.timeout.length + 10000 + Client timeout in milliseconds + + + hbase.client.timeout.number + 5 + Try this many timeouts before giving up. + + + + hbase.client.retries.number + 2 + Count of maximum retries fetching the root region from root + region server. + + + + hbase.master.meta.thread.rescanfrequency + 60000 + How long the HMaster sleeps (in milliseconds) between scans of + the META table. + + + Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HAbstractScanner.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HAbstractScanner.java (revision 530954) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HAbstractScanner.java (working copy) @@ -60,7 +60,7 @@ String column = col.toString(); try { int colpos = column.indexOf(":") + 1; - if (colpos == 0) { + if(colpos == 0) { throw new IllegalArgumentException("Column name has no family indicator."); } @@ -66,11 +66,11 @@ String columnkey = column.substring(colpos); - if (columnkey == null || columnkey.length() == 0) { + if(columnkey == null || columnkey.length() == 0) { this.matchType = MATCH_TYPE.FAMILY_ONLY; this.family = column.substring(0, colpos); - } else if (isRegexPattern.matcher(columnkey).matches()) { + } else if(isRegexPattern.matcher(columnkey).matches()) { this.matchType = MATCH_TYPE.REGEX; this.columnMatcher = Pattern.compile(column); @@ -86,13 +86,13 @@ // Matching method boolean matches(Text col) throws IOException { - if (this.matchType == MATCH_TYPE.SIMPLE) { + if(this.matchType == MATCH_TYPE.SIMPLE) { return col.equals(this.col); - } else if (this.matchType == MATCH_TYPE.FAMILY_ONLY) { + } else if(this.matchType == MATCH_TYPE.FAMILY_ONLY) { return col.toString().startsWith(this.family); - } else if (this.matchType == MATCH_TYPE.REGEX) { + } else if(this.matchType == MATCH_TYPE.REGEX) { return this.columnMatcher.matcher(col.toString()).matches(); } else { @@ -121,7 +121,7 @@ for(int i = 0; i < targetCols.length; i++) { Text family = HStoreKey.extractFamily(targetCols[i]); Vector matchers = okCols.get(family); - if (matchers == null) { + if(matchers == null) { matchers = new Vector(); } matchers.add(new ColumnMatcher(targetCols[i])); @@ -144,11 +144,11 @@ Text column = keys[i].getColumn(); Text family = HStoreKey.extractFamily(column); Vector matchers = okCols.get(family); - if (matchers == null) { + if(matchers == null) { return false; } for(int m = 0; m < matchers.size(); m++) { - if (matchers.get(m).matches(column)) { + if(matchers.get(m).matches(column)) { return true; } } @@ -180,7 +180,7 @@ * @see org.apache.hadoop.hbase.HScannerInterface#next(org.apache.hadoop.hbase.HStoreKey, java.util.TreeMap) */ public boolean next(HStoreKey key, TreeMap results) - throws IOException { + throws IOException { // Find the next row label (and timestamp) @@ -188,12 +188,12 @@ long chosenTimestamp = -1; for(int i = 0; i < keys.length; i++) { while((keys[i] != null) - && (columnMatch(i)) - && (keys[i].getTimestamp() <= this.timestamp) - && ((chosenRow == null) - || (keys[i].getRow().compareTo(chosenRow) < 0) - || ((keys[i].getRow().compareTo(chosenRow) == 0) - && (keys[i].getTimestamp() > chosenTimestamp)))) { + && (columnMatch(i)) + && (keys[i].getTimestamp() <= this.timestamp) + && ((chosenRow == null) + || (keys[i].getRow().compareTo(chosenRow) < 0) + || ((keys[i].getRow().compareTo(chosenRow) == 0) + && (keys[i].getTimestamp() > chosenTimestamp)))) { chosenRow = new Text(keys[i].getRow()); chosenTimestamp = keys[i].getTimestamp(); @@ -203,7 +203,7 @@ // Grab all the values that match this row/timestamp boolean insertedItem = false; - if (chosenRow != null) { + if(chosenRow != null) { key.setRow(chosenRow); key.setVersion(chosenTimestamp); key.setColumn(new Text("")); @@ -212,10 +212,10 @@ // Fetch the data while((keys[i] != null) - && (keys[i].getRow().compareTo(chosenRow) == 0) - && (keys[i].getTimestamp() == chosenTimestamp)) { + && (keys[i].getRow().compareTo(chosenRow) == 0) + && (keys[i].getTimestamp() == chosenTimestamp)) { - if (columnMatch(i)) { + if(columnMatch(i)) { outbuf.reset(); vals[i].write(outbuf); byte byteresults[] = outbuf.getData(); @@ -226,7 +226,7 @@ insertedItem = true; } - if (!getNext(i)) { + if (! getNext(i)) { closeSubScanner(i); } } @@ -235,9 +235,9 @@ // a valid timestamp, so we're ready next time. while((keys[i] != null) - && ((keys[i].getRow().compareTo(chosenRow) <= 0) - || (keys[i].getTimestamp() > this.timestamp) - || (!columnMatch(i)))) { + && ((keys[i].getRow().compareTo(chosenRow) <= 0) + || (keys[i].getTimestamp() > this.timestamp) + || (! columnMatch(i)))) { getNext(i); } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HBaseConfiguration.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HBaseConfiguration.java (revision 0) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HBaseConfiguration.java (revision 0) @@ -0,0 +1,25 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import org.apache.hadoop.conf.Configuration; + +public class HBaseConfiguration extends Configuration { + public HBaseConfiguration() { + super(); + addDefaultResource("hbase-default.xml"); + } +} Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java (revision 530954) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java (working copy) @@ -15,12 +15,20 @@ */ package org.apache.hadoop.hbase; -import org.apache.hadoop.io.*; -import org.apache.hadoop.ipc.*; -import org.apache.hadoop.conf.*; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.Random; +import java.util.TreeMap; +import java.util.TreeSet; -import java.io.*; -import java.util.*; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.RPC; +import org.apache.log4j.Logger; /******************************************************************************* * HClient manages a connection to a single HRegionServer. @@ -25,7 +33,10 @@ /******************************************************************************* * HClient manages a connection to a single HRegionServer. ******************************************************************************/ -public class HClient extends HGlobals implements HConstants { +public class HClient implements HConstants { + private final Logger LOG = + Logger.getLogger(this.getClass().getName()); + private static final Text[] metaColumns = { META_COLUMN_FAMILY }; @@ -32,8 +43,6 @@ private static final Text startRow = new Text(); private boolean closed; - private Configuration conf; - private HServerAddress masterLocation; private long clientTimeout; private int numTimeouts; private int numRetries; @@ -38,6 +47,7 @@ private int numTimeouts; private int numRetries; private HMasterInterface master; + private final Configuration conf; private class TableInfo { public HRegionInfo regionInfo; @@ -72,16 +82,11 @@ public HClient(Configuration conf) { this.closed = false; this.conf = conf; - - // Load config settings + + this.clientTimeout = conf.getLong("hbase.client.timeout.length", 10 * 1000); + this.numTimeouts = conf.getInt("hbase.client.timeout.number", 5); + this.numRetries = conf.getInt("hbase.client.retries.number", 2); - this.masterLocation = new HServerAddress(this.conf.get(MASTER_DEFAULT_NAME)); - this.clientTimeout = this.conf.getLong("hbase.client.timeout.length", 10 * 1000); - this.numTimeouts = this.conf.getInt("hbase.client.timeout.number", 5); - this.numRetries = this.conf.getInt("hbase.client.retries.number", 2); - - // Finish initialization - this.master = null; this.tablesToServers = new TreeMap>(); this.tableServers = null; @@ -94,8 +99,28 @@ this.rand = new Random(); } + public synchronized void createTable(HTableDescriptor desc) throws IOException { + if(closed) { + throw new IllegalStateException("client is not open"); + } + if(master == null) { + locateRootRegion(); + } + master.createTable(desc); + } + + public synchronized void deleteTable(Text tableName) throws IOException { + if(closed) { + throw new IllegalStateException("client is not open"); + } + if(master == null) { + locateRootRegion(); + } + master.deleteTable(tableName); + } + public synchronized void openTable(Text tableName) throws IOException { - if (closed) { + if(closed) { throw new IllegalStateException("client is not open"); } @@ -100,7 +125,7 @@ } tableServers = tablesToServers.get(tableName); - if (tableServers == null) { // We don't know where the table is + if(tableServers == null ) { // We don't know where the table is findTableInMeta(tableName); // Load the information from meta } } @@ -108,9 +133,9 @@ private void findTableInMeta(Text tableName) throws IOException { TreeMap metaServers = tablesToServers.get(META_TABLE_NAME); - if (metaServers == null) { // Don't know where the meta is + if(metaServers == null) { // Don't know where the meta is loadMetaFromRoot(tableName); - if (tableName.equals(META_TABLE_NAME) || tableName.equals(ROOT_TABLE_NAME)) { + if(tableName.equals(META_TABLE_NAME) || tableName.equals(ROOT_TABLE_NAME)) { // All we really wanted was the meta or root table return; } @@ -119,7 +144,7 @@ tableServers = new TreeMap(); for(Iterator i = metaServers.tailMap(tableName).values().iterator(); - i.hasNext();) { + i.hasNext(); ) { TableInfo t = i.next(); @@ -133,7 +158,7 @@ */ private void loadMetaFromRoot(Text tableName) throws IOException { locateRootRegion(); - if (tableName.equals(ROOT_TABLE_NAME)) { // All we really wanted was the root + if(tableName.equals(ROOT_TABLE_NAME)) { // All we really wanted was the root return; } scanRoot(); @@ -144,10 +169,12 @@ * could be. */ private void locateRootRegion() throws IOException { - if (master == null) { + if(master == null) { + HServerAddress masterLocation = + new HServerAddress(this.conf.get(MASTER_ADDRESS)); master = (HMasterInterface)RPC.getProxy(HMasterInterface.class, - HMasterInterface.versionID, - masterLocation.getInetSocketAddress(), conf); + HMasterInterface.versionID, + masterLocation.getInetSocketAddress(), conf); } int tries = 0; @@ -157,10 +184,9 @@ while(rootRegionLocation == null && localTimeouts < numTimeouts) { rootRegionLocation = master.findRootRegion(); - if (rootRegionLocation == null) { + if(rootRegionLocation == null) { try { Thread.sleep(clientTimeout); - } catch(InterruptedException iex) { } localTimeouts++; @@ -166,7 +192,7 @@ localTimeouts++; } } - if (rootRegionLocation == null) { + if(rootRegionLocation == null) { throw new IOException("Timed out trying to locate root region"); } @@ -174,9 +200,9 @@ HRegionInterface rootRegion = getHRegionConnection(rootRegionLocation); - if (rootRegion.getRegionInfo(rootRegionInfo.regionName) != null) { + if(rootRegion.getRegionInfo(HGlobals.rootRegionInfo.regionName) != null) { tableServers = new TreeMap(); - tableServers.put(startRow, new TableInfo(rootRegionInfo, rootRegionLocation)); + tableServers.put(startRow, new TableInfo(HGlobals.rootRegionInfo, rootRegionLocation)); tablesToServers.put(ROOT_TABLE_NAME, tableServers); break; } @@ -184,7 +210,7 @@ } while(rootRegionLocation == null && tries++ < numRetries); - if (rootRegionLocation == null) { + if(rootRegionLocation == null) { closed = true; throw new IOException("unable to locate root region server"); } @@ -202,53 +228,64 @@ /* * Scans a single meta region - * @param t - the table we're going to scan - * @param tableName - the name of the table we're looking for + * @param t the table we're going to scan + * @param tableName the name of the table we're looking for */ private void scanOneMetaRegion(TableInfo t, Text tableName) throws IOException { HRegionInterface server = getHRegionConnection(t.serverAddress); - HScannerInterface scanner = null; + long scannerId = -1L; try { - scanner = server.openScanner(t.regionInfo.regionName, metaColumns, tableName); - HStoreKey key = new HStoreKey(); - TreeMap results = new TreeMap(); + scannerId = server.openScanner(t.regionInfo.regionName, metaColumns, tableName); + DataInputBuffer inbuf = new DataInputBuffer(); + while(true) { + HStoreKey key = new HStoreKey(); - while(scanner.next(key, results)) { - byte hRegionInfoBytes[] = results.get(META_COL_REGIONINFO); - inbuf.reset(hRegionInfoBytes, hRegionInfoBytes.length); + LabelledData[] values = server.next(scannerId, key); + if(values.length == 0) { + break; + } + + TreeMap results = new TreeMap(); + for(int i = 0; i < values.length; i++) { + results.put(values[i].getLabel(), values[i].getData().get()); + } HRegionInfo regionInfo = new HRegionInfo(); + byte[] bytes = results.get(META_COL_REGIONINFO); + inbuf.reset(bytes, bytes.length); regionInfo.readFields(inbuf); - - if (!regionInfo.tableDesc.getName().equals(tableName)) { + + if(!regionInfo.tableDesc.getName().equals(tableName)) { // We're done break; } - - byte serverBytes[] = results.get(META_COL_SERVER); - String serverName = new String(serverBytes, UTF8_ENCODING); + + bytes = results.get(META_COL_SERVER); + String serverName = new String(bytes, UTF8_ENCODING); tableServers.put(regionInfo.startKey, - new TableInfo(regionInfo, new HServerAddress(serverName))); + new TableInfo(regionInfo, new HServerAddress(serverName))); - results.clear(); } + } finally { - scanner.close(); + if(scannerId != -1L) { + server.close(scannerId); + } } } - public synchronized HRegionInterface getHRegionConnection(HServerAddress regionServer) - throws IOException { + synchronized HRegionInterface getHRegionConnection(HServerAddress regionServer) + throws IOException { - // See if we already have a connection + // See if we already have a connection HRegionInterface server = servers.get(regionServer.toString()); - if (server == null) { // Get a connection + if(server == null) { // Get a connection server = (HRegionInterface)RPC.waitForProxy(HRegionInterface.class, - HRegionInterface.versionID, regionServer.getInetSocketAddress(), conf); + HRegionInterface.versionID, regionServer.getInetSocketAddress(), conf); servers.put(regionServer.toString(), server); } @@ -257,7 +294,7 @@ /** Close the connection to the HRegionServer */ public synchronized void close() throws IOException { - if (!closed) { + if(! closed) { RPC.stopClient(); closed = true; } @@ -270,11 +307,11 @@ * catalog table that just contains table names and their descriptors. * Right now, it only exists as part of the META table's region info. */ - public HTableDescriptor[] listTables() throws IOException { + public synchronized HTableDescriptor[] listTables() throws IOException { TreeSet uniqueTables = new TreeSet(); TreeMap metaTables = tablesToServers.get(META_TABLE_NAME); - if (metaTables == null) { + if(metaTables == null) { // Meta is not loaded yet so go do that loadMetaFromRoot(META_TABLE_NAME); metaTables = tablesToServers.get(META_TABLE_NAME); @@ -280,31 +317,41 @@ metaTables = tablesToServers.get(META_TABLE_NAME); } - for(Iteratori = metaTables.values().iterator(); i.hasNext();) { - TableInfo t = i.next(); + for(Iteratorit = metaTables.values().iterator(); it.hasNext(); ) { + TableInfo t = it.next(); HRegionInterface server = getHRegionConnection(t.serverAddress); - HScannerInterface scanner = null; + long scannerId = -1L; try { - scanner = server.openScanner(t.regionInfo.regionName, metaColumns, startRow); + scannerId = server.openScanner(t.regionInfo.regionName, metaColumns, startRow); HStoreKey key = new HStoreKey(); - TreeMap results = new TreeMap(); + DataInputBuffer inbuf = new DataInputBuffer(); - while(scanner.next(key, results)) { - byte infoBytes[] = (byte[]) results.get(ROOT_COL_REGIONINFO); - inbuf.reset(infoBytes, infoBytes.length); - HRegionInfo info = new HRegionInfo(); - info.readFields(inbuf); + while(true) { + LabelledData[] values = server.next(scannerId, key); + if(values.length == 0) { + break; + } - // Only examine the rows where the startKey is zero length - - if (info.startKey.getLength() == 0) { - uniqueTables.add(info.tableDesc); + for(int i = 0; i < values.length; i++) { + if(values[i].getLabel().equals(META_COL_REGIONINFO)) { + byte[] bytes = values[i].getData().get(); + inbuf.reset(bytes, bytes.length); + HRegionInfo info = new HRegionInfo(); + info.readFields(inbuf); + + // Only examine the rows where the startKey is zero length + + if(info.startKey.getLength() == 0) { + uniqueTables.add(info.tableDesc); + } + } } - results.clear(); } } finally { - scanner.close(); + if(scannerId != -1L) { + server.close(scannerId); + } } } return (HTableDescriptor[]) uniqueTables.toArray(new HTableDescriptor[uniqueTables.size()]); @@ -310,8 +357,8 @@ return (HTableDescriptor[]) uniqueTables.toArray(new HTableDescriptor[uniqueTables.size()]); } - private TableInfo getTableInfo(Text row) { - if (tableServers == null) { + private synchronized TableInfo getTableInfo(Text row) { + if(tableServers == null) { throw new IllegalStateException("Must open table first"); } @@ -325,7 +372,7 @@ public byte[] get(Text row, Text column) throws IOException { TableInfo info = getTableInfo(row); return getHRegionConnection(info.serverAddress).get( - info.regionInfo.regionName, row, column).get(); + info.regionInfo.regionName, row, column).get(); } /** Get the specified number of versions of the specified row and column */ @@ -332,10 +379,10 @@ public byte[][] get(Text row, Text column, int numVersions) throws IOException { TableInfo info = getTableInfo(row); BytesWritable[] values = getHRegionConnection(info.serverAddress).get( - info.regionInfo.regionName, row, column, numVersions); + info.regionInfo.regionName, row, column, numVersions); ArrayList bytes = new ArrayList(); - for(int i = 0; i < values.length; i++) { + for(int i = 0 ; i < values.length; i++) { bytes.add(values[i].get()); } return bytes.toArray(new byte[values.length][]); @@ -348,10 +395,10 @@ public byte[][] get(Text row, Text column, long timestamp, int numVersions) throws IOException { TableInfo info = getTableInfo(row); BytesWritable[] values = getHRegionConnection(info.serverAddress).get( - info.regionInfo.regionName, row, column, timestamp, numVersions); + info.regionInfo.regionName, row, column, timestamp, numVersions); ArrayList bytes = new ArrayList(); - for(int i = 0; i < values.length; i++) { + for(int i = 0 ; i < values.length; i++) { bytes.add(values[i].get()); } return bytes.toArray(new byte[values.length][]); @@ -361,7 +408,7 @@ public LabelledData[] getRow(Text row) throws IOException { TableInfo info = getTableInfo(row); return getHRegionConnection(info.serverAddress).getRow( - info.regionInfo.regionName, row); + info.regionInfo.regionName, row); } /** @@ -368,8 +415,8 @@ * Get a scanner on the current table starting at the specified row. * Return the specified columns. */ - public HScannerInterface obtainScanner(Text[] columns, Text startRow) throws IOException { - if (tableServers == null) { + public synchronized HScannerInterface obtainScanner(Text[] columns, Text startRow) throws IOException { + if(tableServers == null) { throw new IllegalStateException("Must open table first"); } return new ClientScanner(columns, startRow); @@ -462,7 +509,7 @@ private TableInfo[] regions; private int currentRegion; private HRegionInterface server; - private HScannerInterface scanner; + private long scannerId; public ClientScanner(Text[] columns, Text startRow) throws IOException { this.columns = columns; @@ -472,7 +519,7 @@ this.regions = info.toArray(new TableInfo[info.size()]); this.currentRegion = -1; this.server = null; - this.scanner = null; + this.scannerId = -1L; nextScanner(); } @@ -481,11 +528,12 @@ * Returns false if there are no more scanners. */ private boolean nextScanner() throws IOException { - if (scanner != null) { - scanner.close(); + if(scannerId != -1L) { + server.close(scannerId); + scannerId = -1L; } currentRegion += 1; - if (currentRegion == regions.length) { + if(currentRegion == regions.length) { close(); return false; } @@ -491,8 +539,8 @@ } try { server = getHRegionConnection(regions[currentRegion].serverAddress); - scanner = server.openScanner(regions[currentRegion].regionInfo.regionName, - columns, startRow); + scannerId = server.openScanner(regions[currentRegion].regionInfo.regionName, + columns, startRow); } catch(IOException e) { close(); @@ -505,17 +553,18 @@ * @see org.apache.hadoop.hbase.HScannerInterface#next(org.apache.hadoop.hbase.HStoreKey, java.util.TreeMap) */ public boolean next(HStoreKey key, TreeMap results) throws IOException { - if (closed) { + if(closed) { return false; } - boolean status = scanner.next(key, results); - if (!status) { - status = nextScanner(); - if (status) { - status = scanner.next(key, results); - } + LabelledData[] values = null; + do { + values = server.next(scannerId, key); + } while(values.length == 0 && nextScanner()); + + for(int i = 0; i < values.length; i++) { + results.put(values[i].getLabel(), values[i].getData().get()); } - return status; + return values.length != 0; } /* (non-Javadoc) @@ -522,8 +571,8 @@ * @see org.apache.hadoop.hbase.HScannerInterface#close() */ public void close() throws IOException { - if (scanner != null) { - scanner.close(); + if(scannerId != -1L) { + server.close(scannerId); } server = null; closed = true; @@ -529,5 +578,41 @@ closed = true; } } - -} + + private void printUsage() { + System.err.println("Usage: java " + this.getClass().getName() + + " [--master=hostname:port]"); + } + + private int doCommandLine(final String args[]) { + // Process command-line args. TODO: Better cmd-line processing + // (but hopefully something not as painful as cli options). + for (String cmd: args) { + if (cmd.equals("-h") || cmd.startsWith("--h")) { + printUsage(); + return 0; + } + + final String masterArgKey = "--master="; + if (cmd.startsWith(masterArgKey)) { + this.conf.set(MASTER_ADDRESS, + cmd.substring(masterArgKey.length())); + } + } + + int errCode = -1; + try { + locateRootRegion(); + } catch (Exception e) { + e.printStackTrace(); + } + + return errCode; + } + + public static void main(final String args[]) { + Configuration c = new HBaseConfiguration(); + int errCode = (new HClient(c)).doCommandLine(args); + System.exit(errCode); + } +} \ No newline at end of file Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java (revision 530954) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java (working copy) @@ -24,7 +24,18 @@ // Configuration parameters - static final String MASTER_DEFAULT_NAME = "hbase.master.default.name"; + // TODO: URL for hbase master, like hdfs URLs with host and port. + // Or, like jdbc URLs: + // jdbc:mysql://[host][,failoverhost...][:port]/[database] + // jdbc:mysql://[host][,failoverhost...][:port]/[database][?propertyName1][=propertyValue1][&propertyName2][=propertyValue2]... + + static final String MASTER_ADDRESS = "hbase.master"; + // TODO: Support 'local': i.e. default of all running in single + // process. Same for regionserver. + static final String DEFAULT_MASTER_ADDRESS = "localhost:60000"; + static final String REGIONSERVER_ADDRESS = "hbase.regionserver"; + static final String DEFAULT_REGIONSERVER_ADDRESS = + "localhost:60010"; static final String HREGION_DIR = "hbase.regiondir"; static final String DEFAULT_HREGION_DIR = "/hbase"; static final String HREGIONDIR_PREFIX = "hregion_"; @@ -37,10 +48,10 @@ // Do we ever need to know all the information that we are storing? static final Text ROOT_TABLE_NAME = new Text("--ROOT--"); - static final Text ROOT_COLUMN_FAMILY = new Text("info"); - static final Text ROOT_COL_REGIONINFO = new Text(ROOT_COLUMN_FAMILY + ":" + "regioninfo"); - static final Text ROOT_COL_SERVER = new Text(ROOT_COLUMN_FAMILY + ":" + "server"); - static final Text ROOT_COL_STARTCODE = new Text(ROOT_COLUMN_FAMILY + ":" + "serverstartcode"); + static final Text ROOT_COLUMN_FAMILY = new Text("info:"); + static final Text ROOT_COL_REGIONINFO = new Text(ROOT_COLUMN_FAMILY + "regioninfo"); + static final Text ROOT_COL_SERVER = new Text(ROOT_COLUMN_FAMILY + "server"); + static final Text ROOT_COL_STARTCODE = new Text(ROOT_COLUMN_FAMILY + "serverstartcode"); static final Text META_TABLE_NAME = new Text("--META--"); static final Text META_COLUMN_FAMILY = new Text(ROOT_COLUMN_FAMILY); Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLocking.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLocking.java (revision 530954) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLocking.java (working copy) @@ -1,90 +0,0 @@ -/** - * Copyright 2007 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase; - -/******************************************************************************* - * HLocking is a set of lock primitives that are pretty helpful in a few places - * around the HBase code. For each independent entity that needs locking, create - * a new HLocking instance. - ******************************************************************************/ -public class HLocking { - Integer readerLock = new Integer(0); - Integer writerLock = new Integer(0); - int numReaders = 0; - int numWriters = 0; - - public HLocking() { - } - - /** Caller needs the nonexclusive read-lock */ - public void obtainReadLock() { - synchronized(readerLock) { - synchronized(writerLock) { - while(numWriters > 0) { - try { - writerLock.wait(); - } catch (InterruptedException ie) { - } - } - numReaders++; - readerLock.notifyAll(); - } - } - } - - /** Caller is finished with the nonexclusive read-lock */ - public void releaseReadLock() { - synchronized(readerLock) { - synchronized(writerLock) { - numReaders--; - readerLock.notifyAll(); - } - } - } - - /** Caller needs the exclusive write-lock */ - public void obtainWriteLock() { - synchronized(readerLock) { - synchronized(writerLock) { - while(numReaders > 0) { - try { - readerLock.wait(); - } catch (InterruptedException ie) { - } - } - while(numWriters > 0) { - try { - writerLock.wait(); - } catch (InterruptedException ie) { - } - } - numWriters++; - writerLock.notifyAll(); - } - } - } - - /** Caller is finished with the write lock */ - public void releaseWriteLock() { - synchronized(readerLock) { - synchronized(writerLock) { - numWriters--; - writerLock.notifyAll(); - } - } - } -} - Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java (revision 530954) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java (working copy) @@ -101,12 +101,12 @@ newlog.close(); } - if (fs.exists(srcDir)) { + if(fs.exists(srcDir)) { - if (!fs.delete(srcDir)) { + if(! fs.delete(srcDir)) { LOG.error("Cannot delete: " + srcDir); - if (!FileUtil.fullyDelete(new File(srcDir.toString()))) { + if(! FileUtil.fullyDelete(new File(srcDir.toString()))) { throw new IOException("Cannot delete: " + srcDir); } } @@ -127,7 +127,7 @@ this.conf = conf; this.logSeqNum = 0; - if (fs.exists(dir)) { + if(fs.exists(dir)) { throw new IOException("Target HLog directory already exists: " + dir); } fs.mkdirs(dir); @@ -154,7 +154,7 @@ Vector toDeleteList = new Vector(); synchronized(this) { - if (closed) { + if(closed) { throw new IOException("Cannot roll log; log is closed"); } @@ -174,10 +174,10 @@ // Close the current writer (if any), and grab a new one. - if (writer != null) { + if(writer != null) { writer.close(); - if (filenum > 0) { + if(filenum > 0) { outputfiles.put(logSeqNum-1, computeFilename(filenum-1)); } } @@ -192,10 +192,10 @@ // over all the regions. long oldestOutstandingSeqNum = Long.MAX_VALUE; - for(Iterator it = regionToLastFlush.values().iterator(); it.hasNext();) { + for(Iterator it = regionToLastFlush.values().iterator(); it.hasNext(); ) { long curSeqNum = it.next().longValue(); - if (curSeqNum < oldestOutstandingSeqNum) { + if(curSeqNum < oldestOutstandingSeqNum) { oldestOutstandingSeqNum = curSeqNum; } } @@ -205,10 +205,10 @@ LOG.debug("removing old log files"); - for(Iterator it = outputfiles.keySet().iterator(); it.hasNext();) { + for(Iterator it = outputfiles.keySet().iterator(); it.hasNext(); ) { long maxSeqNum = it.next().longValue(); - if (maxSeqNum < oldestOutstandingSeqNum) { + if(maxSeqNum < oldestOutstandingSeqNum) { Path p = outputfiles.get(maxSeqNum); it.remove(); toDeleteList.add(p); @@ -221,7 +221,7 @@ // Actually delete them, if any! - for(Iterator it = toDeleteList.iterator(); it.hasNext();) { + for(Iterator it = toDeleteList.iterator(); it.hasNext(); ) { Path p = it.next(); fs.delete(p); } @@ -262,7 +262,7 @@ * We need to seize a lock on the writer so that writes are atomic. */ public synchronized void append(Text regionName, Text tableName, Text row, TreeMap columns, long timestamp) throws IOException { - if (closed) { + if(closed) { throw new IOException("Cannot append; log is closed"); } @@ -273,7 +273,7 @@ // that don't have any flush yet, the relevant operation is the // first one that's been added. - if (regionToLastFlush.get(regionName) == null) { + if(regionToLastFlush.get(regionName) == null) { regionToLastFlush.put(regionName, seqNum[0]); } @@ -278,7 +278,7 @@ } int counter = 0; - for(Iterator it = columns.keySet().iterator(); it.hasNext();) { + for(Iterator it = columns.keySet().iterator(); it.hasNext(); ) { Text column = it.next(); byte[] val = columns.get(column); HLogKey logKey = new HLogKey(regionName, tableName, row, seqNum[counter++]); @@ -333,11 +333,11 @@ /** Complete the cache flush */ public synchronized void completeCacheFlush(Text regionName, Text tableName, long logSeqId) throws IOException { - if (closed) { + if(closed) { return; } - if (!insideCacheFlush) { + if(! insideCacheFlush) { throw new IOException("Impossible situation: inside completeCacheFlush(), but 'insideCacheFlush' flag is false"); } @@ -342,7 +342,7 @@ } writer.append(new HLogKey(regionName, tableName, HLog.METAROW, logSeqId), - new HLogEdit(HLog.METACOLUMN, HStoreKey.COMPLETE_CACHEFLUSH, System.currentTimeMillis())); + new HLogEdit(HLog.METACOLUMN, HStoreKey.COMPLETE_CACHEFLUSH, System.currentTimeMillis())); numEntries++; // Remember the most-recent flush for each region. @@ -353,4 +353,4 @@ insideCacheFlush = false; notifyAll(); } -} +} \ No newline at end of file Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLogEdit.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLogEdit.java (revision 530954) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLogEdit.java (working copy) @@ -67,5 +67,4 @@ this.val.readFields(in); this.timestamp = in.readLong(); } -} - +} \ No newline at end of file Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLogKey.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLogKey.java (revision 530954) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLogKey.java (working copy) @@ -80,10 +80,10 @@ HLogKey other = (HLogKey) o; int result = this.regionName.compareTo(other.regionName); - if (result == 0) { + if(result == 0) { result = this.row.compareTo(other.row); - if (result == 0) { + if(result == 0) { if (this.logSeqNum < other.logSeqNum) { result = -1; @@ -113,5 +113,4 @@ this.row.readFields(in); this.logSeqNum = in.readLong(); } -} - +} \ No newline at end of file Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (revision 530954) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (working copy) @@ -15,6 +15,9 @@ */ package org.apache.hadoop.hbase; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.fs.*; @@ -28,9 +31,21 @@ * HMaster is the "master server" for a HBase. * There is only one HMaster for a single HBase deployment. ******************************************************************************/ -public class HMaster extends HGlobals - implements HConstants, HMasterInterface, HMasterRegionInterface { +public class HMaster implements HConstants, HMasterInterface, HMasterRegionInterface { + + public long getProtocolVersion(String protocol, + long clientVersion) throws IOException { + if (protocol.equals(HMasterInterface.class.getName())) { + return HMasterInterface.versionID; + } else if (protocol.equals(HMasterRegionInterface.class.getName())){ + return HMasterRegionInterface.versionID; + } else { + throw new IOException("Unknown protocol to name node: " + protocol); + } + } + private final Log LOG = LogFactory.getLog(this.getClass().getName()); + private boolean closed; private Path dir; private Configuration conf; @@ -44,6 +59,7 @@ private Leases serverLeases; private Server server; + private HServerAddress address; private HClient client; @@ -99,92 +115,114 @@ * */ private class RootScanner implements Runnable { + private final Text cols[] = { + ROOT_COLUMN_FAMILY + }; + private final Text firstRow = new Text(); + private HRegionInterface rootServer; + public RootScanner() { + rootServer = null; } public void run() { - Text cols[] = { - ROOT_COLUMN_FAMILY - }; - Text firstRow = new Text(); - while((!closed)) { - int metaRegions = 0; - while(rootRegionLocation == null) { - try { - rootRegionLocation.wait(); - - } catch(InterruptedException e) { - } - } - - HRegionInterface server = null; - HScannerInterface scanner = null; + rootScanned = false; + waitForRootRegion(); + + rootServer = null; + long scannerId = -1L; try { - server = client.getHRegionConnection(rootRegionLocation); - scanner = server.openScanner(rootRegionInfo.regionName, cols, firstRow); + rootServer = client.getHRegionConnection(rootRegionLocation); + scannerId = rootServer.openScanner(HGlobals.rootRegionInfo.regionName, cols, firstRow); } catch(IOException iex) { try { - close(); + iex.printStackTrace(); + if(scannerId != -1L) { + rootServer.close(scannerId); + } } catch(IOException iex2) { } + closed = true; break; } try { - HStoreKey key = new HStoreKey(); - TreeMap results = new TreeMap(); + LOG.debug("starting root region scan"); + DataInputBuffer inbuf = new DataInputBuffer(); - - while(scanner.next(key, results)) { - byte hRegionInfoBytes[] = results.get(ROOT_COL_REGIONINFO); - inbuf.reset(hRegionInfoBytes, hRegionInfoBytes.length); + while(true) { + TreeMap results = new TreeMap(); + HStoreKey key = new HStoreKey(); + LabelledData[] values = rootServer.next(scannerId, key); + if(values.length == 0) { + break; + } + for(int i = 0; i < values.length; i++) { + results.put(values[i].getLabel(), values[i].getData().get()); + } + byte[] bytes = results.get(ROOT_COL_REGIONINFO); + if(bytes == null || bytes.length == 0) { + LOG.fatal("no value for " + ROOT_COL_REGIONINFO); + stop(); + } + inbuf.reset(bytes, bytes.length); HRegionInfo info = new HRegionInfo(); info.readFields(inbuf); - - byte serverBytes[] = results.get(ROOT_COL_SERVER); - String serverName = new String(serverBytes, UTF8_ENCODING); - - byte startCodeBytes[] = results.get(ROOT_COL_STARTCODE); - long startCode = Long.decode(new String(startCodeBytes, UTF8_ENCODING)); - + + String serverName = null; + bytes = results.get(ROOT_COL_SERVER); + if(bytes != null && bytes.length != 0) { + serverName = new String(bytes, UTF8_ENCODING); + } + + long startCode = -1L; + bytes = results.get(ROOT_COL_STARTCODE); + if(bytes != null && bytes.length != 0) { + startCode = Long.valueOf(new String(bytes, UTF8_ENCODING)); + } + // Note META region to load. - + HServerInfo storedInfo = null; - synchronized(serversToServerInfo) { - storedInfo = serversToServerInfo.get(serverName); - if (storedInfo == null - || storedInfo.getStartCode() != startCode) { - - // The current assignment is no good; load the region. - - synchronized(unassignedRegions) { - unassignedRegions.put(info.regionName, info); - assignAttempts.put(info.regionName, 0L); - } - } + if(serverName != null) { + serversToServerInfo.get(serverName); + } + if(storedInfo == null + || storedInfo.getStartCode() != startCode) { + + // The current assignment is no good; load the region. + + unassignedRegions.put(info.regionName, info); + assignAttempts.put(info.regionName, 0L); + + LOG.debug("region unassigned: " + info.regionName); } - results.clear(); - metaRegions += 1; + + numMetaRegions += 1; } - + } catch(Exception iex) { + iex.printStackTrace(); + } finally { try { - scanner.close(); + if(scannerId != -1L) { + rootServer.close(scannerId); + } } catch(IOException iex2) { } + scannerId = -1L; } - rootScanned = true; - numMetaRegions = metaRegions; - try { - Thread.sleep(metaRescanInterval); + } + rootScanned = true; + try { + Thread.sleep(metaRescanInterval); - } catch(InterruptedException e) { - } + } catch(InterruptedException e) { } } } @@ -202,8 +240,9 @@ /** Work for the meta scanner is queued up here */ private Vector metaRegionsToScan; - private TreeMap knownMetaRegions; - private Boolean allMetaRegionsScanned; + private SortedMap knownMetaRegions; + + private boolean allMetaRegionsScanned; /** * MetaScanner scans a region either in the META table. @@ -216,7 +255,7 @@ */ private class MetaScanner implements Runnable { private final Text cols[] = { - META_COLUMN_FAMILY + META_COLUMN_FAMILY }; private final Text firstRow = new Text(); @@ -225,15 +264,21 @@ private void scanRegion(MetaRegion region) { HRegionInterface server = null; - HScannerInterface scanner = null; + long scannerId = -1L; + + LOG.debug("scanning meta region: " + region.regionName); try { server = client.getHRegionConnection(region.server); - scanner = server.openScanner(region.regionName, cols, firstRow); + scannerId = server.openScanner(region.regionName, cols, firstRow); } catch(IOException iex) { try { - close(); + if(scannerId != -1L) { + server.close(scannerId); + scannerId = -1L; + } + stop(); } catch(IOException iex2) { } @@ -239,22 +284,43 @@ } return; } + + DataInputBuffer inbuf = new DataInputBuffer(); try { - HStoreKey key = new HStoreKey(); - TreeMap results = new TreeMap(); - DataInputBuffer inbuf = new DataInputBuffer(); + while(true) { + TreeMap results = new TreeMap(); + HStoreKey key = new HStoreKey(); - while(scanner.next(key, results)) { - byte hRegionInfoBytes[] = results.get(META_COL_REGIONINFO); - inbuf.reset(hRegionInfoBytes, hRegionInfoBytes.length); + LabelledData[] values = server.next(scannerId, key); + + if(values.length == 0) { + break; + } + + for(int i = 0; i < values.length; i++) { + results.put(values[i].getLabel(), values[i].getData().get()); + } + + byte bytes[] = results.get(META_COL_REGIONINFO); + if(bytes == null || bytes.length == 0) { + LOG.fatal("no value for " + META_COL_REGIONINFO); + stop(); + } + inbuf.reset(bytes, bytes.length); HRegionInfo info = new HRegionInfo(); info.readFields(inbuf); - - byte serverBytes[] = results.get(META_COL_SERVER); - String serverName = new String(serverBytes, UTF8_ENCODING); - byte startCodeBytes[] = results.get(META_COL_STARTCODE); - long startCode = Long.decode(new String(startCodeBytes, UTF8_ENCODING)); + String serverName = null; + bytes = results.get(META_COL_SERVER); + if(bytes != null && bytes.length != 0) { + serverName = new String(bytes, UTF8_ENCODING); + } + + long startCode = -1L; + bytes = results.get(META_COL_STARTCODE); + if(bytes != null && bytes.length != 0) { + startCode = Long.valueOf(new String(bytes, UTF8_ENCODING)); + } // Note HRegion to load. @@ -259,28 +325,33 @@ // Note HRegion to load. HServerInfo storedInfo = null; - synchronized(serversToServerInfo) { - storedInfo = serversToServerInfo.get(serverName); - if (storedInfo == null - || storedInfo.getStartCode() != startCode) { - - // The current assignment is no good; load the region. + if(serverName != null) { + serversToServerInfo.get(serverName); + } + if(storedInfo == null + || storedInfo.getStartCode() != startCode) { + + // The current assignment is no good; load the region. - synchronized(unassignedRegions) { - unassignedRegions.put(info.regionName, info); - assignAttempts.put(info.regionName, 0L); - } - } + unassignedRegions.put(info.regionName, info); + assignAttempts.put(info.regionName, 0L); + LOG.debug("region unassigned: " + info.regionName); } - results.clear(); } + } catch(Exception iex) { + iex.printStackTrace(); + } finally { try { - scanner.close(); + if(scannerId != -1L) { + server.close(scannerId); + } } catch(IOException iex2) { + iex2.printStackTrace(); } + scannerId = -1L; } } @@ -290,15 +361,15 @@ while(region == null) { synchronized(metaRegionsToScan) { - if (metaRegionsToScan.size() != 0) { + if(metaRegionsToScan.size() != 0) { region = metaRegionsToScan.remove(0); } - } - if (region == null) { - try { - metaRegionsToScan.wait(); - - } catch(InterruptedException e) { + if(region == null) { + try { + metaRegionsToScan.wait(); + + } catch(InterruptedException e) { + } } } } @@ -304,13 +375,14 @@ } scanRegion(region); - - synchronized(knownMetaRegions) { - knownMetaRegions.put(region.startKey, region); - if (rootScanned && knownMetaRegions.size() == numMetaRegions) { - allMetaRegionsScanned = true; - allMetaRegionsScanned.notifyAll(); - } + if(closed) { + break; + } + knownMetaRegions.put(region.startKey, region); + if(rootScanned && knownMetaRegions.size() == numMetaRegions) { + LOG.debug("all meta regions scanned"); + allMetaRegionsScanned = true; + metaRegionsScanned(); } do { @@ -319,8 +391,8 @@ } catch(InterruptedException ex) { } - if (!allMetaRegionsScanned) { - break; // A region must have split + if(!allMetaRegionsScanned) { + break; // A meta region must have split } // Rescan the known meta regions every so often @@ -328,7 +400,7 @@ Vector v = new Vector(); v.addAll(knownMetaRegions.values()); - for(Iterator i = v.iterator(); i.hasNext();) { + for(Iterator i = v.iterator(); i.hasNext(); ) { scanRegion(i.next()); } } while(true); @@ -334,6 +406,20 @@ } while(true); } } + + private synchronized void metaRegionsScanned() { + notifyAll(); + } + + public synchronized void waitForMetaScan() { + while(!allMetaRegionsScanned) { + try { + wait(); + + } catch(InterruptedException e) { + } + } + } } private MetaScanner metaScanner; @@ -350,7 +436,7 @@ // We fill 'unassignedRecords' by scanning ROOT and META tables, learning the // set of all known valid regions. - private TreeMap unassignedRegions; + private SortedMap unassignedRegions; // The 'assignAttempts' table maps from regions to a timestamp that indicates // the last time we *tried* to assign the region to a RegionServer. If the @@ -356,7 +442,7 @@ // the last time we *tried* to assign the region to a RegionServer. If the // timestamp is out of date, then we can try to reassign it. - private TreeMap assignAttempts; + private SortedMap assignAttempts; // 'killList' indicates regions that we hope to close and then never reopen // (because we're merging them, say). @@ -361,11 +447,11 @@ // 'killList' indicates regions that we hope to close and then never reopen // (because we're merging them, say). - private TreeMap> killList; + private SortedMap> killList; // 'serversToServerInfo' maps from the String to its HServerInfo - private TreeMap serversToServerInfo; + private SortedMap serversToServerInfo; /** Build the HMaster out of a raw configuration item. */ public HMaster(Configuration conf) throws IOException { @@ -370,8 +456,8 @@ /** Build the HMaster out of a raw configuration item. */ public HMaster(Configuration conf) throws IOException { this(new Path(conf.get(HREGION_DIR, DEFAULT_HREGION_DIR)), - new HServerAddress(conf.get(MASTER_DEFAULT_NAME)), - conf); + new HServerAddress(conf.get(MASTER_ADDRESS, DEFAULT_MASTER_ADDRESS)), + conf); } /** @@ -391,20 +477,28 @@ // Make sure the root directory exists! - if (!fs.exists(dir)) { + if(! fs.exists(dir)) { fs.mkdirs(dir); } - Path rootRegionDir = HStoreFile.getHRegionDir(dir, rootRegionInfo.regionName); - if (!fs.exists(rootRegionDir)) { + Path rootRegionDir = HStoreFile.getHRegionDir(dir, HGlobals.rootRegionInfo.regionName); + if(! fs.exists(rootRegionDir)) { + LOG.debug("bootstrap: creating root and meta regions"); // Bootstrap! Need to create the root region and the first meta region. - //TODO is the root region self referential? - HRegion root = createNewHRegion(rootTableDesc, 0L); - HRegion meta = createNewHRegion(metaTableDesc, 1L); + try { + HRegion root = createNewHRegion(HGlobals.rootTableDesc, 0L); + HRegion meta = createNewHRegion(HGlobals.metaTableDesc, 1L); - addTableToMeta(root, meta); + addTableToMeta(root, meta); + + root.close(); + meta.close(); + + } catch(IOException e) { + e.printStackTrace(); + } } this.maxRegionOpenTime = conf.getLong("hbase.hbasemaster.maxregionopen", 30 * 1000); @@ -409,10 +503,17 @@ this.maxRegionOpenTime = conf.getLong("hbase.hbasemaster.maxregionopen", 30 * 1000); this.msgQueue = new Vector(); - this.serverLeases = new Leases(conf.getLong("hbase.master.lease.period", 15 * 1000), - conf.getLong("hbase.master.lease.thread.wakefrequency", 15 * 1000)); + this.serverLeases = new Leases(conf.getLong("hbase.master.lease.period", 30 * 1000), + conf.getLong("hbase.master.lease.thread.wakefrequency", 15 * 1000)); + this.server = RPC.getServer(this, address.getBindAddress(), - address.getPort(), conf.getInt("hbase.hregionserver.handler.count", 10), false, conf); + address.getPort(), conf.getInt("hbase.hregionserver.handler.count", 10), false, conf); + + // The rpc-server port can be ephemeral... ensure we have the correct info + + this.address = new HServerAddress(server.getListenerAddress()); + conf.set(MASTER_ADDRESS, address.toString()); + this.client = new HClient(conf); this.metaRescanInterval @@ -429,8 +530,12 @@ this.numMetaRegions = 0; this.metaRegionsToScan = new Vector(); - this.knownMetaRegions = new TreeMap(); - this.allMetaRegionsScanned = new Boolean(false); + + this.knownMetaRegions = + Collections.synchronizedSortedMap(new TreeMap()); + + this.allMetaRegionsScanned = false; + this.metaScanner = new MetaScanner(); this.metaScannerThread = new Thread(metaScanner, "HMaster.metaScanner"); @@ -439,14 +544,22 @@ this.clientProcessor = new ClientProcessor(); this.clientProcessorThread = new Thread(clientProcessor, "HMaster.clientProcessor"); - this.unassignedRegions = new TreeMap(); - this.unassignedRegions.put(rootRegionInfo.regionName, rootRegionInfo); + this.unassignedRegions = + Collections.synchronizedSortedMap(new TreeMap()); + + this.unassignedRegions.put(HGlobals.rootRegionInfo.regionName, HGlobals.rootRegionInfo); - this.assignAttempts = new TreeMap(); - this.assignAttempts.put(rootRegionInfo.regionName, 0L); + this.assignAttempts = + Collections.synchronizedSortedMap(new TreeMap()); + + this.assignAttempts.put(HGlobals.rootRegionInfo.regionName, 0L); - this.killList = new TreeMap>(); - this.serversToServerInfo = new TreeMap(); + this.killList = + Collections.synchronizedSortedMap( + new TreeMap>()); + + this.serversToServerInfo = + Collections.synchronizedSortedMap(new TreeMap()); // We're almost open for business @@ -470,10 +583,11 @@ this.closed = true; throw e; } + LOG.info("HMaster started"); } /** Turn off the HMaster. Turn off all the threads, close files, etc. */ - public void close() throws IOException { + public void stop() throws IOException { closed = true; try { @@ -481,9 +595,18 @@ } catch(IOException iex) { } - + server.stop(); + LOG.info("shutting down HMaster"); + } + + /** returns the HMaster server address */ + public HServerAddress getMasterAddress() { + return address; + } + + /** Call this to wait for everything to finish */ + public void join() { try { - server.stop(); server.join(); } catch(InterruptedException iex) { @@ -503,6 +626,7 @@ } catch(Exception iex) { } + LOG.info("HMaster stopped"); } ////////////////////////////////////////////////////////////////////////////// @@ -514,6 +638,8 @@ String server = serverInfo.getServerAddress().toString(); HServerInfo storedInfo = null; + LOG.debug("received start message from: " + server); + // If we get the startup message but there's an old server by that // name, then we can timeout the old one right away and register // the new one. @@ -518,27 +644,23 @@ // name, then we can timeout the old one right away and register // the new one. - synchronized(serversToServerInfo) { - storedInfo = serversToServerInfo.get(server); - - if (storedInfo != null) { - serversToServerInfo.remove(server); + storedInfo = serversToServerInfo.get(server); - synchronized(msgQueue) { - msgQueue.add(new PendingServerShutdown(storedInfo)); - msgQueue.notifyAll(); - } + if(storedInfo != null) { + serversToServerInfo.remove(server); + synchronized(msgQueue) { + msgQueue.add(new PendingServerShutdown(storedInfo)); + msgQueue.notifyAll(); } + } - // Either way, record the new server + // Either way, record the new server - serversToServerInfo.put(server, serverInfo); + serversToServerInfo.put(server, serverInfo); - - Text serverLabel = new Text(server); - serverLeases.createLease(serverLabel, serverLabel, new ServerExpirer(server)); - } + Text serverLabel = new Text(server); + serverLeases.createLease(serverLabel, serverLabel, new ServerExpirer(server)); } /** HRegionServers call this method repeatedly. */ @@ -545,48 +667,50 @@ public HMsg[] regionServerReport(HServerInfo serverInfo, HMsg msgs[]) throws IOException { String server = serverInfo.getServerAddress().toString(); - synchronized(serversToServerInfo) { - HServerInfo storedInfo = serversToServerInfo.get(server); - - if (storedInfo == null) { - - // The HBaseMaster may have been restarted. - // Tell the RegionServer to start over and call regionServerStartup() - - HMsg returnMsgs[] = new HMsg[1]; - returnMsgs[0] = new HMsg(HMsg.MSG_CALL_SERVER_STARTUP); - return returnMsgs; - - } else if (storedInfo.getStartCode() != serverInfo.getStartCode()) { - - // This state is reachable if: - // - // 1) RegionServer A started - // 2) RegionServer B started on the same machine, then - // clobbered A in regionServerStartup. - // 3) RegionServer A returns, expecting to work as usual. - // - // The answer is to ask A to shut down for good. - - HMsg returnMsgs[] = new HMsg[1]; - returnMsgs[0] = new HMsg(HMsg.MSG_REGIONSERVER_ALREADY_RUNNING); - return returnMsgs; - - } else { - - // All's well. Renew the server's lease. - // This will always succeed; otherwise, the fetch of serversToServerInfo - // would have failed above. - - Text serverLabel = new Text(server); - serverLeases.renewLease(serverLabel, serverLabel); + HServerInfo storedInfo = serversToServerInfo.get(server); + + if(storedInfo == null) { + + LOG.debug("received server report from unknown server: " + server); + + // The HBaseMaster may have been restarted. + // Tell the RegionServer to start over and call regionServerStartup() + + HMsg returnMsgs[] = new HMsg[1]; + returnMsgs[0] = new HMsg(HMsg.MSG_CALL_SERVER_STARTUP); + return returnMsgs; + + } else if(storedInfo.getStartCode() != serverInfo.getStartCode()) { + + // This state is reachable if: + // + // 1) RegionServer A started + // 2) RegionServer B started on the same machine, then + // clobbered A in regionServerStartup. + // 3) RegionServer A returns, expecting to work as usual. + // + // The answer is to ask A to shut down for good. + + LOG.debug("region server race condition detected: " + server); + + HMsg returnMsgs[] = new HMsg[1]; + returnMsgs[0] = new HMsg(HMsg.MSG_REGIONSERVER_ALREADY_RUNNING); + return returnMsgs; + + } else { + + // All's well. Renew the server's lease. + // This will always succeed; otherwise, the fetch of serversToServerInfo + // would have failed above. + + Text serverLabel = new Text(server); + serverLeases.renewLease(serverLabel, serverLabel); - // Refresh the info object - serversToServerInfo.put(server, serverInfo); + // Refresh the info object + serversToServerInfo.put(server, serverInfo); - // Next, process messages for this server - return processMsgs(serverInfo, msgs); - } + // Next, process messages for this server + return processMsgs(serverInfo, msgs); } } @@ -597,9 +721,9 @@ // Process the kill list TreeMap regionsToKill = killList.get(info.toString()); - if (regionsToKill != null) { + if(regionsToKill != null) { for(Iterator i = regionsToKill.values().iterator(); - i.hasNext();) { + i.hasNext(); ) { returnMsgs.add(new HMsg(HMsg.MSG_REGION_CLOSE_AND_DELETE, i.next())); } @@ -607,81 +731,90 @@ // Get reports on what the RegionServer did. - synchronized(unassignedRegions) { - for(int i = 0; i < incomingMsgs.length; i++) { - HRegionInfo region = incomingMsgs[i].getRegionInfo(); - - switch(incomingMsgs[i].getMsg()) { + for(int i = 0; i < incomingMsgs.length; i++) { + HRegionInfo region = incomingMsgs[i].getRegionInfo(); - case HMsg.MSG_REPORT_OPEN: - HRegionInfo regionInfo = unassignedRegions.get(region.regionName); + switch(incomingMsgs[i].getMsg()) { - if (regionInfo == null) { + case HMsg.MSG_REPORT_OPEN: + HRegionInfo regionInfo = unassignedRegions.get(region.regionName); - // This Region should not have been opened. - // Ask the server to shut it down, but don't report it as closed. - // Otherwise the HMaster will think the Region was closed on purpose, - // and then try to reopen it elsewhere; that's not what we want. + if(regionInfo == null) { - returnMsgs.add(new HMsg(HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT, region)); + LOG.debug("region server " + info.getServerAddress().toString() + + "should not have opened region " + region.regionName); - } else { + // This Region should not have been opened. + // Ask the server to shut it down, but don't report it as closed. + // Otherwise the HMaster will think the Region was closed on purpose, + // and then try to reopen it elsewhere; that's not what we want. - // Remove from unassigned list so we don't assign it to someone else + returnMsgs.add(new HMsg(HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT, region)); - unassignedRegions.remove(region.regionName); - assignAttempts.remove(region.regionName); + } else { - if (region.regionName.compareTo(rootRegionInfo.regionName) == 0) { + LOG.debug(info.getServerAddress().toString() + " serving " + + region.regionName); - // Store the Root Region location (in memory) + // Remove from unassigned list so we don't assign it to someone else - rootRegionLocation = new HServerAddress(info.getServerAddress()); - - // Wake up the root scanner - - rootRegionLocation.notifyAll(); - break; - - } else if (region.regionName.find(META_TABLE_NAME.toString()) == 0) { + unassignedRegions.remove(region.regionName); + assignAttempts.remove(region.regionName); + + if(region.regionName.compareTo(HGlobals.rootRegionInfo.regionName) == 0) { + + // Store the Root Region location (in memory) + + rootRegionLocation = new HServerAddress(info.getServerAddress()); + + // Wake up threads waiting for the root server + + rootRegionIsAvailable(); + break; + + } else if(region.regionName.find(META_TABLE_NAME.toString()) == 0) { + + // It's a meta region. Put it on the queue to be scanned. + + MetaRegion r = new MetaRegion(); + r.server = info.getServerAddress(); + r.regionName = region.regionName; + r.startKey = region.startKey; - // It's a meta region. Put it on the queue to be scanned. - - MetaRegion r = new MetaRegion(); - r.server = info.getServerAddress(); - r.regionName = region.regionName; - r.startKey = region.startKey; - - synchronized(metaRegionsToScan) { - metaRegionsToScan.add(r); - metaRegionsToScan.notifyAll(); - } + synchronized(metaRegionsToScan) { + metaRegionsToScan.add(r); + metaRegionsToScan.notifyAll(); } - - // Queue up an update to note the region location. + } + + // Queue up an update to note the region location. - synchronized(msgQueue) { - msgQueue.add(new PendingOpenReport(info, region.regionName)); - msgQueue.notifyAll(); - } + synchronized(msgQueue) { + msgQueue.add(new PendingOpenReport(info, region.regionName)); + msgQueue.notifyAll(); } - break; + } + break; + + case HMsg.MSG_REPORT_CLOSE: + LOG.debug(info.getServerAddress().toString() + " no longer serving " + + region.regionName); - case HMsg.MSG_REPORT_CLOSE: - if (region.regionName.compareTo(rootRegionInfo.regionName) == 0) { // Root region - rootRegionLocation = null; - unassignedRegions.put(region.regionName, region); - assignAttempts.put(region.regionName, 0L); + if(region.regionName.compareTo(HGlobals.rootRegionInfo.regionName) == 0) { // Root region + rootRegionLocation = null; + unassignedRegions.put(region.regionName, region); + assignAttempts.put(region.regionName, 0L); - } else { - boolean reassignRegion = true; - - if (regionsToKill.containsKey(region.regionName)) { + } else { + boolean reassignRegion = true; + + synchronized(regionsToKill) { + if(regionsToKill.containsKey(region.regionName)) { regionsToKill.remove(region.regionName); - - if (regionsToKill.size() > 0) { + + if(regionsToKill.size() > 0) { killList.put(info.toString(), regionsToKill); - + } else { killList.remove(info.toString()); } @@ -687,66 +820,69 @@ } reassignRegion = false; } - - synchronized(msgQueue) { - msgQueue.add(new PendingCloseReport(region, reassignRegion)); - msgQueue.notifyAll(); - } + } + + synchronized(msgQueue) { + msgQueue.add(new PendingCloseReport(region, reassignRegion)); + msgQueue.notifyAll(); + } + + // NOTE: we cannot put the region into unassignedRegions as that + // could create a race with the pending close if it gets + // reassigned before the close is processed. + + } + break; - // NOTE: we cannot put the region into unassignedRegions as that - // could create a race with the pending close if it gets - // reassigned before the close is processed. + case HMsg.MSG_NEW_REGION: + LOG.debug("new region " + region.regionName); - } - break; + if(region.regionName.find(META_TABLE_NAME.toString()) == 0) { + // A meta region has split. - case HMsg.MSG_NEW_REGION: - if (region.regionName.find(META_TABLE_NAME.toString()) == 0) { - // A meta region has split. - - allMetaRegionsScanned = false; - } - synchronized(unassignedRegions) { - unassignedRegions.put(region.regionName, region); - assignAttempts.put(region.regionName, 0L); - } - break; - - default: - throw new IOException("Impossible state during msg processing. Instruction: " - + incomingMsgs[i].getMsg()); + allMetaRegionsScanned = false; } + unassignedRegions.put(region.regionName, region); + assignAttempts.put(region.regionName, 0L); + break; + + default: + throw new IOException("Impossible state during msg processing. Instruction: " + + incomingMsgs[i].getMsg()); } + } + + // Figure out what the RegionServer ought to do, and write back. - // Figure out what the RegionServer ought to do, and write back. + if(unassignedRegions.size() > 0) { - if (unassignedRegions.size() > 0) { + // Open new regions as necessary - // Open new regions as necessary + int targetForServer = (int) Math.ceil(unassignedRegions.size() + / (1.0 * serversToServerInfo.size())); - int targetForServer = (int) Math.ceil(unassignedRegions.size() - / (1.0 * serversToServerInfo.size())); + int counter = 0; + long now = System.currentTimeMillis(); - int counter = 0; - long now = System.currentTimeMillis(); + for(Iterator it = unassignedRegions.keySet().iterator(); + it.hasNext(); ) { - for(Iterator it = unassignedRegions.keySet().iterator(); - it.hasNext();) { + Text curRegionName = it.next(); + HRegionInfo regionInfo = unassignedRegions.get(curRegionName); + long assignedTime = assignAttempts.get(curRegionName); - Text curRegionName = it.next(); - HRegionInfo regionInfo = unassignedRegions.get(curRegionName); - long assignedTime = assignAttempts.get(curRegionName); + if(now - assignedTime > maxRegionOpenTime) { + LOG.debug("assigning region " + regionInfo.regionName + " to server " + + info.getServerAddress().toString()); - if (now - assignedTime > maxRegionOpenTime) { - returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo)); + returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo)); - assignAttempts.put(curRegionName, now); - counter++; - } + assignAttempts.put(curRegionName, now); + counter++; + } - if (counter >= targetForServer) { - break; - } + if(counter >= targetForServer) { + break; } } } @@ -752,6 +888,20 @@ } return (HMsg[]) returnMsgs.toArray(new HMsg[returnMsgs.size()]); } + + private synchronized void rootRegionIsAvailable() { + notifyAll(); + } + + private synchronized void waitForRootRegion() { + while(rootRegionLocation == null) { + try { + wait(); + + } catch(InterruptedException e) { + } + } + } ////////////////////////////////////////////////////////////////////////////// // Some internal classes to manage msg-passing and client operations @@ -773,8 +923,7 @@ } catch(InterruptedException iex) { } } - op = msgQueue.elementAt(msgQueue.size()-1); - msgQueue.removeElementAt(msgQueue.size()-1); + op = msgQueue.remove(msgQueue.size()-1); } try { op.process(); @@ -780,9 +929,7 @@ op.process(); } catch(Exception ex) { - synchronized(msgQueue) { - msgQueue.insertElementAt(op, 0); - } + msgQueue.insertElementAt(op, 0); } } } @@ -788,9 +935,9 @@ } } - abstract class PendingOperation { + private abstract class PendingOperation { protected final Text[] columns = { - META_COLUMN_FAMILY + META_COLUMN_FAMILY }; protected final Text startRow = new Text(); protected long clientId; @@ -802,9 +949,9 @@ public abstract void process() throws IOException; } - class PendingServerShutdown extends PendingOperation { - String deadServer; - long oldStartCode; + private class PendingServerShutdown extends PendingOperation { + private String deadServer; + private long oldStartCode; public PendingServerShutdown(HServerInfo serverInfo) { super(); @@ -812,8 +959,8 @@ this.oldStartCode = serverInfo.getStartCode(); } - private void scanMetaRegion(HRegionInterface server, HScannerInterface scanner, - Text regionName) throws IOException { + private void scanMetaRegion(HRegionInterface server, long scannerId, + Text regionName) throws IOException { Vector toDoList = new Vector(); TreeMap regions = new TreeMap(); @@ -820,14 +967,24 @@ DataInputBuffer inbuf = new DataInputBuffer(); try { - HStoreKey key = new HStoreKey(); - TreeMap results = new TreeMap(); + LabelledData[] values = null; + + while(true) { + HStoreKey key = new HStoreKey(); + values = server.next(scannerId, key); + if(values.length == 0) { + break; + } - while(scanner.next(key, results)) { - byte serverBytes[] = results.get(META_COL_SERVER); - String serverName = new String(serverBytes, UTF8_ENCODING); + TreeMap results = new TreeMap(); + for(int i = 0; i < values.length; i++) { + results.put(values[i].getLabel(), values[i].getData().get()); + } + + String serverName = + new String(results.get(META_COL_SERVER), UTF8_ENCODING); - if (deadServer.compareTo(serverName) != 0) { + if(deadServer.compareTo(serverName) != 0) { // This isn't the server you're looking for - move along continue; } @@ -832,10 +989,10 @@ continue; } - byte startCodeBytes[] = results.get(META_COL_STARTCODE); - long startCode = Long.decode(new String(startCodeBytes, UTF8_ENCODING)); + long startCode = + Long.valueOf(new String(results.get(META_COL_STARTCODE), UTF8_ENCODING)); - if (oldStartCode != startCode) { + if(oldStartCode != startCode) { // Close but no cigar continue; } @@ -847,6 +1004,8 @@ HRegionInfo info = new HRegionInfo(); info.readFields(inbuf); + LOG.debug(serverName + " was serving " + info.regionName); + // Add to our to do lists toDoList.add(key); @@ -854,7 +1013,16 @@ } } finally { - scanner.close(); + if(scannerId != -1L) { + try { + server.close(scannerId); + + } catch(IOException e) { + e.printStackTrace(); + + } + } + scannerId = -1L; } // Remove server from root/meta entries @@ -869,7 +1037,7 @@ // Put all the regions we found on the unassigned region list for(Iterator> i = regions.entrySet().iterator(); - i.hasNext();) { + i.hasNext(); ) { Map.Entry e = i.next(); Text region = e.getKey(); @@ -875,10 +1043,8 @@ Text region = e.getKey(); HRegionInfo regionInfo = e.getValue(); - synchronized(unassignedRegions) { - unassignedRegions.put(region, regionInfo); - assignAttempts.put(region, 0L); - } + unassignedRegions.put(region, regionInfo); + assignAttempts.put(region, 0L); } } @@ -883,27 +1049,24 @@ } public void process() throws IOException { + LOG.debug("server shutdown: " + deadServer); + + // Scan the ROOT region + + waitForRootRegion(); // Wait until the root region is available + HRegionInterface server = client.getHRegionConnection(rootRegionLocation); + long scannerId = + server.openScanner(HGlobals.rootRegionInfo.regionName, columns, startRow); + + scanMetaRegion(server, scannerId, HGlobals.rootRegionInfo.regionName); // We can not scan every meta region if they have not already been assigned // and scanned. - - while(!allMetaRegionsScanned) { - try { - allMetaRegionsScanned.wait(); - - } catch(InterruptedException e) { - } - } - // First scan the ROOT region + metaScanner.waitForMetaScan(); - HRegionInterface server = client.getHRegionConnection(rootRegionLocation); - HScannerInterface scanner = server.openScanner(rootRegionInfo.regionName, - columns, startRow); - - scanMetaRegion(server, scanner, rootRegionInfo.regionName); for(Iterator i = knownMetaRegions.values().iterator(); - i.hasNext();) { + i.hasNext(); ) { MetaRegion r = i.next(); @@ -908,8 +1071,8 @@ MetaRegion r = i.next(); server = client.getHRegionConnection(r.server); - scanner = server.openScanner(r.regionName, columns, startRow); - scanMetaRegion(server, scanner, r.regionName); + scannerId = server.openScanner(r.regionName, columns, startRow); + scanMetaRegion(server, scannerId, r.regionName); } } } @@ -915,10 +1078,10 @@ } /** PendingCloseReport is a close message that is saved in a different thread. */ - class PendingCloseReport extends PendingOperation { - HRegionInfo regionInfo; - boolean reassignRegion; - boolean rootRegion; + private class PendingCloseReport extends PendingOperation { + private HRegionInfo regionInfo; + private boolean reassignRegion; + private boolean rootRegion; public PendingCloseReport(HRegionInfo regionInfo, boolean reassignRegion) { super(); @@ -929,7 +1092,7 @@ // If the region closing down is a meta region then we need to update // the ROOT table - if (this.regionInfo.regionName.find(metaTableDesc.getName().toString()) == 0) { + if(this.regionInfo.regionName.find(HGlobals.metaTableDesc.getName().toString()) == 0) { this.rootRegion = true; } else { @@ -938,17 +1101,13 @@ } public void process() throws IOException { - + // We can not access any meta region if they have not already been assigned // and scanned. + + metaScanner.waitForMetaScan(); - while(!allMetaRegionsScanned) { - try { - allMetaRegionsScanned.wait(); - - } catch(InterruptedException e) { - } - } + LOG.debug("region closed: " + regionInfo.regionName); // Mark the Region as unavailable in the appropriate meta table @@ -954,8 +1113,9 @@ Text metaRegionName; HRegionInterface server; - if (rootRegion) { - metaRegionName = rootRegionInfo.regionName; + if(rootRegion) { + metaRegionName = HGlobals.rootRegionInfo.regionName; + waitForRootRegion(); // Make sure root region available server = client.getHRegionConnection(rootRegionLocation); } else { @@ -969,11 +1129,11 @@ server.delete(metaRegionName, clientId, lockid, META_COL_STARTCODE); server.commit(metaRegionName, clientId, lockid); - if (reassignRegion) { - synchronized(unassignedRegions) { - unassignedRegions.put(regionInfo.regionName, regionInfo); - assignAttempts.put(regionInfo.regionName, 0L); - } + if(reassignRegion) { + LOG.debug("reassign region: " + regionInfo.regionName); + + unassignedRegions.put(regionInfo.regionName, regionInfo); + assignAttempts.put(regionInfo.regionName, 0L); } } } @@ -979,14 +1139,14 @@ } /** PendingOpenReport is an open message that is saved in a different thread. */ - class PendingOpenReport extends PendingOperation { - boolean rootRegion; - Text regionName; - BytesWritable serverAddress; - BytesWritable startCode; + private class PendingOpenReport extends PendingOperation { + private boolean rootRegion; + private Text regionName; + private BytesWritable serverAddress; + private BytesWritable startCode; public PendingOpenReport(HServerInfo info, Text regionName) { - if (regionName.find(metaTableDesc.getName().toString()) == 0) { + if(regionName.find(HGlobals.metaTableDesc.getName().toString()) == 0) { // The region which just came on-line is a META region. // We need to look in the ROOT region for its information. @@ -1003,10 +1163,10 @@ try { this.serverAddress = new BytesWritable( - info.getServerAddress().toString().getBytes(UTF8_ENCODING)); + info.getServerAddress().toString().getBytes(UTF8_ENCODING)); this.startCode = new BytesWritable( - String.valueOf(info.getStartCode()).getBytes(UTF8_ENCODING)); + String.valueOf(info.getStartCode()).getBytes(UTF8_ENCODING)); } catch(UnsupportedEncodingException e) { } @@ -1017,14 +1177,10 @@ // We can not access any meta region if they have not already been assigned // and scanned. + + metaScanner.waitForMetaScan(); - while(!allMetaRegionsScanned) { - try { - allMetaRegionsScanned.wait(); - - } catch(InterruptedException e) { - } - } + LOG.debug(regionName + " open on " + serverAddress.toString()); // Register the newly-available Region's location. @@ -1030,8 +1186,9 @@ Text metaRegionName; HRegionInterface server; - if (rootRegion) { - metaRegionName = rootRegionInfo.regionName; + if(rootRegion) { + metaRegionName = HGlobals.rootRegionInfo.regionName; + waitForRootRegion(); // Make sure root region available server = client.getHRegionConnection(rootRegionLocation); } else { @@ -1056,15 +1213,9 @@ // We can not access any meta region if they have not already been assigned // and scanned. + + metaScanner.waitForMetaScan(); - while(!allMetaRegionsScanned) { - try { - allMetaRegionsScanned.wait(); - - } catch(InterruptedException e) { - } - } - // 1. Check to see if table already exists Text metaStartRow = knownMetaRegions.headMap(newRegion.regionName).lastKey(); @@ -1074,7 +1225,7 @@ BytesWritable bytes = server.get(metaRegionName, desc.getName(), META_COL_REGIONINFO); - if (bytes != null && bytes.getSize() != 0) { + if(bytes != null && bytes.getSize() != 0) { byte[] infoBytes = bytes.get(); DataInputBuffer inbuf = new DataInputBuffer(); inbuf.reset(infoBytes, infoBytes.length); @@ -1080,7 +1231,7 @@ inbuf.reset(infoBytes, infoBytes.length); HRegionInfo info = new HRegionInfo(); info.readFields(inbuf); - if (info.tableDesc.getName().compareTo(desc.getName()) == 0) { + if(info.tableDesc.getName().compareTo(desc.getName()) == 0) { throw new IOException("table already exists"); } } @@ -1100,15 +1251,19 @@ long clientId = rand.nextLong(); long lockid = server.startUpdate(metaRegionName, clientId, regionName); server.put(metaRegionName, clientId, lockid, META_COL_REGIONINFO, - new BytesWritable(byteValue.toByteArray())); + new BytesWritable(byteValue.toByteArray())); server.commit(metaRegionName, clientId, lockid); - // 4. Get it assigned to a server + // 4. Close the new region to flush it to disk + + r.close(); + + // 5. Get it assigned to a server + + unassignedRegions.put(regionName, info); + assignAttempts.put(regionName, 0L); - synchronized(unassignedRegions) { - unassignedRegions.put(regionName, info); - assignAttempts.put(regionName, 0L); - } + LOG.debug("created table " + desc.getName()); } /** @@ -1122,7 +1277,7 @@ * @throws IOException */ private HRegion createNewHRegion(HTableDescriptor desc, long regionId) - throws IOException { + throws IOException { HRegionInfo info = new HRegionInfo(regionId, desc, null, null); Path regionDir = HStoreFile.getHRegionDir(dir, info.regionName); @@ -1129,7 +1284,7 @@ fs.mkdirs(regionDir); return new HRegion(dir, new HLog(fs, new Path(regionDir, "log"), conf), fs, - conf, info, null, null); + conf, info, null, null); } /** @@ -1152,17 +1307,8 @@ table.getRegionInfo().write(s); - s.writeLong(table.getRegionId()); meta.put(writeid, META_COL_REGIONINFO, bytes.toByteArray()); - bytes.reset(); - new HServerAddress().write(s); - meta.put(writeid, META_COL_SERVER, bytes.toByteArray()); - - bytes.reset(); - s.writeLong(0L); - meta.put(writeid, META_COL_STARTCODE, bytes.toByteArray()); - meta.commit(writeid); } @@ -1168,7 +1314,7 @@ public void deleteTable(Text tableName) throws IOException { Text[] columns = { - META_COLUMN_FAMILY + META_COLUMN_FAMILY }; // We can not access any meta region if they have not already been assigned @@ -1173,17 +1319,11 @@ // We can not access any meta region if they have not already been assigned // and scanned. + + metaScanner.waitForMetaScan(); - while(!allMetaRegionsScanned) { - try { - allMetaRegionsScanned.wait(); - - } catch(InterruptedException e) { - } - } - - for(Iterator i = knownMetaRegions.tailMap(tableName).values().iterator(); - i.hasNext();) { + for(Iterator it = knownMetaRegions.tailMap(tableName).values().iterator(); + it.hasNext(); ) { // Find all the regions that make up this table @@ -1188,25 +1328,32 @@ // Find all the regions that make up this table long clientId = rand.nextLong(); - MetaRegion m = i.next(); + MetaRegion m = it.next(); HRegionInterface server = client.getHRegionConnection(m.server); + long scannerId = -1L; try { - HScannerInterface scanner - = server.openScanner(m.regionName, columns, tableName); - - HStoreKey key = new HStoreKey(); - TreeMap results = new TreeMap(); - DataInputBuffer inbuf = new DataInputBuffer(); + scannerId = server.openScanner(m.regionName, columns, tableName); Vector rowsToDelete = new Vector(); - while(scanner.next(key, results)) { - byte hRegionInfoBytes[] = results.get(META_COL_REGIONINFO); - inbuf.reset(hRegionInfoBytes, hRegionInfoBytes.length); + DataInputBuffer inbuf = new DataInputBuffer(); + while(true) { + LabelledData[] values = null; + HStoreKey key = new HStoreKey(); + values = server.next(scannerId, key); + if(values.length == 0) { + break; + } + TreeMap results = new TreeMap(); + for(int i = 0; i < values.length; i++) { + results.put(values[i].getLabel(), values[i].getData().get()); + } + byte bytes[] = results.get(META_COL_REGIONINFO); + inbuf.reset(bytes, bytes.length); HRegionInfo info = new HRegionInfo(); info.readFields(inbuf); - if (info.tableDesc.getName().compareTo(tableName) > 0) { + if(info.tableDesc.getName().compareTo(tableName) > 0) { break; // Beyond any more entries for this table } @@ -1212,28 +1359,26 @@ // Is it being served? - byte serverBytes[] = results.get(META_COL_SERVER); - String serverName = new String(serverBytes, UTF8_ENCODING); + String serverName = + new String(results.get(META_COL_SERVER), UTF8_ENCODING); - byte startCodeBytes[] = results.get(META_COL_STARTCODE); - long startCode = Long.decode(new String(startCodeBytes, UTF8_ENCODING)); + long startCode = + Long.valueOf(new String(results.get(META_COL_STARTCODE), UTF8_ENCODING)); - synchronized(serversToServerInfo) { - HServerInfo s = serversToServerInfo.get(serverName); - if (s != null && s.getStartCode() == startCode) { - - // It is being served. Tell the server to stop it and not report back - - TreeMap regionsToKill = killList.get(serverName); - if (regionsToKill == null) { - regionsToKill = new TreeMap(); - } - regionsToKill.put(info.regionName, info); - killList.put(serverName, regionsToKill); + HServerInfo s = serversToServerInfo.get(serverName); + if(s != null && s.getStartCode() == startCode) { + + // It is being served. Tell the server to stop it and not report back + + TreeMap regionsToKill = killList.get(serverName); + if(regionsToKill == null) { + regionsToKill = new TreeMap(); } + regionsToKill.put(info.regionName, info); + killList.put(serverName, regionsToKill); } } - for(Iterator row = rowsToDelete.iterator(); row.hasNext();) { + for(Iterator row = rowsToDelete.iterator(); row.hasNext(); ) { long lockid = server.startUpdate(m.regionName, clientId, row.next()); server.delete(m.regionName, clientId, lockid, columns[0]); server.commit(m.regionName, clientId, lockid); @@ -1240,8 +1385,21 @@ } } catch(IOException e) { e.printStackTrace(); + + } finally { + if(scannerId != -1L) { + try { + server.close(scannerId); + + } catch(IOException e) { + e.printStackTrace(); + + } + } + scannerId = -1L; } } + LOG.debug("deleted table: " + tableName); } public HServerAddress findRootRegion() { @@ -1252,8 +1410,8 @@ // Managing leases ////////////////////////////////////////////////////////////////////////////// - class ServerExpirer extends LeaseListener { - String server = null; + private class ServerExpirer extends LeaseListener { + private String server; public ServerExpirer(String server) { this.server = new String(server); @@ -1260,11 +1418,9 @@ } public void leaseExpired() { - HServerInfo storedInfo = null; + LOG.debug(server + " lease expired"); - synchronized(serversToServerInfo) { - storedInfo = serversToServerInfo.remove(server); - } + HServerInfo storedInfo = serversToServerInfo.remove(server); synchronized(msgQueue) { msgQueue.add(new PendingServerShutdown(storedInfo)); msgQueue.notifyAll(); @@ -1271,5 +1427,31 @@ } } } + + private static void printUsage() { + System.err.println("Usage: java org.apache.hbase.HMaster " + + "[--bind=hostname:port]"); + } + + public static void main(String [] args) throws IOException { + Configuration conf = new HBaseConfiguration(); + + // Process command-line args. TODO: Better cmd-line processing + // (but hopefully something not as painful as cli options). + for (String cmd: args) { + if (cmd.equals("-h") || cmd.startsWith("--h")) { + printUsage(); + return; + } + + final String addressArgKey = "--bind="; + if (cmd.startsWith(addressArgKey)) { + conf.set(MASTER_ADDRESS, + cmd.substring(addressArgKey.length())); + } + } + + new HMaster(conf); + } } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterInterface.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterInterface.java (revision 530954) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterInterface.java (working copy) @@ -15,7 +15,8 @@ */ package org.apache.hadoop.hbase; -import org.apache.hadoop.io.*; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.VersionedProtocol; import java.io.IOException; @@ -23,7 +24,7 @@ * Clients interact with the HMasterInterface to gain access to meta-level HBase * functionality, like finding an HRegionServer and creating/destroying tables. ******************************************************************************/ -public interface HMasterInterface { +public interface HMasterInterface extends VersionedProtocol { public static final long versionID = 1L; // initial version ////////////////////////////////////////////////////////////////////////////// Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterRegionInterface.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterRegionInterface.java (revision 530954) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterRegionInterface.java (working copy) @@ -15,7 +15,9 @@ */ package org.apache.hadoop.hbase; -import java.io.*; +import java.io.IOException; +import org.apache.hadoop.ipc.VersionedProtocol; + /******************************************************************************* * HRegionServers interact with the HMasterRegionInterface to report on local * goings-on and to obtain data-handling instructions from the HMaster. @@ -20,8 +22,8 @@ * HRegionServers interact with the HMasterRegionInterface to report on local * goings-on and to obtain data-handling instructions from the HMaster. *********************************************/ -public interface HMasterRegionInterface { - public static final long versionId = 1L; +public interface HMasterRegionInterface extends VersionedProtocol { + public static final long versionID = 1L; public void regionServerStartup(HServerInfo info) throws IOException; public HMsg[] regionServerReport(HServerInfo info, HMsg msgs[]) throws IOException; } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java (revision 530954) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java (working copy) @@ -22,6 +22,8 @@ import java.io.*; import java.util.*; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; /******************************************************************************* * The HMemcache holds in-memory modifications to the HRegion. This is really a @@ -31,14 +33,14 @@ private static final Log LOG = LogFactory.getLog(HMemcache.class); TreeMap memcache - = new TreeMap(); + = new TreeMap(); Vector> history - = new Vector>(); + = new Vector>(); TreeMap snapshot = null; - HLocking locking = new HLocking(); + ReadWriteLock locker = new ReentrantReadWriteLock(); public HMemcache() { } @@ -52,13 +54,17 @@ } /** - * We want to return a snapshot of the current HMemcache with a known HLog + * Returns a snapshot of the current HMemcache with a known HLog * sequence number at the same time. + * + * We need to prevent any writing to the cache during this time, + * so we obtain a write lock for the duration of the operation. * - * Return both the frozen HMemcache TreeMap, as well as the HLog seq number. - * - * We need to prevent any writing to the cache during this time, so we obtain - * a write lock for the duration of the operation. + *

If this method returns non-null, client must call + * {@link #deleteSnapshot()} to clear 'snapshot-in-progress' + * state when finished with the returned {@link Snapshot}. + * + * @return frozen HMemcache TreeMap and HLog sequence number. */ public Snapshot snapshotMemcacheForLog(HLog log) throws IOException { Snapshot retval = new Snapshot(); @@ -63,12 +69,12 @@ public Snapshot snapshotMemcacheForLog(HLog log) throws IOException { Snapshot retval = new Snapshot(); - locking.obtainWriteLock(); + this.locker.writeLock().lock(); try { - if (snapshot != null) { + if(snapshot != null) { throw new IOException("Snapshot in progress!"); } - if (memcache.size() == 0) { + if(memcache.size() == 0) { LOG.debug("memcache empty. Skipping snapshot"); return retval; } @@ -86,7 +92,7 @@ return retval; } finally { - locking.releaseWriteLock(); + this.locker.writeLock().unlock(); } } @@ -96,10 +102,10 @@ * Modifying the structure means we need to obtain a writelock. */ public void deleteSnapshot() throws IOException { - locking.obtainWriteLock(); + this.locker.writeLock().lock(); try { - if (snapshot == null) { + if(snapshot == null) { throw new IOException("Snapshot not present!"); } LOG.debug("deleting snapshot"); @@ -105,10 +111,10 @@ LOG.debug("deleting snapshot"); for(Iterator> it = history.iterator(); - it.hasNext();) { + it.hasNext(); ) { TreeMap cur = it.next(); - if (snapshot == cur) { + if(snapshot == cur) { it.remove(); break; } @@ -118,7 +124,7 @@ LOG.debug("snapshot deleted"); } finally { - locking.releaseWriteLock(); + this.locker.writeLock().unlock(); } } @@ -128,9 +134,9 @@ * Operation uses a write lock. */ public void add(Text row, TreeMap columns, long timestamp) { - locking.obtainWriteLock(); + this.locker.writeLock().lock(); try { - for(Iterator it = columns.keySet().iterator(); it.hasNext();) { + for(Iterator it = columns.keySet().iterator(); it.hasNext(); ) { Text column = it.next(); byte[] val = columns.get(column); @@ -139,7 +145,7 @@ } } finally { - locking.releaseWriteLock(); + this.locker.writeLock().unlock(); } } @@ -150,7 +156,7 @@ */ public byte[][] get(HStoreKey key, int numVersions) { Vector results = new Vector(); - locking.obtainReadLock(); + this.locker.readLock().lock(); try { Vector result = get(memcache, key, numVersions-results.size()); results.addAll(0, result); @@ -156,7 +162,7 @@ results.addAll(0, result); for(int i = history.size()-1; i >= 0; i--) { - if (numVersions > 0 && results.size() >= numVersions) { + if(numVersions > 0 && results.size() >= numVersions) { break; } @@ -164,7 +170,7 @@ results.addAll(results.size(), result); } - if (results.size() == 0) { + if(results.size() == 0) { return null; } else { @@ -172,7 +178,7 @@ } } finally { - locking.releaseReadLock(); + this.locker.readLock().unlock(); } } @@ -184,7 +190,7 @@ */ public TreeMap getFull(HStoreKey key) throws IOException { TreeMap results = new TreeMap(); - locking.obtainReadLock(); + this.locker.readLock().lock(); try { internalGetFull(memcache, key, results); for(int i = history.size()-1; i >= 0; i--) { @@ -194,7 +200,7 @@ return results; } finally { - locking.releaseReadLock(); + this.locker.readLock().unlock(); } } @@ -199,15 +205,15 @@ } void internalGetFull(TreeMap map, HStoreKey key, - TreeMap results) { + TreeMap results) { SortedMap tailMap = map.tailMap(key); - for(Iterator it = tailMap.keySet().iterator(); it.hasNext();) { + for(Iterator it = tailMap.keySet().iterator(); it.hasNext(); ) { HStoreKey itKey = it.next(); Text itCol = itKey.getColumn(); - if (results.get(itCol) == null + if(results.get(itCol) == null && key.matchesWithoutColumn(itKey)) { BytesWritable val = tailMap.get(itKey); results.put(itCol, val.get()); @@ -212,7 +218,7 @@ BytesWritable val = tailMap.get(itKey); results.put(itCol, val.get()); - } else if (key.getRow().compareTo(itKey.getRow()) > 0) { + } else if(key.getRow().compareTo(itKey.getRow()) > 0) { break; } } @@ -232,10 +238,10 @@ HStoreKey curKey = new HStoreKey(key.getRow(), key.getColumn(), key.getTimestamp()); SortedMap tailMap = map.tailMap(curKey); - for(Iterator it = tailMap.keySet().iterator(); it.hasNext();) { + for(Iterator it = tailMap.keySet().iterator(); it.hasNext(); ) { HStoreKey itKey = it.next(); - if (itKey.matchesRowCol(curKey)) { + if(itKey.matchesRowCol(curKey)) { result.add(tailMap.get(itKey).get()); curKey.setVersion(itKey.getTimestamp() - 1); } @@ -240,7 +246,7 @@ curKey.setVersion(itKey.getTimestamp() - 1); } - if (numVersions > 0 && result.size() >= numVersions) { + if(numVersions > 0 && result.size() >= numVersions) { break; } } @@ -251,7 +257,7 @@ * Return a scanner over the keys in the HMemcache */ public HScannerInterface getScanner(long timestamp, Text targetCols[], Text firstRow) - throws IOException { + throws IOException { return new HMemcacheScanner(timestamp, targetCols, firstRow); } @@ -267,11 +273,11 @@ @SuppressWarnings("unchecked") public HMemcacheScanner(long timestamp, Text targetCols[], Text firstRow) - throws IOException { + throws IOException { super(timestamp, targetCols); - locking.obtainReadLock(); + locker.readLock().lock(); try { this.backingMaps = new TreeMap[history.size() + 1]; int i = 0; @@ -276,7 +282,7 @@ this.backingMaps = new TreeMap[history.size() + 1]; int i = 0; for(Iterator> it = history.iterator(); - it.hasNext();) { + it.hasNext(); ) { backingMaps[i++] = it.next(); } @@ -290,7 +296,7 @@ HStoreKey firstKey = new HStoreKey(firstRow); for(i = 0; i < backingMaps.length; i++) { - if (firstRow.getLength() != 0) { + if(firstRow.getLength() != 0) { keyIterators[i] = backingMaps[i].tailMap(firstKey).keySet().iterator(); } else { @@ -298,10 +304,10 @@ } while(getNext(i)) { - if (!findFirstRow(i, firstRow)) { + if(! findFirstRow(i, firstRow)) { continue; } - if (columnMatch(i)) { + if(columnMatch(i)) { break; } } @@ -331,7 +337,7 @@ * @return - true if there is more data available */ boolean getNext(int i) { - if (!keyIterators[i].hasNext()) { + if(! keyIterators[i].hasNext()) { closeSubScanner(i); return false; } @@ -350,10 +356,10 @@ /** Shut down map iterators, and release the lock */ public void close() throws IOException { - if (!scannerClosed) { + if(! scannerClosed) { try { for(int i = 0; i < keys.length; i++) { - if (keyIterators[i] != null) { + if(keyIterators[i] != null) { closeSubScanner(i); } } @@ -359,7 +365,7 @@ } } finally { - locking.releaseReadLock(); + locker.readLock().unlock(); scannerClosed = true; } } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java (revision 530954) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java (working copy) @@ -67,13 +67,13 @@ // Writable ////////////////////////////////////////////////////////////////////////////// - public void write(DataOutput out) throws IOException { - out.writeByte(msg); - info.write(out); - } + public void write(DataOutput out) throws IOException { + out.writeByte(msg); + info.write(out); + } - public void readFields(DataInput in) throws IOException { - this.msg = in.readByte(); - this.info.readFields(in); - } + public void readFields(DataInput in) throws IOException { + this.msg = in.readByte(); + this.info.readFields(in); + } } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (revision 530954) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (working copy) @@ -61,8 +61,8 @@ // Make sure that srcA comes first; important for key-ordering during // write of the merged file. - if (srcA.getStartKey() == null) { - if (srcB.getStartKey() == null) { + if(srcA.getStartKey() == null) { + if(srcB.getStartKey() == null) { throw new IOException("Cannot merge two regions with null start key"); } // A's start key is null but B's isn't. Assume A comes before B @@ -67,8 +67,8 @@ } // A's start key is null but B's isn't. Assume A comes before B - } else if ((srcB.getStartKey() == null) // A is not null but B is - || (srcA.getStartKey().compareTo(srcB.getStartKey()) > 0)) { // A > B + } else if((srcB.getStartKey() == null) // A is not null but B is + || (srcA.getStartKey().compareTo(srcB.getStartKey()) > 0)) { // A > B HRegion tmp = srcA; srcA = srcB; @@ -75,7 +75,7 @@ srcB = tmp; } - if (!srcA.getEndKey().equals(srcB.getStartKey())) { + if (! srcA.getEndKey().equals(srcB.getStartKey())) { throw new IOException("Cannot merge non-adjacent regions"); } @@ -89,7 +89,7 @@ Text endKey = srcB.getEndKey(); Path merges = new Path(srcA.getRegionDir(), MERGEDIR); - if (!fs.exists(merges)) { + if(! fs.exists(merges)) { fs.mkdirs(merges); } @@ -98,7 +98,7 @@ Path newRegionDir = HStoreFile.getHRegionDir(merges, newRegionInfo.regionName); - if (fs.exists(newRegionDir)) { + if(fs.exists(newRegionDir)) { throw new IOException("Cannot merge; target file collision at " + newRegionDir); } @@ -103,9 +103,9 @@ } LOG.info("starting merge of regions: " + srcA.getRegionName() + " and " - + srcB.getRegionName() + " new region start key is '" - + (startKey == null ? "" : startKey) + "', end key is '" - + (endKey == null ? "" : endKey) + "'"); + + srcB.getRegionName() + " new region start key is '" + + (startKey == null ? "" : startKey) + "', end key is '" + + (endKey == null ? "" : endKey) + "'"); // Flush each of the sources, and merge their files into a single // target for each column family. @@ -114,10 +114,10 @@ TreeSet alreadyMerged = new TreeSet(); TreeMap> filesToMerge = new TreeMap>(); - for(Iterator it = srcA.flushcache(true).iterator(); it.hasNext();) { + for(Iterator it = srcA.flushcache(true).iterator(); it.hasNext(); ) { HStoreFile src = it.next(); Vector v = filesToMerge.get(src.getColFamily()); - if (v == null) { + if(v == null) { v = new Vector(); filesToMerge.put(src.getColFamily(), v); } @@ -126,10 +126,10 @@ LOG.debug("flushing and getting file names for region " + srcB.getRegionName()); - for(Iterator it = srcB.flushcache(true).iterator(); it.hasNext();) { + for(Iterator it = srcB.flushcache(true).iterator(); it.hasNext(); ) { HStoreFile src = it.next(); Vector v = filesToMerge.get(src.getColFamily()); - if (v == null) { + if(v == null) { v = new Vector(); filesToMerge.put(src.getColFamily(), v); } @@ -138,11 +138,11 @@ LOG.debug("merging stores"); - for(Iterator it = filesToMerge.keySet().iterator(); it.hasNext();) { + for(Iterator it = filesToMerge.keySet().iterator(); it.hasNext(); ) { Text colFamily = it.next(); Vector srcFiles = filesToMerge.get(colFamily); HStoreFile dst = new HStoreFile(conf, merges, newRegionInfo.regionName, - colFamily, Math.abs(rand.nextLong())); + colFamily, Math.abs(rand.nextLong())); dst.mergeStoreFiles(srcFiles, fs, conf); alreadyMerged.addAll(srcFiles); @@ -153,15 +153,15 @@ // of any last-minute inserts LOG.debug("flushing changes since start of merge for region " - + srcA.getRegionName()); + + srcA.getRegionName()); filesToMerge.clear(); - for(Iterator it = srcA.close().iterator(); it.hasNext();) { + for(Iterator it = srcA.close().iterator(); it.hasNext(); ) { HStoreFile src = it.next(); - if (!alreadyMerged.contains(src)) { + if(! alreadyMerged.contains(src)) { Vector v = filesToMerge.get(src.getColFamily()); - if (v == null) { + if(v == null) { v = new Vector(); filesToMerge.put(src.getColFamily(), v); } @@ -170,14 +170,14 @@ } LOG.debug("flushing changes since start of merge for region " - + srcB.getRegionName()); + + srcB.getRegionName()); - for(Iterator it = srcB.close().iterator(); it.hasNext();) { + for(Iterator it = srcB.close().iterator(); it.hasNext(); ) { HStoreFile src = it.next(); - if (!alreadyMerged.contains(src)) { + if(! alreadyMerged.contains(src)) { Vector v = filesToMerge.get(src.getColFamily()); - if (v == null) { + if(v == null) { v = new Vector(); filesToMerge.put(src.getColFamily(), v); } @@ -187,11 +187,11 @@ LOG.debug("merging changes since start of merge"); - for(Iterator it = filesToMerge.keySet().iterator(); it.hasNext();) { + for(Iterator it = filesToMerge.keySet().iterator(); it.hasNext(); ) { Text colFamily = it.next(); Vector srcFiles = filesToMerge.get(colFamily); HStoreFile dst = new HStoreFile(conf, merges, newRegionInfo.regionName, - colFamily, Math.abs(rand.nextLong())); + colFamily, Math.abs(rand.nextLong())); dst.mergeStoreFiles(srcFiles, fs, conf); } @@ -199,7 +199,7 @@ // Done HRegion dstRegion = new HRegion(dir, log, fs, conf, newRegionInfo, - newRegionDir, null); + newRegionDir, null); // Get rid of merges directory @@ -284,7 +284,7 @@ * written-to before), then read it from the supplied path. */ public HRegion(Path dir, HLog log, FileSystem fs, Configuration conf, - HRegionInfo regionInfo, Path initialFiles, Path oldLogFile) throws IOException { + HRegionInfo regionInfo, Path initialFiles, Path oldLogFile) throws IOException { this.dir = dir; this.log = log; @@ -303,7 +303,7 @@ // Move prefab HStore files into place (if any) - if (initialFiles != null && fs.exists(initialFiles)) { + if(initialFiles != null && fs.exists(initialFiles)) { fs.rename(initialFiles, regiondir); } @@ -310,11 +310,11 @@ // Load in all the HStores. for(Iterator it = this.regionInfo.tableDesc.families().iterator(); - it.hasNext();) { + it.hasNext(); ) { - Text colFamily = it.next(); + Text colFamily = HStoreKey.extractFamily(it.next()); stores.put(colFamily, new HStore(dir, this.regionInfo.regionName, colFamily, - this.regionInfo.tableDesc.getMaxVersions(), fs, oldLogFile, conf)); + this.regionInfo.tableDesc.getMaxVersions(), fs, oldLogFile, conf)); } // Get rid of any splits or merges that were lost in-progress @@ -320,7 +320,7 @@ // Get rid of any splits or merges that were lost in-progress Path splits = new Path(regiondir, SPLITDIR); - if (fs.exists(splits)) { + if(fs.exists(splits)) { fs.delete(splits); } @@ -325,7 +325,7 @@ } Path merges = new Path(regiondir, MERGEDIR); - if (fs.exists(merges)) { + if(fs.exists(merges)) { fs.delete(merges); } @@ -345,6 +345,7 @@ /** Closes and deletes this HRegion. Called when doing a table deletion, for example */ public void closeAndDelete() throws IOException { + LOG.info("deleting region: " + regionInfo.regionName); close(); fs.delete(regiondir); } @@ -362,7 +363,7 @@ public Vector close() throws IOException { boolean shouldClose = false; synchronized(writestate) { - if (writestate.closed) { + if(writestate.closed) { LOG.info("region " + this.regionInfo.regionName + " closed"); return new Vector(); } @@ -376,7 +377,7 @@ shouldClose = true; } - if (!shouldClose) { + if(! shouldClose) { return null; } else { @@ -382,7 +383,7 @@ } else { LOG.info("closing region " + this.regionInfo.regionName); Vector allHStoreFiles = internalFlushcache(); - for(Iterator it = stores.values().iterator(); it.hasNext();) { + for(Iterator it = stores.values().iterator(); it.hasNext(); ) { HStore store = it.next(); store.close(); } @@ -406,8 +407,8 @@ * Returns two brand-new (and open) HRegions */ public HRegion[] closeAndSplit(Text midKey) throws IOException { - if (((regionInfo.startKey.getLength() != 0) - && (regionInfo.startKey.compareTo(midKey) > 0)) + if(((regionInfo.startKey.getLength() != 0) + && (regionInfo.startKey.compareTo(midKey) > 0)) || ((regionInfo.endKey.getLength() != 0) && (regionInfo.endKey.compareTo(midKey) < 0))) { throw new IOException("Region splitkey must lie within region boundaries."); @@ -419,7 +420,7 @@ // or compactions until close() is called. Path splits = new Path(regiondir, SPLITDIR); - if (!fs.exists(splits)) { + if(! fs.exists(splits)) { fs.mkdirs(splits); } @@ -425,7 +426,7 @@ long regionAId = Math.abs(rand.nextLong()); HRegionInfo regionAInfo = new HRegionInfo(regionAId, regionInfo.tableDesc, - regionInfo.startKey, midKey); + regionInfo.startKey, midKey); long regionBId = Math.abs(rand.nextLong()); HRegionInfo regionBInfo @@ -434,9 +435,9 @@ Path dirA = HStoreFile.getHRegionDir(splits, regionAInfo.regionName); Path dirB = HStoreFile.getHRegionDir(splits, regionBInfo.regionName); - if (fs.exists(dirA) || fs.exists(dirB)) { + if(fs.exists(dirA) || fs.exists(dirB)) { throw new IOException("Cannot split; target file collision at " + dirA - + " or " + dirB); + + " or " + dirB); } TreeSet alreadySplit = new TreeSet(); @@ -441,17 +442,17 @@ TreeSet alreadySplit = new TreeSet(); Vector hstoreFilesToSplit = flushcache(true); - for(Iterator it = hstoreFilesToSplit.iterator(); it.hasNext();) { + for(Iterator it = hstoreFilesToSplit.iterator(); it.hasNext(); ) { HStoreFile hsf = it.next(); LOG.debug("splitting HStore " + hsf.getRegionName() + "/" + hsf.getColFamily() - + "/" + hsf.fileId()); + + "/" + hsf.fileId()); HStoreFile dstA = new HStoreFile(conf, splits, regionAInfo.regionName, - hsf.getColFamily(), Math.abs(rand.nextLong())); + hsf.getColFamily(), Math.abs(rand.nextLong())); HStoreFile dstB = new HStoreFile(conf, splits, regionBInfo.regionName, - hsf.getColFamily(), Math.abs(rand.nextLong())); + hsf.getColFamily(), Math.abs(rand.nextLong())); hsf.splitStoreFile(midKey, dstA, dstB, fs, conf); alreadySplit.add(hsf); @@ -461,18 +462,18 @@ // and copy the small remainder hstoreFilesToSplit = close(); - for(Iterator it = hstoreFilesToSplit.iterator(); it.hasNext();) { + for(Iterator it = hstoreFilesToSplit.iterator(); it.hasNext(); ) { HStoreFile hsf = it.next(); - if (!alreadySplit.contains(hsf)) { + if(! alreadySplit.contains(hsf)) { LOG.debug("splitting HStore " + hsf.getRegionName() + "/" + hsf.getColFamily() - + "/" + hsf.fileId()); + + "/" + hsf.fileId()); HStoreFile dstA = new HStoreFile(conf, splits, regionAInfo.regionName, - hsf.getColFamily(), Math.abs(rand.nextLong())); + hsf.getColFamily(), Math.abs(rand.nextLong())); HStoreFile dstB = new HStoreFile(conf, splits, regionBInfo.regionName, - hsf.getColFamily(), Math.abs(rand.nextLong())); + hsf.getColFamily(), Math.abs(rand.nextLong())); hsf.splitStoreFile(midKey, dstA, dstB, fs, conf); } @@ -494,7 +495,7 @@ regions[1] = regionB; LOG.info("region split complete. new regions are: " + regions[0].getRegionName() - + ", " + regions[1].getRegionName()); + + ", " + regions[1].getRegionName()); return regions; } @@ -565,10 +566,10 @@ Text key = new Text(); long maxSize = 0; - for(Iterator i = stores.values().iterator(); i.hasNext();) { + for(Iterator i = stores.values().iterator(); i.hasNext(); ) { long size = i.next().getLargestFileSize(key); - if (size > maxSize) { // Largest so far + if(size > maxSize) { // Largest so far maxSize = size; midKey.set(key); } @@ -593,9 +594,9 @@ public boolean compactStores() throws IOException { boolean shouldCompact = false; synchronized(writestate) { - if ((!writestate.writesOngoing) + if((! writestate.writesOngoing) && writestate.writesEnabled - && (!writestate.closed) + && (! writestate.closed) && recentCommits > MIN_COMMITS_FOR_COMPACTION) { writestate.writesOngoing = true; @@ -603,7 +604,7 @@ } } - if (!shouldCompact) { + if(! shouldCompact) { LOG.info("not compacting region " + this.regionInfo.regionName); return false; @@ -610,7 +611,7 @@ } else { try { LOG.info("starting compaction on region " + this.regionInfo.regionName); - for(Iterator it = stores.values().iterator(); it.hasNext();) { + for(Iterator it = stores.values().iterator(); it.hasNext(); ) { HStore store = it.next(); store.compact(); } @@ -632,7 +633,7 @@ * only take if there have been a lot of uncommitted writes. */ public void optionallyFlush() throws IOException { - if (commitsSinceFlush > maxUnflushedEntries) { + if(commitsSinceFlush > maxUnflushedEntries) { flushcache(false); } } @@ -657,9 +658,9 @@ public Vector flushcache(boolean disableFutureWrites) throws IOException { boolean shouldFlush = false; synchronized(writestate) { - if ((!writestate.writesOngoing) + if((! writestate.writesOngoing) && writestate.writesEnabled - && (!writestate.closed)) { + && (! writestate.closed)) { writestate.writesOngoing = true; shouldFlush = true; @@ -664,7 +665,7 @@ writestate.writesOngoing = true; shouldFlush = true; - if (disableFutureWrites) { + if(disableFutureWrites) { writestate.writesEnabled = false; } } @@ -670,7 +671,7 @@ } } - if (!shouldFlush) { + if(! shouldFlush) { LOG.debug("not flushing cache for region " + this.regionInfo.regionName); return null; @@ -731,8 +732,8 @@ HMemcache.Snapshot retval = memcache.snapshotMemcacheForLog(log); TreeMap memcacheSnapshot = retval.memcacheSnapshot; - if (memcacheSnapshot == null) { - for(Iterator it = stores.values().iterator(); it.hasNext();) { + if(memcacheSnapshot == null) { + for(Iterator it = stores.values().iterator(); it.hasNext(); ) { HStore hstore = it.next(); Vector hstoreFiles = hstore.getAllMapFiles(); allHStoreFiles.addAll(0, hstoreFiles); @@ -746,7 +747,7 @@ LOG.debug("flushing memcache to HStores"); - for(Iterator it = stores.values().iterator(); it.hasNext();) { + for(Iterator it = stores.values().iterator(); it.hasNext(); ) { HStore hstore = it.next(); Vector hstoreFiles = hstore.flushCache(memcacheSnapshot, logCacheFlushId); @@ -762,7 +763,7 @@ LOG.debug("writing flush cache complete to log"); log.completeCacheFlush(this.regionInfo.regionName, - regionInfo.tableDesc.getName(), logCacheFlushId); + regionInfo.tableDesc.getName(), logCacheFlushId); // C. Delete the now-irrelevant memcache snapshot; its contents have been // dumped to disk-based HStores. @@ -784,7 +785,7 @@ /** Fetch a single data item. */ public byte[] get(Text row, Text column) throws IOException { byte results[][] = get(row, column, Long.MAX_VALUE, 1); - if (results == null) { + if(results == null) { return null; } else { @@ -799,9 +800,9 @@ /** Fetch multiple versions of a single data item, with timestamp. */ public byte[][] get(Text row, Text column, long timestamp, int numVersions) - throws IOException { + throws IOException { - if (writestate.closed) { + if(writestate.closed) { throw new IOException("HRegion is closed."); } @@ -808,8 +809,7 @@ // Make sure this is a valid row and valid column checkRow(row); - Text colFamily = HStoreKey.extractFamily(column); - checkFamily(colFamily); + checkColumn(column); // Obtain the row-lock @@ -830,7 +830,7 @@ // Check the memcache byte[][] result = memcache.get(key, numVersions); - if (result != null) { + if(result != null) { return result; } @@ -838,7 +838,7 @@ Text colFamily = HStoreKey.extractFamily(key.getColumn()); HStore targetStore = stores.get(colFamily); - if (targetStore == null) { + if(targetStore == null) { return null; } @@ -859,7 +859,7 @@ HStoreKey key = new HStoreKey(row, System.currentTimeMillis()); TreeMap memResult = memcache.getFull(key); - for(Iterator it = stores.keySet().iterator(); it.hasNext();) { + for(Iterator it = stores.keySet().iterator(); it.hasNext(); ) { Text colFamily = it.next(); HStore targetStore = stores.get(colFamily); targetStore.getFull(key, memResult); @@ -879,7 +879,7 @@ HStore storelist[] = new HStore[families.size()]; int i = 0; - for(Iterator it = families.iterator(); it.hasNext();) { + for(Iterator it = families.iterator(); it.hasNext(); ) { Text family = it.next(); storelist[i++] = stores.get(family); } @@ -911,7 +911,7 @@ /** * Put a cell value into the locked row. The user indicates the row-lock, the * target column, and the desired value. This stuff is set into a temporary - * memory area until the user commits the change, at which pointit's logged + * memory area until the user commits the change, at which point it's logged * and placed into the memcache. * * This method really just tests the input, then calls an internal localput() @@ -918,10 +918,10 @@ * method. */ public void put(long lockid, Text targetCol, byte[] val) throws IOException { - if (val.length == HStoreKey.DELETE_BYTES.length) { + if(val.length == HStoreKey.DELETE_BYTES.length) { boolean matches = true; for(int i = 0; i < val.length; i++) { - if (val[i] != HStoreKey.DELETE_BYTES[i]) { + if(val[i] != HStoreKey.DELETE_BYTES[i]) { matches = false; break; } @@ -927,7 +927,7 @@ } } - if (matches) { + if(matches) { throw new IOException("Cannot insert value: " + val); } } @@ -950,9 +950,11 @@ * (Or until the user's write-lock expires.) */ void localput(long lockid, Text targetCol, byte[] val) throws IOException { + checkColumn(targetCol); + Text row = getRowFromLock(lockid); - if (row == null) { - throw new IOException("No write lock for lockid " + lockid); + if(row == null) { + throw new LockException("No write lock for lockid " + lockid); } // This sync block makes localput() thread-safe when multiple @@ -964,13 +966,13 @@ // This check makes sure that another thread from the client // hasn't aborted/committed the write-operation. - if (row != getRowFromLock(lockid)) { - throw new IOException("Locking error: put operation on lock " + lockid - + " unexpected aborted by another thread"); + if(row != getRowFromLock(lockid)) { + throw new LockException("Locking error: put operation on lock " + lockid + + " unexpected aborted by another thread"); } TreeMap targets = targetColumns.get(lockid); - if (targets == null) { + if(targets == null) { targets = new TreeMap(); targetColumns.put(lockid, targets); } @@ -985,8 +987,8 @@ */ public void abort(long lockid) throws IOException { Text row = getRowFromLock(lockid); - if (row == null) { - throw new IOException("No write lock for lockid " + lockid); + if(row == null) { + throw new LockException("No write lock for lockid " + lockid); } // This sync block makes abort() thread-safe when multiple @@ -998,9 +1000,9 @@ // This check makes sure another thread from the client // hasn't aborted/committed the write-operation. - if (row != getRowFromLock(lockid)) { - throw new IOException("Locking error: abort() operation on lock " - + lockid + " unexpected aborted by another thread"); + if(row != getRowFromLock(lockid)) { + throw new LockException("Locking error: abort() operation on lock " + + lockid + " unexpected aborted by another thread"); } targetColumns.remove(lockid); @@ -1021,8 +1023,8 @@ // that repeated executions won't screw this up. Text row = getRowFromLock(lockid); - if (row == null) { - throw new IOException("No write lock for lockid " + lockid); + if(row == null) { + throw new LockException("No write lock for lockid " + lockid); } // This check makes sure that another thread from the client @@ -1035,7 +1037,7 @@ long commitTimestamp = System.currentTimeMillis(); log.append(regionInfo.regionName, regionInfo.tableDesc.getName(), row, - targetColumns.get(lockid), commitTimestamp); + targetColumns.get(lockid), commitTimestamp); memcache.add(row, targetColumns.get(lockid), commitTimestamp); @@ -1054,8 +1056,8 @@ /** Make sure this is a valid row for the HRegion */ void checkRow(Text row) throws IOException { - if (((regionInfo.startKey.getLength() == 0) - || (regionInfo.startKey.compareTo(row) <= 0)) + if(((regionInfo.startKey.getLength() == 0) + || (regionInfo.startKey.compareTo(row) <= 0)) && ((regionInfo.endKey.getLength() == 0) || (regionInfo.endKey.compareTo(row) > 0))) { // all's well @@ -1062,17 +1064,18 @@ } else { throw new IOException("Requested row out of range for HRegion " - + regionInfo.regionName + ", startKey='" + regionInfo.startKey - + "', endKey='" + regionInfo.endKey + "', row='" + row + "'"); + + regionInfo.regionName + ", startKey='" + regionInfo.startKey + + "', endKey='" + regionInfo.endKey + "', row='" + row + "'"); } } - + /** Make sure this is a valid column for the current table */ - void checkFamily(Text family) throws IOException { - if (!regionInfo.tableDesc.hasFamily(family)) { + void checkColumn(Text columnName) throws IOException { + Text family = new Text(HStoreKey.extractFamily(columnName) + ":"); + if(! regionInfo.tableDesc.hasFamily(family)) { throw new IOException("Requested column family " + family - + " does not exist in HRegion " + regionInfo.regionName - + " for table " + regionInfo.tableDesc.getName()); + + " does not exist in HRegion " + regionInfo.regionName + + " for table " + regionInfo.tableDesc.getName()); } } @@ -1092,6 +1095,8 @@ * which maybe we'll do in the future. */ long obtainLock(Text row) throws IOException { + checkRow(row); + synchronized(rowsToLocks) { while(rowsToLocks.get(row) != null) { try { @@ -1109,6 +1114,8 @@ } Text getRowFromLock(long lockid) throws IOException { + // Pattern is that all access to rowsToLocks and/or to + // locksToRows is via a lock on rowsToLocks. synchronized(rowsToLocks) { return locksToRows.get(lockid); } @@ -1150,7 +1157,7 @@ keys[i] = new HStoreKey(); resultSets[i] = new TreeMap(); - if (!scanners[i].next(keys[i], resultSets[i])) { + if(! scanners[i].next(keys[i], resultSets[i])) { closeScanner(i); } } @@ -1167,7 +1174,7 @@ Text chosenRow = null; long chosenTimestamp = -1; for(int i = 0; i < keys.length; i++) { - if (scanners[i] != null + if(scanners[i] != null && (chosenRow == null || (keys[i].getRow().compareTo(chosenRow) < 0) || ((keys[i].getRow().compareTo(chosenRow) == 0) @@ -1181,7 +1188,7 @@ // Store the key and results for each sub-scanner. Merge them as appropriate. boolean insertedItem = false; - if (chosenTimestamp > 0) { + if(chosenTimestamp > 0) { key.setRow(chosenRow); key.setVersion(chosenTimestamp); key.setColumn(new Text("")); @@ -1188,8 +1195,8 @@ for(int i = 0; i < scanners.length; i++) { while((scanners[i] != null) - && (keys[i].getRow().compareTo(chosenRow) == 0) - && (keys[i].getTimestamp() == chosenTimestamp)) { + && (keys[i].getRow().compareTo(chosenRow) == 0) + && (keys[i].getTimestamp() == chosenTimestamp)) { results.putAll(resultSets[i]); insertedItem = true; @@ -1195,7 +1202,7 @@ insertedItem = true; resultSets[i].clear(); - if (!scanners[i].next(keys[i], resultSets[i])) { + if(! scanners[i].next(keys[i], resultSets[i])) { closeScanner(i); } } @@ -1204,10 +1211,10 @@ // row label, then its timestamp is bad. We need to advance it. while((scanners[i] != null) - && (keys[i].getRow().compareTo(chosenRow) <= 0)) { + && (keys[i].getRow().compareTo(chosenRow) <= 0)) { resultSets[i].clear(); - if (!scanners[i].next(keys[i], resultSets[i])) { + if(! scanners[i].next(keys[i], resultSets[i])) { closeScanner(i); } } @@ -1231,7 +1238,7 @@ /** All done with the scanner. */ public void close() throws IOException { for(int i = 0; i < scanners.length; i++) { - if (scanners[i] != null) { + if(scanners[i] != null) { closeScanner(i); } } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java (revision 530954) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java (working copy) @@ -38,11 +38,11 @@ } public HRegionInfo(long regionId, HTableDescriptor tableDesc, Text startKey, - Text endKey) throws IllegalArgumentException { + Text endKey) throws IllegalArgumentException { this.regionId = regionId; - if (tableDesc == null) { + if(tableDesc == null) { throw new IllegalArgumentException("tableDesc cannot be null"); } @@ -49,7 +49,7 @@ this.tableDesc = tableDesc; this.startKey = new Text(); - if (startKey != null) { + if(startKey != null) { this.startKey.set(startKey); } @@ -54,7 +54,7 @@ } this.endKey = new Text(); - if (endKey != null) { + if(endKey != null) { this.endKey.set(endKey); } @@ -59,7 +59,7 @@ } this.regionName = new Text(tableDesc.getName() + "_" - + (startKey == null ? "" : startKey.toString()) + "_" + regionId); + + (startKey == null ? "" : startKey.toString()) + "_" + regionId); } ////////////////////////////////////////////////////////////////////////////// Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java (revision 530954) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java (working copy) @@ -15,7 +15,9 @@ */ package org.apache.hadoop.hbase; -import org.apache.hadoop.io.*; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.VersionedProtocol; import java.io.*; @@ -23,7 +25,7 @@ * Clients interact with HRegionServers using * a handle to the HRegionInterface. ******************************************************************************/ -public interface HRegionInterface { +public interface HRegionInterface extends VersionedProtocol { public static final long versionID = 1L; // initial version // Get metainfo about an HRegion @@ -30,10 +32,6 @@ public HRegionInfo getRegionInfo(Text regionName); - // Start a scanner for a given HRegion. - - public HScannerInterface openScanner(Text regionName, Text[] columns, Text startRow) throws IOException; - // GET methods for an HRegion. public BytesWritable get(Text regionName, Text row, Text column) throws IOException; @@ -58,4 +56,41 @@ public void abort(Text regionName, long clientid, long lockid) throws IOException; public void commit(Text regionName, long clientid, long lockid) throws IOException; public void renewLease(long lockid, long clientid) throws IOException; + + ////////////////////////////////////////////////////////////////////////////// + // remote scanner interface + ////////////////////////////////////////////////////////////////////////////// + + /** + * Opens a remote scanner. + * + * @param clientId - client identifier (so we can associate a scanner with a client) + * @param regionName - name of region to scan + * @param columns - columns to scan + * @param startRow - starting row to scan + * + * @param scannerId - scanner identifier used in other calls + * @throws IOException + */ + public long openScanner(Text regionName, Text[] columns, Text startRow) throws IOException; + + /** + * Get the next set of values + * + * @param scannerId - clientId passed to openScanner + * @param key - the next HStoreKey + * @param columns - an array of column names + * @param values - an array of byte[] values (corresponds 1-1 with columns) + * @return - true if a value was retrieved + * @throws IOException + */ + public LabelledData[] next(long scannerId, HStoreKey key) throws IOException; + + /** + * Close a scanner + * + * @param scannerId - the scanner id returned by openScanner + * @throws IOException + */ + public void close(long scannerId) throws IOException; } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (revision 530954) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (working copy) @@ -15,6 +15,8 @@ */ package org.apache.hadoop.hbase; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.ipc.*; @@ -22,6 +24,8 @@ import java.io.*; import java.util.*; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; /******************************************************************************* * HRegionServer makes a set of HRegions available to clients. It checks in with @@ -27,7 +31,20 @@ * HRegionServer makes a set of HRegions available to clients. It checks in with * the HMaster. There are many HRegionServers in a single HBase deployment. ******************************************************************************/ -public class HRegionServer implements HConstants, HRegionInterface, Runnable { +public class HRegionServer + implements HConstants, HRegionInterface, Runnable { + + public long getProtocolVersion(String protocol, + long clientVersion) throws IOException { + if (protocol.equals(HRegionInterface.class.getName())) { + return HRegionInterface.versionID; + } else { + throw new IOException("Unknown protocol to name node: " + protocol); + } + } + + private static final Log LOG = LogFactory.getLog(HRegionServer.class); + private boolean stopRequested; private Path regionDir; private HServerAddress address; @@ -34,7 +51,7 @@ private Configuration conf; private Random rand; private TreeMap regions; // region name -> HRegion - private HLocking locking; + private ReadWriteLock locker; private Vector outboundMsgs; private long threadWakeFrequency; @@ -61,7 +78,7 @@ } public void run() { - while(!stopRequested) { + while(! stopRequested) { long startTime = System.currentTimeMillis(); // Grab a list of regions to check @@ -67,7 +84,7 @@ // Grab a list of regions to check Vector checkSplit = new Vector(); - locking.obtainReadLock(); + locker.readLock().lock(); try { checkSplit.addAll(regions.values()); @@ -72,7 +89,7 @@ checkSplit.addAll(regions.values()); } finally { - locking.releaseReadLock(); + locker.readLock().unlock(); } // Check to see if they need splitting @@ -78,7 +95,7 @@ // Check to see if they need splitting Vector toSplit = new Vector(); - for(Iterator it = checkSplit.iterator(); it.hasNext();) { + for(Iterator it = checkSplit.iterator(); it.hasNext(); ) { HRegion cur = it.next(); Text midKey = new Text(); @@ -83,7 +100,7 @@ Text midKey = new Text(); try { - if (cur.needsSplit(midKey)) { + if(cur.needsSplit(midKey)) { toSplit.add(new SplitRegion(cur, midKey)); } @@ -92,12 +109,12 @@ } } - for(Iterator it = toSplit.iterator(); it.hasNext();) { + for(Iterator it = toSplit.iterator(); it.hasNext(); ) { SplitRegion r = it.next(); - locking.obtainWriteLock(); + locker.writeLock().lock(); regions.remove(r.region.getRegionName()); - locking.releaseWriteLock(); + locker.writeLock().unlock(); HRegion[] newRegions = null; try { @@ -103,6 +120,8 @@ try { Text oldRegion = r.region.getRegionName(); + LOG.info("splitting region: " + oldRegion); + newRegions = r.region.closeAndSplit(r.midKey); // When a region is split, the META table needs to updated if we're @@ -111,8 +130,10 @@ Text tableToUpdate = (oldRegion.find(META_TABLE_NAME.toString()) == 0) - ? ROOT_TABLE_NAME : META_TABLE_NAME; + ? ROOT_TABLE_NAME : META_TABLE_NAME; + LOG.debug("region split complete. updating meta"); + client.openTable(tableToUpdate); long lockid = client.startUpdate(oldRegion); client.delete(lockid, META_COL_REGIONINFO); @@ -132,7 +153,14 @@ // Now tell the master about the new regions + LOG.debug("reporting region split to master"); + reportSplit(newRegions[0].getRegionInfo(), newRegions[1].getRegionInfo()); + + LOG.info("region split successful. old region=" + oldRegion + + ", new regions: " + newRegions[0].getRegionName() + ", " + + newRegions[1].getRegionName()); + newRegions[0].close(); newRegions[1].close(); @@ -145,11 +173,15 @@ // Sleep - long endTime = System.currentTimeMillis(); - try { - Thread.sleep(splitCheckFrequency - (endTime - startTime)); - - } catch(InterruptedException iex) { + long waitTime = + splitCheckFrequency - (System.currentTimeMillis() - startTime); + + if(waitTime > 0) { + try { + Thread.sleep(waitTime); + + } catch(InterruptedException iex) { + } } } } @@ -161,7 +193,7 @@ private Thread cacheFlusherThread; private class Flusher implements Runnable { public void run() { - while(!stopRequested) { + while(! stopRequested) { long startTime = System.currentTimeMillis(); // Grab a list of items to flush @@ -167,7 +199,7 @@ // Grab a list of items to flush Vector toFlush = new Vector(); - locking.obtainReadLock(); + locker.readLock().lock(); try { toFlush.addAll(regions.values()); @@ -172,7 +204,7 @@ toFlush.addAll(regions.values()); } finally { - locking.releaseReadLock(); + locker.readLock().unlock(); } // Flush them, if necessary @@ -177,7 +209,7 @@ // Flush them, if necessary - for(Iterator it = toFlush.iterator(); it.hasNext();) { + for(Iterator it = toFlush.iterator(); it.hasNext(); ) { HRegion cur = it.next(); try { @@ -190,11 +222,15 @@ // Sleep - long endTime = System.currentTimeMillis(); - try { - Thread.sleep(threadWakeFrequency - (endTime - startTime)); - - } catch(InterruptedException iex) { + long waitTime = + threadWakeFrequency - (System.currentTimeMillis() - startTime); + + if(waitTime > 0) { + try { + Thread.sleep(waitTime); + + } catch(InterruptedException iex) { + } } } } @@ -212,7 +248,7 @@ private Thread logRollerThread; private class LogRoller implements Runnable { public void run() { - while(!stopRequested) { + while(! stopRequested) { // If the number of log entries is high enough, roll the log. This is a // very fast operation, but should not be done too frequently. @@ -217,7 +253,7 @@ // If the number of log entries is high enough, roll the log. This is a // very fast operation, but should not be done too frequently. - if (log.getNumEntries() > maxLogEntries) { + if(log.getNumEntries() > maxLogEntries) { try { log.rollWriter(); @@ -249,8 +285,8 @@ /** Start a HRegionServer at the default location */ public HRegionServer(Configuration conf) throws IOException { this(new Path(conf.get(HREGION_DIR, DEFAULT_HREGION_DIR)), - new HServerAddress(conf.get("hbase.regionserver.default.name")), - conf); + new HServerAddress(conf.get(REGIONSERVER_ADDRESS, "localhost:0")), + conf); } /** Start a HRegionServer at an indicated location */ @@ -255,7 +291,7 @@ /** Start a HRegionServer at an indicated location */ public HRegionServer(Path regionDir, HServerAddress address, Configuration conf) - throws IOException { + throws IOException { // Basic setup @@ -261,12 +297,12 @@ this.stopRequested = false; this.regionDir = regionDir; - this.address = address; this.conf = conf; this.rand = new Random(); this.regions = new TreeMap(); - this.locking = new HLocking(); + this.locker = new ReentrantReadWriteLock(); this.outboundMsgs = new Vector(); + this.scanners = Collections.synchronizedMap(new TreeMap()); // Config'ed params @@ -278,7 +314,7 @@ // Cache flushing this.cacheFlusher = new Flusher(); - this.cacheFlusherThread = new Thread(cacheFlusher); + this.cacheFlusherThread = new Thread(cacheFlusher, "HRegionServer.cacheFlusher"); // Check regions to see if they need to be split @@ -283,21 +319,36 @@ // Check regions to see if they need to be split this.splitChecker = new SplitChecker(); - this.splitCheckerThread = new Thread(splitChecker); + this.splitCheckerThread = new Thread(splitChecker, "HRegionServer.splitChecker"); + + // Process requests from Master + + this.toDo = new Vector(); + this.worker = new Worker(); + this.workerThread = new Thread(worker, "HRegionServer.worker"); try { + + // Server to handle client requests + + this.server = RPC.getServer(this, address.getBindAddress().toString(), + address.getPort(), conf.getInt("hbase.hregionserver.handler.count", 10), false, conf); + + this.address = new HServerAddress(server.getListenerAddress()); + // Local file paths - this.fs = FileSystem.get(conf); - Path newlogdir = new Path(regionDir, "log" + "_" + address.toString()); - this.oldlogfile = new Path(regionDir, "oldlogfile" + "_" + address.toString()); + String serverName = this.address.getBindAddress() + "_" + this.address.getPort(); + Path newlogdir = new Path(regionDir, "log" + "_" + serverName); + this.oldlogfile = new Path(regionDir, "oldlogfile" + "_" + serverName); // Logging + this.fs = FileSystem.get(conf); HLog.consolidateOldLog(newlogdir, oldlogfile, fs, conf); this.log = new HLog(fs, newlogdir, conf); this.logRoller = new LogRoller(); - this.logRollerThread = new Thread(logRoller); + this.logRollerThread = new Thread(logRoller, "HRegionServer.logRoller"); // Remote HMaster @@ -302,13 +353,14 @@ // Remote HMaster this.hbaseMaster = (HMasterRegionInterface) - RPC.waitForProxy(HMasterRegionInterface.class, - HMasterRegionInterface.versionId, - new HServerAddress(conf.get(MASTER_DEFAULT_NAME)).getInetSocketAddress(), - conf); + RPC.waitForProxy(HMasterRegionInterface.class, + HMasterRegionInterface.versionID, + new HServerAddress(conf.get(MASTER_ADDRESS)).getInetSocketAddress(), + conf); // Threads + this.workerThread.start(); this.cacheFlusherThread.start(); this.splitCheckerThread.start(); this.logRollerThread.start(); @@ -313,12 +365,10 @@ this.splitCheckerThread.start(); this.logRollerThread.start(); this.leases = new Leases(conf.getLong("hbase.hregionserver.lease.period", - 3 * 60 * 1000), threadWakeFrequency); + 3 * 60 * 1000), threadWakeFrequency); // Server - this.server = RPC.getServer(this, address.getBindAddress().toString(), - address.getPort(), conf.getInt("hbase.hregionserver.handler.count", 10), false, conf); this.server.start(); } catch(IOException e) { @@ -325,6 +375,8 @@ this.stopRequested = true; throw e; } + + LOG.info("HRegionServer started at: " + address.toString()); } /** @@ -334,7 +386,7 @@ * processing to cease. */ public void stop() throws IOException { - if (!stopRequested) { + if(! stopRequested) { stopRequested = true; closeAllRegions(); @@ -342,7 +394,7 @@ fs.close(); server.stop(); } - + LOG.info("stopping server at: " + address.toString()); } /** Call join to wait for all the threads to finish */ @@ -348,6 +400,12 @@ /** Call join to wait for all the threads to finish */ public void join() { try { + this.workerThread.join(); + + } catch(InterruptedException iex) { + } + + try { this.logRollerThread.join(); } catch(InterruptedException iex) { @@ -366,7 +424,7 @@ } catch(InterruptedException iex) { } - + LOG.info("server stopped at: " + address.toString()); } /** @@ -375,7 +433,7 @@ * load/unload instructions. */ public void run() { - while(!stopRequested) { + while(! stopRequested) { HServerInfo info = new HServerInfo(address, rand.nextLong()); long lastMsg = 0; long waitTime; @@ -388,10 +446,12 @@ } catch(IOException e) { waitTime = msgInterval - (System.currentTimeMillis() - lastMsg); - try { - Thread.sleep(waitTime); - - } catch(InterruptedException iex) { + if(waitTime > 0) { + try { + Thread.sleep(waitTime); + + } catch(InterruptedException iex) { + } } continue; } @@ -398,8 +458,8 @@ // Now ask the master what it wants us to do and tell it what we have done. - while(!stopRequested) { - if ((System.currentTimeMillis() - lastMsg) >= msgInterval) { + while(! stopRequested) { + if((System.currentTimeMillis() - lastMsg) >= msgInterval) { HMsg outboundArray[] = null; synchronized(outboundMsgs) { @@ -411,10 +471,33 @@ HMsg msgs[] = hbaseMaster.regionServerReport(info, outboundArray); lastMsg = System.currentTimeMillis(); - // Process the HMaster's instruction stream + // Queue up the HMaster's instruction stream for processing - if (!processMessages(msgs)) { - break; + synchronized(toDo) { + boolean restartOrStop = false; + for(int i = 0; i < msgs.length; i++) { + switch(msgs[i].getMsg()) { + + case HMsg.MSG_CALL_SERVER_STARTUP: + closeAllRegions(); + restartOrStop = true; + break; + + case HMsg.MSG_REGIONSERVER_ALREADY_RUNNING: + stop(); + restartOrStop = true; + break; + + default: + toDo.add(msgs[i]); + } + } + if(toDo.size() > 0) { + toDo.notifyAll(); + } + if(restartOrStop) { + break; + } } } catch(IOException e) { @@ -424,53 +507,14 @@ waitTime = msgInterval - (System.currentTimeMillis() - lastMsg); - try { - Thread.sleep(waitTime); - } catch(InterruptedException iex) { + if(waitTime > 0) { + try { + Thread.sleep(waitTime); + } catch(InterruptedException iex) { + } } - - } - } - } - - private boolean processMessages(HMsg[] msgs) throws IOException { - for(int i = 0; i < msgs.length; i++) { - switch(msgs[i].getMsg()) { - - case HMsg.MSG_REGION_OPEN: // Open a region - openRegion(msgs[i].getRegionInfo()); - break; - - case HMsg.MSG_REGION_CLOSE: // Close a region - closeRegion(msgs[i].getRegionInfo(), true); - break; - - case HMsg.MSG_REGION_MERGE: // Merge two regions - //TODO ??? - throw new IOException("TODO: need to figure out merge"); - //break; - - case HMsg.MSG_CALL_SERVER_STARTUP: // Close regions, restart - closeAllRegions(); - return false; - - case HMsg.MSG_REGIONSERVER_ALREADY_RUNNING: // Go away - stop(); - return false; - - case HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT: // Close a region, don't reply - closeRegion(msgs[i].getRegionInfo(), false); - break; - - case HMsg.MSG_REGION_CLOSE_AND_DELETE: - closeAndDeleteRegion(msgs[i].getRegionInfo()); - break; - - default: - throw new IOException("Impossible state during msg processing. Instruction: " + msgs[i]); } } - return true; } /** Add to the outbound message buffer */ @@ -508,9 +552,68 @@ // HMaster-given operations ////////////////////////////////////////////////////////////////////////////// + private Vector toDo; + private Worker worker; + private Thread workerThread; + private class Worker implements Runnable { + public void run() { + while(!stopRequested) { + HMsg msg = null; + synchronized(toDo) { + while(toDo.size() == 0) { + try { + toDo.wait(); + + } catch(InterruptedException e) { + } + } + msg = toDo.remove(0); + } + try { + switch(msg.getMsg()) { + + case HMsg.MSG_REGION_OPEN: // Open a region + openRegion(msg.getRegionInfo()); + break; + + case HMsg.MSG_REGION_CLOSE: // Close a region + closeRegion(msg.getRegionInfo(), true); + break; + + case HMsg.MSG_REGION_MERGE: // Merge two regions + //TODO ??? + throw new IOException("TODO: need to figure out merge"); + //break; + + case HMsg.MSG_CALL_SERVER_STARTUP: // Close regions, restart + closeAllRegions(); + continue; + + case HMsg.MSG_REGIONSERVER_ALREADY_RUNNING: // Go away + stop(); + continue; + + case HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT: // Close a region, don't reply + closeRegion(msg.getRegionInfo(), false); + break; + + case HMsg.MSG_REGION_CLOSE_AND_DELETE: + closeAndDeleteRegion(msg.getRegionInfo()); + break; + + default: + throw new IOException("Impossible state during msg processing. Instruction: " + msg); + } + } catch(IOException e) { + e.printStackTrace(); + } + } + } + } + private void openRegion(HRegionInfo regionInfo) throws IOException { - locking.obtainWriteLock(); + this.locker.writeLock().lock(); try { HRegion region = new HRegion(regionDir, log, fs, conf, regionInfo, null, oldlogfile); @@ -518,7 +621,7 @@ reportOpen(region); } finally { - locking.releaseWriteLock(); + this.locker.writeLock().unlock(); } } @@ -523,16 +626,16 @@ } private void closeRegion(HRegionInfo info, boolean reportWhenCompleted) - throws IOException { + throws IOException { - locking.obtainWriteLock(); + this.locker.writeLock().lock(); try { HRegion region = regions.remove(info.regionName); - if (region != null) { + if(region != null) { region.close(); - if (reportWhenCompleted) { + if(reportWhenCompleted) { reportClose(region); } } @@ -538,7 +641,7 @@ } } finally { - locking.releaseWriteLock(); + this.locker.writeLock().unlock(); } } @@ -544,11 +647,11 @@ private void closeAndDeleteRegion(HRegionInfo info) throws IOException { - locking.obtainWriteLock(); + this.locker.writeLock().lock(); try { HRegion region = regions.remove(info.regionName); - if (region != null) { + if(region != null) { region.closeAndDelete(); } @@ -553,7 +656,7 @@ } } finally { - locking.releaseWriteLock(); + this.locker.writeLock().unlock(); } } @@ -559,9 +662,9 @@ /** Called either when the master tells us to restart or from stop() */ private void closeAllRegions() throws IOException { - locking.obtainWriteLock(); + this.locker.writeLock().lock(); try { - for(Iterator it = regions.values().iterator(); it.hasNext();) { + for(Iterator it = regions.values().iterator(); it.hasNext(); ) { HRegion region = it.next(); region.close(); } @@ -568,7 +671,7 @@ regions.clear(); } finally { - locking.releaseWriteLock(); + this.locker.writeLock().unlock(); } } @@ -580,24 +683,24 @@ * * For now, we do not do merging. Splits are driven by the HRegionServer. ****************************************************************************/ - /* - private void mergeRegions(Text regionNameA, Text regionNameB) throws IOException { +/* + private void mergeRegions(Text regionNameA, Text regionNameB) throws IOException { locking.obtainWriteLock(); try { - HRegion srcA = regions.remove(regionNameA); - HRegion srcB = regions.remove(regionNameB); - HRegion newRegion = HRegion.closeAndMerge(srcA, srcB); - regions.put(newRegion.getRegionName(), newRegion); + HRegion srcA = regions.remove(regionNameA); + HRegion srcB = regions.remove(regionNameB); + HRegion newRegion = HRegion.closeAndMerge(srcA, srcB); + regions.put(newRegion.getRegionName(), newRegion); - reportClose(srcA); - reportClose(srcB); - reportOpen(newRegion); + reportClose(srcA); + reportClose(srcB); + reportOpen(newRegion); } finally { - locking.releaseWriteLock(); - } + locking.releaseWriteLock(); } - */ + } +*/ ////////////////////////////////////////////////////////////////////////////// // HRegionInterface @@ -606,7 +709,7 @@ /** Obtain a table descriptor for the given region */ public HRegionInfo getRegionInfo(Text regionName) { HRegion region = getRegion(regionName); - if (region == null) { + if(region == null) { return null; } return region.getRegionInfo(); @@ -612,21 +715,10 @@ return region.getRegionInfo(); } - /** Start a scanner for a given HRegion. */ - public HScannerInterface openScanner(Text regionName, Text[] cols, - Text firstRow) throws IOException { - - HRegion r = getRegion(regionName); - if (r == null) { - throw new IOException("Not serving region " + regionName); - } - return r.getScanner(cols, firstRow); - } - /** Get the indicated row/column */ public BytesWritable get(Text regionName, Text row, Text column) throws IOException { HRegion region = getRegion(regionName); - if (region == null) { + if(region == null) { throw new IOException("Not serving region " + regionName); } @@ -631,7 +723,7 @@ } byte results[] = region.get(row, column); - if (results != null) { + if(results != null) { return new BytesWritable(results); } return null; @@ -639,10 +731,10 @@ /** Get multiple versions of the indicated row/col */ public BytesWritable[] get(Text regionName, Text row, Text column, - int numVersions) throws IOException { + int numVersions) throws IOException { HRegion region = getRegion(regionName); - if (region == null) { + if(region == null) { throw new IOException("Not serving region " + regionName); } @@ -647,10 +739,10 @@ } byte results[][] = region.get(row, column, numVersions); - if (results != null) { + if(results != null) { BytesWritable realResults[] = new BytesWritable[results.length]; for(int i = 0; i < realResults.length; i++) { - if (results[i] != null) { + if(results[i] != null) { realResults[i] = new BytesWritable(results[i]); } } @@ -661,10 +753,10 @@ /** Get multiple timestamped versions of the indicated row/col */ public BytesWritable[] get(Text regionName, Text row, Text column, - long timestamp, int numVersions) throws IOException { + long timestamp, int numVersions) throws IOException { HRegion region = getRegion(regionName); - if (region == null) { + if(region == null) { throw new IOException("Not serving region " + regionName); } @@ -669,10 +761,10 @@ } byte results[][] = region.get(row, column, timestamp, numVersions); - if (results != null) { + if(results != null) { BytesWritable realResults[] = new BytesWritable[results.length]; for(int i = 0; i < realResults.length; i++) { - if (results[i] != null) { + if(results[i] != null) { realResults[i] = new BytesWritable(results[i]); } } @@ -684,7 +776,7 @@ /** Get all the columns (along with their names) for a given row. */ public LabelledData[] getRow(Text regionName, Text row) throws IOException { HRegion region = getRegion(regionName); - if (region == null) { + if(region == null) { throw new IOException("Not serving region " + regionName); } @@ -691,7 +783,7 @@ TreeMap map = region.getFull(row); LabelledData result[] = new LabelledData[map.size()]; int counter = 0; - for(Iterator it = map.keySet().iterator(); it.hasNext();) { + for(Iterator it = map.keySet().iterator(); it.hasNext(); ) { Text colname = it.next(); byte val[] = map.get(colname); result[counter++] = new LabelledData(colname, val); @@ -723,10 +815,10 @@ } public long startUpdate(Text regionName, long clientid, Text row) - throws IOException { + throws IOException { HRegion region = getRegion(regionName); - if (region == null) { + if(region == null) { throw new IOException("Not serving region " + regionName); } @@ -732,8 +824,8 @@ long lockid = region.startUpdate(row); leases.createLease(new Text(String.valueOf(clientid)), - new Text(String.valueOf(lockid)), - new RegionListener(region, lockid)); + new Text(String.valueOf(lockid)), + new RegionListener(region, lockid)); return lockid; } @@ -740,10 +832,10 @@ /** Add something to the HBase. */ public void put(Text regionName, long clientid, long lockid, Text column, - BytesWritable val) throws IOException { + BytesWritable val) throws IOException { HRegion region = getRegion(regionName); - if (region == null) { + if(region == null) { throw new IOException("Not serving region " + regionName); } @@ -748,7 +840,7 @@ } leases.renewLease(new Text(String.valueOf(clientid)), - new Text(String.valueOf(lockid))); + new Text(String.valueOf(lockid))); region.put(lockid, column, val.get()); } @@ -755,10 +847,10 @@ /** Remove a cell from the HBase. */ public void delete(Text regionName, long clientid, long lockid, Text column) - throws IOException { + throws IOException { HRegion region = getRegion(regionName); - if (region == null) { + if(region == null) { throw new IOException("Not serving region " + regionName); } @@ -763,7 +855,7 @@ } leases.renewLease(new Text(String.valueOf(clientid)), - new Text(String.valueOf(lockid))); + new Text(String.valueOf(lockid))); region.delete(lockid, column); } @@ -770,10 +862,10 @@ /** Abandon the transaction */ public void abort(Text regionName, long clientid, long lockid) - throws IOException { + throws IOException { HRegion region = getRegion(regionName); - if (region == null) { + if(region == null) { throw new IOException("Not serving region " + regionName); } @@ -778,7 +870,7 @@ } leases.cancelLease(new Text(String.valueOf(clientid)), - new Text(String.valueOf(lockid))); + new Text(String.valueOf(lockid))); region.abort(lockid); } @@ -785,10 +877,10 @@ /** Confirm the transaction */ public void commit(Text regionName, long clientid, long lockid) - throws IOException { + throws IOException { HRegion region = getRegion(regionName); - if (region == null) { + if(region == null) { throw new IOException("Not serving region " + regionName); } @@ -793,7 +885,7 @@ } leases.cancelLease(new Text(String.valueOf(clientid)), - new Text(String.valueOf(lockid))); + new Text(String.valueOf(lockid))); region.commit(lockid); } @@ -801,7 +893,7 @@ /** Don't let the client's lease expire just yet... */ public void renewLease(long lockid, long clientid) throws IOException { leases.renewLease(new Text(String.valueOf(clientid)), - new Text(String.valueOf(lockid))); + new Text(String.valueOf(lockid))); } /** Private utility method for safely obtaining an HRegion handle. */ @@ -806,7 +898,7 @@ /** Private utility method for safely obtaining an HRegion handle. */ private HRegion getRegion(Text regionName) { - locking.obtainReadLock(); + this.locker.readLock().lock(); try { return regions.get(regionName); @@ -811,8 +903,121 @@ return regions.get(regionName); } finally { - locking.releaseReadLock(); + this.locker.readLock().unlock(); + } + } + + ////////////////////////////////////////////////////////////////////////////// + // remote scanner interface + ////////////////////////////////////////////////////////////////////////////// + + private Map scanners; + private class ScannerListener extends LeaseListener { + private Text scannerName; + + public ScannerListener(Text scannerName) { + this.scannerName = scannerName; + } + + public void leaseExpired() { + HScannerInterface s = scanners.remove(scannerName); + if(s != null) { + try { + s.close(); + + } catch(IOException e) { + e.printStackTrace(); + } + } + } + } + + /** Start a scanner for a given HRegion. */ + public long openScanner(Text regionName, Text[] cols, Text firstRow) + throws IOException { + + HRegion r = getRegion(regionName); + if(r == null) { + throw new IOException("Not serving region " + regionName); + } + + long scannerId = -1L; + try { + HScannerInterface s = r.getScanner(cols, firstRow); + scannerId = rand.nextLong(); + Text scannerName = new Text(String.valueOf(scannerId)); + scanners.put(scannerName, s); + leases.createLease(scannerName, scannerName, new ScannerListener(scannerName)); + + } catch(IOException e) { + e.printStackTrace(); + throw e; + } + return scannerId; + } + + public LabelledData[] next(long scannerId, HStoreKey key) throws IOException { + + Text scannerName = new Text(String.valueOf(scannerId)); + HScannerInterface s = scanners.get(scannerName); + if(s == null) { + throw new IOException("unknown scanner"); + } + leases.renewLease(scannerName, scannerName); + TreeMap results = new TreeMap(); + ArrayList values = new ArrayList(); + if(s.next(key, results)) { + for(Iterator> it = results.entrySet().iterator(); + it.hasNext(); ) { + Map.Entry e = it.next(); + values.add(new LabelledData(e.getKey(), e.getValue())); + } + } + return values.toArray(new LabelledData[values.size()]); + } + + public void close(long scannerId) throws IOException { + Text scannerName = new Text(String.valueOf(scannerId)); + HScannerInterface s = scanners.remove(scannerName); + if(s == null) { + throw new IOException("unknown scanner"); } + try { + s.close(); + + } catch(IOException ex) { + ex.printStackTrace(); + } + leases.cancelLease(scannerName, scannerName); } + ////////////////////////////////////////////////////////////////////////////// + // Main program + ////////////////////////////////////////////////////////////////////////////// + + private static void printUsage() { + System.err.println("Usage: java " + + "org.apache.hbase.HRegionServer [--bind=hostname:port]"); + } + + public static void main(String [] args) throws IOException { + Configuration conf = new HBaseConfiguration(); + + // Process command-line args. TODO: Better cmd-line processing + // (but hopefully something not as painful as cli options). + for (String cmd: args) { + if (cmd.equals("-h") || cmd.startsWith("--h")) { + printUsage(); + return; + } + + final String addressArgKey = "--bind="; + if (cmd.startsWith(addressArgKey)) { + conf.set(REGIONSERVER_ADDRESS, + cmd.substring(addressArgKey.length())); + } + } + + new HRegionServer(conf); + } } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HServerAddress.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HServerAddress.java (revision 530954) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HServerAddress.java (working copy) @@ -33,9 +33,15 @@ this.stringValue = null; } + public HServerAddress(InetSocketAddress address) { + this.address = address; + this.stringValue = new String(address.getAddress().getHostAddress() + + ":" + address.getPort()); + } + public HServerAddress(String hostAndPort) { int colonIndex = hostAndPort.indexOf(':'); - if (colonIndex < 0) { + if(colonIndex < 0) { throw new IllegalArgumentException("Not a host:port pair: " + hostAndPort); } String host = hostAndPort.substring(0, colonIndex); @@ -80,7 +86,7 @@ String bindAddress = in.readUTF(); int port = in.readInt(); - if (bindAddress == null || bindAddress.length() == 0) { + if(bindAddress == null || bindAddress.length() == 0) { address = null; stringValue = null; @@ -91,7 +97,7 @@ } public void write(DataOutput out) throws IOException { - if (address == null) { + if(address == null) { out.writeUTF(""); out.writeInt(0); Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (revision 530954) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (working copy) @@ -15,14 +15,26 @@ */ package org.apache.hadoop.hbase; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import java.util.Random; +import java.util.TreeMap; +import java.util.Vector; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.*; -import org.apache.hadoop.io.*; -import org.apache.hadoop.fs.*; - -import java.io.*; -import java.util.*; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.MapFile; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; /******************************************************************************* * HStore maintains a bunch of data files. It is responsible for maintaining @@ -53,7 +65,7 @@ Integer compactLock = new Integer(0); Integer flushLock = new Integer(0); - HLocking locking = new HLocking(); + ReadWriteLock locker = new ReentrantReadWriteLock(); TreeMap maps = new TreeMap(); TreeMap mapFiles = new TreeMap(); @@ -88,7 +100,7 @@ * will be deleted (by whoever has instantiated the HStore). */ public HStore(Path dir, Text regionName, Text colFamily, int maxVersions, - FileSystem fs, Path reconstructionLog, Configuration conf) throws IOException { + FileSystem fs, Path reconstructionLog, Configuration conf) throws IOException { this.dir = dir; this.regionName = regionName; @@ -110,7 +122,7 @@ this.compactdir = new Path(dir, COMPACTION_DIR); Path curCompactStore = HStoreFile.getHStoreDir(compactdir, regionName, colFamily); - if (fs.exists(curCompactStore)) { + if(fs.exists(curCompactStore)) { processReadyCompaction(); fs.delete(curCompactStore); } @@ -123,7 +135,7 @@ Vector hstoreFiles = HStoreFile.loadHStoreFiles(conf, dir, regionName, colFamily, fs); - for(Iterator it = hstoreFiles.iterator(); it.hasNext();) { + for(Iterator it = hstoreFiles.iterator(); it.hasNext(); ) { HStoreFile hsf = it.next(); mapFiles.put(hsf.loadInfo(fs), hsf); } @@ -138,11 +150,11 @@ // contain any updates also contained in the log. long maxSeqID = -1; - for(Iterator it = hstoreFiles.iterator(); it.hasNext();) { + for(Iterator it = hstoreFiles.iterator(); it.hasNext(); ) { HStoreFile hsf = it.next(); long seqid = hsf.loadInfo(fs); - if (seqid > 0) { - if (seqid > maxSeqID) { + if(seqid > 0) { + if(seqid > maxSeqID) { maxSeqID = seqid; } } @@ -157,7 +169,7 @@ LOG.debug("reading reconstructionLog"); - if (reconstructionLog != null && fs.exists(reconstructionLog)) { + if(reconstructionLog != null && fs.exists(reconstructionLog)) { long maxSeqIdInLog = -1; TreeMap reconstructedCache = new TreeMap(); @@ -170,11 +182,11 @@ HLogEdit val = new HLogEdit(); while(login.next(key, val)) { maxSeqIdInLog = Math.max(maxSeqIdInLog, key.getLogSeqNum()); - if (key.getLogSeqNum() <= maxSeqID) { + if(key.getLogSeqNum() <= maxSeqID) { continue; } reconstructedCache.put(new HStoreKey(key.getRow(), val.getColumn(), - val.getTimestamp()), val.getVal()); + val.getTimestamp()), val.getVal()); } } finally { @@ -181,7 +193,7 @@ login.close(); } - if (reconstructedCache.size() > 0) { + if(reconstructedCache.size() > 0) { // We create a "virtual flush" at maxSeqIdInLog+1. @@ -195,7 +207,7 @@ // should be "timeless"; that is, it should not have an associated seq-ID, // because all log messages have been reflected in the TreeMaps at this point. - if (mapFiles.size() >= 1) { + if(mapFiles.size() >= 1) { compactHelper(true); } @@ -204,7 +216,7 @@ LOG.debug("starting map readers"); - for(Iterator it = mapFiles.keySet().iterator(); it.hasNext();) { + for(Iterator it = mapFiles.keySet().iterator(); it.hasNext(); ) { Long key = it.next().longValue(); HStoreFile hsf = mapFiles.get(key); @@ -218,11 +230,11 @@ /** Turn off all the MapFile readers */ public void close() throws IOException { - locking.obtainWriteLock(); + this.locker.writeLock().lock(); LOG.info("closing HStore for " + this.regionName + "/" + this.colFamily); try { - for(Iterator it = maps.values().iterator(); it.hasNext();) { + for(Iterator it = maps.values().iterator(); it.hasNext(); ) { MapFile.Reader map = it.next(); map.close(); } @@ -232,7 +244,7 @@ LOG.info("HStore closed for " + this.regionName + "/" + this.colFamily); } finally { - locking.releaseWriteLock(); + this.locker.writeLock().unlock(); } } @@ -252,7 +264,7 @@ * Return the entire list of HStoreFiles currently used by the HStore. */ public Vector flushCache(TreeMap inputCache, - long logCacheFlushId) throws IOException { + long logCacheFlushId) throws IOException { return flushCacheHelper(inputCache, logCacheFlushId, true); } @@ -258,7 +270,7 @@ } Vector flushCacheHelper(TreeMap inputCache, - long logCacheFlushId, boolean addToAvailableMaps) throws IOException { + long logCacheFlushId, boolean addToAvailableMaps) throws IOException { synchronized(flushLock) { LOG.debug("flushing HStore " + this.regionName + "/" + this.colFamily); @@ -270,12 +282,12 @@ Path mapfile = flushedFile.getMapFilePath(); MapFile.Writer out = new MapFile.Writer(conf, fs, mapfile.toString(), - HStoreKey.class, BytesWritable.class); + HStoreKey.class, BytesWritable.class); try { - for(Iterator it = inputCache.keySet().iterator(); it.hasNext();) { + for(Iterator it = inputCache.keySet().iterator(); it.hasNext(); ) { HStoreKey curkey = it.next(); - if (this.colFamily.equals(HStoreKey.extractFamily(curkey.getColumn()))) { + if(this.colFamily.equals(HStoreKey.extractFamily(curkey.getColumn()))) { BytesWritable val = inputCache.get(curkey); out.append(curkey, val); } @@ -294,8 +306,8 @@ // C. Finally, make the new MapFile available. - if (addToAvailableMaps) { - locking.obtainWriteLock(); + if(addToAvailableMaps) { + this.locker.writeLock().lock(); try { maps.put(logCacheFlushId, new MapFile.Reader(fs, mapfile.toString(), conf)); @@ -303,7 +315,7 @@ LOG.debug("HStore available for " + this.regionName + "/" + this.colFamily); } finally { - locking.releaseWriteLock(); + this.locker.writeLock().unlock(); } } return getAllMapFiles(); @@ -312,7 +324,7 @@ public Vector getAllMapFiles() { Vector flushedFiles = new Vector(); - for(Iterator it = mapFiles.values().iterator(); it.hasNext();) { + for(Iterator it = mapFiles.values().iterator(); it.hasNext(); ) { HStoreFile hsf = it.next(); flushedFiles.add(hsf); } @@ -355,7 +367,7 @@ // Grab a list of files to compact. Vector toCompactFiles = null; - locking.obtainWriteLock(); + this.locker.writeLock().lock(); try { toCompactFiles = new Vector(mapFiles.values()); @@ -360,7 +372,7 @@ toCompactFiles = new Vector(mapFiles.values()); } finally { - locking.releaseWriteLock(); + this.locker.writeLock().unlock(); } // Compute the max-sequenceID seen in any of the to-be-compacted TreeMaps @@ -366,11 +378,11 @@ // Compute the max-sequenceID seen in any of the to-be-compacted TreeMaps long maxSeenSeqID = -1; - for(Iterator it = toCompactFiles.iterator(); it.hasNext();) { + for(Iterator it = toCompactFiles.iterator(); it.hasNext(); ) { HStoreFile hsf = it.next(); long seqid = hsf.loadInfo(fs); - if (seqid > 0) { - if (seqid > maxSeenSeqID) { + if(seqid > 0) { + if(seqid > maxSeenSeqID) { maxSeenSeqID = seqid; } } @@ -380,11 +392,11 @@ HStoreFile compactedOutputFile = new HStoreFile(conf, compactdir, regionName, colFamily, -1); - if (toCompactFiles.size() == 1) { + if(toCompactFiles.size() == 1) { LOG.debug("nothing to compact for " + this.regionName + "/" + this.colFamily); HStoreFile hsf = toCompactFiles.elementAt(0); - if (hsf.loadInfo(fs) == -1) { + if(hsf.loadInfo(fs) == -1) { return; } } @@ -392,8 +404,8 @@ // Step through them, writing to the brand-new TreeMap MapFile.Writer compactedOut = new MapFile.Writer(conf, fs, - compactedOutputFile.getMapFilePath().toString(), HStoreKey.class, - BytesWritable.class); + compactedOutputFile.getMapFilePath().toString(), HStoreKey.class, + BytesWritable.class); try { @@ -414,7 +426,7 @@ BytesWritable[] vals = new BytesWritable[toCompactFiles.size()]; boolean[] done = new boolean[toCompactFiles.size()]; int pos = 0; - for(Iterator it = toCompactFiles.iterator(); it.hasNext();) { + for(Iterator it = toCompactFiles.iterator(); it.hasNext(); ) { HStoreFile hsf = it.next(); readers[pos] = new MapFile.Reader(fs, hsf.getMapFilePath().toString(), conf); keys[pos] = new HStoreKey(); @@ -431,8 +443,8 @@ int numDone = 0; for(int i = 0; i < readers.length; i++) { readers[i].reset(); - done[i] = !readers[i].next(keys[i], vals[i]); - if (done[i]) { + done[i] = ! readers[i].next(keys[i], vals[i]); + if(done[i]) { numDone++; } } @@ -446,15 +458,15 @@ int smallestKey = -1; for(int i = 0; i < readers.length; i++) { - if (done[i]) { + if(done[i]) { continue; } - if (smallestKey < 0) { + if(smallestKey < 0) { smallestKey = i; } else { - if (keys[i].compareTo(keys[smallestKey]) < 0) { + if(keys[i].compareTo(keys[smallestKey]) < 0) { smallestKey = i; } } @@ -463,7 +475,7 @@ // Reflect the current key/val in the output HStoreKey sk = keys[smallestKey]; - if (lastRow.equals(sk.getRow()) + if(lastRow.equals(sk.getRow()) && lastColumn.equals(sk.getColumn())) { timesSeen++; @@ -472,7 +484,7 @@ timesSeen = 1; } - if (timesSeen <= maxVersions) { + if(timesSeen <= maxVersions) { // Keep old versions until we have maxVersions worth. // Then just skip them. @@ -477,7 +489,7 @@ // Keep old versions until we have maxVersions worth. // Then just skip them. - if (sk.getRow().getLength() != 0 + if(sk.getRow().getLength() != 0 && sk.getColumn().getLength() != 0) { // Only write out objects which have a non-zero length key and value @@ -499,7 +511,7 @@ // Advance the smallest key. If that reader's all finished, then // mark it as done. - if (!readers[smallestKey].next(keys[smallestKey], vals[smallestKey])) { + if(! readers[smallestKey].next(keys[smallestKey], vals[smallestKey])) { done[smallestKey] = true; readers[smallestKey].close(); numDone++; @@ -516,7 +528,7 @@ // Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap. - if ((!deleteSequenceInfo) && maxSeenSeqID >= 0) { + if((! deleteSequenceInfo) && maxSeenSeqID >= 0) { compactedOutputFile.writeInfo(fs, maxSeenSeqID); } else { @@ -529,7 +541,7 @@ DataOutputStream out = new DataOutputStream(fs.create(filesToReplace)); try { out.writeInt(toCompactFiles.size()); - for(Iterator it = toCompactFiles.iterator(); it.hasNext();) { + for(Iterator it = toCompactFiles.iterator(); it.hasNext(); ) { HStoreFile hsf = it.next(); hsf.write(out); } @@ -583,11 +595,11 @@ // 1. Acquiring the write-lock - locking.obtainWriteLock(); + this.locker.writeLock().lock(); Path curCompactStore = HStoreFile.getHStoreDir(compactdir, regionName, colFamily); try { Path doneFile = new Path(curCompactStore, COMPACTION_DONE); - if (!fs.exists(doneFile)) { + if(! fs.exists(doneFile)) { // The last execution didn't finish the compaction, so there's nothing // we can do. We'll just have to redo it. Abandon it and return. @@ -622,10 +634,10 @@ // 3. Unload all the replaced MapFiles. Iterator it2 = mapFiles.values().iterator(); - for(Iterator it = maps.values().iterator(); it.hasNext();) { + for(Iterator it = maps.values().iterator(); it.hasNext(); ) { MapFile.Reader curReader = it.next(); HStoreFile curMapFile = it2.next(); - if (toCompactFiles.contains(curMapFile)) { + if(toCompactFiles.contains(curMapFile)) { curReader.close(); it.remove(); } @@ -631,9 +643,9 @@ } } - for(Iterator it = mapFiles.values().iterator(); it.hasNext();) { + for(Iterator it = mapFiles.values().iterator(); it.hasNext(); ) { HStoreFile curMapFile = it.next(); - if (toCompactFiles.contains(curMapFile)) { + if(toCompactFiles.contains(curMapFile)) { it.remove(); } } @@ -645,7 +657,7 @@ // 4. Delete all the old files, no longer needed - for(Iterator it = toCompactFiles.iterator(); it.hasNext();) { + for(Iterator it = toCompactFiles.iterator(); it.hasNext(); ) { HStoreFile hsf = it.next(); fs.delete(hsf.getMapFilePath()); fs.delete(hsf.getInfoFilePath()); @@ -683,7 +695,7 @@ mapFiles.put(orderVal, finalCompactedFile); maps.put(orderVal, new MapFile.Reader(fs, - finalCompactedFile.getMapFilePath().toString(), conf)); + finalCompactedFile.getMapFilePath().toString(), conf)); } finally { @@ -689,7 +701,7 @@ // 7. Releasing the write-lock - locking.releaseWriteLock(); + this.locker.writeLock().unlock(); } } @@ -705,7 +717,7 @@ * The returned object should map column names to byte arrays (byte[]). */ public void getFull(HStoreKey key, TreeMap results) throws IOException { - locking.obtainReadLock(); + this.locker.readLock().lock(); try { MapFile.Reader[] maparray = maps.values().toArray(new MapFile.Reader[maps.size()]); @@ -720,7 +732,7 @@ do { Text readcol = readkey.getColumn(); - if (results.get(readcol) == null + if(results.get(readcol) == null && key.matchesWithoutColumn(readkey)) { results.put(new Text(readcol), readval.get()); readval = new BytesWritable(); @@ -725,7 +737,7 @@ results.put(new Text(readcol), readval.get()); readval = new BytesWritable(); - } else if (key.getRow().compareTo(readkey.getRow()) > 0) { + } else if(key.getRow().compareTo(readkey.getRow()) > 0) { break; } @@ -734,7 +746,7 @@ } } finally { - locking.releaseReadLock(); + this.locker.readLock().unlock(); } } @@ -745,7 +757,7 @@ * If 'numVersions' is negative, the method returns all available versions. */ public byte[][] get(HStoreKey key, int numVersions) throws IOException { - if (numVersions == 0) { + if(numVersions == 0) { throw new IllegalArgumentException("Must request at least one value."); } @@ -750,7 +762,7 @@ } Vector results = new Vector(); - locking.obtainReadLock(); + this.locker.readLock().lock(); try { MapFile.Reader[] maparray = maps.values().toArray(new MapFile.Reader[maps.size()]); @@ -763,7 +775,7 @@ map.reset(); HStoreKey readkey = (HStoreKey)map.getClosest(key, readval); - if (readkey.matchesRowCol(key)) { + if(readkey.matchesRowCol(key)) { results.add(readval.get()); readval = new BytesWritable(); @@ -768,7 +780,7 @@ readval = new BytesWritable(); while(map.next(readkey, readval) && readkey.matchesRowCol(key)) { - if (numVersions > 0 && (results.size() >= numVersions)) { + if(numVersions > 0 && (results.size() >= numVersions)) { break; } else { @@ -778,7 +790,7 @@ } } } - if (results.size() >= numVersions) { + if(results.size() >= numVersions) { break; } } @@ -783,7 +795,7 @@ } } - if (results.size() == 0) { + if(results.size() == 0) { return null; } else { @@ -791,7 +803,7 @@ } } finally { - locking.releaseReadLock(); + this.locker.readLock().unlock(); } } @@ -804,6 +816,11 @@ */ public long getLargestFileSize(Text midKey) throws IOException { long maxSize = 0L; + if (this.mapFiles.size() <= 0) { + return maxSize; + } + + long mapIndex = 0L; // Iterate through all the MapFiles @@ -809,7 +826,7 @@ // Iterate through all the MapFiles for(Iterator> it = mapFiles.entrySet().iterator(); - it.hasNext();) { + it.hasNext(); ) { Map.Entry e = it.next(); HStoreFile curHSF = e.getValue(); @@ -815,7 +832,7 @@ HStoreFile curHSF = e.getValue(); long size = fs.getLength(new Path(curHSF.getMapFilePath(), MapFile.DATA_FILE_NAME)); - if (size > maxSize) { // This is the largest one so far + if(size > maxSize) { // This is the largest one so far maxSize = size; mapIndex = e.getKey(); } @@ -850,7 +867,7 @@ * These should be closed after the user is done with them. */ public HScannerInterface getScanner(long timestamp, Text targetCols[], - Text firstRow) throws IOException { + Text firstRow) throws IOException { return new HStoreScanner(timestamp, targetCols, firstRow); } @@ -867,11 +884,11 @@ public HStoreScanner(long timestamp, Text targetCols[], Text firstRow) throws IOException { super(timestamp, targetCols); - locking.obtainReadLock(); + locker.readLock().lock(); try { this.readers = new MapFile.Reader[mapFiles.size()]; int i = 0; - for(Iterator it = mapFiles.values().iterator(); it.hasNext();) { + for(Iterator it = mapFiles.values().iterator(); it.hasNext(); ) { HStoreFile curHSF = it.next(); readers[i++] = new MapFile.Reader(fs, curHSF.getMapFilePath().toString(), conf); } @@ -885,8 +902,8 @@ keys[i] = new HStoreKey(); vals[i] = new BytesWritable(); - if (firstRow.getLength() != 0) { - if (findFirstRow(i, firstRow)) { + if(firstRow.getLength() != 0) { + if(findFirstRow(i, firstRow)) { continue; } } @@ -892,7 +909,7 @@ } while(getNext(i)) { - if (columnMatch(i)) { + if(columnMatch(i)) { break; } } @@ -915,7 +932,7 @@ HStoreKey firstKey = (HStoreKey)readers[i].getClosest(new HStoreKey(firstRow), vals[i]); - if (firstKey == null) { + if(firstKey == null) { // Didn't find it. Close the scanner and return TRUE @@ -935,7 +952,7 @@ * @return - true if there is more data available */ boolean getNext(int i) throws IOException { - if (!readers[i].next(keys[i], vals[i])) { + if(! readers[i].next(keys[i], vals[i])) { closeSubScanner(i); return false; } @@ -945,7 +962,7 @@ /** Close down the indicated reader. */ void closeSubScanner(int i) throws IOException { try { - if (readers[i] != null) { + if(readers[i] != null) { readers[i].close(); } @@ -958,10 +975,10 @@ /** Shut it down! */ public void close() throws IOException { - if (!scannerClosed) { + if(! scannerClosed) { try { for(int i = 0; i < readers.length; i++) { - if (readers[i] != null) { + if(readers[i] != null) { readers[i].close(); } } @@ -967,7 +984,7 @@ } } finally { - locking.releaseReadLock(); + locker.readLock().unlock(); scannerClosed = true; } } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreFile.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreFile.java (revision 530954) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreFile.java (working copy) @@ -61,7 +61,7 @@ } public HStoreFile(Configuration conf, Path dir, Text regionName, - Text colFamily, long fileId) { + Text colFamily, long fileId) { this.conf = conf; this.dir = dir; @@ -92,7 +92,7 @@ public Path getMapFilePath() { return new Path(HStoreFile.getMapDir(dir, regionName, colFamily), - HSTORE_DATFILE_PREFIX + fileId); + HSTORE_DATFILE_PREFIX + fileId); } public Path getInfoFilePath() { @@ -97,7 +97,7 @@ public Path getInfoFilePath() { return new Path(HStoreFile.getInfoDir(dir, regionName, colFamily), - HSTORE_INFOFILE_PREFIX + fileId); + HSTORE_INFOFILE_PREFIX + fileId); } // Static methods to build partial paths to internal directories. Useful for @@ -105,7 +105,7 @@ public static Path getMapDir(Path dir, Text regionName, Text colFamily) { return new Path(dir, new Path(HREGIONDIR_PREFIX + regionName, - new Path(colFamily.toString(), HSTORE_DATFILE_DIR))); + new Path(colFamily.toString(), HSTORE_DATFILE_DIR))); } public static Path getInfoDir(Path dir, Text regionName, Text colFamily) { @@ -110,7 +110,7 @@ public static Path getInfoDir(Path dir, Text regionName, Text colFamily) { return new Path(dir, new Path(HREGIONDIR_PREFIX + regionName, - new Path(colFamily.toString(), HSTORE_INFO_DIR))); + new Path(colFamily.toString(), HSTORE_INFO_DIR))); } public static Path getHStoreDir(Path dir, Text regionName, Text colFamily) { @@ -115,7 +115,7 @@ public static Path getHStoreDir(Path dir, Text regionName, Text colFamily) { return new Path(dir, new Path(HREGIONDIR_PREFIX + regionName, - colFamily.toString())); + colFamily.toString())); } public static Path getHRegionDir(Path dir, Text regionName) { @@ -127,7 +127,7 @@ * filesystem if the file already exists. */ static HStoreFile obtainNewHStoreFile(Configuration conf, Path dir, - Text regionName, Text colFamily, FileSystem fs) throws IOException { + Text regionName, Text colFamily, FileSystem fs) throws IOException { Path mapdir = HStoreFile.getMapDir(dir, regionName, colFamily); long fileId = Math.abs(rand.nextLong()); @@ -149,7 +149,7 @@ * If only one exists, we'll delete it. */ static Vector loadHStoreFiles(Configuration conf, Path dir, - Text regionName, Text colFamily, FileSystem fs) throws IOException { + Text regionName, Text colFamily, FileSystem fs) throws IOException { Vector results = new Vector(); Path mapdir = HStoreFile.getMapDir(dir, regionName, colFamily); @@ -158,7 +158,7 @@ for(int i = 0; i < datfiles.length; i++) { String name = datfiles[i].getName(); - if (name.startsWith(HSTORE_DATFILE_PREFIX)) { + if(name.startsWith(HSTORE_DATFILE_PREFIX)) { Long fileId = Long.parseLong(name.substring(HSTORE_DATFILE_PREFIX.length())); HStoreFile curfile = new HStoreFile(conf, dir, regionName, colFamily, fileId); Path mapfile = curfile.getMapFilePath(); @@ -164,7 +164,7 @@ Path mapfile = curfile.getMapFilePath(); Path infofile = curfile.getInfoFilePath(); - if (fs.exists(infofile)) { + if(fs.exists(infofile)) { results.add(curfile); } else { @@ -178,7 +178,7 @@ for(int i = 0; i < infofiles.length; i++) { String name = infofiles[i].getName(); - if (name.startsWith(HSTORE_INFOFILE_PREFIX)) { + if(name.startsWith(HSTORE_INFOFILE_PREFIX)) { long fileId = Long.parseLong(name.substring(HSTORE_INFOFILE_PREFIX.length())); HStoreFile curfile = new HStoreFile(conf, dir, regionName, colFamily, fileId); Path mapfile = curfile.getMapFilePath(); @@ -183,7 +183,7 @@ HStoreFile curfile = new HStoreFile(conf, dir, regionName, colFamily, fileId); Path mapfile = curfile.getMapFilePath(); - if (!fs.exists(mapfile)) { + if(! fs.exists(mapfile)) { fs.delete(curfile.getInfoFilePath()); } } @@ -200,7 +200,7 @@ * brand-new HRegions. */ public void splitStoreFile(Text midKey, HStoreFile dstA, HStoreFile dstB, - FileSystem fs, Configuration conf) throws IOException { + FileSystem fs, Configuration conf) throws IOException { // Copy the appropriate tuples to one MapFile or the other. @@ -207,11 +207,11 @@ MapFile.Reader in = new MapFile.Reader(fs, getMapFilePath().toString(), conf); try { MapFile.Writer outA = new MapFile.Writer(conf, fs, - dstA.getMapFilePath().toString(), HStoreKey.class, BytesWritable.class); + dstA.getMapFilePath().toString(), HStoreKey.class, BytesWritable.class); try { MapFile.Writer outB = new MapFile.Writer(conf, fs, - dstB.getMapFilePath().toString(), HStoreKey.class, BytesWritable.class); + dstB.getMapFilePath().toString(), HStoreKey.class, BytesWritable.class); try { HStoreKey readkey = new HStoreKey(); @@ -220,7 +220,7 @@ while(in.next(readkey, readval)) { Text key = readkey.getRow(); - if (key.compareTo(midKey) < 0) { + if(key.compareTo(midKey) < 0) { outA.append(readkey, readval); } else { @@ -252,7 +252,7 @@ * We are merging multiple regions into a single new one. */ public void mergeStoreFiles(Vector srcFiles, FileSystem fs, - Configuration conf) throws IOException { + Configuration conf) throws IOException { // Copy all the source MapFile tuples into this HSF's MapFile @@ -257,10 +257,10 @@ // Copy all the source MapFile tuples into this HSF's MapFile MapFile.Writer out = new MapFile.Writer(conf, fs, getMapFilePath().toString(), - HStoreKey.class, BytesWritable.class); + HStoreKey.class, BytesWritable.class); try { - for(Iterator it = srcFiles.iterator(); it.hasNext();) { + for(Iterator it = srcFiles.iterator(); it.hasNext(); ) { HStoreFile src = it.next(); MapFile.Reader in = new MapFile.Reader(fs, src.getMapFilePath().toString(), conf); @@ -283,11 +283,11 @@ // Build a unified InfoFile from the source InfoFiles. long unifiedSeqId = -1; - for(Iterator it = srcFiles.iterator(); it.hasNext();) { + for(Iterator it = srcFiles.iterator(); it.hasNext(); ) { HStoreFile hsf = it.next(); long curSeqId = hsf.loadInfo(fs); - if (curSeqId > unifiedSeqId) { + if(curSeqId > unifiedSeqId) { unifiedSeqId = curSeqId; } } @@ -301,7 +301,7 @@ try { byte flag = in.readByte(); - if (flag == INFO_SEQ_NUM) { + if(flag == INFO_SEQ_NUM) { return in.readLong(); } else { @@ -352,17 +352,17 @@ public int compareTo(Object o) { HStoreFile other = (HStoreFile) o; int result = this.dir.compareTo(other.dir); - if (result == 0) { + if(result == 0) { this.regionName.compareTo(other.regionName); } - if (result == 0) { + if(result == 0) { result = this.colFamily.compareTo(other.colFamily); } - if (result == 0) { - if (this.fileId < other.fileId) { + if(result == 0) { + if(this.fileId < other.fileId) { result = -1; - } else if (this.fileId > other.fileId) { + } else if(this.fileId > other.fileId) { result = 1; } } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java (revision 530954) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java (working copy) @@ -29,7 +29,7 @@ public static Text extractFamily(Text col) throws IOException { String column = col.toString(); int colpos = column.indexOf(":"); - if (colpos < 0) { + if(colpos < 0) { throw new IllegalArgumentException("Illegal column name has no family indicator: " + column); } return new Text(column.substring(0, colpos)); @@ -93,8 +93,13 @@ return timestamp; } + /** + * @param other Key to compare against. Compares row and column. + * @return True if same row and column. + * @see {@link #matchesWithoutColumn(HStoreKey)} + */ public boolean matchesRowCol(HStoreKey other) { - if (this.row.compareTo(other.row) == 0 && + if(this.row.compareTo(other.row) == 0 && this.column.compareTo(other.column) == 0) { return true; @@ -103,8 +108,15 @@ } } + /** + * @param other Key to copmare against. Compares row and + * timestamp. + * @return True if same row and timestamp is greater than + * other + * @see {@link #matchesRowCol(HStoreKey)} + */ public boolean matchesWithoutColumn(HStoreKey other) { - if ((this.row.compareTo(other.row) == 0) && + if((this.row.compareTo(other.row) == 0) && (this.timestamp >= other.getTimestamp())) { return true; @@ -124,14 +136,14 @@ public int compareTo(Object o) { HStoreKey other = (HStoreKey) o; int result = this.row.compareTo(other.row); - if (result == 0) { + if(result == 0) { result = this.column.compareTo(other.column); - if (result == 0) { - if (this.timestamp < other.timestamp) { + if(result == 0) { + if(this.timestamp < other.timestamp) { result = 1; - } else if (this.timestamp > other.timestamp) { + } else if(this.timestamp > other.timestamp) { result = -1; } } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTableDescriptor.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTableDescriptor.java (revision 530954) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTableDescriptor.java (working copy) @@ -54,7 +54,7 @@ /** Do we contain a given column? */ public boolean hasFamily(Text family) { - if (families.contains(family)) { + if(families.contains(family)) { return true; } else { @@ -75,7 +75,7 @@ name.write(out); out.writeInt(maxVersions); out.writeInt(families.size()); - for(Iterator it = families.iterator(); it.hasNext();) { + for(Iterator it = families.iterator(); it.hasNext(); ) { it.next().write(out); } } @@ -99,21 +99,21 @@ public int compareTo(Object o) { HTableDescriptor htd = (HTableDescriptor) o; int result = name.compareTo(htd.name); - if (result == 0) { + if(result == 0) { result = maxVersions - htd.maxVersions; } - if (result == 0) { + if(result == 0) { result = families.size() - htd.families.size(); } - if (result == 0) { + if(result == 0) { Iterator it2 = htd.families.iterator(); - for(Iterator it = families.iterator(); it.hasNext();) { + for(Iterator it = families.iterator(); it.hasNext(); ) { Text family1 = it.next(); Text family2 = it2.next(); result = family1.compareTo(family2); - if (result != 0) { + if(result != 0) { return result; } } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/LabelledData.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/LabelledData.java (revision 530954) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/LabelledData.java (working copy) @@ -32,7 +32,7 @@ } public LabelledData(Text label, byte[] data) { - this.label.set(label); + this.label = new Text(label); this.data = new BytesWritable(data); } @@ -40,7 +40,7 @@ return label; } - public BytesWritable getDat() { + public BytesWritable getData() { return data; } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java (revision 530954) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java (working copy) @@ -77,7 +77,7 @@ synchronized(sortedLeases) { Lease lease = new Lease(holderId, resourceId, listener); Text leaseId = lease.getLeaseId(); - if (leases.get(leaseId) != null) { + if(leases.get(leaseId) != null) { throw new IOException("Impossible state for createLease(): Lease for holderId " + holderId + " and resourceId " + resourceId + " is still held."); } leases.put(leaseId, lease); @@ -92,7 +92,7 @@ synchronized(sortedLeases) { Text leaseId = createLeaseId(holderId, resourceId); Lease lease = leases.get(leaseId); - if (lease == null) { + if(lease == null) { // It's possible that someone tries to renew the lease, but // it just expired a moment ago. So fail. @@ -113,7 +113,7 @@ synchronized(sortedLeases) { Text leaseId = createLeaseId(holderId, resourceId); Lease lease = leases.get(leaseId); - if (lease == null) { + if(lease == null) { // It's possible that someone tries to renew the lease, but // it just expired a moment ago. So fail. @@ -137,9 +137,9 @@ synchronized(sortedLeases) { Lease top; while((sortedLeases.size() > 0) - && ((top = sortedLeases.first()) != null)) { + && ((top = sortedLeases.first()) != null)) { - if (top.shouldExpire()) { + if(top.shouldExpire()) { leases.remove(top.getLeaseId()); sortedLeases.remove(top); @@ -205,10 +205,10 @@ public int compareTo(Object o) { Lease other = (Lease) o; - if (this.lastUpdate < other.lastUpdate) { + if(this.lastUpdate < other.lastUpdate) { return -1; - } else if (this.lastUpdate > other.lastUpdate) { + } else if(this.lastUpdate > other.lastUpdate) { return 1; } else { Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/LockException.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/LockException.java (revision 0) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/LockException.java (revision 0) @@ -0,0 +1,28 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; + +public class LockException extends IOException { + public LockException() { + super(); + } + + public LockException(String s) { + super(s); + } +} Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/Environment.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/Environment.java (revision 530954) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/Environment.java (working copy) @@ -29,7 +29,7 @@ String value = null; value = System.getenv("DEBUGGING"); - if (value != null && value.equalsIgnoreCase("TRUE")) { + if(value != null && value.equalsIgnoreCase("TRUE")) { debugging = true; } @@ -34,22 +34,22 @@ } value = System.getenv("LOGGING_LEVEL"); - if (value != null && value.length() != 0) { - if (value.equalsIgnoreCase("ALL")) { + if(value != null && value.length() != 0) { + if(value.equalsIgnoreCase("ALL")) { logLevel = Level.ALL; - } else if (value.equalsIgnoreCase("DEBUG")) { + } else if(value.equalsIgnoreCase("DEBUG")) { logLevel = Level.DEBUG; - } else if (value.equalsIgnoreCase("ERROR")) { + } else if(value.equalsIgnoreCase("ERROR")) { logLevel = Level.ERROR; - } else if (value.equalsIgnoreCase("FATAL")) { + } else if(value.equalsIgnoreCase("FATAL")) { logLevel = Level.FATAL; - } else if (value.equalsIgnoreCase("INFO")) { + } else if(value.equalsIgnoreCase("INFO")) { logLevel = Level.INFO; - } else if (value.equalsIgnoreCase("OFF")) { + } else if(value.equalsIgnoreCase("OFF")) { logLevel = Level.OFF; - } else if (value.equalsIgnoreCase("TRACE")) { + } else if(value.equalsIgnoreCase("TRACE")) { logLevel = Level.TRACE; - } else if (value.equalsIgnoreCase("WARN")) { + } else if(value.equalsIgnoreCase("WARN")) { logLevel = Level.WARN; } } Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (revision 0) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (revision 0) @@ -0,0 +1,298 @@ +/** + * Copyright 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.File; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.dfs.MiniDFSCluster; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +/** + * This class creates a single process HBase cluster for junit testing. + * One thread is created for each server. + */ +public class MiniHBaseCluster implements HConstants { + private Configuration conf; + private MiniDFSCluster cluster; + private FileSystem fs; + private Path parentdir; + private HMasterRunner master; + private Thread masterThread; + private HRegionServerRunner[] regionServers; + private Thread[] regionThreads; + + public MiniHBaseCluster(Configuration conf, int nRegionNodes) { + this.conf = conf; + + try { + try { + if(System.getProperty("test.build.data") == null) { + File testDir = new File(new File("").getAbsolutePath(), + "build/contrib/hbase/test"); + + String dir = testDir.getAbsolutePath(); + System.out.println(dir); + System.setProperty("test.build.data", dir); + } + + // To run using configured filesystem, comment out this + // line below that starts up the MiniDFSCluster. + this.cluster = new MiniDFSCluster(this.conf, 2, true, (String[])null); + this.fs = FileSystem.get(conf); + this.parentdir = + new Path(conf.get(HREGION_DIR, DEFAULT_HREGION_DIR)); + fs.mkdirs(parentdir); + + } catch(Throwable e) { + System.err.println("Mini DFS cluster failed to start"); + e.printStackTrace(); + throw e; + } + + if(this.conf.get(MASTER_ADDRESS) == null) { + this.conf.set(MASTER_ADDRESS, "localhost:0"); + } + + // Create the master + + this.master = new HMasterRunner(); + this.masterThread = new Thread(master, "HMaster"); + + // Start up the master + + masterThread.start(); + while(! master.isCrashed() && ! master.isInitialized()) { + try { + System.err.println("Waiting for HMaster to initialize..."); + Thread.sleep(1000); + + } catch(InterruptedException e) { + } + if(master.isCrashed()) { + throw new RuntimeException("HMaster crashed"); + } + } + + // Set the master's port for the HRegionServers + + this.conf.set(MASTER_ADDRESS, master.getHMasterAddress().toString()); + + // Start the HRegionServers + + if(this.conf.get(REGIONSERVER_ADDRESS) == null) { + this.conf.set(REGIONSERVER_ADDRESS, "localhost:0"); + } + + startRegionServers(this.conf, nRegionNodes); + + // Wait for things to get started + + while(! master.isCrashed() && ! master.isUp()) { + try { + System.err.println("Waiting for Mini HBase cluster to start..."); + Thread.sleep(1000); + + } catch(InterruptedException e) { + } + if(master.isCrashed()) { + throw new RuntimeException("HMaster crashed"); + } + } + + } catch(Throwable e) { + + // Delete all DFS files + + deleteFile(new File(System.getProperty("test.build.data"), "dfs")); + + throw new RuntimeException("Mini HBase cluster did not start"); + } + } + + private void startRegionServers(Configuration conf, int nRegionNodes) { + this.regionServers = new HRegionServerRunner[nRegionNodes]; + this.regionThreads = new Thread[nRegionNodes]; + + for(int i = 0; i < nRegionNodes; i++) { + regionServers[i] = new HRegionServerRunner(conf); + regionThreads[i] = new Thread(regionServers[i], "HRegionServer-" + i); + regionThreads[i].start(); + } + } + + /** + * Returns the rpc address actually used by the master server, because the + * supplied port is not necessarily the actual port used. + */ + public HServerAddress getHMasterAddress() { + return master.getHMasterAddress(); + } + + /** Shut down the HBase cluster */ + public void shutdown() { + System.out.println("Shutting down the HBase Cluster"); + for(int i = 0; i < regionServers.length; i++) { + regionServers[i].shutdown(); + } + master.shutdown(); + + for(int i = 0; i < regionServers.length; i++) { + try { + regionThreads[i].join(); + + } catch(InterruptedException e) { + } + } + try { + masterThread.join(); + + } catch(InterruptedException e) { + } + + System.out.println("Shutting down Mini DFS cluster"); + if (cluster != null) { + cluster.shutdown(); + } + + // Delete all DFS files + + deleteFile(new File(System.getProperty("test.build.data"), "dfs")); + + } + + private void deleteFile(File f) { + if(f.isDirectory()) { + File[] children = f.listFiles(); + for(int i = 0; i < children.length; i++) { + deleteFile(children[i]); + } + } + f.delete(); + } + + private class HMasterRunner implements Runnable { + private HMaster master = null; + private volatile boolean isInitialized = false; + private boolean isCrashed = false; + private boolean isRunning = true; + + public HServerAddress getHMasterAddress() { + return master.getMasterAddress(); + } + + public synchronized boolean isInitialized() { + return isInitialized; + } + + public synchronized boolean isCrashed() { + return isCrashed; + } + + public boolean isUp() { + if(master == null) { + return false; + } + synchronized(this) { + return isInitialized; + } + } + + /** Create the HMaster and run it */ + public void run() { + try { + synchronized(this) { + if(isRunning) { + master = new HMaster(conf); + } + isInitialized = true; + } + } catch(Throwable e) { + shutdown(); + System.err.println("HMaster crashed:"); + e.printStackTrace(); + synchronized(this) { + isCrashed = true; + } + } + } + + /** Shut down the HMaster and wait for it to finish */ + public synchronized void shutdown() { + isRunning = false; + if(master != null) { + try { + master.stop(); + + } catch(IOException e) { + System.err.println("Master crashed during stop"); + e.printStackTrace(); + + } finally { + master.join(); + master = null; + } + } + } + } + + private class HRegionServerRunner implements Runnable { + private HRegionServer server = null; + private boolean isRunning = true; + private Configuration conf; + + public HRegionServerRunner(Configuration conf) { + this.conf = conf; + } + + /** Start up the HRegionServer */ + public void run() { + try { + synchronized(this) { + if(isRunning) { + server = new HRegionServer(conf); + } + } + server.run(); + + } catch(Throwable e) { + shutdown(); + System.err.println("HRegionServer crashed:"); + e.printStackTrace(); + } + } + + /** Shut down the HRegionServer */ + public synchronized void shutdown() { + isRunning = false; + if(server != null) { + try { + server.stop(); + + } catch(IOException e) { + System.err.println("HRegionServer crashed during stop"); + e.printStackTrace(); + + } finally { + server.join(); + server = null; + } + } + } + } +} Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHMemcache.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHMemcache.java (revision 0) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHMemcache.java (revision 0) @@ -0,0 +1,191 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.util.TreeMap; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HMemcache.Snapshot; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; + +public class TestHMemcache extends TestCase { + private final Logger LOG = + Logger.getLogger(this.getClass().getName()); + + private HMemcache hmemcache; + + private Configuration conf; + + private static final int ROW_COUNT = 3; + + private static final int COLUMNS_COUNT = 3; + + private static final String COLUMN_FAMILY = "column"; + + protected void setUp() throws Exception { + super.setUp(); + + this.hmemcache = new HMemcache(); + + // Set up a configuration that has configuration for a file + // filesystem implementation. + this.conf = new HBaseConfiguration(); + // The test hadoop-site.xml doesn't have a default file fs + // implementation. Remove below when gets added. + this.conf.set("fs.file.impl", + "org.apache.hadoop.fs.LocalFileSystem"); + } + + protected void tearDown() throws Exception { + super.tearDown(); + } + + private Text getRowName(final int index) { + return new Text("row" + Integer.toString(index)); + } + + private Text getColumnName(final int rowIndex, + final int colIndex) { + return new Text(COLUMN_FAMILY + ":" + + Integer.toString(rowIndex) + ";" + + Integer.toString(colIndex)); + } + + /** + * Adds {@link #ROW_COUNT} rows and {@link #COLUMNS_COUNT} + * @param hmc Instance to add rows to. + */ + private void addRows(final HMemcache hmc) { + for (int i = 0; i < ROW_COUNT; i++) { + TreeMap columns = new TreeMap(); + for (int ii = 0; ii < COLUMNS_COUNT; ii++) { + Text k = getColumnName(i, ii); + columns.put(k, k.toString().getBytes()); + } + hmc.add(getRowName(i), columns, System.currentTimeMillis()); + } + } + + private HLog getLogfile() throws IOException { + // Create a log file. + Path testDir = new Path(conf.get("hadoop.tmp.dir", System + .getProperty("java.tmp.dir")), "hbase"); + Path logFile = new Path(testDir, this.getName()); + FileSystem fs = testDir.getFileSystem(conf); + // Cleanup any old log file. + if (fs.exists(logFile)) { + fs.delete(logFile); + } + return new HLog(fs, logFile, this.conf); + } + + private Snapshot runSnapshot(final HMemcache hmc, final HLog log) + throws IOException { + // Save off old state. + int oldHistorySize = hmc.history.size(); + TreeMap oldMemcache = hmc.memcache; + // Run snapshot. + Snapshot s = hmc.snapshotMemcacheForLog(log); + // Make some assertions about what just happened. + assertEquals("Snapshot equals old memcache", hmc.snapshot, + oldMemcache); + assertEquals("Returned snapshot holds old memcache", + s.memcacheSnapshot, oldMemcache); + assertEquals("History has been incremented", + oldHistorySize + 1, hmc.history.size()); + assertEquals("History holds old snapshot", + hmc.history.get(oldHistorySize), oldMemcache); + return s; + } + + public void testSnapshotting() throws IOException { + final int snapshotCount = 5; + final Text tableName = new Text(getName()); + HLog log = getLogfile(); + try { + // Add some rows, run a snapshot. Do it a few times. + for (int i = 0; i < snapshotCount; i++) { + addRows(this.hmemcache); + Snapshot s = runSnapshot(this.hmemcache, log); + log.completeCacheFlush(new Text(Integer.toString(i)), + tableName, s.sequenceId); + // Clean up snapshot now we are done with it. + this.hmemcache.deleteSnapshot(); + } + log.close(); + } finally { + log.dir.getFileSystem(this.conf).delete(log.dir); + } + } + + private void isExpectedRow(final int rowIndex, + TreeMap row) { + int i = 0; + for (Text colname: row.keySet()) { + String expectedColname = + getColumnName(rowIndex, i++).toString(); + String colnameStr = colname.toString(); + assertEquals("Column name", colnameStr, expectedColname); + // Value is column name as bytes. Usually result is + // 100 bytes in size at least. This is the default size + // for BytesWriteable. For comparison, comvert bytes to + // String and trim to remove trailing null bytes. + String colvalueStr = + new String(row.get(colname)).trim(); + assertEquals("Content", colnameStr, colvalueStr); + } + } + + public void testGetFull() throws IOException { + addRows(this.hmemcache); + for (int i = 0; i < ROW_COUNT; i++) { + HStoreKey hsk = new HStoreKey(getRowName(i)); + TreeMap all = this.hmemcache.getFull(hsk); + isExpectedRow(i, all); + } + } + + public void testScanner() throws IOException { + addRows(this.hmemcache); + long timestamp = System.currentTimeMillis(); + Text [] cols = new Text[COLUMNS_COUNT * ROW_COUNT]; + for (int i = 0; i < ROW_COUNT; i++) { + for (int ii = 0; ii < COLUMNS_COUNT; ii++) { + cols[(ii + (i * COLUMNS_COUNT))] = getColumnName(i, ii); + } + } + HScannerInterface scanner = + this.hmemcache.getScanner(timestamp, cols, new Text()); + HStoreKey key = new HStoreKey(); + TreeMap results = new TreeMap(); + for (int i = 0; scanner.next(key, results); i++) { + assertTrue("Row name", + key.toString().startsWith(getRowName(i).toString())); + assertEquals("Count of columns", COLUMNS_COUNT, + results.size()); + isExpectedRow(i, results); + // Clear out set. Otherwise row results accumulate. + results.clear(); + } + } +} Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java (revision 530954) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java (working copy) @@ -21,7 +21,10 @@ import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.Enumeration; import java.util.Iterator; +import java.util.List; import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; @@ -30,6 +33,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; +import org.apache.log4j.Appender; +import org.apache.log4j.ConsoleAppender; +import org.apache.log4j.Layout; import org.apache.log4j.Logger; import org.apache.log4j.Level; import org.apache.log4j.PatternLayout; @@ -41,6 +47,7 @@ * HRegions or in the HBaseMaster, so only basic testing is possible. */ public class TestHRegion extends TestCase { + private Logger LOG = Logger.getLogger(this.getClass().getName()); /** Constructor */ public TestHRegion(String name) { @@ -51,6 +58,8 @@ public static Test suite() { TestSuite suite = new TestSuite(); suite.addTest(new TestHRegion("testSetup")); + suite.addTest(new TestHRegion("testLocks")); + suite.addTest(new TestHRegion("testBadPuts")); suite.addTest(new TestHRegion("testBasic")); suite.addTest(new TestHRegion("testScan")); suite.addTest(new TestHRegion("testBatchWrite")); @@ -61,7 +70,7 @@ } - private static final int FIRST_ROW = 0; + private static final int FIRST_ROW = 1; private static final int N_ROWS = 1000000; private static final int NUM_VALS = 1000; private static final Text CONTENTS_BASIC = new Text("contents:basic"); @@ -88,9 +97,10 @@ // Set up environment, start mini cluster, etc. + @SuppressWarnings("unchecked") public void testSetup() throws IOException { try { - if (System.getProperty("test.build.data") == null) { + if(System.getProperty("test.build.data") == null) { String dir = new File(new File("").getAbsolutePath(), "build/contrib/hbase/test").getAbsolutePath(); System.out.println(dir); System.setProperty("test.build.data", dir); @@ -95,21 +105,34 @@ System.out.println(dir); System.setProperty("test.build.data", dir); } - conf = new Configuration(); + conf = new HBaseConfiguration(); Environment.getenv(); - if (Environment.debugging) { + if(Environment.debugging) { Logger rootLogger = Logger.getRootLogger(); rootLogger.setLevel(Level.WARN); + + ConsoleAppender consoleAppender = null; + for(Enumeration e = (Enumeration)rootLogger.getAllAppenders(); + e.hasMoreElements();) { - PatternLayout consoleLayout - = (PatternLayout)rootLogger.getAppender("console").getLayout(); - consoleLayout.setConversionPattern("%d %-5p [%t] %l: %m%n"); - + Appender a = e.nextElement(); + if(a instanceof ConsoleAppender) { + consoleAppender = (ConsoleAppender)a; + break; + } + } + if(consoleAppender != null) { + Layout layout = consoleAppender.getLayout(); + if(layout instanceof PatternLayout) { + PatternLayout consoleLayout = (PatternLayout)layout; + consoleLayout.setConversionPattern("%d %-5p [%t] %l: %m%n"); + } + } Logger.getLogger("org.apache.hadoop.hbase").setLevel(Environment.logLevel); } - cluster = new MiniDFSCluster(conf, 2, true, null); + cluster = new MiniDFSCluster(conf, 2, true, (String[])null); fs = cluster.getFileSystem(); parentdir = new Path("/hbase"); fs.mkdirs(parentdir); @@ -118,10 +141,10 @@ log = new HLog(fs, newlogdir, conf); desc = new HTableDescriptor("test", 3); - desc.addFamily(new Text("contents")); - desc.addFamily(new Text("anchor")); + desc.addFamily(new Text("contents:")); + desc.addFamily(new Text("anchor:")); region = new HRegion(parentdir, log, fs, conf, - new HRegionInfo(1, desc, null, null), null, oldlogfile); + new HRegionInfo(1, desc, null, null), null, oldlogfile); } catch(IOException e) { failures = true; @@ -133,7 +156,7 @@ // Test basic functionality. Writes to contents:basic and anchor:anchornum-* public void testBasic() throws IOException { - if (!initialized) { + if(!initialized) { throw new IllegalStateException(); } @@ -138,10 +161,11 @@ } try { + long startTime = System.currentTimeMillis(); // Write out a bunch of values - for (int k = 0; k < NUM_VALS; k++) { + for (int k = FIRST_ROW; k <= NUM_VALS; k++) { long writeid = region.startUpdate(new Text("row_" + k)); region.put(writeid, CONTENTS_BASIC, (CONTENTSTR + k).getBytes()); region.put(writeid, new Text(ANCHORNUM + k), (ANCHORSTR + k).getBytes()); @@ -147,12 +171,24 @@ region.put(writeid, new Text(ANCHORNUM + k), (ANCHORSTR + k).getBytes()); region.commit(writeid); } + System.out.println("Write " + NUM_VALS + " rows. Elapsed time: " + + ((System.currentTimeMillis() - startTime) / 1000.0)); + + // Flush cache + + startTime = System.currentTimeMillis(); + region.flushcache(false); + + System.out.println("Cache flush elapsed time: " + + ((System.currentTimeMillis() - startTime) / 1000.0)); // Read them back in + startTime = System.currentTimeMillis(); + Text collabel = null; - for (int k = 0; k < NUM_VALS; k++) { + for (int k = FIRST_ROW; k <= NUM_VALS; k++) { Text rowlabel = new Text("row_" + k); byte bodydata[] = region.get(rowlabel, CONTENTS_BASIC); @@ -160,8 +196,8 @@ String bodystr = new String(bodydata).toString().trim(); String teststr = CONTENTSTR + k; assertEquals("Incorrect value for key: (" + rowlabel + "," + CONTENTS_BASIC - + "), expected: '" + teststr + "' got: '" + bodystr + "'", - bodystr, teststr); + + "), expected: '" + teststr + "' got: '" + bodystr + "'", + bodystr, teststr); collabel = new Text(ANCHORNUM + k); bodydata = region.get(rowlabel, collabel); bodystr = new String(bodydata).toString().trim(); @@ -167,21 +203,13 @@ bodystr = new String(bodydata).toString().trim(); teststr = ANCHORSTR + k; assertEquals("Incorrect value for key: (" + rowlabel + "," + collabel - + "), expected: '" + teststr + "' got: '" + bodystr + "'", - bodystr, teststr); - /* - // Check to make sure that null values are actually null - for (int j = 0; j < Math.min(15, NUM_VALS); j++) { - if (k != j) { - collabel = new Text(ANCHORNUM + j); - byte results[] = region.get(rowlabel, collabel); - if (results != null) { - throw new IOException("Found incorrect value at [" + rowlabel + ", " + collabel + "] == " + new String(results).toString().trim()); - } - } - } - */ + + "), expected: '" + teststr + "' got: '" + bodystr + "'", + bodystr, teststr); } + + System.out.println("Read " + NUM_VALS + " rows. Elapsed time: " + + ((System.currentTimeMillis() - startTime) / 1000.0)); + } catch(IOException e) { failures = true; throw e; @@ -187,6 +215,97 @@ throw e; } } + + public void testBadPuts() throws IOException { + if(!initialized) { + throw new IllegalStateException(); + } + + // Try put with bad lockid. + boolean exceptionThrown = false; + try { + region.put(-1, CONTENTS_BASIC, "bad input".getBytes()); + } catch (LockException e) { + exceptionThrown = true; + } + assertTrue("Bad lock id", exceptionThrown); + + // Try column name not registered in the table. + exceptionThrown = false; + long lockid = -1; + try { + lockid = region.startUpdate(new Text("Some old key")); + String unregisteredColName = "FamilyGroup:FamilyLabel"; + region.put(lockid, new Text(unregisteredColName), + unregisteredColName.getBytes()); + } catch (IOException e) { + exceptionThrown = true; + } finally { + if (lockid != -1) { + region.abort(lockid); + } + } + assertTrue("Bad family", exceptionThrown); + } + + /** + * Test getting and releasing locks. + */ + public void testLocks() { + final int threadCount = 10; + final int lockCount = 10; + + Listthreads = new ArrayList(threadCount); + for (int i = 0; i < threadCount; i++) { + threads.add(new Thread(Integer.toString(i)) { + public void run() { + long [] lockids = new long[lockCount]; + // Get locks. + for (int i = 0; i < lockCount; i++) { + try { + Text rowid = new Text(Integer.toString(i)); + lockids[i] = region.obtainLock(rowid); + rowid.equals(region.getRowFromLock(lockids[i])); + LOG.debug(getName() + " locked " + rowid.toString()); + } catch (IOException e) { + e.printStackTrace(); + } + } + LOG.debug(getName() + " set " + + Integer.toString(lockCount) + " locks"); + + // Abort outstanding locks. + for (int i = lockCount - 1; i >= 0; i--) { + try { + region.abort(lockids[i]); + LOG.debug(getName() + " unlocked " + + Integer.toString(i)); + } catch (IOException e) { + e.printStackTrace(); + } + } + LOG.debug(getName() + " released " + + Integer.toString(lockCount) + " locks"); + } + }); + } + + // Startup all our threads. + for (Thread t : threads) { + t.start(); + } + + // Now wait around till all are done. + for (Thread t: threads) { + while (t.isAlive()) { + try { + Thread.sleep(1); + } catch (InterruptedException e) { + // Go around again. + } + } + } + } // Test scanners. Writes contents:firstcol and anchor:secondcol @@ -191,7 +310,7 @@ // Test scanners. Writes contents:firstcol and anchor:secondcol public void testScan() throws IOException { - if (!initialized) { + if(!initialized) { throw new IllegalStateException(); } @@ -196,8 +315,8 @@ } Text cols[] = new Text[] { - CONTENTS_FIRSTCOL, - ANCHOR_SECONDCOL + CONTENTS_FIRSTCOL, + ANCHOR_SECONDCOL }; // Test the Scanner!!! @@ -207,6 +326,9 @@ } // 1. Insert a bunch of values + + long startTime = System.currentTimeMillis(); + for(int k = 0; k < vals1.length / 2; k++) { String kLabel = String.format("%1$03d", k); @@ -217,7 +339,13 @@ numInserted += 2; } - // 2. Scan + System.out.println("Write " + (vals1.length / 2) + " elapsed time: " + + ((System.currentTimeMillis() - startTime) / 1000.0)); + + // 2. Scan from cache + + startTime = System.currentTimeMillis(); + HScannerInterface s = region.getScanner(cols, new Text()); int numFetched = 0; try { @@ -225,7 +353,7 @@ TreeMap curVals = new TreeMap(); int k = 0; while(s.next(curKey, curVals)) { - for(Iterator it = curVals.keySet().iterator(); it.hasNext();) { + for(Iterator it = curVals.keySet().iterator(); it.hasNext(); ) { Text col = it.next(); byte val[] = curVals.get(col); int curval = Integer.parseInt(new String(val).trim()); @@ -231,10 +359,10 @@ int curval = Integer.parseInt(new String(val).trim()); for(int j = 0; j < cols.length; j++) { - if (col.compareTo(cols[j]) == 0) { + if(col.compareTo(cols[j]) == 0) { assertEquals("Error at:" + curKey.getRow() + "/" + curKey.getTimestamp() - + ", Value for " + col + " should be: " + k - + ", but was fetched as: " + curval, k, curval); + + ", Value for " + col + " should be: " + k + + ", but was fetched as: " + curval, k, curval); numFetched++; } } @@ -247,10 +375,23 @@ } assertEquals("Inserted " + numInserted + " values, but fetched " + numFetched, numInserted, numFetched); + System.out.println("Scanned " + (vals1.length / 2) + + " rows from cache. Elapsed time: " + + ((System.currentTimeMillis() - startTime) / 1000.0)); + // 3. Flush to disk + + startTime = System.currentTimeMillis(); + region.flushcache(false); - // 4. Scan + System.out.println("Cache flush elapsed time: " + + ((System.currentTimeMillis() - startTime) / 1000.0)); + + // 4. Scan from disk + + startTime = System.currentTimeMillis(); + s = region.getScanner(cols, new Text()); numFetched = 0; try { @@ -258,7 +399,7 @@ TreeMap curVals = new TreeMap(); int k = 0; while(s.next(curKey, curVals)) { - for(Iterator it = curVals.keySet().iterator(); it.hasNext();) { + for(Iterator it = curVals.keySet().iterator(); it.hasNext(); ) { Text col = it.next(); byte val[] = curVals.get(col); int curval = Integer.parseInt(new String(val).trim()); @@ -264,10 +405,10 @@ int curval = Integer.parseInt(new String(val).trim()); for(int j = 0; j < cols.length; j++) { - if (col.compareTo(cols[j]) == 0) { + if(col.compareTo(cols[j]) == 0) { assertEquals("Error at:" + curKey.getRow() + "/" + curKey.getTimestamp() - + ", Value for " + col + " should be: " + k - + ", but was fetched as: " + curval, k, curval); + + ", Value for " + col + " should be: " + k + + ", but was fetched as: " + curval, k, curval); numFetched++; } } @@ -280,7 +421,14 @@ } assertEquals("Inserted " + numInserted + " values, but fetched " + numFetched, numInserted, numFetched); + System.out.println("Scanned " + (vals1.length / 2) + + " rows from disk. Elapsed time: " + + ((System.currentTimeMillis() - startTime) / 1000.0)); + // 5. Insert more values + + startTime = System.currentTimeMillis(); + for(int k = vals1.length/2; k < vals1.length; k++) { String kLabel = String.format("%1$03d", k); @@ -291,7 +439,13 @@ numInserted += 2; } - // 6. Scan + System.out.println("Write " + (vals1.length / 2) + " rows. Elapsed time: " + + ((System.currentTimeMillis() - startTime) / 1000.0)); + + // 6. Scan from cache and disk + + startTime = System.currentTimeMillis(); + s = region.getScanner(cols, new Text()); numFetched = 0; try { @@ -299,7 +453,7 @@ TreeMap curVals = new TreeMap(); int k = 0; while(s.next(curKey, curVals)) { - for(Iterator it = curVals.keySet().iterator(); it.hasNext();) { + for(Iterator it = curVals.keySet().iterator(); it.hasNext(); ) { Text col = it.next(); byte val[] = curVals.get(col); int curval = Integer.parseInt(new String(val).trim()); @@ -305,10 +459,10 @@ int curval = Integer.parseInt(new String(val).trim()); for(int j = 0; j < cols.length; j++) { - if (col.compareTo(cols[j]) == 0) { + if(col.compareTo(cols[j]) == 0) { assertEquals("Error at:" + curKey.getRow() + "/" + curKey.getTimestamp() - + ", Value for " + col + " should be: " + k - + ", but was fetched as: " + curval, k, curval); + + ", Value for " + col + " should be: " + k + + ", but was fetched as: " + curval, k, curval); numFetched++; } } @@ -321,10 +475,23 @@ } assertEquals("Inserted " + numInserted + " values, but fetched " + numFetched, numInserted, numFetched); + System.out.println("Scanned " + vals1.length + + " rows from cache and disk. Elapsed time: " + + ((System.currentTimeMillis() - startTime) / 1000.0)); + // 7. Flush to disk + + startTime = System.currentTimeMillis(); + region.flushcache(false); - // 8. Scan + System.out.println("Cache flush elapsed time: " + + ((System.currentTimeMillis() - startTime) / 1000.0)); + + // 8. Scan from disk + + startTime = System.currentTimeMillis(); + s = region.getScanner(cols, new Text()); numFetched = 0; try { @@ -332,7 +499,7 @@ TreeMap curVals = new TreeMap(); int k = 0; while(s.next(curKey, curVals)) { - for(Iterator it = curVals.keySet().iterator(); it.hasNext();) { + for(Iterator it = curVals.keySet().iterator(); it.hasNext(); ) { Text col = it.next(); byte val[] = curVals.get(col); int curval = Integer.parseInt(new String(val).trim()); @@ -340,7 +507,7 @@ for (int j = 0; j < cols.length; j++) { if (col.compareTo(cols[j]) == 0) { assertEquals("Value for " + col + " should be: " + k - + ", but was fetched as: " + curval, curval, k); + + ", but was fetched as: " + curval, curval, k); numFetched++; } } @@ -353,8 +520,14 @@ } assertEquals("Inserted " + numInserted + " values, but fetched " + numFetched, numInserted, numFetched); + System.out.println("Scanned " + vals1.length + + " rows from disk. Elapsed time: " + + ((System.currentTimeMillis() - startTime) / 1000.0)); + // 9. Scan with a starting point + startTime = System.currentTimeMillis(); + s = region.getScanner(cols, new Text("row_vals1_500")); numFetched = 0; try { @@ -362,7 +535,7 @@ TreeMap curVals = new TreeMap(); int k = 500; while(s.next(curKey, curVals)) { - for(Iterator it = curVals.keySet().iterator(); it.hasNext();) { + for(Iterator it = curVals.keySet().iterator(); it.hasNext(); ) { Text col = it.next(); byte val[] = curVals.get(col); int curval = Integer.parseInt(new String(val).trim()); @@ -370,7 +543,7 @@ for (int j = 0; j < cols.length; j++) { if (col.compareTo(cols[j]) == 0) { assertEquals("Value for " + col + " should be: " + k - + ", but was fetched as: " + curval, curval, k); + + ", but was fetched as: " + curval, curval, k); numFetched++; } } @@ -383,6 +556,9 @@ } assertEquals("Should have fetched " + (numInserted / 2) + " values, but fetched " + numFetched, (numInserted / 2), numFetched); + System.out.println("Scanned " + (numFetched / 2) + + " rows from disk with specified start point. Elapsed time: " + + ((System.currentTimeMillis() - startTime) / 1000.0)); } // Do a large number of writes. Disabled if not debugging because it takes a @@ -390,10 +566,10 @@ // Creates contents:body public void testBatchWrite() throws IOException { - if (!initialized || failures) { + if(!initialized || failures) { throw new IllegalStateException(); } - if (!Environment.debugging) { + if(! Environment.debugging) { return; } @@ -406,7 +582,7 @@ // 1M writes int valsize = 1000; - for (int k = FIRST_ROW; k < N_ROWS; k++) { + for (int k = FIRST_ROW; k <= N_ROWS; k++) { // Come up with a random 1000-byte string String randstr1 = "" + System.currentTimeMillis(); StringBuffer buf1 = new StringBuffer("val_" + k + "__"); @@ -437,7 +613,7 @@ } } long startCompact = System.currentTimeMillis(); - if (region.compactStores()) { + if(region.compactStores()) { totalCompact = System.currentTimeMillis() - startCompact; System.out.println("Region compacted - elapsedTime: " + (totalCompact / 1000.0)); @@ -457,7 +633,8 @@ System.out.println("Total time, rows/second: " + (N_ROWS / (totalElapsed / 1000.0))); System.out.println("Adjusted time (not including flush, compact, or log): " + ((totalElapsed - totalFlush - totalCompact - totalLog) / 1000.0)); System.out.println("Adjusted time, rows/second: " + (N_ROWS / ((totalElapsed - totalFlush - totalCompact - totalLog) / 1000.0))); - + System.out.println(); + } catch(IOException e) { failures = true; throw e; @@ -467,7 +644,7 @@ // NOTE: This test depends on testBatchWrite succeeding public void testSplitAndMerge() throws IOException { - if (!initialized || failures) { + if(!initialized || failures) { throw new IllegalStateException(); } @@ -474,7 +651,7 @@ try { Text midKey = new Text(); - if (region.needsSplit(midKey)) { + if(region.needsSplit(midKey)) { System.out.println("Needs split"); } @@ -482,7 +659,14 @@ Text midkey = new Text("row_" + (Environment.debugging ? (N_ROWS / 2) : (NUM_VALS/2))); Path oldRegionPath = region.getRegionDir(); + + long startTime = System.currentTimeMillis(); + HRegion subregions[] = region.closeAndSplit(midkey); + + System.out.println("Split region elapsed time: " + + ((System.currentTimeMillis() - startTime) / 1000.0)); + assertEquals("Number of subregions", subregions.length, 2); // Now merge it back together @@ -489,8 +673,14 @@ Path oldRegion1 = subregions[0].getRegionDir(); Path oldRegion2 = subregions[1].getRegionDir(); + + startTime = System.currentTimeMillis(); + region = HRegion.closeAndMerge(subregions[0], subregions[1]); + System.out.println("Merge regions elapsed time: " + + ((System.currentTimeMillis() - startTime) / 1000.0)); + fs.delete(oldRegionPath); fs.delete(oldRegion1); fs.delete(oldRegion2); @@ -504,7 +694,7 @@ // This test verifies that everything is still there after splitting and merging public void testRead() throws IOException { - if (!initialized || failures) { + if(!initialized || failures) { throw new IllegalStateException(); } @@ -511,10 +701,12 @@ // First verify the data written by testBasic() Text[] cols = new Text[] { - new Text(ANCHORNUM + "[0-9]+"), - new Text(CONTENTS_BASIC) + new Text(ANCHORNUM + "[0-9]+"), + new Text(CONTENTS_BASIC) }; + long startTime = System.currentTimeMillis(); + HScannerInterface s = region.getScanner(cols, new Text()); try { @@ -525,7 +717,7 @@ TreeMap curVals = new TreeMap(); int k = 0; while(s.next(curKey, curVals)) { - for(Iterator it = curVals.keySet().iterator(); it.hasNext();) { + for(Iterator it = curVals.keySet().iterator(); it.hasNext(); ) { Text col = it.next(); byte val[] = curVals.get(col); String curval = new String(val).trim(); @@ -530,18 +722,18 @@ byte val[] = curVals.get(col); String curval = new String(val).trim(); - if (col.compareTo(CONTENTS_BASIC) == 0) { + if(col.compareTo(CONTENTS_BASIC) == 0) { assertTrue("Error at:" + curKey.getRow() + "/" + curKey.getTimestamp() - + ", Value for " + col + " should start with: " + CONTENTSTR - + ", but was fetched as: " + curval, - curval.startsWith(CONTENTSTR)); + + ", Value for " + col + " should start with: " + CONTENTSTR + + ", but was fetched as: " + curval, + curval.startsWith(CONTENTSTR)); contentsFetched++; - } else if (col.toString().startsWith(ANCHORNUM)) { + } else if(col.toString().startsWith(ANCHORNUM)) { assertTrue("Error at:" + curKey.getRow() + "/" + curKey.getTimestamp() - + ", Value for " + col + " should start with: " + ANCHORSTR - + ", but was fetched as: " + curval, - curval.startsWith(ANCHORSTR)); + + ", Value for " + col + " should start with: " + ANCHORSTR + + ", but was fetched as: " + curval, + curval.startsWith(ANCHORSTR)); anchorFetched++; } else { @@ -554,6 +746,10 @@ assertEquals("Expected " + NUM_VALS + " " + CONTENTS_BASIC + " values, but fetched " + contentsFetched, NUM_VALS, contentsFetched); assertEquals("Expected " + NUM_VALS + " " + ANCHORNUM + " values, but fetched " + anchorFetched, NUM_VALS, anchorFetched); + System.out.println("Scanned " + NUM_VALS + + " rows from disk. Elapsed time: " + + ((System.currentTimeMillis() - startTime) / 1000.0)); + } finally { s.close(); } @@ -561,9 +757,11 @@ // Verify testScan data cols = new Text[] { - CONTENTS_FIRSTCOL, - ANCHOR_SECONDCOL + CONTENTS_FIRSTCOL, + ANCHOR_SECONDCOL }; + + startTime = System.currentTimeMillis(); s = region.getScanner(cols, new Text()); try { @@ -572,7 +770,7 @@ TreeMap curVals = new TreeMap(); int k = 0; while(s.next(curKey, curVals)) { - for(Iterator it = curVals.keySet().iterator(); it.hasNext();) { + for(Iterator it = curVals.keySet().iterator(); it.hasNext(); ) { Text col = it.next(); byte val[] = curVals.get(col); int curval = Integer.parseInt(new String(val).trim()); @@ -580,7 +778,7 @@ for (int j = 0; j < cols.length; j++) { if (col.compareTo(cols[j]) == 0) { assertEquals("Value for " + col + " should be: " + k - + ", but was fetched as: " + curval, curval, k); + + ", but was fetched as: " + curval, curval, k); numFetched++; } } @@ -590,6 +788,10 @@ } assertEquals("Inserted " + numInserted + " values, but fetched " + numFetched, numInserted, numFetched); + System.out.println("Scanned " + (numFetched / 2) + + " rows from disk. Elapsed time: " + + ((System.currentTimeMillis() - startTime) / 1000.0)); + } finally { s.close(); } @@ -596,7 +798,9 @@ // Verify testBatchWrite data - if (Environment.debugging) { + if(Environment.debugging) { + startTime = System.currentTimeMillis(); + s = region.getScanner(new Text[] { CONTENTS_BODY }, new Text()); try { int numFetched = 0; @@ -604,7 +808,7 @@ TreeMap curVals = new TreeMap(); int k = 0; while(s.next(curKey, curVals)) { - for(Iterator it = curVals.keySet().iterator(); it.hasNext();) { + for(Iterator it = curVals.keySet().iterator(); it.hasNext(); ) { Text col = it.next(); byte val[] = curVals.get(col); @@ -617,6 +821,10 @@ } assertEquals("Inserted " + N_ROWS + " values, but fetched " + numFetched, N_ROWS, numFetched); + System.out.println("Scanned " + N_ROWS + + " rows from disk. Elapsed time: " + + ((System.currentTimeMillis() - startTime) / 1000.0)); + } finally { s.close(); } @@ -625,9 +833,11 @@ // Test a scanner which only specifies the column family name cols = new Text[] { - new Text("anchor:") + new Text("anchor:") }; + startTime = System.currentTimeMillis(); + s = region.getScanner(cols, new Text()); try { @@ -635,7 +845,7 @@ HStoreKey curKey = new HStoreKey(); TreeMap curVals = new TreeMap(); while(s.next(curKey, curVals)) { - for(Iterator it = curVals.keySet().iterator(); it.hasNext();) { + for(Iterator it = curVals.keySet().iterator(); it.hasNext(); ) { it.next(); fetched++; } @@ -643,6 +853,10 @@ } assertEquals("Inserted " + (NUM_VALS + numInserted/2) + " values, but fetched " + fetched, (NUM_VALS + numInserted/2), fetched); + System.out.println("Scanned " + fetched + + " rows from disk. Elapsed time: " + + ((System.currentTimeMillis() - startTime) / 1000.0)); + } finally { s.close(); } @@ -650,7 +864,7 @@ private static void deleteFile(File f) { - if (f.isDirectory()) { + if(f.isDirectory()) { File[] children = f.listFiles(); for(int i = 0; i < children.length; i++) { deleteFile(children[i]); @@ -660,7 +874,7 @@ } public void testCleanup() throws IOException { - if (!initialized) { + if(!initialized) { throw new IllegalStateException(); } @@ -672,5 +886,5 @@ deleteFile(new File(System.getProperty("test.build.data"), "dfs")); - } + } }