Index: src/contrib/hbase/conf/hbase-default.xml =================================================================== --- src/contrib/hbase/conf/hbase-default.xml (.../vendor/current) (revision 0) +++ src/contrib/hbase/conf/hbase-default.xml (.../dev) (revision 7443) @@ -0,0 +1,48 @@ + + + + + hbase.master + localhost:60000 + The host and port that the HBase master runs at. + TODO: Support 'local' (All running in single context). + + + + hbase.regionserver + localhost:60010 + The host and port a HBase region server runs at. + + + + hbase.regiondir + ${hadoop.tmp.dir}/hbase + The directory shared by region servers. + + + + hbase.client.timeout.length + 10000 + Client timeout in milliseconds + + + hbase.client.timeout.number + 5 + Try this many timeouts before giving up. + + + + hbase.client.retries.number + 2 + Count of maximum retries fetching the root region from root + region server. + + + + hbase.master.meta.thread.rescanfrequency + 60000 + How long the HMaster sleeps (in milliseconds) between scans of + the META table. + + + Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (.../vendor/current) (revision 0) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (.../dev) (revision 7443) @@ -0,0 +1,298 @@ +/** + * Copyright 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.File; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.dfs.MiniDFSCluster; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +/** + * This class creates a single process HBase cluster for junit testing. + * One thread is created for each server. + */ +public class MiniHBaseCluster implements HConstants { + private Configuration conf; + private MiniDFSCluster cluster; + private FileSystem fs; + private Path parentdir; + private HMasterRunner master; + private Thread masterThread; + private HRegionServerRunner[] regionServers; + private Thread[] regionThreads; + + public MiniHBaseCluster(Configuration conf, int nRegionNodes) { + this.conf = conf; + + try { + try { + if(System.getProperty("test.build.data") == null) { + File testDir = new File(new File("").getAbsolutePath(), + "build/contrib/hbase/test"); + + String dir = testDir.getAbsolutePath(); + System.out.println(dir); + System.setProperty("test.build.data", dir); + } + + // To run using configured filesystem, comment out this + // line below that starts up the MiniDFSCluster. + this.cluster = new MiniDFSCluster(this.conf, 2, true, (String[])null); + this.fs = FileSystem.get(conf); + this.parentdir = + new Path(conf.get(HREGION_DIR, DEFAULT_HREGION_DIR)); + fs.mkdirs(parentdir); + + } catch(Throwable e) { + System.err.println("Mini DFS cluster failed to start"); + e.printStackTrace(); + throw e; + } + + if(this.conf.get(MASTER_ADDRESS) == null) { + this.conf.set(MASTER_ADDRESS, "localhost:0"); + } + + // Create the master + + this.master = new HMasterRunner(); + this.masterThread = new Thread(master, "HMaster"); + + // Start up the master + + masterThread.start(); + while(! master.isCrashed() && ! master.isInitialized()) { + try { + System.err.println("Waiting for HMaster to initialize..."); + Thread.sleep(1000); + + } catch(InterruptedException e) { + } + if(master.isCrashed()) { + throw new RuntimeException("HMaster crashed"); + } + } + + // Set the master's port for the HRegionServers + + this.conf.set(MASTER_ADDRESS, master.getHMasterAddress().toString()); + + // Start the HRegionServers + + if(this.conf.get(REGIONSERVER_ADDRESS) == null) { + this.conf.set(REGIONSERVER_ADDRESS, "localhost:0"); + } + + startRegionServers(this.conf, nRegionNodes); + + // Wait for things to get started + + while(! master.isCrashed() && ! master.isUp()) { + try { + System.err.println("Waiting for Mini HBase cluster to start..."); + Thread.sleep(1000); + + } catch(InterruptedException e) { + } + if(master.isCrashed()) { + throw new RuntimeException("HMaster crashed"); + } + } + + } catch(Throwable e) { + + // Delete all DFS files + + deleteFile(new File(System.getProperty("test.build.data"), "dfs")); + + throw new RuntimeException("Mini HBase cluster did not start"); + } + } + + private void startRegionServers(Configuration conf, int nRegionNodes) { + this.regionServers = new HRegionServerRunner[nRegionNodes]; + this.regionThreads = new Thread[nRegionNodes]; + + for(int i = 0; i < nRegionNodes; i++) { + regionServers[i] = new HRegionServerRunner(conf); + regionThreads[i] = new Thread(regionServers[i], "HRegionServer-" + i); + regionThreads[i].start(); + } + } + + /** + * Returns the rpc address actually used by the master server, because the + * supplied port is not necessarily the actual port used. + */ + public HServerAddress getHMasterAddress() { + return master.getHMasterAddress(); + } + + /** Shut down the HBase cluster */ + public void shutdown() { + System.out.println("Shutting down the HBase Cluster"); + for(int i = 0; i < regionServers.length; i++) { + regionServers[i].shutdown(); + } + master.shutdown(); + + for(int i = 0; i < regionServers.length; i++) { + try { + regionThreads[i].join(); + + } catch(InterruptedException e) { + } + } + try { + masterThread.join(); + + } catch(InterruptedException e) { + } + + System.out.println("Shutting down Mini DFS cluster"); + if (cluster != null) { + cluster.shutdown(); + } + + // Delete all DFS files + + deleteFile(new File(System.getProperty("test.build.data"), "dfs")); + + } + + private void deleteFile(File f) { + if(f.isDirectory()) { + File[] children = f.listFiles(); + for(int i = 0; i < children.length; i++) { + deleteFile(children[i]); + } + } + f.delete(); + } + + private class HMasterRunner implements Runnable { + private HMaster master = null; + private volatile boolean isInitialized = false; + private boolean isCrashed = false; + private boolean isRunning = true; + + public HServerAddress getHMasterAddress() { + return master.getMasterAddress(); + } + + public synchronized boolean isInitialized() { + return isInitialized; + } + + public synchronized boolean isCrashed() { + return isCrashed; + } + + public boolean isUp() { + if(master == null) { + return false; + } + synchronized(this) { + return isInitialized; + } + } + + /** Create the HMaster and run it */ + public void run() { + try { + synchronized(this) { + if(isRunning) { + master = new HMaster(conf); + } + isInitialized = true; + } + } catch(Throwable e) { + shutdown(); + System.err.println("HMaster crashed:"); + e.printStackTrace(); + synchronized(this) { + isCrashed = true; + } + } + } + + /** Shut down the HMaster and wait for it to finish */ + public synchronized void shutdown() { + isRunning = false; + if(master != null) { + try { + master.stop(); + + } catch(IOException e) { + System.err.println("Master crashed during stop"); + e.printStackTrace(); + + } finally { + master.join(); + master = null; + } + } + } + } + + private class HRegionServerRunner implements Runnable { + private HRegionServer server = null; + private boolean isRunning = true; + private Configuration conf; + + public HRegionServerRunner(Configuration conf) { + this.conf = conf; + } + + /** Start up the HRegionServer */ + public void run() { + try { + synchronized(this) { + if(isRunning) { + server = new HRegionServer(conf); + } + } + server.run(); + + } catch(Throwable e) { + shutdown(); + System.err.println("HRegionServer crashed:"); + e.printStackTrace(); + } + } + + /** Shut down the HRegionServer */ + public synchronized void shutdown() { + isRunning = false; + if(server != null) { + try { + server.stop(); + + } catch(IOException e) { + System.err.println("HRegionServer crashed during stop"); + e.printStackTrace(); + + } finally { + server.join(); + server = null; + } + } + } + } +} Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHMemcache.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHMemcache.java (.../vendor/current) (revision 0) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHMemcache.java (.../dev) (revision 7443) @@ -0,0 +1,191 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.util.TreeMap; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HMemcache.Snapshot; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; + +public class TestHMemcache extends TestCase { + private final Logger LOG = + Logger.getLogger(this.getClass().getName()); + + private HMemcache hmemcache; + + private Configuration conf; + + private static final int ROW_COUNT = 3; + + private static final int COLUMNS_COUNT = 3; + + private static final String COLUMN_FAMILY = "column"; + + protected void setUp() throws Exception { + super.setUp(); + + this.hmemcache = new HMemcache(); + + // Set up a configuration that has configuration for a file + // filesystem implementation. + this.conf = new HBaseConfiguration(); + // The test hadoop-site.xml doesn't have a default file fs + // implementation. Remove below when gets added. + this.conf.set("fs.file.impl", + "org.apache.hadoop.fs.LocalFileSystem"); + } + + protected void tearDown() throws Exception { + super.tearDown(); + } + + private Text getRowName(final int index) { + return new Text("row" + Integer.toString(index)); + } + + private Text getColumnName(final int rowIndex, + final int colIndex) { + return new Text(COLUMN_FAMILY + ":" + + Integer.toString(rowIndex) + ";" + + Integer.toString(colIndex)); + } + + /** + * Adds {@link #ROW_COUNT} rows and {@link #COLUMNS_COUNT} + * @param hmc Instance to add rows to. + */ + private void addRows(final HMemcache hmc) { + for (int i = 0; i < ROW_COUNT; i++) { + TreeMap columns = new TreeMap(); + for (int ii = 0; ii < COLUMNS_COUNT; ii++) { + Text k = getColumnName(i, ii); + columns.put(k, k.toString().getBytes()); + } + hmc.add(getRowName(i), columns, System.currentTimeMillis()); + } + } + + private HLog getLogfile() throws IOException { + // Create a log file. + Path testDir = new Path(conf.get("hadoop.tmp.dir", System + .getProperty("java.tmp.dir")), "hbase"); + Path logFile = new Path(testDir, this.getName()); + FileSystem fs = testDir.getFileSystem(conf); + // Cleanup any old log file. + if (fs.exists(logFile)) { + fs.delete(logFile); + } + return new HLog(fs, logFile, this.conf); + } + + private Snapshot runSnapshot(final HMemcache hmc, final HLog log) + throws IOException { + // Save off old state. + int oldHistorySize = hmc.history.size(); + TreeMap oldMemcache = hmc.memcache; + // Run snapshot. + Snapshot s = hmc.snapshotMemcacheForLog(log); + // Make some assertions about what just happened. + assertEquals("Snapshot equals old memcache", hmc.snapshot, + oldMemcache); + assertEquals("Returned snapshot holds old memcache", + s.memcacheSnapshot, oldMemcache); + assertEquals("History has been incremented", + oldHistorySize + 1, hmc.history.size()); + assertEquals("History holds old snapshot", + hmc.history.get(oldHistorySize), oldMemcache); + return s; + } + + public void testSnapshotting() throws IOException { + final int snapshotCount = 5; + final Text tableName = new Text(getName()); + HLog log = getLogfile(); + try { + // Add some rows, run a snapshot. Do it a few times. + for (int i = 0; i < snapshotCount; i++) { + addRows(this.hmemcache); + Snapshot s = runSnapshot(this.hmemcache, log); + log.completeCacheFlush(new Text(Integer.toString(i)), + tableName, s.sequenceId); + // Clean up snapshot now we are done with it. + this.hmemcache.deleteSnapshot(); + } + log.close(); + } finally { + log.dir.getFileSystem(this.conf).delete(log.dir); + } + } + + private void isExpectedRow(final int rowIndex, + TreeMap row) { + int i = 0; + for (Text colname: row.keySet()) { + String expectedColname = + getColumnName(rowIndex, i++).toString(); + String colnameStr = colname.toString(); + assertEquals("Column name", colnameStr, expectedColname); + // Value is column name as bytes. Usually result is + // 100 bytes in size at least. This is the default size + // for BytesWriteable. For comparison, comvert bytes to + // String and trim to remove trailing null bytes. + String colvalueStr = + new String(row.get(colname)).trim(); + assertEquals("Content", colnameStr, colvalueStr); + } + } + + public void testGetFull() throws IOException { + addRows(this.hmemcache); + for (int i = 0; i < ROW_COUNT; i++) { + HStoreKey hsk = new HStoreKey(getRowName(i)); + TreeMap all = this.hmemcache.getFull(hsk); + isExpectedRow(i, all); + } + } + + public void testScanner() throws IOException { + addRows(this.hmemcache); + long timestamp = System.currentTimeMillis(); + Text [] cols = new Text[COLUMNS_COUNT * ROW_COUNT]; + for (int i = 0; i < ROW_COUNT; i++) { + for (int ii = 0; ii < COLUMNS_COUNT; ii++) { + cols[(ii + (i * COLUMNS_COUNT))] = getColumnName(i, ii); + } + } + HScannerInterface scanner = + this.hmemcache.getScanner(timestamp, cols, new Text()); + HStoreKey key = new HStoreKey(); + TreeMap results = new TreeMap(); + for (int i = 0; scanner.next(key, results); i++) { + assertTrue("Row name", + key.toString().startsWith(getRowName(i).toString())); + assertEquals("Count of columns", COLUMNS_COUNT, + results.size()); + isExpectedRow(i, results); + // Clear out set. Otherwise row results accumulate. + results.clear(); + } + } +} Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java (.../vendor/current) (revision 7443) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java (.../dev) (revision 7443) @@ -21,7 +21,10 @@ import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.Enumeration; import java.util.Iterator; +import java.util.List; import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; @@ -30,6 +33,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; +import org.apache.log4j.Appender; +import org.apache.log4j.ConsoleAppender; +import org.apache.log4j.Layout; import org.apache.log4j.Logger; import org.apache.log4j.Level; import org.apache.log4j.PatternLayout; @@ -41,6 +47,7 @@ * HRegions or in the HBaseMaster, so only basic testing is possible. */ public class TestHRegion extends TestCase { + private Logger LOG = Logger.getLogger(this.getClass().getName()); /** Constructor */ public TestHRegion(String name) { @@ -51,6 +58,8 @@ public static Test suite() { TestSuite suite = new TestSuite(); suite.addTest(new TestHRegion("testSetup")); + suite.addTest(new TestHRegion("testLocks")); + suite.addTest(new TestHRegion("testBadPuts")); suite.addTest(new TestHRegion("testBasic")); suite.addTest(new TestHRegion("testScan")); suite.addTest(new TestHRegion("testBatchWrite")); @@ -61,7 +70,7 @@ } - private static final int FIRST_ROW = 0; + private static final int FIRST_ROW = 1; private static final int N_ROWS = 1000000; private static final int NUM_VALS = 1000; private static final Text CONTENTS_BASIC = new Text("contents:basic"); @@ -88,6 +97,7 @@ // Set up environment, start mini cluster, etc. + @SuppressWarnings("unchecked") public void testSetup() throws IOException { try { if(System.getProperty("test.build.data") == null) { @@ -95,21 +105,34 @@ System.out.println(dir); System.setProperty("test.build.data", dir); } - conf = new Configuration(); + conf = new HBaseConfiguration(); Environment.getenv(); if(Environment.debugging) { Logger rootLogger = Logger.getRootLogger(); rootLogger.setLevel(Level.WARN); + + ConsoleAppender consoleAppender = null; + for(Enumeration e = (Enumeration)rootLogger.getAllAppenders(); + e.hasMoreElements();) { - PatternLayout consoleLayout - = (PatternLayout)rootLogger.getAppender("console").getLayout(); - consoleLayout.setConversionPattern("%d %-5p [%t] %l: %m%n"); - + Appender a = e.nextElement(); + if(a instanceof ConsoleAppender) { + consoleAppender = (ConsoleAppender)a; + break; + } + } + if(consoleAppender != null) { + Layout layout = consoleAppender.getLayout(); + if(layout instanceof PatternLayout) { + PatternLayout consoleLayout = (PatternLayout)layout; + consoleLayout.setConversionPattern("%d %-5p [%t] %l: %m%n"); + } + } Logger.getLogger("org.apache.hadoop.hbase").setLevel(Environment.logLevel); } - cluster = new MiniDFSCluster(65312, conf, 2, false); + cluster = new MiniDFSCluster(conf, 2, true, (String[])null); fs = cluster.getFileSystem(); parentdir = new Path("/hbase"); fs.mkdirs(parentdir); @@ -118,8 +141,8 @@ log = new HLog(fs, newlogdir, conf); desc = new HTableDescriptor("test", 3); - desc.addFamily(new Text("contents")); - desc.addFamily(new Text("anchor")); + desc.addFamily(new Text("contents:")); + desc.addFamily(new Text("anchor:")); region = new HRegion(parentdir, log, fs, conf, new HRegionInfo(1, desc, null, null), null, oldlogfile); @@ -138,21 +161,34 @@ } try { + long startTime = System.currentTimeMillis(); // Write out a bunch of values - for (int k = 0; k < NUM_VALS; k++) { + for (int k = FIRST_ROW; k <= NUM_VALS; k++) { long writeid = region.startUpdate(new Text("row_" + k)); region.put(writeid, CONTENTS_BASIC, (CONTENTSTR + k).getBytes()); region.put(writeid, new Text(ANCHORNUM + k), (ANCHORSTR + k).getBytes()); region.commit(writeid); } + System.out.println("Write " + NUM_VALS + " rows. Elapsed time: " + + ((System.currentTimeMillis() - startTime) / 1000.0)); + + // Flush cache + + startTime = System.currentTimeMillis(); + region.flushcache(false); + + System.out.println("Cache flush elapsed time: " + + ((System.currentTimeMillis() - startTime) / 1000.0)); // Read them back in + startTime = System.currentTimeMillis(); + Text collabel = null; - for (int k = 0; k < NUM_VALS; k++) { + for (int k = FIRST_ROW; k <= NUM_VALS; k++) { Text rowlabel = new Text("row_" + k); byte bodydata[] = region.get(rowlabel, CONTENTS_BASIC); @@ -169,25 +205,108 @@ assertEquals("Incorrect value for key: (" + rowlabel + "," + collabel + "), expected: '" + teststr + "' got: '" + bodystr + "'", bodystr, teststr); -/* - // Check to make sure that null values are actually null - for (int j = 0; j < Math.min(15, NUM_VALS); j++) { - if (k != j) { - collabel = new Text(ANCHORNUM + j); - byte results[] = region.get(rowlabel, collabel); - if (results != null) { - throw new IOException("Found incorrect value at [" + rowlabel + ", " + collabel + "] == " + new String(results).toString().trim()); - } - } - } -*/ } + + System.out.println("Read " + NUM_VALS + " rows. Elapsed time: " + + ((System.currentTimeMillis() - startTime) / 1000.0)); + } catch(IOException e) { failures = true; throw e; } } + + public void testBadPuts() throws IOException { + if(!initialized) { + throw new IllegalStateException(); + } + + // Try put with bad lockid. + boolean exceptionThrown = false; + try { + region.put(-1, CONTENTS_BASIC, "bad input".getBytes()); + } catch (LockException e) { + exceptionThrown = true; + } + assertTrue("Bad lock id", exceptionThrown); + // Try column name not registered in the table. + exceptionThrown = false; + long lockid = -1; + try { + lockid = region.startUpdate(new Text("Some old key")); + String unregisteredColName = "FamilyGroup:FamilyLabel"; + region.put(lockid, new Text(unregisteredColName), + unregisteredColName.getBytes()); + } catch (IOException e) { + exceptionThrown = true; + } finally { + if (lockid != -1) { + region.abort(lockid); + } + } + assertTrue("Bad family", exceptionThrown); + } + + /** + * Test getting and releasing locks. + */ + public void testLocks() { + final int threadCount = 10; + final int lockCount = 10; + + Listthreads = new ArrayList(threadCount); + for (int i = 0; i < threadCount; i++) { + threads.add(new Thread(Integer.toString(i)) { + public void run() { + long [] lockids = new long[lockCount]; + // Get locks. + for (int i = 0; i < lockCount; i++) { + try { + Text rowid = new Text(Integer.toString(i)); + lockids[i] = region.obtainLock(rowid); + rowid.equals(region.getRowFromLock(lockids[i])); + LOG.debug(getName() + " locked " + rowid.toString()); + } catch (IOException e) { + e.printStackTrace(); + } + } + LOG.debug(getName() + " set " + + Integer.toString(lockCount) + " locks"); + + // Abort outstanding locks. + for (int i = lockCount - 1; i >= 0; i--) { + try { + region.abort(lockids[i]); + LOG.debug(getName() + " unlocked " + + Integer.toString(i)); + } catch (IOException e) { + e.printStackTrace(); + } + } + LOG.debug(getName() + " released " + + Integer.toString(lockCount) + " locks"); + } + }); + } + + // Startup all our threads. + for (Thread t : threads) { + t.start(); + } + + // Now wait around till all are done. + for (Thread t: threads) { + while (t.isAlive()) { + try { + Thread.sleep(1); + } catch (InterruptedException e) { + // Go around again. + } + } + } + } + // Test scanners. Writes contents:firstcol and anchor:secondcol public void testScan() throws IOException { @@ -207,6 +326,9 @@ } // 1. Insert a bunch of values + + long startTime = System.currentTimeMillis(); + for(int k = 0; k < vals1.length / 2; k++) { String kLabel = String.format("%1$03d", k); @@ -217,7 +339,13 @@ numInserted += 2; } - // 2. Scan + System.out.println("Write " + (vals1.length / 2) + " elapsed time: " + + ((System.currentTimeMillis() - startTime) / 1000.0)); + + // 2. Scan from cache + + startTime = System.currentTimeMillis(); + HScannerInterface s = region.getScanner(cols, new Text()); int numFetched = 0; try { @@ -247,10 +375,23 @@ } assertEquals("Inserted " + numInserted + " values, but fetched " + numFetched, numInserted, numFetched); + System.out.println("Scanned " + (vals1.length / 2) + + " rows from cache. Elapsed time: " + + ((System.currentTimeMillis() - startTime) / 1000.0)); + // 3. Flush to disk + + startTime = System.currentTimeMillis(); + region.flushcache(false); - // 4. Scan + System.out.println("Cache flush elapsed time: " + + ((System.currentTimeMillis() - startTime) / 1000.0)); + + // 4. Scan from disk + + startTime = System.currentTimeMillis(); + s = region.getScanner(cols, new Text()); numFetched = 0; try { @@ -280,7 +421,14 @@ } assertEquals("Inserted " + numInserted + " values, but fetched " + numFetched, numInserted, numFetched); + System.out.println("Scanned " + (vals1.length / 2) + + " rows from disk. Elapsed time: " + + ((System.currentTimeMillis() - startTime) / 1000.0)); + // 5. Insert more values + + startTime = System.currentTimeMillis(); + for(int k = vals1.length/2; k < vals1.length; k++) { String kLabel = String.format("%1$03d", k); @@ -291,7 +439,13 @@ numInserted += 2; } - // 6. Scan + System.out.println("Write " + (vals1.length / 2) + " rows. Elapsed time: " + + ((System.currentTimeMillis() - startTime) / 1000.0)); + + // 6. Scan from cache and disk + + startTime = System.currentTimeMillis(); + s = region.getScanner(cols, new Text()); numFetched = 0; try { @@ -321,10 +475,23 @@ } assertEquals("Inserted " + numInserted + " values, but fetched " + numFetched, numInserted, numFetched); + System.out.println("Scanned " + vals1.length + + " rows from cache and disk. Elapsed time: " + + ((System.currentTimeMillis() - startTime) / 1000.0)); + // 7. Flush to disk + + startTime = System.currentTimeMillis(); + region.flushcache(false); - // 8. Scan + System.out.println("Cache flush elapsed time: " + + ((System.currentTimeMillis() - startTime) / 1000.0)); + + // 8. Scan from disk + + startTime = System.currentTimeMillis(); + s = region.getScanner(cols, new Text()); numFetched = 0; try { @@ -353,8 +520,14 @@ } assertEquals("Inserted " + numInserted + " values, but fetched " + numFetched, numInserted, numFetched); + System.out.println("Scanned " + vals1.length + + " rows from disk. Elapsed time: " + + ((System.currentTimeMillis() - startTime) / 1000.0)); + // 9. Scan with a starting point + startTime = System.currentTimeMillis(); + s = region.getScanner(cols, new Text("row_vals1_500")); numFetched = 0; try { @@ -383,6 +556,9 @@ } assertEquals("Should have fetched " + (numInserted / 2) + " values, but fetched " + numFetched, (numInserted / 2), numFetched); + System.out.println("Scanned " + (numFetched / 2) + + " rows from disk with specified start point. Elapsed time: " + + ((System.currentTimeMillis() - startTime) / 1000.0)); } // Do a large number of writes. Disabled if not debugging because it takes a @@ -406,7 +582,7 @@ // 1M writes int valsize = 1000; - for (int k = FIRST_ROW; k < N_ROWS; k++) { + for (int k = FIRST_ROW; k <= N_ROWS; k++) { // Come up with a random 1000-byte string String randstr1 = "" + System.currentTimeMillis(); StringBuffer buf1 = new StringBuffer("val_" + k + "__"); @@ -457,7 +633,8 @@ System.out.println("Total time, rows/second: " + (N_ROWS / (totalElapsed / 1000.0))); System.out.println("Adjusted time (not including flush, compact, or log): " + ((totalElapsed - totalFlush - totalCompact - totalLog) / 1000.0)); System.out.println("Adjusted time, rows/second: " + (N_ROWS / ((totalElapsed - totalFlush - totalCompact - totalLog) / 1000.0))); - + System.out.println(); + } catch(IOException e) { failures = true; throw e; @@ -482,15 +659,28 @@ Text midkey = new Text("row_" + (Environment.debugging ? (N_ROWS / 2) : (NUM_VALS/2))); Path oldRegionPath = region.getRegionDir(); + + long startTime = System.currentTimeMillis(); + HRegion subregions[] = region.closeAndSplit(midkey); + + System.out.println("Split region elapsed time: " + + ((System.currentTimeMillis() - startTime) / 1000.0)); + assertEquals("Number of subregions", subregions.length, 2); // Now merge it back together Path oldRegion1 = subregions[0].getRegionDir(); Path oldRegion2 = subregions[1].getRegionDir(); + + startTime = System.currentTimeMillis(); + region = HRegion.closeAndMerge(subregions[0], subregions[1]); + System.out.println("Merge regions elapsed time: " + + ((System.currentTimeMillis() - startTime) / 1000.0)); + fs.delete(oldRegionPath); fs.delete(oldRegion1); fs.delete(oldRegion2); @@ -515,6 +705,8 @@ new Text(CONTENTS_BASIC) }; + long startTime = System.currentTimeMillis(); + HScannerInterface s = region.getScanner(cols, new Text()); try { @@ -554,6 +746,10 @@ assertEquals("Expected " + NUM_VALS + " " + CONTENTS_BASIC + " values, but fetched " + contentsFetched, NUM_VALS, contentsFetched); assertEquals("Expected " + NUM_VALS + " " + ANCHORNUM + " values, but fetched " + anchorFetched, NUM_VALS, anchorFetched); + System.out.println("Scanned " + NUM_VALS + + " rows from disk. Elapsed time: " + + ((System.currentTimeMillis() - startTime) / 1000.0)); + } finally { s.close(); } @@ -564,6 +760,8 @@ CONTENTS_FIRSTCOL, ANCHOR_SECONDCOL }; + + startTime = System.currentTimeMillis(); s = region.getScanner(cols, new Text()); try { @@ -590,6 +788,10 @@ } assertEquals("Inserted " + numInserted + " values, but fetched " + numFetched, numInserted, numFetched); + System.out.println("Scanned " + (numFetched / 2) + + " rows from disk. Elapsed time: " + + ((System.currentTimeMillis() - startTime) / 1000.0)); + } finally { s.close(); } @@ -597,6 +799,8 @@ // Verify testBatchWrite data if(Environment.debugging) { + startTime = System.currentTimeMillis(); + s = region.getScanner(new Text[] { CONTENTS_BODY }, new Text()); try { int numFetched = 0; @@ -617,6 +821,10 @@ } assertEquals("Inserted " + N_ROWS + " values, but fetched " + numFetched, N_ROWS, numFetched); + System.out.println("Scanned " + N_ROWS + + " rows from disk. Elapsed time: " + + ((System.currentTimeMillis() - startTime) / 1000.0)); + } finally { s.close(); } @@ -628,6 +836,8 @@ new Text("anchor:") }; + startTime = System.currentTimeMillis(); + s = region.getScanner(cols, new Text()); try { @@ -643,6 +853,10 @@ } assertEquals("Inserted " + (NUM_VALS + numInserted/2) + " values, but fetched " + fetched, (NUM_VALS + numInserted/2), fetched); + System.out.println("Scanned " + fetched + + " rows from disk. Elapsed time: " + + ((System.currentTimeMillis() - startTime) / 1000.0)); + } finally { s.close(); } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLocking.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLocking.java (.../vendor/current) (revision 7443) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLocking.java (.../dev) (revision 7443) @@ -1,90 +0,0 @@ -/** - * Copyright 2007 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase; - -/******************************************************************************* - * HLocking is a set of lock primitives that are pretty helpful in a few places - * around the HBase code. For each independent entity that needs locking, create - * a new HLocking instance. - ******************************************************************************/ -public class HLocking { - Integer readerLock = new Integer(0); - Integer writerLock = new Integer(0); - int numReaders = 0; - int numWriters = 0; - - public HLocking() { - } - - /** Caller needs the nonexclusive read-lock */ - public void obtainReadLock() { - synchronized(readerLock) { - synchronized(writerLock) { - while(numWriters > 0) { - try { - writerLock.wait(); - } catch (InterruptedException ie) { - } - } - numReaders++; - readerLock.notifyAll(); - } - } - } - - /** Caller is finished with the nonexclusive read-lock */ - public void releaseReadLock() { - synchronized(readerLock) { - synchronized(writerLock) { - numReaders--; - readerLock.notifyAll(); - } - } - } - - /** Caller needs the exclusive write-lock */ - public void obtainWriteLock() { - synchronized(readerLock) { - synchronized(writerLock) { - while(numReaders > 0) { - try { - readerLock.wait(); - } catch (InterruptedException ie) { - } - } - while(numWriters > 0) { - try { - writerLock.wait(); - } catch (InterruptedException ie) { - } - } - numWriters++; - writerLock.notifyAll(); - } - } - } - - /** Caller is finished with the write lock */ - public void releaseWriteLock() { - synchronized(readerLock) { - synchronized(writerLock) { - numWriters--; - writerLock.notifyAll(); - } - } - } -} - Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLogEdit.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLogEdit.java (.../vendor/current) (revision 7443) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLogEdit.java (.../dev) (revision 7443) @@ -67,5 +67,4 @@ this.val.readFields(in); this.timestamp = in.readLong(); } -} - +} \ No newline at end of file Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLogKey.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLogKey.java (.../vendor/current) (revision 7443) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLogKey.java (.../dev) (revision 7443) @@ -113,5 +113,4 @@ this.row.readFields(in); this.logSeqNum = in.readLong(); } -} - +} \ No newline at end of file Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (.../vendor/current) (revision 7443) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (.../dev) (revision 7443) @@ -15,6 +15,9 @@ */ package org.apache.hadoop.hbase; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.fs.*; @@ -28,9 +31,21 @@ * HMaster is the "master server" for a HBase. * There is only one HMaster for a single HBase deployment. ******************************************************************************/ -public class HMaster extends HGlobals - implements HConstants, HMasterInterface, HMasterRegionInterface { +public class HMaster implements HConstants, HMasterInterface, HMasterRegionInterface { + public long getProtocolVersion(String protocol, + long clientVersion) throws IOException { + if (protocol.equals(HMasterInterface.class.getName())) { + return HMasterInterface.versionID; + } else if (protocol.equals(HMasterRegionInterface.class.getName())){ + return HMasterRegionInterface.versionID; + } else { + throw new IOException("Unknown protocol to name node: " + protocol); + } + } + + private final Log LOG = LogFactory.getLog(this.getClass().getName()); + private boolean closed; private Path dir; private Configuration conf; @@ -44,6 +59,7 @@ private Leases serverLeases; private Server server; + private HServerAddress address; private HClient client; @@ -99,92 +115,114 @@ * */ private class RootScanner implements Runnable { + private final Text cols[] = { + ROOT_COLUMN_FAMILY + }; + private final Text firstRow = new Text(); + private HRegionInterface rootServer; + public RootScanner() { + rootServer = null; } public void run() { - Text cols[] = { - ROOT_COLUMN_FAMILY - }; - Text firstRow = new Text(); - - while((! closed)) { - int metaRegions = 0; - while(rootRegionLocation == null) { - try { - rootRegionLocation.wait(); - - } catch(InterruptedException e) { - } - } + while((!closed)) { + rootScanned = false; + waitForRootRegion(); + + rootServer = null; + long scannerId = -1L; - HRegionInterface server = null; - HScannerInterface scanner = null; - try { - server = client.getHRegionConnection(rootRegionLocation); - scanner = server.openScanner(rootRegionInfo.regionName, cols, firstRow); + rootServer = client.getHRegionConnection(rootRegionLocation); + scannerId = rootServer.openScanner(HGlobals.rootRegionInfo.regionName, cols, firstRow); } catch(IOException iex) { try { - close(); + iex.printStackTrace(); + if(scannerId != -1L) { + rootServer.close(scannerId); + } } catch(IOException iex2) { } + closed = true; break; } try { - HStoreKey key = new HStoreKey(); - TreeMap results = new TreeMap(); + LOG.debug("starting root region scan"); + DataInputBuffer inbuf = new DataInputBuffer(); - - while(scanner.next(key, results)) { - byte hRegionInfoBytes[] = results.get(ROOT_COL_REGIONINFO); - inbuf.reset(hRegionInfoBytes, hRegionInfoBytes.length); + while(true) { + TreeMap results = new TreeMap(); + HStoreKey key = new HStoreKey(); + LabelledData[] values = rootServer.next(scannerId, key); + if(values.length == 0) { + break; + } + for(int i = 0; i < values.length; i++) { + results.put(values[i].getLabel(), values[i].getData().get()); + } + byte[] bytes = results.get(ROOT_COL_REGIONINFO); + if(bytes == null || bytes.length == 0) { + LOG.fatal("no value for " + ROOT_COL_REGIONINFO); + stop(); + } + inbuf.reset(bytes, bytes.length); HRegionInfo info = new HRegionInfo(); info.readFields(inbuf); - - byte serverBytes[] = results.get(ROOT_COL_SERVER); - String serverName = new String(serverBytes, UTF8_ENCODING); - - byte startCodeBytes[] = results.get(ROOT_COL_STARTCODE); - long startCode = Long.decode(new String(startCodeBytes, UTF8_ENCODING)); - + + String serverName = null; + bytes = results.get(ROOT_COL_SERVER); + if(bytes != null && bytes.length != 0) { + serverName = new String(bytes, UTF8_ENCODING); + } + + long startCode = -1L; + bytes = results.get(ROOT_COL_STARTCODE); + if(bytes != null && bytes.length != 0) { + startCode = Long.valueOf(new String(bytes, UTF8_ENCODING)); + } + // Note META region to load. - + HServerInfo storedInfo = null; - synchronized(serversToServerInfo) { - storedInfo = serversToServerInfo.get(serverName); - if(storedInfo == null - || storedInfo.getStartCode() != startCode) { - - // The current assignment is no good; load the region. - - synchronized(unassignedRegions) { - unassignedRegions.put(info.regionName, info); - assignAttempts.put(info.regionName, 0L); - } - } + if(serverName != null) { + serversToServerInfo.get(serverName); } - results.clear(); - metaRegions += 1; + if(storedInfo == null + || storedInfo.getStartCode() != startCode) { + + // The current assignment is no good; load the region. + + unassignedRegions.put(info.regionName, info); + assignAttempts.put(info.regionName, 0L); + + LOG.debug("region unassigned: " + info.regionName); + } + + numMetaRegions += 1; } + + } catch(Exception iex) { + iex.printStackTrace(); - } catch(Exception iex) { } finally { try { - scanner.close(); + if(scannerId != -1L) { + rootServer.close(scannerId); + } } catch(IOException iex2) { } + scannerId = -1L; } - rootScanned = true; - numMetaRegions = metaRegions; - try { - Thread.sleep(metaRescanInterval); + } + rootScanned = true; + try { + Thread.sleep(metaRescanInterval); - } catch(InterruptedException e) { - } + } catch(InterruptedException e) { } } } @@ -202,9 +240,10 @@ /** Work for the meta scanner is queued up here */ private Vector metaRegionsToScan; - private TreeMap knownMetaRegions; - private Boolean allMetaRegionsScanned; + private SortedMap knownMetaRegions; + private boolean allMetaRegionsScanned; + /** * MetaScanner scans a region either in the META table. * @@ -225,67 +264,99 @@ private void scanRegion(MetaRegion region) { HRegionInterface server = null; - HScannerInterface scanner = null; + long scannerId = -1L; + LOG.debug("scanning meta region: " + region.regionName); + try { server = client.getHRegionConnection(region.server); - scanner = server.openScanner(region.regionName, cols, firstRow); + scannerId = server.openScanner(region.regionName, cols, firstRow); } catch(IOException iex) { try { - close(); + if(scannerId != -1L) { + server.close(scannerId); + scannerId = -1L; + } + stop(); } catch(IOException iex2) { } return; } + + DataInputBuffer inbuf = new DataInputBuffer(); try { - HStoreKey key = new HStoreKey(); - TreeMap results = new TreeMap(); - DataInputBuffer inbuf = new DataInputBuffer(); + while(true) { + TreeMap results = new TreeMap(); + HStoreKey key = new HStoreKey(); - while(scanner.next(key, results)) { - byte hRegionInfoBytes[] = results.get(META_COL_REGIONINFO); - inbuf.reset(hRegionInfoBytes, hRegionInfoBytes.length); + LabelledData[] values = server.next(scannerId, key); + + if(values.length == 0) { + break; + } + + for(int i = 0; i < values.length; i++) { + results.put(values[i].getLabel(), values[i].getData().get()); + } + + byte bytes[] = results.get(META_COL_REGIONINFO); + if(bytes == null || bytes.length == 0) { + LOG.fatal("no value for " + META_COL_REGIONINFO); + stop(); + } + inbuf.reset(bytes, bytes.length); HRegionInfo info = new HRegionInfo(); info.readFields(inbuf); - - byte serverBytes[] = results.get(META_COL_SERVER); - String serverName = new String(serverBytes, UTF8_ENCODING); - byte startCodeBytes[] = results.get(META_COL_STARTCODE); - long startCode = Long.decode(new String(startCodeBytes, UTF8_ENCODING)); + String serverName = null; + bytes = results.get(META_COL_SERVER); + if(bytes != null && bytes.length != 0) { + serverName = new String(bytes, UTF8_ENCODING); + } + + long startCode = -1L; + bytes = results.get(META_COL_STARTCODE); + if(bytes != null && bytes.length != 0) { + startCode = Long.valueOf(new String(bytes, UTF8_ENCODING)); + } // Note HRegion to load. HServerInfo storedInfo = null; - synchronized(serversToServerInfo) { - storedInfo = serversToServerInfo.get(serverName); - if(storedInfo == null - || storedInfo.getStartCode() != startCode) { - - // The current assignment is no good; load the region. + if(serverName != null) { + serversToServerInfo.get(serverName); + } + if(storedInfo == null + || storedInfo.getStartCode() != startCode) { - synchronized(unassignedRegions) { - unassignedRegions.put(info.regionName, info); - assignAttempts.put(info.regionName, 0L); - } - } + // The current assignment is no good; load the region. + + unassignedRegions.put(info.regionName, info); + assignAttempts.put(info.regionName, 0L); + LOG.debug("region unassigned: " + info.regionName); } - results.clear(); } + } catch(Exception iex) { + iex.printStackTrace(); + } finally { try { - scanner.close(); + if(scannerId != -1L) { + server.close(scannerId); + } } catch(IOException iex2) { + iex2.printStackTrace(); } + scannerId = -1L; } } public void run() { - while((! closed)) { + while((!closed)) { MetaRegion region = null; while(region == null) { @@ -293,25 +364,26 @@ if(metaRegionsToScan.size() != 0) { region = metaRegionsToScan.remove(0); } - } - if(region == null) { - try { - metaRegionsToScan.wait(); - - } catch(InterruptedException e) { + if(region == null) { + try { + metaRegionsToScan.wait(); + + } catch(InterruptedException e) { + } } } } scanRegion(region); - - synchronized(knownMetaRegions) { - knownMetaRegions.put(region.startKey, region); - if(rootScanned && knownMetaRegions.size() == numMetaRegions) { - allMetaRegionsScanned = true; - allMetaRegionsScanned.notifyAll(); - } + if(closed) { + break; } + knownMetaRegions.put(region.startKey, region); + if(rootScanned && knownMetaRegions.size() == numMetaRegions) { + LOG.debug("all meta regions scanned"); + allMetaRegionsScanned = true; + metaRegionsScanned(); + } do { try { @@ -319,8 +391,8 @@ } catch(InterruptedException ex) { } - if(! allMetaRegionsScanned) { - break; // A region must have split + if(!allMetaRegionsScanned) { + break; // A meta region must have split } // Rescan the known meta regions every so often @@ -334,6 +406,20 @@ } while(true); } } + + private synchronized void metaRegionsScanned() { + notifyAll(); + } + + public synchronized void waitForMetaScan() { + while(!allMetaRegionsScanned) { + try { + wait(); + + } catch(InterruptedException e) { + } + } + } } private MetaScanner metaScanner; @@ -350,27 +436,27 @@ // We fill 'unassignedRecords' by scanning ROOT and META tables, learning the // set of all known valid regions. - private TreeMap unassignedRegions; + private SortedMap unassignedRegions; // The 'assignAttempts' table maps from regions to a timestamp that indicates // the last time we *tried* to assign the region to a RegionServer. If the // timestamp is out of date, then we can try to reassign it. - private TreeMap assignAttempts; + private SortedMap assignAttempts; // 'killList' indicates regions that we hope to close and then never reopen // (because we're merging them, say). - private TreeMap> killList; + private SortedMap> killList; // 'serversToServerInfo' maps from the String to its HServerInfo - private TreeMap serversToServerInfo; + private SortedMap serversToServerInfo; /** Build the HMaster out of a raw configuration item. */ public HMaster(Configuration conf) throws IOException { this(new Path(conf.get(HREGION_DIR, DEFAULT_HREGION_DIR)), - new HServerAddress(conf.get(MASTER_DEFAULT_NAME)), + new HServerAddress(conf.get(MASTER_ADDRESS, DEFAULT_MASTER_ADDRESS)), conf); } @@ -395,24 +481,39 @@ fs.mkdirs(dir); } - Path rootRegionDir = HStoreFile.getHRegionDir(dir, rootRegionInfo.regionName); + Path rootRegionDir = HStoreFile.getHRegionDir(dir, HGlobals.rootRegionInfo.regionName); if(! fs.exists(rootRegionDir)) { + LOG.debug("bootstrap: creating root and meta regions"); // Bootstrap! Need to create the root region and the first meta region. - //TODO is the root region self referential? - HRegion root = createNewHRegion(rootTableDesc, 0L); - HRegion meta = createNewHRegion(metaTableDesc, 1L); + try { + HRegion root = createNewHRegion(HGlobals.rootTableDesc, 0L); + HRegion meta = createNewHRegion(HGlobals.metaTableDesc, 1L); - addTableToMeta(root, meta); + addTableToMeta(root, meta); + + root.close(); + meta.close(); + + } catch(IOException e) { + e.printStackTrace(); + } } this.maxRegionOpenTime = conf.getLong("hbase.hbasemaster.maxregionopen", 30 * 1000); this.msgQueue = new Vector(); - this.serverLeases = new Leases(conf.getLong("hbase.master.lease.period", 15 * 1000), + this.serverLeases = new Leases(conf.getLong("hbase.master.lease.period", 30 * 1000), conf.getLong("hbase.master.lease.thread.wakefrequency", 15 * 1000)); + this.server = RPC.getServer(this, address.getBindAddress(), address.getPort(), conf.getInt("hbase.hregionserver.handler.count", 10), false, conf); + + // The rpc-server port can be ephemeral... ensure we have the correct info + + this.address = new HServerAddress(server.getListenerAddress()); + conf.set(MASTER_ADDRESS, address.toString()); + this.client = new HClient(conf); this.metaRescanInterval @@ -429,8 +530,12 @@ this.numMetaRegions = 0; this.metaRegionsToScan = new Vector(); - this.knownMetaRegions = new TreeMap(); - this.allMetaRegionsScanned = new Boolean(false); + + this.knownMetaRegions = + Collections.synchronizedSortedMap(new TreeMap()); + + this.allMetaRegionsScanned = false; + this.metaScanner = new MetaScanner(); this.metaScannerThread = new Thread(metaScanner, "HMaster.metaScanner"); @@ -439,15 +544,23 @@ this.clientProcessor = new ClientProcessor(); this.clientProcessorThread = new Thread(clientProcessor, "HMaster.clientProcessor"); - this.unassignedRegions = new TreeMap(); - this.unassignedRegions.put(rootRegionInfo.regionName, rootRegionInfo); + this.unassignedRegions = + Collections.synchronizedSortedMap(new TreeMap()); - this.assignAttempts = new TreeMap(); - this.assignAttempts.put(rootRegionInfo.regionName, 0L); + this.unassignedRegions.put(HGlobals.rootRegionInfo.regionName, HGlobals.rootRegionInfo); + + this.assignAttempts = + Collections.synchronizedSortedMap(new TreeMap()); + + this.assignAttempts.put(HGlobals.rootRegionInfo.regionName, 0L); - this.killList = new TreeMap>(); - this.serversToServerInfo = new TreeMap(); + this.killList = + Collections.synchronizedSortedMap( + new TreeMap>()); + this.serversToServerInfo = + Collections.synchronizedSortedMap(new TreeMap()); + // We're almost open for business this.closed = false; @@ -470,10 +583,11 @@ this.closed = true; throw e; } + LOG.info("HMaster started"); } /** Turn off the HMaster. Turn off all the threads, close files, etc. */ - public void close() throws IOException { + public void stop() throws IOException { closed = true; try { @@ -481,9 +595,18 @@ } catch(IOException iex) { } - + server.stop(); + LOG.info("shutting down HMaster"); + } + + /** returns the HMaster server address */ + public HServerAddress getMasterAddress() { + return address; + } + + /** Call this to wait for everything to finish */ + public void join() { try { - server.stop(); server.join(); } catch(InterruptedException iex) { @@ -503,6 +626,7 @@ } catch(Exception iex) { } + LOG.info("HMaster stopped"); } ////////////////////////////////////////////////////////////////////////////// @@ -514,79 +638,79 @@ String server = serverInfo.getServerAddress().toString(); HServerInfo storedInfo = null; + LOG.debug("received start message from: " + server); + // If we get the startup message but there's an old server by that // name, then we can timeout the old one right away and register // the new one. - synchronized(serversToServerInfo) { - storedInfo = serversToServerInfo.get(server); - - if(storedInfo != null) { - serversToServerInfo.remove(server); + storedInfo = serversToServerInfo.get(server); - synchronized(msgQueue) { - msgQueue.add(new PendingServerShutdown(storedInfo)); - msgQueue.notifyAll(); - } + if(storedInfo != null) { + serversToServerInfo.remove(server); + synchronized(msgQueue) { + msgQueue.add(new PendingServerShutdown(storedInfo)); + msgQueue.notifyAll(); } + } - // Either way, record the new server + // Either way, record the new server - serversToServerInfo.put(server, serverInfo); + serversToServerInfo.put(server, serverInfo); - - Text serverLabel = new Text(server); - serverLeases.createLease(serverLabel, serverLabel, new ServerExpirer(server)); - } + Text serverLabel = new Text(server); + serverLeases.createLease(serverLabel, serverLabel, new ServerExpirer(server)); } /** HRegionServers call this method repeatedly. */ public HMsg[] regionServerReport(HServerInfo serverInfo, HMsg msgs[]) throws IOException { String server = serverInfo.getServerAddress().toString(); - synchronized(serversToServerInfo) { - HServerInfo storedInfo = serversToServerInfo.get(server); - - if(storedInfo == null) { - - // The HBaseMaster may have been restarted. - // Tell the RegionServer to start over and call regionServerStartup() - - HMsg returnMsgs[] = new HMsg[1]; - returnMsgs[0] = new HMsg(HMsg.MSG_CALL_SERVER_STARTUP); - return returnMsgs; - - } else if(storedInfo.getStartCode() != serverInfo.getStartCode()) { - - // This state is reachable if: - // - // 1) RegionServer A started - // 2) RegionServer B started on the same machine, then - // clobbered A in regionServerStartup. - // 3) RegionServer A returns, expecting to work as usual. - // - // The answer is to ask A to shut down for good. - - HMsg returnMsgs[] = new HMsg[1]; - returnMsgs[0] = new HMsg(HMsg.MSG_REGIONSERVER_ALREADY_RUNNING); - return returnMsgs; - - } else { - - // All's well. Renew the server's lease. - // This will always succeed; otherwise, the fetch of serversToServerInfo - // would have failed above. - - Text serverLabel = new Text(server); - serverLeases.renewLease(serverLabel, serverLabel); + HServerInfo storedInfo = serversToServerInfo.get(server); - // Refresh the info object - serversToServerInfo.put(server, serverInfo); + if(storedInfo == null) { - // Next, process messages for this server - return processMsgs(serverInfo, msgs); - } + LOG.debug("received server report from unknown server: " + server); + + // The HBaseMaster may have been restarted. + // Tell the RegionServer to start over and call regionServerStartup() + + HMsg returnMsgs[] = new HMsg[1]; + returnMsgs[0] = new HMsg(HMsg.MSG_CALL_SERVER_STARTUP); + return returnMsgs; + + } else if(storedInfo.getStartCode() != serverInfo.getStartCode()) { + + // This state is reachable if: + // + // 1) RegionServer A started + // 2) RegionServer B started on the same machine, then + // clobbered A in regionServerStartup. + // 3) RegionServer A returns, expecting to work as usual. + // + // The answer is to ask A to shut down for good. + + LOG.debug("region server race condition detected: " + server); + + HMsg returnMsgs[] = new HMsg[1]; + returnMsgs[0] = new HMsg(HMsg.MSG_REGIONSERVER_ALREADY_RUNNING); + return returnMsgs; + + } else { + + // All's well. Renew the server's lease. + // This will always succeed; otherwise, the fetch of serversToServerInfo + // would have failed above. + + Text serverLabel = new Text(server); + serverLeases.renewLease(serverLabel, serverLabel); + + // Refresh the info object + serversToServerInfo.put(server, serverInfo); + + // Next, process messages for this server + return processMsgs(serverInfo, msgs); } } @@ -607,151 +731,177 @@ // Get reports on what the RegionServer did. - synchronized(unassignedRegions) { - for(int i = 0; i < incomingMsgs.length; i++) { - HRegionInfo region = incomingMsgs[i].getRegionInfo(); - - switch(incomingMsgs[i].getMsg()) { + for(int i = 0; i < incomingMsgs.length; i++) { + HRegionInfo region = incomingMsgs[i].getRegionInfo(); - case HMsg.MSG_REPORT_OPEN: - HRegionInfo regionInfo = unassignedRegions.get(region.regionName); + switch(incomingMsgs[i].getMsg()) { - if(regionInfo == null) { + case HMsg.MSG_REPORT_OPEN: + HRegionInfo regionInfo = unassignedRegions.get(region.regionName); - // This Region should not have been opened. - // Ask the server to shut it down, but don't report it as closed. - // Otherwise the HMaster will think the Region was closed on purpose, - // and then try to reopen it elsewhere; that's not what we want. + if(regionInfo == null) { - returnMsgs.add(new HMsg(HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT, region)); + LOG.debug("region server " + info.getServerAddress().toString() + + "should not have opened region " + region.regionName); - } else { + // This Region should not have been opened. + // Ask the server to shut it down, but don't report it as closed. + // Otherwise the HMaster will think the Region was closed on purpose, + // and then try to reopen it elsewhere; that's not what we want. - // Remove from unassigned list so we don't assign it to someone else + returnMsgs.add(new HMsg(HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT, region)); - unassignedRegions.remove(region.regionName); - assignAttempts.remove(region.regionName); + } else { - if(region.regionName.compareTo(rootRegionInfo.regionName) == 0) { + LOG.debug(info.getServerAddress().toString() + " serving " + + region.regionName); - // Store the Root Region location (in memory) + // Remove from unassigned list so we don't assign it to someone else - rootRegionLocation = new HServerAddress(info.getServerAddress()); - - // Wake up the root scanner - - rootRegionLocation.notifyAll(); - break; - - } else if(region.regionName.find(META_TABLE_NAME.toString()) == 0) { + unassignedRegions.remove(region.regionName); + assignAttempts.remove(region.regionName); - // It's a meta region. Put it on the queue to be scanned. - - MetaRegion r = new MetaRegion(); - r.server = info.getServerAddress(); - r.regionName = region.regionName; - r.startKey = region.startKey; - - synchronized(metaRegionsToScan) { - metaRegionsToScan.add(r); - metaRegionsToScan.notifyAll(); - } - } - - // Queue up an update to note the region location. + if(region.regionName.compareTo(HGlobals.rootRegionInfo.regionName) == 0) { - synchronized(msgQueue) { - msgQueue.add(new PendingOpenReport(info, region.regionName)); - msgQueue.notifyAll(); + // Store the Root Region location (in memory) + + rootRegionLocation = new HServerAddress(info.getServerAddress()); + + // Wake up threads waiting for the root server + + rootRegionIsAvailable(); + break; + + } else if(region.regionName.find(META_TABLE_NAME.toString()) == 0) { + + // It's a meta region. Put it on the queue to be scanned. + + MetaRegion r = new MetaRegion(); + r.server = info.getServerAddress(); + r.regionName = region.regionName; + r.startKey = region.startKey; + + synchronized(metaRegionsToScan) { + metaRegionsToScan.add(r); + metaRegionsToScan.notifyAll(); } } - break; - case HMsg.MSG_REPORT_CLOSE: - if(region.regionName.compareTo(rootRegionInfo.regionName) == 0) { // Root region - rootRegionLocation = null; - unassignedRegions.put(region.regionName, region); - assignAttempts.put(region.regionName, 0L); + // Queue up an update to note the region location. - } else { - boolean reassignRegion = true; - + synchronized(msgQueue) { + msgQueue.add(new PendingOpenReport(info, region.regionName)); + msgQueue.notifyAll(); + } + } + break; + + case HMsg.MSG_REPORT_CLOSE: + LOG.debug(info.getServerAddress().toString() + " no longer serving " + + region.regionName); + + if(region.regionName.compareTo(HGlobals.rootRegionInfo.regionName) == 0) { // Root region + rootRegionLocation = null; + unassignedRegions.put(region.regionName, region); + assignAttempts.put(region.regionName, 0L); + + } else { + boolean reassignRegion = true; + + synchronized(regionsToKill) { if(regionsToKill.containsKey(region.regionName)) { regionsToKill.remove(region.regionName); - + if(regionsToKill.size() > 0) { killList.put(info.toString(), regionsToKill); - + } else { killList.remove(info.toString()); } reassignRegion = false; } - - synchronized(msgQueue) { - msgQueue.add(new PendingCloseReport(region, reassignRegion)); - msgQueue.notifyAll(); - } + } - // NOTE: we cannot put the region into unassignedRegions as that - // could create a race with the pending close if it gets - // reassigned before the close is processed. - + synchronized(msgQueue) { + msgQueue.add(new PendingCloseReport(region, reassignRegion)); + msgQueue.notifyAll(); } - break; - case HMsg.MSG_NEW_REGION: - if(region.regionName.find(META_TABLE_NAME.toString()) == 0) { - // A meta region has split. - - allMetaRegionsScanned = false; - } - synchronized(unassignedRegions) { - unassignedRegions.put(region.regionName, region); - assignAttempts.put(region.regionName, 0L); - } - break; - - default: - throw new IOException("Impossible state during msg processing. Instruction: " - + incomingMsgs[i].getMsg()); + // NOTE: we cannot put the region into unassignedRegions as that + // could create a race with the pending close if it gets + // reassigned before the close is processed. + } + break; + + case HMsg.MSG_NEW_REGION: + LOG.debug("new region " + region.regionName); + + if(region.regionName.find(META_TABLE_NAME.toString()) == 0) { + // A meta region has split. + + allMetaRegionsScanned = false; + } + unassignedRegions.put(region.regionName, region); + assignAttempts.put(region.regionName, 0L); + break; + + default: + throw new IOException("Impossible state during msg processing. Instruction: " + + incomingMsgs[i].getMsg()); } + } - // Figure out what the RegionServer ought to do, and write back. + // Figure out what the RegionServer ought to do, and write back. - if(unassignedRegions.size() > 0) { + if(unassignedRegions.size() > 0) { - // Open new regions as necessary + // Open new regions as necessary - int targetForServer = (int) Math.ceil(unassignedRegions.size() - / (1.0 * serversToServerInfo.size())); + int targetForServer = (int) Math.ceil(unassignedRegions.size() + / (1.0 * serversToServerInfo.size())); - int counter = 0; - long now = System.currentTimeMillis(); + int counter = 0; + long now = System.currentTimeMillis(); - for(Iterator it = unassignedRegions.keySet().iterator(); - it.hasNext(); ) { + for(Iterator it = unassignedRegions.keySet().iterator(); + it.hasNext(); ) { - Text curRegionName = it.next(); - HRegionInfo regionInfo = unassignedRegions.get(curRegionName); - long assignedTime = assignAttempts.get(curRegionName); + Text curRegionName = it.next(); + HRegionInfo regionInfo = unassignedRegions.get(curRegionName); + long assignedTime = assignAttempts.get(curRegionName); - if(now - assignedTime > maxRegionOpenTime) { - returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo)); + if(now - assignedTime > maxRegionOpenTime) { + LOG.debug("assigning region " + regionInfo.regionName + " to server " + + info.getServerAddress().toString()); - assignAttempts.put(curRegionName, now); - counter++; - } + returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo)); - if(counter >= targetForServer) { - break; - } + assignAttempts.put(curRegionName, now); + counter++; } + + if(counter >= targetForServer) { + break; + } } } return (HMsg[]) returnMsgs.toArray(new HMsg[returnMsgs.size()]); } + + private synchronized void rootRegionIsAvailable() { + notifyAll(); + } + + private synchronized void waitForRootRegion() { + while(rootRegionLocation == null) { + try { + wait(); + + } catch(InterruptedException e) { + } + } + } ////////////////////////////////////////////////////////////////////////////// // Some internal classes to manage msg-passing and client operations @@ -762,7 +912,7 @@ } public void run() { - while(! closed) { + while(!closed) { PendingOperation op = null; synchronized(msgQueue) { @@ -773,22 +923,19 @@ } catch(InterruptedException iex) { } } - op = msgQueue.elementAt(msgQueue.size()-1); - msgQueue.removeElementAt(msgQueue.size()-1); + op = msgQueue.remove(msgQueue.size()-1); } try { op.process(); } catch(Exception ex) { - synchronized(msgQueue) { - msgQueue.insertElementAt(op, 0); - } + msgQueue.insertElementAt(op, 0); } } } } - abstract class PendingOperation { + private abstract class PendingOperation { protected final Text[] columns = { META_COLUMN_FAMILY }; @@ -802,9 +949,9 @@ public abstract void process() throws IOException; } - class PendingServerShutdown extends PendingOperation { - String deadServer; - long oldStartCode; + private class PendingServerShutdown extends PendingOperation { + private String deadServer; + private long oldStartCode; public PendingServerShutdown(HServerInfo serverInfo) { super(); @@ -812,7 +959,7 @@ this.oldStartCode = serverInfo.getStartCode(); } - private void scanMetaRegion(HRegionInterface server, HScannerInterface scanner, + private void scanMetaRegion(HRegionInterface server, long scannerId, Text regionName) throws IOException { Vector toDoList = new Vector(); @@ -820,20 +967,30 @@ DataInputBuffer inbuf = new DataInputBuffer(); try { - HStoreKey key = new HStoreKey(); - TreeMap results = new TreeMap(); + LabelledData[] values = null; - while(scanner.next(key, results)) { - byte serverBytes[] = results.get(META_COL_SERVER); - String serverName = new String(serverBytes, UTF8_ENCODING); + while(true) { + HStoreKey key = new HStoreKey(); + values = server.next(scannerId, key); + if(values.length == 0) { + break; + } + TreeMap results = new TreeMap(); + for(int i = 0; i < values.length; i++) { + results.put(values[i].getLabel(), values[i].getData().get()); + } + + String serverName = + new String(results.get(META_COL_SERVER), UTF8_ENCODING); + if(deadServer.compareTo(serverName) != 0) { // This isn't the server you're looking for - move along continue; } - byte startCodeBytes[] = results.get(META_COL_STARTCODE); - long startCode = Long.decode(new String(startCodeBytes, UTF8_ENCODING)); + long startCode = + Long.valueOf(new String(results.get(META_COL_STARTCODE), UTF8_ENCODING)); if(oldStartCode != startCode) { // Close but no cigar @@ -847,6 +1004,8 @@ HRegionInfo info = new HRegionInfo(); info.readFields(inbuf); + LOG.debug(serverName + " was serving " + info.regionName); + // Add to our to do lists toDoList.add(key); @@ -854,7 +1013,16 @@ } } finally { - scanner.close(); + if(scannerId != -1L) { + try { + server.close(scannerId); + + } catch(IOException e) { + e.printStackTrace(); + + } + } + scannerId = -1L; } // Remove server from root/meta entries @@ -875,50 +1043,45 @@ Text region = e.getKey(); HRegionInfo regionInfo = e.getValue(); - synchronized(unassignedRegions) { - unassignedRegions.put(region, regionInfo); - assignAttempts.put(region, 0L); - } + unassignedRegions.put(region, regionInfo); + assignAttempts.put(region, 0L); } } public void process() throws IOException { + LOG.debug("server shutdown: " + deadServer); + // Scan the ROOT region + + waitForRootRegion(); // Wait until the root region is available + HRegionInterface server = client.getHRegionConnection(rootRegionLocation); + long scannerId = + server.openScanner(HGlobals.rootRegionInfo.regionName, columns, startRow); + + scanMetaRegion(server, scannerId, HGlobals.rootRegionInfo.regionName); + // We can not scan every meta region if they have not already been assigned // and scanned. - - while(!allMetaRegionsScanned) { - try { - allMetaRegionsScanned.wait(); - - } catch(InterruptedException e) { - } - } - // First scan the ROOT region + metaScanner.waitForMetaScan(); - HRegionInterface server = client.getHRegionConnection(rootRegionLocation); - HScannerInterface scanner = server.openScanner(rootRegionInfo.regionName, - columns, startRow); - - scanMetaRegion(server, scanner, rootRegionInfo.regionName); for(Iterator i = knownMetaRegions.values().iterator(); i.hasNext(); ) { MetaRegion r = i.next(); server = client.getHRegionConnection(r.server); - scanner = server.openScanner(r.regionName, columns, startRow); - scanMetaRegion(server, scanner, r.regionName); + scannerId = server.openScanner(r.regionName, columns, startRow); + scanMetaRegion(server, scannerId, r.regionName); } } } /** PendingCloseReport is a close message that is saved in a different thread. */ - class PendingCloseReport extends PendingOperation { - HRegionInfo regionInfo; - boolean reassignRegion; - boolean rootRegion; + private class PendingCloseReport extends PendingOperation { + private HRegionInfo regionInfo; + private boolean reassignRegion; + private boolean rootRegion; public PendingCloseReport(HRegionInfo regionInfo, boolean reassignRegion) { super(); @@ -929,7 +1092,7 @@ // If the region closing down is a meta region then we need to update // the ROOT table - if(this.regionInfo.regionName.find(metaTableDesc.getName().toString()) == 0) { + if(this.regionInfo.regionName.find(HGlobals.metaTableDesc.getName().toString()) == 0) { this.rootRegion = true; } else { @@ -938,24 +1101,21 @@ } public void process() throws IOException { - + // We can not access any meta region if they have not already been assigned // and scanned. + + metaScanner.waitForMetaScan(); - while(!allMetaRegionsScanned) { - try { - allMetaRegionsScanned.wait(); - - } catch(InterruptedException e) { - } - } + LOG.debug("region closed: " + regionInfo.regionName); // Mark the Region as unavailable in the appropriate meta table Text metaRegionName; HRegionInterface server; if(rootRegion) { - metaRegionName = rootRegionInfo.regionName; + metaRegionName = HGlobals.rootRegionInfo.regionName; + waitForRootRegion(); // Make sure root region available server = client.getHRegionConnection(rootRegionLocation); } else { @@ -970,23 +1130,23 @@ server.commit(metaRegionName, clientId, lockid); if(reassignRegion) { - synchronized(unassignedRegions) { - unassignedRegions.put(regionInfo.regionName, regionInfo); - assignAttempts.put(regionInfo.regionName, 0L); - } + LOG.debug("reassign region: " + regionInfo.regionName); + + unassignedRegions.put(regionInfo.regionName, regionInfo); + assignAttempts.put(regionInfo.regionName, 0L); } } } /** PendingOpenReport is an open message that is saved in a different thread. */ - class PendingOpenReport extends PendingOperation { - boolean rootRegion; - Text regionName; - BytesWritable serverAddress; - BytesWritable startCode; + private class PendingOpenReport extends PendingOperation { + private boolean rootRegion; + private Text regionName; + private BytesWritable serverAddress; + private BytesWritable startCode; public PendingOpenReport(HServerInfo info, Text regionName) { - if(regionName.find(metaTableDesc.getName().toString()) == 0) { + if(regionName.find(HGlobals.metaTableDesc.getName().toString()) == 0) { // The region which just came on-line is a META region. // We need to look in the ROOT region for its information. @@ -1017,21 +1177,18 @@ // We can not access any meta region if they have not already been assigned // and scanned. + + metaScanner.waitForMetaScan(); - while(!allMetaRegionsScanned) { - try { - allMetaRegionsScanned.wait(); - - } catch(InterruptedException e) { - } - } + LOG.debug(regionName + " open on " + serverAddress.toString()); // Register the newly-available Region's location. Text metaRegionName; HRegionInterface server; if(rootRegion) { - metaRegionName = rootRegionInfo.regionName; + metaRegionName = HGlobals.rootRegionInfo.regionName; + waitForRootRegion(); // Make sure root region available server = client.getHRegionConnection(rootRegionLocation); } else { @@ -1056,15 +1213,9 @@ // We can not access any meta region if they have not already been assigned // and scanned. + + metaScanner.waitForMetaScan(); - while(!allMetaRegionsScanned) { - try { - allMetaRegionsScanned.wait(); - - } catch(InterruptedException e) { - } - } - // 1. Check to see if table already exists Text metaStartRow = knownMetaRegions.headMap(newRegion.regionName).lastKey(); @@ -1103,12 +1254,16 @@ new BytesWritable(byteValue.toByteArray())); server.commit(metaRegionName, clientId, lockid); - // 4. Get it assigned to a server + // 4. Close the new region to flush it to disk - synchronized(unassignedRegions) { - unassignedRegions.put(regionName, info); - assignAttempts.put(regionName, 0L); - } + r.close(); + + // 5. Get it assigned to a server + + unassignedRegions.put(regionName, info); + assignAttempts.put(regionName, 0L); + + LOG.debug("created table " + desc.getName()); } /** @@ -1152,17 +1307,8 @@ table.getRegionInfo().write(s); - s.writeLong(table.getRegionId()); meta.put(writeid, META_COL_REGIONINFO, bytes.toByteArray()); - bytes.reset(); - new HServerAddress().write(s); - meta.put(writeid, META_COL_SERVER, bytes.toByteArray()); - - bytes.reset(); - s.writeLong(0L); - meta.put(writeid, META_COL_STARTCODE, bytes.toByteArray()); - meta.commit(writeid); } @@ -1173,36 +1319,37 @@ // We can not access any meta region if they have not already been assigned // and scanned. + + metaScanner.waitForMetaScan(); - while(!allMetaRegionsScanned) { - try { - allMetaRegionsScanned.wait(); - - } catch(InterruptedException e) { - } - } + for(Iterator it = knownMetaRegions.tailMap(tableName).values().iterator(); + it.hasNext(); ) { - for(Iterator i = knownMetaRegions.tailMap(tableName).values().iterator(); - i.hasNext(); ) { - // Find all the regions that make up this table long clientId = rand.nextLong(); - MetaRegion m = i.next(); + MetaRegion m = it.next(); HRegionInterface server = client.getHRegionConnection(m.server); + long scannerId = -1L; try { - HScannerInterface scanner - = server.openScanner(m.regionName, columns, tableName); + scannerId = server.openScanner(m.regionName, columns, tableName); - HStoreKey key = new HStoreKey(); - TreeMap results = new TreeMap(); - DataInputBuffer inbuf = new DataInputBuffer(); - Vector rowsToDelete = new Vector(); - while(scanner.next(key, results)) { - byte hRegionInfoBytes[] = results.get(META_COL_REGIONINFO); - inbuf.reset(hRegionInfoBytes, hRegionInfoBytes.length); + DataInputBuffer inbuf = new DataInputBuffer(); + while(true) { + LabelledData[] values = null; + HStoreKey key = new HStoreKey(); + values = server.next(scannerId, key); + if(values.length == 0) { + break; + } + TreeMap results = new TreeMap(); + for(int i = 0; i < values.length; i++) { + results.put(values[i].getLabel(), values[i].getData().get()); + } + byte bytes[] = results.get(META_COL_REGIONINFO); + inbuf.reset(bytes, bytes.length); HRegionInfo info = new HRegionInfo(); info.readFields(inbuf); @@ -1212,25 +1359,23 @@ // Is it being served? - byte serverBytes[] = results.get(META_COL_SERVER); - String serverName = new String(serverBytes, UTF8_ENCODING); + String serverName = + new String(results.get(META_COL_SERVER), UTF8_ENCODING); - byte startCodeBytes[] = results.get(META_COL_STARTCODE); - long startCode = Long.decode(new String(startCodeBytes, UTF8_ENCODING)); + long startCode = + Long.valueOf(new String(results.get(META_COL_STARTCODE), UTF8_ENCODING)); - synchronized(serversToServerInfo) { - HServerInfo s = serversToServerInfo.get(serverName); - if(s != null && s.getStartCode() == startCode) { - - // It is being served. Tell the server to stop it and not report back - - TreeMap regionsToKill = killList.get(serverName); - if(regionsToKill == null) { - regionsToKill = new TreeMap(); - } - regionsToKill.put(info.regionName, info); - killList.put(serverName, regionsToKill); + HServerInfo s = serversToServerInfo.get(serverName); + if(s != null && s.getStartCode() == startCode) { + + // It is being served. Tell the server to stop it and not report back + + TreeMap regionsToKill = killList.get(serverName); + if(regionsToKill == null) { + regionsToKill = new TreeMap(); } + regionsToKill.put(info.regionName, info); + killList.put(serverName, regionsToKill); } } for(Iterator row = rowsToDelete.iterator(); row.hasNext(); ) { @@ -1240,8 +1385,21 @@ } } catch(IOException e) { e.printStackTrace(); + + } finally { + if(scannerId != -1L) { + try { + server.close(scannerId); + + } catch(IOException e) { + e.printStackTrace(); + + } + } + scannerId = -1L; } } + LOG.debug("deleted table: " + tableName); } public HServerAddress findRootRegion() { @@ -1252,24 +1410,48 @@ // Managing leases ////////////////////////////////////////////////////////////////////////////// - class ServerExpirer extends LeaseListener { - String server = null; + private class ServerExpirer extends LeaseListener { + private String server; public ServerExpirer(String server) { this.server = new String(server); } public void leaseExpired() { - HServerInfo storedInfo = null; + LOG.debug(server + " lease expired"); - synchronized(serversToServerInfo) { - storedInfo = serversToServerInfo.remove(server); - } + HServerInfo storedInfo = serversToServerInfo.remove(server); synchronized(msgQueue) { msgQueue.add(new PendingServerShutdown(storedInfo)); msgQueue.notifyAll(); } } } + + private static void printUsage() { + System.err.println("Usage: java org.apache.hbase.HMaster " + + "[--bind=hostname:port]"); + } + + public static void main(String [] args) throws IOException { + Configuration conf = new HBaseConfiguration(); + + // Process command-line args. TODO: Better cmd-line processing + // (but hopefully something not as painful as cli options). + for (String cmd: args) { + if (cmd.equals("-h") || cmd.startsWith("--h")) { + printUsage(); + return; + } + + final String addressArgKey = "--bind="; + if (cmd.startsWith(addressArgKey)) { + conf.set(MASTER_ADDRESS, + cmd.substring(addressArgKey.length())); + } + } + + new HMaster(conf); + } } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (.../vendor/current) (revision 7443) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (.../dev) (revision 7443) @@ -15,15 +15,27 @@ */ package org.apache.hadoop.hbase; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import java.util.Random; +import java.util.TreeMap; +import java.util.Vector; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.*; -import org.apache.hadoop.io.*; -import org.apache.hadoop.fs.*; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.MapFile; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; -import java.io.*; -import java.util.*; - /******************************************************************************* * HStore maintains a bunch of data files. It is responsible for maintaining * the memory/file hierarchy and for periodic flushes to disk and compacting @@ -53,7 +65,7 @@ Integer compactLock = new Integer(0); Integer flushLock = new Integer(0); - HLocking locking = new HLocking(); + ReadWriteLock locker = new ReentrantReadWriteLock(); TreeMap maps = new TreeMap(); TreeMap mapFiles = new TreeMap(); @@ -218,7 +230,7 @@ /** Turn off all the MapFile readers */ public void close() throws IOException { - locking.obtainWriteLock(); + this.locker.writeLock().lock(); LOG.info("closing HStore for " + this.regionName + "/" + this.colFamily); try { @@ -232,7 +244,7 @@ LOG.info("HStore closed for " + this.regionName + "/" + this.colFamily); } finally { - locking.releaseWriteLock(); + this.locker.writeLock().unlock(); } } @@ -295,7 +307,7 @@ // C. Finally, make the new MapFile available. if(addToAvailableMaps) { - locking.obtainWriteLock(); + this.locker.writeLock().lock(); try { maps.put(logCacheFlushId, new MapFile.Reader(fs, mapfile.toString(), conf)); @@ -303,7 +315,7 @@ LOG.debug("HStore available for " + this.regionName + "/" + this.colFamily); } finally { - locking.releaseWriteLock(); + this.locker.writeLock().unlock(); } } return getAllMapFiles(); @@ -355,12 +367,12 @@ // Grab a list of files to compact. Vector toCompactFiles = null; - locking.obtainWriteLock(); + this.locker.writeLock().lock(); try { toCompactFiles = new Vector(mapFiles.values()); } finally { - locking.releaseWriteLock(); + this.locker.writeLock().unlock(); } // Compute the max-sequenceID seen in any of the to-be-compacted TreeMaps @@ -583,7 +595,7 @@ // 1. Acquiring the write-lock - locking.obtainWriteLock(); + this.locker.writeLock().lock(); Path curCompactStore = HStoreFile.getHStoreDir(compactdir, regionName, colFamily); try { Path doneFile = new Path(curCompactStore, COMPACTION_DONE); @@ -689,7 +701,7 @@ // 7. Releasing the write-lock - locking.releaseWriteLock(); + this.locker.writeLock().unlock(); } } @@ -705,7 +717,7 @@ * The returned object should map column names to byte arrays (byte[]). */ public void getFull(HStoreKey key, TreeMap results) throws IOException { - locking.obtainReadLock(); + this.locker.readLock().lock(); try { MapFile.Reader[] maparray = maps.values().toArray(new MapFile.Reader[maps.size()]); @@ -734,7 +746,7 @@ } } finally { - locking.releaseReadLock(); + this.locker.readLock().unlock(); } } @@ -750,7 +762,7 @@ } Vector results = new Vector(); - locking.obtainReadLock(); + this.locker.readLock().lock(); try { MapFile.Reader[] maparray = maps.values().toArray(new MapFile.Reader[maps.size()]); @@ -791,7 +803,7 @@ } } finally { - locking.releaseReadLock(); + this.locker.readLock().unlock(); } } @@ -804,6 +816,11 @@ */ public long getLargestFileSize(Text midKey) throws IOException { long maxSize = 0L; + if (this.mapFiles.size() <= 0) { + return maxSize; + } + + long mapIndex = 0L; // Iterate through all the MapFiles @@ -867,7 +884,7 @@ public HStoreScanner(long timestamp, Text targetCols[], Text firstRow) throws IOException { super(timestamp, targetCols); - locking.obtainReadLock(); + locker.readLock().lock(); try { this.readers = new MapFile.Reader[mapFiles.size()]; int i = 0; @@ -967,7 +984,7 @@ } } finally { - locking.releaseReadLock(); + locker.readLock().unlock(); scannerClosed = true; } } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/LockException.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/LockException.java (.../vendor/current) (revision 0) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/LockException.java (.../dev) (revision 7443) @@ -0,0 +1,28 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; + +public class LockException extends IOException { + public LockException() { + super(); + } + + public LockException(String s) { + super(s); + } +} Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java (.../vendor/current) (revision 7443) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java (.../dev) (revision 7443) @@ -22,6 +22,8 @@ import java.io.*; import java.util.*; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; /******************************************************************************* * The HMemcache holds in-memory modifications to the HRegion. This is really a @@ -38,7 +40,7 @@ TreeMap snapshot = null; - HLocking locking = new HLocking(); + ReadWriteLock locker = new ReentrantReadWriteLock(); public HMemcache() { } @@ -52,18 +54,22 @@ } /** - * We want to return a snapshot of the current HMemcache with a known HLog + * Returns a snapshot of the current HMemcache with a known HLog * sequence number at the same time. + * + * We need to prevent any writing to the cache during this time, + * so we obtain a write lock for the duration of the operation. * - * Return both the frozen HMemcache TreeMap, as well as the HLog seq number. - * - * We need to prevent any writing to the cache during this time, so we obtain - * a write lock for the duration of the operation. + *

If this method returns non-null, client must call + * {@link #deleteSnapshot()} to clear 'snapshot-in-progress' + * state when finished with the returned {@link Snapshot}. + * + * @return frozen HMemcache TreeMap and HLog sequence number. */ public Snapshot snapshotMemcacheForLog(HLog log) throws IOException { Snapshot retval = new Snapshot(); - locking.obtainWriteLock(); + this.locker.writeLock().lock(); try { if(snapshot != null) { throw new IOException("Snapshot in progress!"); @@ -86,7 +92,7 @@ return retval; } finally { - locking.releaseWriteLock(); + this.locker.writeLock().unlock(); } } @@ -96,7 +102,7 @@ * Modifying the structure means we need to obtain a writelock. */ public void deleteSnapshot() throws IOException { - locking.obtainWriteLock(); + this.locker.writeLock().lock(); try { if(snapshot == null) { @@ -118,7 +124,7 @@ LOG.debug("snapshot deleted"); } finally { - locking.releaseWriteLock(); + this.locker.writeLock().unlock(); } } @@ -128,7 +134,7 @@ * Operation uses a write lock. */ public void add(Text row, TreeMap columns, long timestamp) { - locking.obtainWriteLock(); + this.locker.writeLock().lock(); try { for(Iterator it = columns.keySet().iterator(); it.hasNext(); ) { Text column = it.next(); @@ -139,7 +145,7 @@ } } finally { - locking.releaseWriteLock(); + this.locker.writeLock().unlock(); } } @@ -150,7 +156,7 @@ */ public byte[][] get(HStoreKey key, int numVersions) { Vector results = new Vector(); - locking.obtainReadLock(); + this.locker.readLock().lock(); try { Vector result = get(memcache, key, numVersions-results.size()); results.addAll(0, result); @@ -172,7 +178,7 @@ } } finally { - locking.releaseReadLock(); + this.locker.readLock().unlock(); } } @@ -184,7 +190,7 @@ */ public TreeMap getFull(HStoreKey key) throws IOException { TreeMap results = new TreeMap(); - locking.obtainReadLock(); + this.locker.readLock().lock(); try { internalGetFull(memcache, key, results); for(int i = history.size()-1; i >= 0; i--) { @@ -194,7 +200,7 @@ return results; } finally { - locking.releaseReadLock(); + this.locker.readLock().unlock(); } } @@ -271,7 +277,7 @@ super(timestamp, targetCols); - locking.obtainReadLock(); + locker.readLock().lock(); try { this.backingMaps = new TreeMap[history.size() + 1]; int i = 0; @@ -359,7 +365,7 @@ } } finally { - locking.releaseReadLock(); + locker.readLock().unlock(); scannerClosed = true; } } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HBaseConfiguration.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HBaseConfiguration.java (.../vendor/current) (revision 0) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HBaseConfiguration.java (.../dev) (revision 7443) @@ -0,0 +1,25 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import org.apache.hadoop.conf.Configuration; + +public class HBaseConfiguration extends Configuration { + public HBaseConfiguration() { + super(); + addDefaultResource("hbase-default.xml"); + } +} Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java (.../vendor/current) (revision 7443) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java (.../dev) (revision 7443) @@ -15,7 +15,9 @@ */ package org.apache.hadoop.hbase; -import org.apache.hadoop.io.*; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.VersionedProtocol; import java.io.*; @@ -23,17 +25,13 @@ * Clients interact with HRegionServers using * a handle to the HRegionInterface. ******************************************************************************/ -public interface HRegionInterface { +public interface HRegionInterface extends VersionedProtocol { public static final long versionID = 1L; // initial version // Get metainfo about an HRegion public HRegionInfo getRegionInfo(Text regionName); - // Start a scanner for a given HRegion. - - public HScannerInterface openScanner(Text regionName, Text[] columns, Text startRow) throws IOException; - // GET methods for an HRegion. public BytesWritable get(Text regionName, Text row, Text column) throws IOException; @@ -58,4 +56,41 @@ public void abort(Text regionName, long clientid, long lockid) throws IOException; public void commit(Text regionName, long clientid, long lockid) throws IOException; public void renewLease(long lockid, long clientid) throws IOException; + + ////////////////////////////////////////////////////////////////////////////// + // remote scanner interface + ////////////////////////////////////////////////////////////////////////////// + + /** + * Opens a remote scanner. + * + * @param clientId - client identifier (so we can associate a scanner with a client) + * @param regionName - name of region to scan + * @param columns - columns to scan + * @param startRow - starting row to scan + * + * @param scannerId - scanner identifier used in other calls + * @throws IOException + */ + public long openScanner(Text regionName, Text[] columns, Text startRow) throws IOException; + + /** + * Get the next set of values + * + * @param scannerId - clientId passed to openScanner + * @param key - the next HStoreKey + * @param columns - an array of column names + * @param values - an array of byte[] values (corresponds 1-1 with columns) + * @return - true if a value was retrieved + * @throws IOException + */ + public LabelledData[] next(long scannerId, HStoreKey key) throws IOException; + + /** + * Close a scanner + * + * @param scannerId - the scanner id returned by openScanner + * @throws IOException + */ + public void close(long scannerId) throws IOException; } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java (.../vendor/current) (revision 7443) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java (.../dev) (revision 7443) @@ -93,6 +93,11 @@ return timestamp; } + /** + * @param other Key to compare against. Compares row and column. + * @return True if same row and column. + * @see {@link #matchesWithoutColumn(HStoreKey)} + */ public boolean matchesRowCol(HStoreKey other) { if(this.row.compareTo(other.row) == 0 && this.column.compareTo(other.column) == 0) { @@ -103,6 +108,13 @@ } } + /** + * @param other Key to copmare against. Compares row and + * timestamp. + * @return True if same row and timestamp is greater than + * other + * @see {@link #matchesRowCol(HStoreKey)} + */ public boolean matchesWithoutColumn(HStoreKey other) { if((this.row.compareTo(other.row) == 0) && (this.timestamp >= other.getTimestamp())) { Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (.../vendor/current) (revision 7443) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (.../dev) (revision 7443) @@ -15,6 +15,8 @@ */ package org.apache.hadoop.hbase; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.ipc.*; @@ -22,19 +24,34 @@ import java.io.*; import java.util.*; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; /******************************************************************************* * HRegionServer makes a set of HRegions available to clients. It checks in with * the HMaster. There are many HRegionServers in a single HBase deployment. ******************************************************************************/ -public class HRegionServer implements HConstants, HRegionInterface, Runnable { +public class HRegionServer + implements HConstants, HRegionInterface, Runnable { + + public long getProtocolVersion(String protocol, + long clientVersion) throws IOException { + if (protocol.equals(HRegionInterface.class.getName())) { + return HRegionInterface.versionID; + } else { + throw new IOException("Unknown protocol to name node: " + protocol); + } + } + + private static final Log LOG = LogFactory.getLog(HRegionServer.class); + private boolean stopRequested; private Path regionDir; private HServerAddress address; private Configuration conf; private Random rand; private TreeMap regions; // region name -> HRegion - private HLocking locking; + private ReadWriteLock locker; private Vector outboundMsgs; private long threadWakeFrequency; @@ -67,12 +84,12 @@ // Grab a list of regions to check Vector checkSplit = new Vector(); - locking.obtainReadLock(); + locker.readLock().lock(); try { checkSplit.addAll(regions.values()); } finally { - locking.releaseReadLock(); + locker.readLock().unlock(); } // Check to see if they need splitting @@ -95,14 +112,16 @@ for(Iterator it = toSplit.iterator(); it.hasNext(); ) { SplitRegion r = it.next(); - locking.obtainWriteLock(); + locker.writeLock().lock(); regions.remove(r.region.getRegionName()); - locking.releaseWriteLock(); + locker.writeLock().unlock(); HRegion[] newRegions = null; try { Text oldRegion = r.region.getRegionName(); + LOG.info("splitting region: " + oldRegion); + newRegions = r.region.closeAndSplit(r.midKey); // When a region is split, the META table needs to updated if we're @@ -113,6 +132,8 @@ = (oldRegion.find(META_TABLE_NAME.toString()) == 0) ? ROOT_TABLE_NAME : META_TABLE_NAME; + LOG.debug("region split complete. updating meta"); + client.openTable(tableToUpdate); long lockid = client.startUpdate(oldRegion); client.delete(lockid, META_COL_REGIONINFO); @@ -132,7 +153,14 @@ // Now tell the master about the new regions + LOG.debug("reporting region split to master"); + reportSplit(newRegions[0].getRegionInfo(), newRegions[1].getRegionInfo()); + + LOG.info("region split successful. old region=" + oldRegion + + ", new regions: " + newRegions[0].getRegionName() + ", " + + newRegions[1].getRegionName()); + newRegions[0].close(); newRegions[1].close(); @@ -145,11 +173,15 @@ // Sleep - long endTime = System.currentTimeMillis(); - try { - Thread.sleep(splitCheckFrequency - (endTime - startTime)); - - } catch(InterruptedException iex) { + long waitTime = + splitCheckFrequency - (System.currentTimeMillis() - startTime); + + if(waitTime > 0) { + try { + Thread.sleep(waitTime); + + } catch(InterruptedException iex) { + } } } } @@ -167,12 +199,12 @@ // Grab a list of items to flush Vector toFlush = new Vector(); - locking.obtainReadLock(); + locker.readLock().lock(); try { toFlush.addAll(regions.values()); } finally { - locking.releaseReadLock(); + locker.readLock().unlock(); } // Flush them, if necessary @@ -190,11 +222,15 @@ // Sleep - long endTime = System.currentTimeMillis(); - try { - Thread.sleep(threadWakeFrequency - (endTime - startTime)); - - } catch(InterruptedException iex) { + long waitTime = + threadWakeFrequency - (System.currentTimeMillis() - startTime); + + if(waitTime > 0) { + try { + Thread.sleep(waitTime); + + } catch(InterruptedException iex) { + } } } } @@ -249,7 +285,7 @@ /** Start a HRegionServer at the default location */ public HRegionServer(Configuration conf) throws IOException { this(new Path(conf.get(HREGION_DIR, DEFAULT_HREGION_DIR)), - new HServerAddress(conf.get("hbase.regionserver.default.name")), + new HServerAddress(conf.get(REGIONSERVER_ADDRESS, "localhost:0")), conf); } @@ -261,12 +297,12 @@ this.stopRequested = false; this.regionDir = regionDir; - this.address = address; this.conf = conf; this.rand = new Random(); this.regions = new TreeMap(); - this.locking = new HLocking(); + this.locker = new ReentrantReadWriteLock(); this.outboundMsgs = new Vector(); + this.scanners = Collections.synchronizedMap(new TreeMap()); // Config'ed params @@ -278,37 +314,53 @@ // Cache flushing this.cacheFlusher = new Flusher(); - this.cacheFlusherThread = new Thread(cacheFlusher); + this.cacheFlusherThread = new Thread(cacheFlusher, "HRegionServer.cacheFlusher"); // Check regions to see if they need to be split this.splitChecker = new SplitChecker(); - this.splitCheckerThread = new Thread(splitChecker); + this.splitCheckerThread = new Thread(splitChecker, "HRegionServer.splitChecker"); + + // Process requests from Master + + this.toDo = new Vector(); + this.worker = new Worker(); + this.workerThread = new Thread(worker, "HRegionServer.worker"); try { + + // Server to handle client requests + + this.server = RPC.getServer(this, address.getBindAddress().toString(), + address.getPort(), conf.getInt("hbase.hregionserver.handler.count", 10), false, conf); + + this.address = new HServerAddress(server.getListenerAddress()); + // Local file paths - this.fs = FileSystem.get(conf); - Path newlogdir = new Path(regionDir, "log" + "_" + address.toString()); - this.oldlogfile = new Path(regionDir, "oldlogfile" + "_" + address.toString()); + String serverName = this.address.getBindAddress() + "_" + this.address.getPort(); + Path newlogdir = new Path(regionDir, "log" + "_" + serverName); + this.oldlogfile = new Path(regionDir, "oldlogfile" + "_" + serverName); // Logging + this.fs = FileSystem.get(conf); HLog.consolidateOldLog(newlogdir, oldlogfile, fs, conf); this.log = new HLog(fs, newlogdir, conf); this.logRoller = new LogRoller(); - this.logRollerThread = new Thread(logRoller); + this.logRollerThread = new Thread(logRoller, "HRegionServer.logRoller"); // Remote HMaster this.hbaseMaster = (HMasterRegionInterface) RPC.waitForProxy(HMasterRegionInterface.class, - HMasterRegionInterface.versionId, - new HServerAddress(conf.get(MASTER_DEFAULT_NAME)).getInetSocketAddress(), + HMasterRegionInterface.versionID, + new HServerAddress(conf.get(MASTER_ADDRESS)).getInetSocketAddress(), conf); // Threads + this.workerThread.start(); this.cacheFlusherThread.start(); this.splitCheckerThread.start(); this.logRollerThread.start(); @@ -317,14 +369,14 @@ // Server - this.server = RPC.getServer(this, address.getBindAddress().toString(), - address.getPort(), conf.getInt("hbase.hregionserver.handler.count", 10), false, conf); this.server.start(); } catch(IOException e) { this.stopRequested = true; throw e; } + + LOG.info("HRegionServer started at: " + address.toString()); } /** @@ -342,12 +394,18 @@ fs.close(); server.stop(); } - + LOG.info("stopping server at: " + address.toString()); } /** Call join to wait for all the threads to finish */ public void join() { try { + this.workerThread.join(); + + } catch(InterruptedException iex) { + } + + try { this.logRollerThread.join(); } catch(InterruptedException iex) { @@ -366,7 +424,7 @@ } catch(InterruptedException iex) { } - + LOG.info("server stopped at: " + address.toString()); } /** @@ -388,10 +446,12 @@ } catch(IOException e) { waitTime = msgInterval - (System.currentTimeMillis() - lastMsg); - try { - Thread.sleep(waitTime); - - } catch(InterruptedException iex) { + if(waitTime > 0) { + try { + Thread.sleep(waitTime); + + } catch(InterruptedException iex) { + } } continue; } @@ -411,10 +471,33 @@ HMsg msgs[] = hbaseMaster.regionServerReport(info, outboundArray); lastMsg = System.currentTimeMillis(); - // Process the HMaster's instruction stream + // Queue up the HMaster's instruction stream for processing - if(! processMessages(msgs)) { - break; + synchronized(toDo) { + boolean restartOrStop = false; + for(int i = 0; i < msgs.length; i++) { + switch(msgs[i].getMsg()) { + + case HMsg.MSG_CALL_SERVER_STARTUP: + closeAllRegions(); + restartOrStop = true; + break; + + case HMsg.MSG_REGIONSERVER_ALREADY_RUNNING: + stop(); + restartOrStop = true; + break; + + default: + toDo.add(msgs[i]); + } + } + if(toDo.size() > 0) { + toDo.notifyAll(); + } + if(restartOrStop) { + break; + } } } catch(IOException e) { @@ -424,55 +507,16 @@ waitTime = msgInterval - (System.currentTimeMillis() - lastMsg); - try { - Thread.sleep(waitTime); - } catch(InterruptedException iex) { + if(waitTime > 0) { + try { + Thread.sleep(waitTime); + } catch(InterruptedException iex) { + } } - } } } - private boolean processMessages(HMsg[] msgs) throws IOException { - for(int i = 0; i < msgs.length; i++) { - switch(msgs[i].getMsg()) { - - case HMsg.MSG_REGION_OPEN: // Open a region - openRegion(msgs[i].getRegionInfo()); - break; - - case HMsg.MSG_REGION_CLOSE: // Close a region - closeRegion(msgs[i].getRegionInfo(), true); - break; - - case HMsg.MSG_REGION_MERGE: // Merge two regions - //TODO ??? - throw new IOException("TODO: need to figure out merge"); - //break; - - case HMsg.MSG_CALL_SERVER_STARTUP: // Close regions, restart - closeAllRegions(); - return false; - - case HMsg.MSG_REGIONSERVER_ALREADY_RUNNING: // Go away - stop(); - return false; - - case HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT: // Close a region, don't reply - closeRegion(msgs[i].getRegionInfo(), false); - break; - - case HMsg.MSG_REGION_CLOSE_AND_DELETE: - closeAndDeleteRegion(msgs[i].getRegionInfo()); - break; - - default: - throw new IOException("Impossible state during msg processing. Instruction: " + msgs[i]); - } - } - return true; - } - /** Add to the outbound message buffer */ private void reportOpen(HRegion region) { synchronized(outboundMsgs) { @@ -508,9 +552,68 @@ // HMaster-given operations ////////////////////////////////////////////////////////////////////////////// + private Vector toDo; + private Worker worker; + private Thread workerThread; + private class Worker implements Runnable { + public void run() { + while(!stopRequested) { + HMsg msg = null; + synchronized(toDo) { + while(toDo.size() == 0) { + try { + toDo.wait(); + + } catch(InterruptedException e) { + } + } + msg = toDo.remove(0); + } + try { + switch(msg.getMsg()) { + + case HMsg.MSG_REGION_OPEN: // Open a region + openRegion(msg.getRegionInfo()); + break; + + case HMsg.MSG_REGION_CLOSE: // Close a region + closeRegion(msg.getRegionInfo(), true); + break; + + case HMsg.MSG_REGION_MERGE: // Merge two regions + //TODO ??? + throw new IOException("TODO: need to figure out merge"); + //break; + + case HMsg.MSG_CALL_SERVER_STARTUP: // Close regions, restart + closeAllRegions(); + continue; + + case HMsg.MSG_REGIONSERVER_ALREADY_RUNNING: // Go away + stop(); + continue; + + case HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT: // Close a region, don't reply + closeRegion(msg.getRegionInfo(), false); + break; + + case HMsg.MSG_REGION_CLOSE_AND_DELETE: + closeAndDeleteRegion(msg.getRegionInfo()); + break; + + default: + throw new IOException("Impossible state during msg processing. Instruction: " + msg); + } + } catch(IOException e) { + e.printStackTrace(); + } + } + } + } + private void openRegion(HRegionInfo regionInfo) throws IOException { - locking.obtainWriteLock(); + this.locker.writeLock().lock(); try { HRegion region = new HRegion(regionDir, log, fs, conf, regionInfo, null, oldlogfile); @@ -518,14 +621,14 @@ reportOpen(region); } finally { - locking.releaseWriteLock(); + this.locker.writeLock().unlock(); } } private void closeRegion(HRegionInfo info, boolean reportWhenCompleted) throws IOException { - locking.obtainWriteLock(); + this.locker.writeLock().lock(); try { HRegion region = regions.remove(info.regionName); @@ -538,13 +641,13 @@ } } finally { - locking.releaseWriteLock(); + this.locker.writeLock().unlock(); } } private void closeAndDeleteRegion(HRegionInfo info) throws IOException { - locking.obtainWriteLock(); + this.locker.writeLock().lock(); try { HRegion region = regions.remove(info.regionName); @@ -553,13 +656,13 @@ } } finally { - locking.releaseWriteLock(); + this.locker.writeLock().unlock(); } } /** Called either when the master tells us to restart or from stop() */ private void closeAllRegions() throws IOException { - locking.obtainWriteLock(); + this.locker.writeLock().lock(); try { for(Iterator it = regions.values().iterator(); it.hasNext(); ) { HRegion region = it.next(); @@ -568,7 +671,7 @@ regions.clear(); } finally { - locking.releaseWriteLock(); + this.locker.writeLock().unlock(); } } @@ -612,17 +715,6 @@ return region.getRegionInfo(); } - /** Start a scanner for a given HRegion. */ - public HScannerInterface openScanner(Text regionName, Text[] cols, - Text firstRow) throws IOException { - - HRegion r = getRegion(regionName); - if(r == null) { - throw new IOException("Not serving region " + regionName); - } - return r.getScanner(cols, firstRow); - } - /** Get the indicated row/column */ public BytesWritable get(Text regionName, Text row, Text column) throws IOException { HRegion region = getRegion(regionName); @@ -806,13 +898,126 @@ /** Private utility method for safely obtaining an HRegion handle. */ private HRegion getRegion(Text regionName) { - locking.obtainReadLock(); + this.locker.readLock().lock(); try { return regions.get(regionName); } finally { - locking.releaseReadLock(); + this.locker.readLock().unlock(); } } + ////////////////////////////////////////////////////////////////////////////// + // remote scanner interface + ////////////////////////////////////////////////////////////////////////////// + + private Map scanners; + private class ScannerListener extends LeaseListener { + private Text scannerName; + + public ScannerListener(Text scannerName) { + this.scannerName = scannerName; + } + + public void leaseExpired() { + HScannerInterface s = scanners.remove(scannerName); + if(s != null) { + try { + s.close(); + + } catch(IOException e) { + e.printStackTrace(); + } + } + } + } + + /** Start a scanner for a given HRegion. */ + public long openScanner(Text regionName, Text[] cols, Text firstRow) + throws IOException { + + HRegion r = getRegion(regionName); + if(r == null) { + throw new IOException("Not serving region " + regionName); + } + + long scannerId = -1L; + try { + HScannerInterface s = r.getScanner(cols, firstRow); + scannerId = rand.nextLong(); + Text scannerName = new Text(String.valueOf(scannerId)); + scanners.put(scannerName, s); + leases.createLease(scannerName, scannerName, new ScannerListener(scannerName)); + + } catch(IOException e) { + e.printStackTrace(); + throw e; + } + return scannerId; + } + + public LabelledData[] next(long scannerId, HStoreKey key) throws IOException { + + Text scannerName = new Text(String.valueOf(scannerId)); + HScannerInterface s = scanners.get(scannerName); + if(s == null) { + throw new IOException("unknown scanner"); + } + leases.renewLease(scannerName, scannerName); + TreeMap results = new TreeMap(); + ArrayList values = new ArrayList(); + if(s.next(key, results)) { + for(Iterator> it = results.entrySet().iterator(); + it.hasNext(); ) { + Map.Entry e = it.next(); + values.add(new LabelledData(e.getKey(), e.getValue())); + } + } + return values.toArray(new LabelledData[values.size()]); + } + + public void close(long scannerId) throws IOException { + Text scannerName = new Text(String.valueOf(scannerId)); + HScannerInterface s = scanners.remove(scannerName); + if(s == null) { + throw new IOException("unknown scanner"); + } + try { + s.close(); + + } catch(IOException ex) { + ex.printStackTrace(); + } + leases.cancelLease(scannerName, scannerName); + } + + ////////////////////////////////////////////////////////////////////////////// + // Main program + ////////////////////////////////////////////////////////////////////////////// + + private static void printUsage() { + System.err.println("Usage: java " + + "org.apache.hbase.HRegionServer [--bind=hostname:port]"); + } + + public static void main(String [] args) throws IOException { + Configuration conf = new HBaseConfiguration(); + + // Process command-line args. TODO: Better cmd-line processing + // (but hopefully something not as painful as cli options). + for (String cmd: args) { + if (cmd.equals("-h") || cmd.startsWith("--h")) { + printUsage(); + return; + } + + final String addressArgKey = "--bind="; + if (cmd.startsWith(addressArgKey)) { + conf.set(REGIONSERVER_ADDRESS, + cmd.substring(addressArgKey.length())); + } + } + + new HRegionServer(conf); + } } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterInterface.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterInterface.java (.../vendor/current) (revision 7443) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterInterface.java (.../dev) (revision 7443) @@ -15,7 +15,8 @@ */ package org.apache.hadoop.hbase; -import org.apache.hadoop.io.*; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.VersionedProtocol; import java.io.IOException; @@ -23,7 +24,7 @@ * Clients interact with the HMasterInterface to gain access to meta-level HBase * functionality, like finding an HRegionServer and creating/destroying tables. ******************************************************************************/ -public interface HMasterInterface { +public interface HMasterInterface extends VersionedProtocol { public static final long versionID = 1L; // initial version ////////////////////////////////////////////////////////////////////////////// Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java (.../vendor/current) (revision 7443) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java (.../dev) (revision 7443) @@ -24,7 +24,18 @@ // Configuration parameters - static final String MASTER_DEFAULT_NAME = "hbase.master.default.name"; + // TODO: URL for hbase master, like hdfs URLs with host and port. + // Or, like jdbc URLs: + // jdbc:mysql://[host][,failoverhost...][:port]/[database] + // jdbc:mysql://[host][,failoverhost...][:port]/[database][?propertyName1][=propertyValue1][&propertyName2][=propertyValue2]... + + static final String MASTER_ADDRESS = "hbase.master"; + // TODO: Support 'local': i.e. default of all running in single + // process. Same for regionserver. + static final String DEFAULT_MASTER_ADDRESS = "localhost:60000"; + static final String REGIONSERVER_ADDRESS = "hbase.regionserver"; + static final String DEFAULT_REGIONSERVER_ADDRESS = + "localhost:60010"; static final String HREGION_DIR = "hbase.regiondir"; static final String DEFAULT_HREGION_DIR = "/hbase"; static final String HREGIONDIR_PREFIX = "hregion_"; @@ -37,10 +48,10 @@ // Do we ever need to know all the information that we are storing? static final Text ROOT_TABLE_NAME = new Text("--ROOT--"); - static final Text ROOT_COLUMN_FAMILY = new Text("info"); - static final Text ROOT_COL_REGIONINFO = new Text(ROOT_COLUMN_FAMILY + ":" + "regioninfo"); - static final Text ROOT_COL_SERVER = new Text(ROOT_COLUMN_FAMILY + ":" + "server"); - static final Text ROOT_COL_STARTCODE = new Text(ROOT_COLUMN_FAMILY + ":" + "serverstartcode"); + static final Text ROOT_COLUMN_FAMILY = new Text("info:"); + static final Text ROOT_COL_REGIONINFO = new Text(ROOT_COLUMN_FAMILY + "regioninfo"); + static final Text ROOT_COL_SERVER = new Text(ROOT_COLUMN_FAMILY + "server"); + static final Text ROOT_COL_STARTCODE = new Text(ROOT_COLUMN_FAMILY + "serverstartcode"); static final Text META_TABLE_NAME = new Text("--META--"); static final Text META_COLUMN_FAMILY = new Text(ROOT_COLUMN_FAMILY); Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HServerAddress.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HServerAddress.java (.../vendor/current) (revision 7443) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HServerAddress.java (.../dev) (revision 7443) @@ -33,6 +33,12 @@ this.stringValue = null; } + public HServerAddress(InetSocketAddress address) { + this.address = address; + this.stringValue = new String(address.getAddress().getHostAddress() + + ":" + address.getPort()); + } + public HServerAddress(String hostAndPort) { int colonIndex = hostAndPort.indexOf(':'); if(colonIndex < 0) { Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java (.../vendor/current) (revision 7443) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java (.../dev) (revision 7443) @@ -15,29 +15,39 @@ */ package org.apache.hadoop.hbase; -import org.apache.hadoop.io.*; -import org.apache.hadoop.ipc.*; -import org.apache.hadoop.conf.*; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.Random; +import java.util.TreeMap; +import java.util.TreeSet; -import java.io.*; -import java.util.*; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.RPC; +import org.apache.log4j.Logger; /******************************************************************************* * HClient manages a connection to a single HRegionServer. ******************************************************************************/ -public class HClient extends HGlobals implements HConstants { +public class HClient implements HConstants { + private final Logger LOG = + Logger.getLogger(this.getClass().getName()); + private static final Text[] metaColumns = { META_COLUMN_FAMILY }; private static final Text startRow = new Text(); private boolean closed; - private Configuration conf; - private HServerAddress masterLocation; private long clientTimeout; private int numTimeouts; private int numRetries; private HMasterInterface master; + private final Configuration conf; private class TableInfo { public HRegionInfo regionInfo; @@ -72,16 +82,11 @@ public HClient(Configuration conf) { this.closed = false; this.conf = conf; + + this.clientTimeout = conf.getLong("hbase.client.timeout.length", 10 * 1000); + this.numTimeouts = conf.getInt("hbase.client.timeout.number", 5); + this.numRetries = conf.getInt("hbase.client.retries.number", 2); - // Load config settings - - this.masterLocation = new HServerAddress(this.conf.get(MASTER_DEFAULT_NAME)); - this.clientTimeout = this.conf.getLong("hbase.client.timeout.length", 10 * 1000); - this.numTimeouts = this.conf.getInt("hbase.client.timeout.number", 5); - this.numRetries = this.conf.getInt("hbase.client.retries.number", 2); - - // Finish initialization - this.master = null; this.tablesToServers = new TreeMap>(); this.tableServers = null; @@ -94,6 +99,26 @@ this.rand = new Random(); } + public synchronized void createTable(HTableDescriptor desc) throws IOException { + if(closed) { + throw new IllegalStateException("client is not open"); + } + if(master == null) { + locateRootRegion(); + } + master.createTable(desc); + } + + public synchronized void deleteTable(Text tableName) throws IOException { + if(closed) { + throw new IllegalStateException("client is not open"); + } + if(master == null) { + locateRootRegion(); + } + master.deleteTable(tableName); + } + public synchronized void openTable(Text tableName) throws IOException { if(closed) { throw new IllegalStateException("client is not open"); @@ -145,9 +170,11 @@ */ private void locateRootRegion() throws IOException { if(master == null) { + HServerAddress masterLocation = + new HServerAddress(this.conf.get(MASTER_ADDRESS)); master = (HMasterInterface)RPC.getProxy(HMasterInterface.class, - HMasterInterface.versionID, - masterLocation.getInetSocketAddress(), conf); + HMasterInterface.versionID, + masterLocation.getInetSocketAddress(), conf); } int tries = 0; @@ -160,7 +187,6 @@ if(rootRegionLocation == null) { try { Thread.sleep(clientTimeout); - } catch(InterruptedException iex) { } localTimeouts++; @@ -174,9 +200,9 @@ HRegionInterface rootRegion = getHRegionConnection(rootRegionLocation); - if(rootRegion.getRegionInfo(rootRegionInfo.regionName) != null) { + if(rootRegion.getRegionInfo(HGlobals.rootRegionInfo.regionName) != null) { tableServers = new TreeMap(); - tableServers.put(startRow, new TableInfo(rootRegionInfo, rootRegionLocation)); + tableServers.put(startRow, new TableInfo(HGlobals.rootRegionInfo, rootRegionLocation)); tablesToServers.put(ROOT_TABLE_NAME, tableServers); break; } @@ -202,43 +228,54 @@ /* * Scans a single meta region - * @param t - the table we're going to scan - * @param tableName - the name of the table we're looking for + * @param t the table we're going to scan + * @param tableName the name of the table we're looking for */ private void scanOneMetaRegion(TableInfo t, Text tableName) throws IOException { HRegionInterface server = getHRegionConnection(t.serverAddress); - HScannerInterface scanner = null; + long scannerId = -1L; try { - scanner = server.openScanner(t.regionInfo.regionName, metaColumns, tableName); - HStoreKey key = new HStoreKey(); - TreeMap results = new TreeMap(); + scannerId = server.openScanner(t.regionInfo.regionName, metaColumns, tableName); + DataInputBuffer inbuf = new DataInputBuffer(); + while(true) { + HStoreKey key = new HStoreKey(); - while(scanner.next(key, results)) { - byte hRegionInfoBytes[] = results.get(META_COL_REGIONINFO); - inbuf.reset(hRegionInfoBytes, hRegionInfoBytes.length); + LabelledData[] values = server.next(scannerId, key); + if(values.length == 0) { + break; + } + + TreeMap results = new TreeMap(); + for(int i = 0; i < values.length; i++) { + results.put(values[i].getLabel(), values[i].getData().get()); + } HRegionInfo regionInfo = new HRegionInfo(); + byte[] bytes = results.get(META_COL_REGIONINFO); + inbuf.reset(bytes, bytes.length); regionInfo.readFields(inbuf); - - if(! regionInfo.tableDesc.getName().equals(tableName)) { + + if(!regionInfo.tableDesc.getName().equals(tableName)) { // We're done break; } - - byte serverBytes[] = results.get(META_COL_SERVER); - String serverName = new String(serverBytes, UTF8_ENCODING); + + bytes = results.get(META_COL_SERVER); + String serverName = new String(bytes, UTF8_ENCODING); tableServers.put(regionInfo.startKey, new TableInfo(regionInfo, new HServerAddress(serverName))); - results.clear(); } + } finally { - scanner.close(); + if(scannerId != -1L) { + server.close(scannerId); + } } } - public synchronized HRegionInterface getHRegionConnection(HServerAddress regionServer) + synchronized HRegionInterface getHRegionConnection(HServerAddress regionServer) throws IOException { // See if we already have a connection @@ -270,7 +307,7 @@ * catalog table that just contains table names and their descriptors. * Right now, it only exists as part of the META table's region info. */ - public HTableDescriptor[] listTables() throws IOException { + public synchronized HTableDescriptor[] listTables() throws IOException { TreeSet uniqueTables = new TreeSet(); TreeMap metaTables = tablesToServers.get(META_TABLE_NAME); @@ -280,37 +317,47 @@ metaTables = tablesToServers.get(META_TABLE_NAME); } - for(Iteratori = metaTables.values().iterator(); i.hasNext(); ) { - TableInfo t = i.next(); + for(Iteratorit = metaTables.values().iterator(); it.hasNext(); ) { + TableInfo t = it.next(); HRegionInterface server = getHRegionConnection(t.serverAddress); - HScannerInterface scanner = null; + long scannerId = -1L; try { - scanner = server.openScanner(t.regionInfo.regionName, metaColumns, startRow); + scannerId = server.openScanner(t.regionInfo.regionName, metaColumns, startRow); HStoreKey key = new HStoreKey(); - TreeMap results = new TreeMap(); + DataInputBuffer inbuf = new DataInputBuffer(); - while(scanner.next(key, results)) { - byte infoBytes[] = (byte[]) results.get(ROOT_COL_REGIONINFO); - inbuf.reset(infoBytes, infoBytes.length); - HRegionInfo info = new HRegionInfo(); - info.readFields(inbuf); + while(true) { + LabelledData[] values = server.next(scannerId, key); + if(values.length == 0) { + break; + } - // Only examine the rows where the startKey is zero length - - if(info.startKey.getLength() == 0) { - uniqueTables.add(info.tableDesc); + for(int i = 0; i < values.length; i++) { + if(values[i].getLabel().equals(META_COL_REGIONINFO)) { + byte[] bytes = values[i].getData().get(); + inbuf.reset(bytes, bytes.length); + HRegionInfo info = new HRegionInfo(); + info.readFields(inbuf); + + // Only examine the rows where the startKey is zero length + + if(info.startKey.getLength() == 0) { + uniqueTables.add(info.tableDesc); + } + } } - results.clear(); } } finally { - scanner.close(); + if(scannerId != -1L) { + server.close(scannerId); + } } } return (HTableDescriptor[]) uniqueTables.toArray(new HTableDescriptor[uniqueTables.size()]); } - private TableInfo getTableInfo(Text row) { + private synchronized TableInfo getTableInfo(Text row) { if(tableServers == null) { throw new IllegalStateException("Must open table first"); } @@ -368,7 +415,7 @@ * Get a scanner on the current table starting at the specified row. * Return the specified columns. */ - public HScannerInterface obtainScanner(Text[] columns, Text startRow) throws IOException { + public synchronized HScannerInterface obtainScanner(Text[] columns, Text startRow) throws IOException { if(tableServers == null) { throw new IllegalStateException("Must open table first"); } @@ -462,7 +509,7 @@ private TableInfo[] regions; private int currentRegion; private HRegionInterface server; - private HScannerInterface scanner; + private long scannerId; public ClientScanner(Text[] columns, Text startRow) throws IOException { this.columns = columns; @@ -472,7 +519,7 @@ this.regions = info.toArray(new TableInfo[info.size()]); this.currentRegion = -1; this.server = null; - this.scanner = null; + this.scannerId = -1L; nextScanner(); } @@ -481,8 +528,9 @@ * Returns false if there are no more scanners. */ private boolean nextScanner() throws IOException { - if(scanner != null) { - scanner.close(); + if(scannerId != -1L) { + server.close(scannerId); + scannerId = -1L; } currentRegion += 1; if(currentRegion == regions.length) { @@ -491,7 +539,7 @@ } try { server = getHRegionConnection(regions[currentRegion].serverAddress); - scanner = server.openScanner(regions[currentRegion].regionInfo.regionName, + scannerId = server.openScanner(regions[currentRegion].regionInfo.regionName, columns, startRow); } catch(IOException e) { @@ -508,26 +556,63 @@ if(closed) { return false; } - boolean status = scanner.next(key, results); - if(! status) { - status = nextScanner(); - if(status) { - status = scanner.next(key, results); - } + LabelledData[] values = null; + do { + values = server.next(scannerId, key); + } while(values.length == 0 && nextScanner()); + + for(int i = 0; i < values.length; i++) { + results.put(values[i].getLabel(), values[i].getData().get()); } - return status; + return values.length != 0; } /* (non-Javadoc) * @see org.apache.hadoop.hbase.HScannerInterface#close() */ public void close() throws IOException { - if(scanner != null) { - scanner.close(); + if(scannerId != -1L) { + server.close(scannerId); } server = null; closed = true; } } - -} + + private void printUsage() { + System.err.println("Usage: java " + this.getClass().getName() + + " [--master=hostname:port]"); + } + + private int doCommandLine(final String args[]) { + // Process command-line args. TODO: Better cmd-line processing + // (but hopefully something not as painful as cli options). + for (String cmd: args) { + if (cmd.equals("-h") || cmd.startsWith("--h")) { + printUsage(); + return 0; + } + + final String masterArgKey = "--master="; + if (cmd.startsWith(masterArgKey)) { + this.conf.set(MASTER_ADDRESS, + cmd.substring(masterArgKey.length())); + } + } + + int errCode = -1; + try { + locateRootRegion(); + } catch (Exception e) { + e.printStackTrace(); + } + + return errCode; + } + + public static void main(final String args[]) { + Configuration c = new HBaseConfiguration(); + int errCode = (new HClient(c)).doCommandLine(args); + System.exit(errCode); + } +} \ No newline at end of file Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterRegionInterface.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterRegionInterface.java (.../vendor/current) (revision 7443) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterRegionInterface.java (.../dev) (revision 7443) @@ -15,13 +15,15 @@ */ package org.apache.hadoop.hbase; -import java.io.*; +import java.io.IOException; +import org.apache.hadoop.ipc.VersionedProtocol; + /******************************************************************************* * HRegionServers interact with the HMasterRegionInterface to report on local * goings-on and to obtain data-handling instructions from the HMaster. *********************************************/ -public interface HMasterRegionInterface { - public static final long versionId = 1L; +public interface HMasterRegionInterface extends VersionedProtocol { + public static final long versionID = 1L; public void regionServerStartup(HServerInfo info) throws IOException; public HMsg[] regionServerReport(HServerInfo info, HMsg msgs[]) throws IOException; } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java (.../vendor/current) (revision 7443) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java (.../dev) (revision 7443) @@ -353,4 +353,4 @@ insideCacheFlush = false; notifyAll(); } -} +} \ No newline at end of file Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (.../vendor/current) (revision 7443) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (.../dev) (revision 7443) @@ -312,7 +312,7 @@ for(Iterator it = this.regionInfo.tableDesc.families().iterator(); it.hasNext(); ) { - Text colFamily = it.next(); + Text colFamily = HStoreKey.extractFamily(it.next()); stores.put(colFamily, new HStore(dir, this.regionInfo.regionName, colFamily, this.regionInfo.tableDesc.getMaxVersions(), fs, oldLogFile, conf)); } @@ -345,6 +345,7 @@ /** Closes and deletes this HRegion. Called when doing a table deletion, for example */ public void closeAndDelete() throws IOException { + LOG.info("deleting region: " + regionInfo.regionName); close(); fs.delete(regiondir); } @@ -808,8 +809,7 @@ // Make sure this is a valid row and valid column checkRow(row); - Text colFamily = HStoreKey.extractFamily(column); - checkFamily(colFamily); + checkColumn(column); // Obtain the row-lock @@ -911,7 +911,7 @@ /** * Put a cell value into the locked row. The user indicates the row-lock, the * target column, and the desired value. This stuff is set into a temporary - * memory area until the user commits the change, at which pointit's logged + * memory area until the user commits the change, at which point it's logged * and placed into the memcache. * * This method really just tests the input, then calls an internal localput() @@ -950,9 +950,11 @@ * (Or until the user's write-lock expires.) */ void localput(long lockid, Text targetCol, byte[] val) throws IOException { + checkColumn(targetCol); + Text row = getRowFromLock(lockid); if(row == null) { - throw new IOException("No write lock for lockid " + lockid); + throw new LockException("No write lock for lockid " + lockid); } // This sync block makes localput() thread-safe when multiple @@ -965,7 +967,7 @@ // hasn't aborted/committed the write-operation. if(row != getRowFromLock(lockid)) { - throw new IOException("Locking error: put operation on lock " + lockid + throw new LockException("Locking error: put operation on lock " + lockid + " unexpected aborted by another thread"); } @@ -986,7 +988,7 @@ public void abort(long lockid) throws IOException { Text row = getRowFromLock(lockid); if(row == null) { - throw new IOException("No write lock for lockid " + lockid); + throw new LockException("No write lock for lockid " + lockid); } // This sync block makes abort() thread-safe when multiple @@ -999,7 +1001,7 @@ // hasn't aborted/committed the write-operation. if(row != getRowFromLock(lockid)) { - throw new IOException("Locking error: abort() operation on lock " + throw new LockException("Locking error: abort() operation on lock " + lockid + " unexpected aborted by another thread"); } @@ -1022,7 +1024,7 @@ Text row = getRowFromLock(lockid); if(row == null) { - throw new IOException("No write lock for lockid " + lockid); + throw new LockException("No write lock for lockid " + lockid); } // This check makes sure that another thread from the client @@ -1066,9 +1068,10 @@ + "', endKey='" + regionInfo.endKey + "', row='" + row + "'"); } } - + /** Make sure this is a valid column for the current table */ - void checkFamily(Text family) throws IOException { + void checkColumn(Text columnName) throws IOException { + Text family = new Text(HStoreKey.extractFamily(columnName) + ":"); if(! regionInfo.tableDesc.hasFamily(family)) { throw new IOException("Requested column family " + family + " does not exist in HRegion " + regionInfo.regionName @@ -1092,6 +1095,8 @@ * which maybe we'll do in the future. */ long obtainLock(Text row) throws IOException { + checkRow(row); + synchronized(rowsToLocks) { while(rowsToLocks.get(row) != null) { try { @@ -1109,6 +1114,8 @@ } Text getRowFromLock(long lockid) throws IOException { + // Pattern is that all access to rowsToLocks and/or to + // locksToRows is via a lock on rowsToLocks. synchronized(rowsToLocks) { return locksToRows.get(lockid); } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/LabelledData.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/LabelledData.java (.../vendor/current) (revision 7443) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/LabelledData.java (.../dev) (revision 7443) @@ -32,7 +32,7 @@ } public LabelledData(Text label, byte[] data) { - this.label.set(label); + this.label = new Text(label); this.data = new BytesWritable(data); } @@ -40,7 +40,7 @@ return label; } - public BytesWritable getDat() { + public BytesWritable getData() { return data; } Property changes on: . ___________________________________________________________________ Name: svk:merge + 8368d6a0-ff38-4065-b5b8-cf688d7fb560:/thirdparty/hadoop/vendor/current:6459