Index: src/java/org/apache/hadoop/hbase/client/HTable.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/HTable.java (revision 666283) +++ src/java/org/apache/hadoop/hbase/client/HTable.java (working copy) @@ -333,7 +333,7 @@ return this.tableName; } - protected HConnection getConnection() { + public HConnection getConnection() { return this.connection; } @@ -1232,7 +1232,7 @@ * If there are multiple regions in a table, this scanner will iterate * through them all. */ - private class ClientScanner implements Scanner { + protected class ClientScanner implements Scanner { private final Log CLIENT_LOG = LogFactory.getLog(this.getClass()); private byte[][] columns; private byte [] startRow; @@ -1270,6 +1270,18 @@ } nextScanner(); } + + protected byte[][] getColumns() { + return columns; + } + + protected long getTimestamp() { + return scanTime; + } + + protected RowFilterInterface getFilter() { + return filter; + } /* * Gets a scanner for the next region. @@ -1308,8 +1320,7 @@ } try { - callable = new ScannerCallable(getConnection(), getTableName(), columns, - localStartKey, scanTime, filter); + callable = getScannerCallable(localStartKey); // open a scanner on the region server starting at the // beginning of the region getConnection().getRegionServerWithRetries(callable); @@ -1320,6 +1331,11 @@ } return true; } + + protected ScannerCallable getScannerCallable(byte [] localStartKey) { + return new ScannerCallable(getConnection(), getTableName(), columns, + localStartKey, scanTime, filter); + } /** * @param endKey Index: src/java/org/apache/hadoop/hbase/client/ScannerCallable.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/ScannerCallable.java (revision 666283) +++ src/java/org/apache/hadoop/hbase/client/ScannerCallable.java (working copy) @@ -23,6 +23,7 @@ import java.io.IOException; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.filter.RowFilterInterface; import org.apache.hadoop.hbase.io.RowResult; @@ -38,7 +39,7 @@ private final long timestamp; private final RowFilterInterface filter; - ScannerCallable (HConnection connection, byte [] tableName, byte [][] columns, + protected ScannerCallable (HConnection connection, byte [] tableName, byte [][] columns, byte [] startRow, long timestamp, RowFilterInterface filter) { super(connection, tableName, startRow); this.columns = columns; @@ -65,15 +66,31 @@ scannerId = -1L; } else if (scannerId == -1L && !closed) { // open the scanner - scannerId = server.openScanner( - this.location.getRegionInfo().getRegionName(), columns, row, - timestamp, filter); + scannerId = openScanner(); } else { return server.next(scannerId); } return null; } + protected long openScanner() throws IOException { + return server.openScanner( + this.location.getRegionInfo().getRegionName(), columns, row, + timestamp, filter); + } + + protected byte [][] getColumns() { + return columns; + } + + protected long getTimestamp() { + return timestamp; + } + + protected RowFilterInterface getFilter() { + return filter; + } + /** * Call this when the next invocation of call should close the scanner */ Index: src/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 666283) +++ src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; @@ -449,14 +450,16 @@ // Load in all the HStores. long maxSeqId = -1; for (HColumnDescriptor c : this.regionInfo.getTableDesc().getFamilies()) { - HStore store = new HStore(this.basedir, this.regionInfo, c, this.fs, - oldLogFile, this.conf, reporter); + HStore store = instantiateHStore(this.basedir, c, oldLogFile, reporter); stores.put(Bytes.mapKey(c.getName()), store); long storeSeqId = store.getMaxSequenceId(); if (storeSeqId > maxSeqId) { maxSeqId = storeSeqId; } } + + doReconstructionLog(oldLogFile, maxSeqId, reporter); + if (fs.exists(oldLogFile)) { if (LOG.isDebugEnabled()) { LOG.debug("Deleting old log file: " + oldLogFile); @@ -495,11 +498,25 @@ LOG.info("region " + this + "/" + this.regionInfo.getEncodedName() + " available"); } + + // Do any reconstruction needed from the log + @SuppressWarnings("unused") + protected void doReconstructionLog(Path oldLogFile, long maxSeqId, + Progressable reporter) throws UnsupportedEncodingException, IOException { + // Nothing to do (Replaying is done in HStores) + } + protected HStore instantiateHStore(Path baseDir, + HColumnDescriptor c, Path oldLogFile, + Progressable reporter) throws IOException { + return new HStore(baseDir, this.regionInfo, c, this.fs, oldLogFile, + this.conf, reporter); + } + /** * @return Updates to this region need to have a sequence id that is >= to * the this number. - private HStore getStore(final byte [] column) { + protected HStore getStore(final byte [] column) { return this.stores.get(HStoreKey.getFamilyMapKey(column)); } Index: src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 666283) +++ src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -70,6 +70,7 @@ import org.apache.hadoop.hbase.RegionServerRunningException; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.UnknownScannerException; +import org.apache.hadoop.hbase.Leases.LeaseStillHeldException; import org.apache.hadoop.hbase.filter.RowFilterInterface; import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.hbase.io.Cell; @@ -476,7 +480,7 @@ * Run init. Sets up hlog and starts up all server threads. * @param c Extra configuration. */ - private void init(final MapWritable c) throws IOException { + protected void init(final MapWritable c) throws IOException { try { for (Map.Entry e: c.entrySet()) { String key = e.getKey().toString(); @@ -860,15 +877,7 @@ HRegion region = this.onlineRegions.get(mapKey); if (region == null) { try { - region = new HRegion(HTableDescriptor.getTableDir(rootDir, - regionInfo.getTableDesc().getName()), - this.log, this.fs, conf, regionInfo, null, this.cacheFlusher, - new Progressable() { - public void progress() { - addProcessingMessage(regionInfo); - } - } - ); + region = instantiateRegion(regionInfo); // Startup a compaction early if one is needed. this.compactSplitThread.compactionRequested(region); } catch (IOException e) { @@ -891,6 +900,17 @@ reportOpen(regionInfo); } + protected HRegion instantiateRegion(final HRegionInfo regionInfo) + throws IOException { + return new HRegion(HTableDescriptor.getTableDir(rootDir, regionInfo + .getTableDesc().getName()), this.log, this.fs, conf, regionInfo, null, + this.cacheFlusher, new Progressable() { + public void progress() { + addProcessingMessage(regionInfo); + } + }); + } + /* * Add a MSG_REPORT_PROCESS_OPEN to the outbound queue. * This method is called while region is in the queue of regions to process @@ -1172,16 +1192,9 @@ requestCount.incrementAndGet(); try { HRegion r = getRegion(regionName); - long scannerId = -1L; InternalScanner s = r.getScanner(cols, firstRow, timestamp, filter); - scannerId = rand.nextLong(); - String scannerName = String.valueOf(scannerId); - synchronized(scanners) { - scanners.put(scannerName, s); - } - this.leases. - createLease(scannerName, new ScannerListener(scannerName)); + long scannerId = addScanner(s); return scannerId; } catch (IOException e) { LOG.error("Error opening scanner (fsOk: " + this.fsOk + ")", @@ -1191,6 +1204,18 @@ } } + protected long addScanner(InternalScanner s) throws LeaseStillHeldException { + long scannerId = -1L; + scannerId = rand.nextLong(); + String scannerName = String.valueOf(scannerId); + synchronized(scanners) { + scanners.put(scannerName, s); + } + this.leases. + createLease(scannerName, new ScannerListener(scannerName)); + return scannerId; + } + /** {@inheritDoc} */ public void close(final long scannerId) throws IOException { checkOpen(); @@ -1409,7 +1434,7 @@ * * @throws IOException */ - private void checkOpen() throws IOException { + protected void checkOpen() throws IOException { if (this.stopRequested.get() || this.abortRequested) { throw new IOException("Server not running"); } @@ -1567,6 +1592,34 @@ * @param args */ public static void main(String [] args) { - doMain(args, HRegionServer.class); + Configuration conf = new HBaseConfiguration(); + @SuppressWarnings("unchecked") + Class regionServerClass = (Class) conf + .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class); + doMain(args, regionServerClass); } + + /** + * Get the leases. + * @return Return the leases. + */ + protected Leases getLeases() { + return leases; + } + + /** + * Get the rootDir. + * @return Return the rootDir. + */ + protected Path getRootDir() { + return rootDir; + } + + /** + * Get the fs. + * @return Return the fs. + */ + protected FileSystem getFileSystem() { + return fs; + } } Index: src/java/org/apache/hadoop/hbase/regionserver/HStore.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/HStore.java (revision 666283) +++ src/java/org/apache/hadoop/hbase/regionserver/HStore.java (working copy) @@ -167,7 +167,7 @@ * failed. Can be null. * @throws IOException */ - HStore(Path basedir, HRegionInfo info, HColumnDescriptor family, + protected HStore(Path basedir, HRegionInfo info, HColumnDescriptor family, FileSystem fs, Path reconstructionLog, HBaseConfiguration conf, final Progressable reporter) throws IOException { @@ -621,7 +621,7 @@ * @param key * @param value */ - void add(HStoreKey key, byte[] value) { + protected void add(HStoreKey key, byte[] value) { lock.readLock().lock(); try { this.memcache.add(key, value); @@ -1845,7 +1845,7 @@ /** * Return a scanner for both the memcache and the HStore files */ - InternalScanner getScanner(long timestamp, byte [][] targetCols, + protected InternalScanner getScanner(long timestamp, byte [][] targetCols, byte [] firstRow, RowFilterInterface filter) throws IOException { lock.readLock().lock(); Index: src/java/org/apache/hadoop/hbase/HConstants.java =================================================================== --- src/java/org/apache/hadoop/hbase/HConstants.java (revision 666283) +++ src/java/org/apache/hadoop/hbase/HConstants.java (working copy) @@ -84,6 +84,9 @@ /** Parameter name for what region server interface to use. */ static final String REGION_SERVER_CLASS = "hbase.regionserver.class"; + /** Parameter name for what region server implementation to use. */ + static final String REGION_SERVER_IMPL= "hbase.regionserver.impl"; + /** Default region server interface class name. */ static final String DEFAULT_REGION_SERVER_CLASS = HRegionInterface.class.getName(); Index: src/java/org/apache/hadoop/hbase/HTableDescriptor.java =================================================================== --- src/java/org/apache/hadoop/hbase/HTableDescriptor.java (revision 666283) +++ src/java/org/apache/hadoop/hbase/HTableDescriptor.java (working copy) @@ -28,8 +28,10 @@ import java.util.Iterator; import java.util.Map; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.ObjectWritable; import org.apache.hadoop.io.WritableComparable; /** @@ -220,9 +222,10 @@ out.writeBoolean(metaregion); Bytes.writeByteArray(out, name); out.writeInt(families.size()); + Configuration conf = new HBaseConfiguration(); for(Iterator it = families.values().iterator(); it.hasNext(); ) { - it.next().write(out); + ObjectWritable.writeObject(out, it.next(), HColumnDescriptor.class, conf); } } @@ -234,9 +237,9 @@ this.nameAsString = Bytes.toString(this.name); int numCols = in.readInt(); this.families.clear(); + Configuration conf = new HBaseConfiguration(); for (int i = 0; i < numCols; i++) { - HColumnDescriptor c = new HColumnDescriptor(); - c.readFields(in); + HColumnDescriptor c = (HColumnDescriptor) ObjectWritable.readObject(in, conf); this.families.put(Bytes.mapKey(c.getName()), c); } } Index: src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java =================================================================== --- src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java (revision 666283) +++ src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java (working copy) @@ -65,6 +65,7 @@ /** 'local:' */ public static final String LOCAL_COLON = LOCAL + ":"; private final HBaseConfiguration conf; + private final Class regionServerClass; /** * Constructor. @@ -98,6 +99,7 @@ // start/stop ports at different times during the life of the test. conf.set(REGIONSERVER_ADDRESS, DEFAULT_HOST + ":0"); this.regionThreads = new ArrayList(); + regionServerClass = (Class) conf.getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class); for (int i = 0; i < noRegionServers; i++) { addRegionServer(); } @@ -112,7 +114,13 @@ */ public RegionServerThread addRegionServer() throws IOException { synchronized (regionThreads) { - RegionServerThread t = new RegionServerThread(new HRegionServer(conf), + HRegionServer server; + try { + server = regionServerClass.getConstructor(HBaseConfiguration.class).newInstance(conf); + } catch (Exception e) { + throw new IOException(e); + } + RegionServerThread t = new RegionServerThread(server, this.regionThreads.size()); this.regionThreads.add(t); return t;