Index: src/test/org/apache/hadoop/hbase/TestGet2.java =================================================================== --- src/test/org/apache/hadoop/hbase/TestGet2.java (revision 630252) +++ src/test/org/apache/hadoop/hbase/TestGet2.java (working copy) @@ -80,7 +80,9 @@ assertColumnsPresent(region, actualStartRow); assertColumnsPresent(region, actualStopRow); // Force a flush so store files come into play. - region.flushcache(); + for (HStore s: region.getStores()) { + region.flushcache(s); + } // Assert I got all out. assertColumnsPresent(region, actualStartRow); assertColumnsPresent(region, actualStopRow); @@ -143,15 +145,17 @@ } } - /** For HADOOP-2443 */ - public void testGetClosestRowBefore() throws IOException{ + /** + * For HADOOP-2443 + * @throws IOException + */ + public void testGetClosestRowBefore() throws IOException { HRegion region = null; HRegionIncommon region_incommon = null; try { HTableDescriptor htd = createTableDescriptor(getName()); - HRegionInfo hri = new HRegionInfo(htd, null, null); region = createNewHRegion(htd, null, null); region_incommon = new HRegionIncommon(region); @@ -193,7 +197,9 @@ assertEquals(new String(results.get(COLUMNS[0])), "t40 bytes"); // force a flush - region.flushcache(); + for (HStore s: region.getStores()) { + region.flushcache(s); + } // try finding "015" results = region.getClosestRowBefore(t15, HConstants.LATEST_TIMESTAMP); Index: src/test/org/apache/hadoop/hbase/TestBloomFilters.java =================================================================== --- src/test/org/apache/hadoop/hbase/TestBloomFilters.java (revision 630252) +++ src/test/org/apache/hadoop/hbase/TestBloomFilters.java (working copy) @@ -26,6 +26,8 @@ import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.io.BatchUpdate; + /** Tests per-column bloom filters */ public class TestBloomFilters extends HBaseClusterTestCase { static final Log LOG = LogFactory.getLog(TestBloomFilters.class); @@ -145,8 +147,6 @@ /** constructor */ public TestBloomFilters() { super(); - conf.set("hbase.hregion.memcache.flush.size", "100");// flush cache every 100 bytes - conf.set("hbase.regionserver.maxlogentries", "90"); // and roll log too } /** @@ -191,9 +191,9 @@ for(int i = 0; i < 100; i++) { Text row = rows[i]; String value = row.toString(); - long lockid = table.startUpdate(rows[i]); - table.put(lockid, CONTENTS, value.getBytes(HConstants.UTF8_ENCODING)); - table.commit(lockid); + BatchUpdate b = new BatchUpdate(row); + b.put(CONTENTS, value.getBytes(HConstants.UTF8_ENCODING)); + table.commit(b); } try { // Give cache flusher and log roller a chance to run @@ -257,9 +257,9 @@ for(int i = 0; i < 100; i++) { Text row = rows[i]; String value = row.toString(); - long lockid = table.startUpdate(rows[i]); - table.put(lockid, CONTENTS, value.getBytes(HConstants.UTF8_ENCODING)); - table.commit(lockid); + BatchUpdate b = new BatchUpdate(row); + b.put(CONTENTS, value.getBytes(HConstants.UTF8_ENCODING)); + table.commit(b); } try { // Give cache flusher and log roller a chance to run Index: src/test/org/apache/hadoop/hbase/HBaseTestCase.java =================================================================== --- src/test/org/apache/hadoop/hbase/HBaseTestCase.java (revision 630252) +++ src/test/org/apache/hadoop/hbase/HBaseTestCase.java (working copy) @@ -328,6 +328,7 @@ public static interface Incommon { /** * @param row + * @return lock id * @throws IOException */ public long startUpdate(Text row) throws IOException; @@ -497,7 +498,9 @@ } /** {@inheritDoc} */ public void flushcache() throws IOException { - this.region.flushcache(); + for (HStore store: this.region.getStores()) { + this.region.flushcache(store); + } } } @@ -506,6 +509,14 @@ */ public static class HTableIncommon implements Incommon { final HTable table; + BatchUpdate batch = null; + + private void checkBatch() { + if (batch == null) { + throw new IllegalStateException("No update in progress"); + } + } + /** * @param table */ @@ -514,24 +525,34 @@ this.table = table; } /** {@inheritDoc} */ - public void abort(long lockid) { - this.table.abort(lockid); + public void abort(@SuppressWarnings("unused") long lockid) { + this.batch = null; } /** {@inheritDoc} */ public void commit(long lockid) throws IOException { - this.table.commit(lockid); + commit(lockid, HConstants.LATEST_TIMESTAMP); } /** {@inheritDoc} */ - public void commit(long lockid, final long ts) throws IOException { - this.table.commit(lockid, ts); + public void commit(@SuppressWarnings("unused") long lockid, final long ts) + throws IOException { + checkBatch(); + try { + this.batch.setTimestamp(ts); + this.table.commit(batch); + } finally { + this.batch = null; + } } /** {@inheritDoc} */ - public void put(long lockid, Text column, byte[] val) { - this.table.put(lockid, column, val); + public void put(@SuppressWarnings("unused") long lockid, Text column, + byte[] val) { + checkBatch(); + this.batch.put(column, val); } /** {@inheritDoc} */ - public void delete(long lockid, Text column) { - this.table.delete(lockid, column); + public void delete(@SuppressWarnings("unused") long lockid, Text column) { + checkBatch(); + this.batch.delete(column); } /** {@inheritDoc} */ public void deleteAll(Text row, Text column, long ts) throws IOException { @@ -539,7 +560,11 @@ } /** {@inheritDoc} */ public long startUpdate(Text row) { - return this.table.startUpdate(row); + if (this.batch != null) { + throw new IllegalStateException("Update already in progress"); + } + this.batch = new BatchUpdate(row); + return 1; } /** {@inheritDoc} */ public HScannerInterface getScanner(Text [] columns, Text firstRow, Index: src/test/org/apache/hadoop/hbase/AbstractMergeTestBase.java =================================================================== --- src/test/org/apache/hadoop/hbase/AbstractMergeTestBase.java (revision 630252) +++ src/test/org/apache/hadoop/hbase/AbstractMergeTestBase.java (working copy) @@ -146,7 +146,6 @@ r.flushcache(); } } - region.compactIfNeeded(); region.close(); region.getLog().closeAndDelete(); region.getRegionInfo().setOffline(true); Index: src/test/org/apache/hadoop/hbase/TestHLog.java =================================================================== --- src/test/org/apache/hadoop/hbase/TestHLog.java (revision 630252) +++ src/test/org/apache/hadoop/hbase/TestHLog.java (working copy) @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase; import java.io.IOException; -import java.util.TreeMap; import org.apache.hadoop.dfs.MiniDFSCluster; import org.apache.hadoop.fs.Path; @@ -71,12 +70,11 @@ for (int ii = 0; ii < 3; ii++) { for (int i = 0; i < 3; i++) { for (int j = 0; j < 3; j++) { - TreeMap edit = new TreeMap(); Text column = new Text(Integer.toString(j)); - edit.put( - new HStoreKey(rowName, column, System.currentTimeMillis()), - column.getBytes()); - log.append(new Text(Integer.toString(i)), tableName, edit); + HStoreKey k = + new HStoreKey(rowName, column, System.currentTimeMillis()); + log.append(new Text(Integer.toString(i) + "/" + column), tableName, + k, column.getBytes()); } } log.rollWriter(); @@ -104,12 +102,11 @@ // Write columns named 1, 2, 3, etc. and then values of single byte // 1, 2, 3... long timestamp = System.currentTimeMillis(); - TreeMap cols = new TreeMap(); for (int i = 0; i < COL_COUNT; i++) { - cols.put(new HStoreKey(row, new Text(Integer.toString(i)), timestamp), - new byte[] { (byte)(i + '0') }); + HStoreKey k = + new HStoreKey(row, new Text(Integer.toString(i)), timestamp); + log.append(regionName, tableName, k, new byte[] { (byte)(i + '0') }); } - log.append(regionName, tableName, cols); long logSeqId = log.startCacheFlush(); log.completeCacheFlush(regionName, tableName, logSeqId); log.close(); @@ -121,7 +118,7 @@ HLogEdit val = new HLogEdit(); for (int i = 0; i < COL_COUNT; i++) { reader.next(key, val); - assertEquals(regionName, key.getRegionName()); + assertEquals(regionName, key.getStoreName()); assertEquals(tableName, key.getTablename()); assertEquals(row, key.getRow()); assertEquals((byte)(i + '0'), val.getVal()[0]); @@ -129,7 +126,7 @@ } while (reader.next(key, val)) { // Assert only one more row... the meta flushed row. - assertEquals(regionName, key.getRegionName()); + assertEquals(regionName, key.getStoreName()); assertEquals(tableName, key.getTablename()); assertEquals(HLog.METAROW, key.getRow()); assertEquals(HLog.METACOLUMN, val.getColumn()); Index: src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java =================================================================== --- src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (revision 630252) +++ src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (working copy) @@ -265,8 +265,10 @@ void flushcache() throws IOException { for (LocalHBaseCluster.RegionServerThread t: this.hbaseCluster.getRegionServers()) { - for(HRegion r: t.getRegionServer().onlineRegions.values() ) { - r.flushcache(); + for(HRegion r: t.getRegionServer().getOnlineRegions().values() ) { + for (HStore s: r.getStores()) { + r.flushcache(s); + } } } } Index: src/test/org/apache/hadoop/hbase/MultiRegionTable.java =================================================================== --- src/test/org/apache/hadoop/hbase/MultiRegionTable.java (revision 630252) +++ src/test/org/apache/hadoop/hbase/MultiRegionTable.java (working copy) @@ -22,7 +22,6 @@ import java.io.IOException; import java.util.ConcurrentModificationException; import java.util.Map; -import java.util.SortedMap; import java.util.TreeMap; import org.apache.commons.logging.Log; @@ -112,7 +111,9 @@ assertNotNull(r); // Flush the cache - server.getCacheFlushListener().flushRequested(r); + for (HStore s: r.getStores()) { + server.flushRequested(s, r); + } // Now, wait until split makes it into the meta table. int oldCount = count; @@ -335,7 +336,7 @@ LOG.info("Starting compaction"); for (LocalHBaseCluster.RegionServerThread thread: cluster.getRegionThreads()) { - SortedMap regions = thread.getRegionServer().onlineRegions; + Map regions = thread.getRegionServer().getOnlineRegions(); // Retry if ConcurrentModification... alternative of sync'ing is not // worth it for sake of unit test. @@ -343,7 +344,9 @@ try { for (HRegion online: regions.values()) { if (online.getRegionName().equals(r.getRegionName())) { - online.compactStores(); + for (HStore s: online.getStores()) { + online.compactStore(s); + } } } break; Index: src/test/org/apache/hadoop/hbase/TestLogRolling.java =================================================================== --- src/test/org/apache/hadoop/hbase/TestLogRolling.java (revision 630252) +++ src/test/org/apache/hadoop/hbase/TestLogRolling.java (working copy) @@ -29,6 +29,8 @@ import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.io.BatchUpdate; + /** * Test log deletion as logs are rolled. */ @@ -59,7 +61,7 @@ conf.setLong("hbase.hregion.max.filesize", 768L * 1024L); // We roll the log after every 256 writes - conf.setInt("hbase.regionserver.maxlogentries", 256); + conf.setInt("hbase.regionserver.maxlogentries", 32); // For less frequently updated regions flush after every 2 flushes conf.setInt("hbase.hregion.memcache.optionalflushcount", 2); @@ -134,7 +136,7 @@ this.log = server.getLog(); // When the META table can be opened, the region servers are running - HTable meta = new HTable(conf, HConstants.META_TABLE_NAME); + new HTable(conf, HConstants.META_TABLE_NAME); // Create the test table and open it HTableDescriptor desc = new HTableDescriptor(tableName); @@ -143,13 +145,13 @@ admin.createTable(desc); HTable table = new HTable(conf, new Text(tableName)); - for (int i = 1; i <= 2048; i++) { // 2048 writes should cause 8 log rolls - long lockid = - table.startUpdate(new Text("row" + String.format("%1$04d", i))); - table.put(lockid, HConstants.COLUMN_FAMILY, value); - table.commit(lockid); + for (int i = 1; i <= 256; i++) { // 256 writes should cause 8 log rolls + BatchUpdate b = + new BatchUpdate(new Text("row" + String.format("%1$04d", i))); + b.put(HConstants.COLUMN_FAMILY, value); + table.commit(b); - if (i % 256 == 0) { + if (i % 32 == 0) { // After every 256 writes sleep to let the log roller run try { @@ -177,7 +179,9 @@ List regions = new ArrayList(server.getOnlineRegions().values()); for (HRegion r: regions) { - r.flushcache(); + for (HStore s: r.getStores()) { + r.flushcache(s); + } } // Now roll the log Index: src/test/org/apache/hadoop/hbase/StaticTestEnvironment.java =================================================================== --- src/test/org/apache/hadoop/hbase/StaticTestEnvironment.java (revision 630252) +++ src/test/org/apache/hadoop/hbase/StaticTestEnvironment.java (working copy) @@ -114,9 +114,10 @@ LOG.setLevel(logLevel); if (!debugging) { - // Turn off all the filter logging unless debug is set. + // Turn off all the and connection logging unless debug is set. // It is way too noisy. Logger.getLogger("org.apache.hadoop.hbase.filter").setLevel(Level.INFO); + Logger.getLogger("org.apache.hadoop.hbase.client").setLevel(Level.INFO); } // Enable mapreduce loggging for the mapreduce jobs. Logger.getLogger("org.apache.hadoop.mapred").setLevel(Level.DEBUG); Index: src/test/org/apache/hadoop/hbase/TestScannerAPI.java =================================================================== --- src/test/org/apache/hadoop/hbase/TestScannerAPI.java (revision 630252) +++ src/test/org/apache/hadoop/hbase/TestScannerAPI.java (working copy) @@ -30,6 +30,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.io.BatchUpdate; /** test the scanner API at all levels */ public class TestScannerAPI extends HBaseClusterTestCase { @@ -80,16 +81,16 @@ HTable table = new HTable(conf, new Text(getName())); for (Map.Entry> row: values.entrySet()) { - long lockid = table.startUpdate(row.getKey()); + BatchUpdate b = new BatchUpdate(row.getKey()); for (Map.Entry val: row.getValue().entrySet()) { - table.put(lockid, val.getKey(), val.getValue()); + b.put(val.getKey(), val.getValue()); } - table.commit(lockid); + table.commit(b); } HRegion region = null; try { - SortedMap regions = + Map regions = cluster.getRegionThreads().get(0).getRegionServer().getOnlineRegions(); for (Map.Entry e: regions.entrySet()) { if (!e.getValue().getRegionInfo().isMetaRegion()) { Index: src/test/org/apache/hadoop/hbase/TestHRegion.java =================================================================== --- src/test/org/apache/hadoop/hbase/TestHRegion.java (revision 630252) +++ src/test/org/apache/hadoop/hbase/TestHRegion.java (working copy) @@ -554,15 +554,17 @@ } } long startCompact = System.currentTimeMillis(); - if(r.compactIfNeeded()) { - totalCompact = System.currentTimeMillis() - startCompact; - System.out.println("Region compacted - elapsedTime: " + (totalCompact / 1000.0)); - - } else { - System.out.println("No compaction required."); + try { + for (HStore s: r.getStores()) { + r.compactStore(s); + } + } catch (IOException e) { + e.printStackTrace(); + throw e; } + totalCompact = System.currentTimeMillis() - startCompact; + System.out.println("Region compacted - elapsedTime: " + (totalCompact / 1000.0)); long endTime = System.currentTimeMillis(); - long totalElapsed = (endTime - startTime); System.out.println(); System.out.println("Batch-write complete."); @@ -582,7 +584,17 @@ private void splitAndMerge() throws IOException { Path oldRegionPath = r.getRegionDir(); long startTime = System.currentTimeMillis(); - HRegion subregions[] = r.splitRegion(this); + Text midKey = null; + for (HStore s: r.getStores()) { + Text k = r.compactStore(s); + if (midKey == null && k != null) { + midKey = k; + } + } + HRegion subregions[] = null; + if (midKey != null) { + subregions = r.splitRegion(this, midKey); + } if (subregions != null) { System.out.println("Split region elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); Index: src/test/org/apache/hadoop/hbase/TestSplit.java =================================================================== --- src/test/org/apache/hadoop/hbase/TestSplit.java (revision 630252) +++ src/test/org/apache/hadoop/hbase/TestSplit.java (working copy) @@ -88,10 +88,18 @@ private void basicSplit(final HRegion region) throws Exception { addContent(region, COLFAMILY_NAME3); - region.flushcache(); - Text midkey = new Text(); - assertTrue(region.needsSplit(midkey)); - HRegion [] regions = split(region); + for (HStore s: region.getStores()) { + region.flushcache(s); + } + Text midkey = null; + for (HStore s: region.getStores()) { + Text k = region.compactStore(s); + if (midkey == null && k != null) { + midkey = k; + } + } + assertNotNull(midkey); + HRegion [] regions = split(region, midkey); try { // Need to open the regions. // TODO: Add an 'open' to HRegion... don't do open by constructing @@ -107,49 +115,42 @@ assertScan(regions[0], COLFAMILY_NAME3, new Text(START_KEY)); assertScan(regions[1], COLFAMILY_NAME3, midkey); // Now prove can't split regions that have references. - Text[] midkeys = new Text[regions.length]; for (int i = 0; i < regions.length; i++) { - midkeys[i] = new Text(); - // Even after above splits, still needs split but after splits its - // unsplitable because biggest store file is reference. References - // make the store unsplittable, until something bigger comes along. - assertFalse(regions[i].needsSplit(midkeys[i])); // Add so much data to this region, we create a store file that is > - // than - // one of our unsplitable references. - // it will. + // than one of our unsplitable references. it will. for (int j = 0; j < 2; j++) { addContent(regions[i], COLFAMILY_NAME3); } addContent(regions[i], COLFAMILY_NAME2); addContent(regions[i], COLFAMILY_NAME1); - regions[i].flushcache(); + for (HStore s: regions[i].getStores()) { + regions[i].flushcache(s); + } } - // Assert that even if one store file is larger than a reference, the - // region is still deemed unsplitable (Can't split region if references - // presen). - for (int i = 0; i < regions.length; i++) { - midkeys[i] = new Text(); - // Even after above splits, still needs split but after splits its - // unsplitable because biggest store file is reference. References - // make the store unsplittable, until something bigger comes along. - assertFalse(regions[i].needsSplit(midkeys[i])); - } - + Text[] midkeys = new Text[regions.length]; // To make regions splitable force compaction. for (int i = 0; i < regions.length; i++) { - regions[i].compactStores(); + midkeys[i] = null; + for (HStore s: regions[i].getStores()) { + Text k = regions[i].compactStore(s); + if (midkeys[i] == null && k != null) { + midkeys[i] = k; + } + } } TreeMap sortedMap = new TreeMap(); // Split these two daughter regions so then I'll have 4 regions. Will // split because added data above. for (int i = 0; i < regions.length; i++) { - HRegion[] rs = split(regions[i]); - for (int j = 0; j < rs.length; j++) { - sortedMap.put(rs[j].getRegionName().toString(), - openClosedRegion(rs[j])); + HRegion[] rs = null; + if (midkeys[i] != null) { + rs = split(regions[i], midkeys[i]); + for (int j = 0; j < rs.length; j++) { + sortedMap.put(rs[j].getRegionName().toString(), + openClosedRegion(rs[j])); + } } } LOG.info("Made 4 regions"); @@ -220,12 +221,11 @@ } } - private HRegion [] split(final HRegion r) throws IOException { - Text midKey = new Text(); - assertTrue(r.needsSplit(midKey)); + private HRegion [] split(final HRegion r, final Text midKey) + throws IOException { // Assert can get mid key from passed region. assertGet(r, COLFAMILY_NAME3, midKey); - HRegion [] regions = r.splitRegion(null); + HRegion [] regions = r.splitRegion(null, midKey); assertEquals(regions.length, 2); return regions; } Index: src/test/org/apache/hadoop/hbase/TestCompaction.java =================================================================== --- src/test/org/apache/hadoop/hbase/TestCompaction.java (revision 630252) +++ src/test/org/apache/hadoop/hbase/TestCompaction.java (working copy) @@ -86,7 +86,6 @@ */ public void testCompaction() throws Exception { createStoreFile(r); - assertFalse(r.compactIfNeeded()); for (int i = 0; i < COMPACTION_THRESHOLD; i++) { createStoreFile(r); } @@ -98,35 +97,22 @@ byte [][] bytes = this.r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/); // Assert that I can get > 5 versions (Should be at least 5 in there). assertTrue(bytes.length >= 5); - // Try to run compaction concurrent with a thread flush just to see that - // we can. - final HRegion region = this.r; - Thread t1 = new Thread() { - @Override - public void run() { - try { - region.flushcache(); - } catch (IOException e) { - e.printStackTrace(); - } + try { + for (HStore s: r.getStores()) { + r.flushcache(s); } - }; - Thread t2 = new Thread() { - @Override - public void run() { - try { - assertTrue(region.compactIfNeeded()); - } catch (IOException e) { - e.printStackTrace(); - } + } catch (IOException e) { + e.printStackTrace(); + throw e; + } + try { + for (HStore s: r.getStores()) { + r.compactStore(s); } - }; - t1.setDaemon(true); - t1.start(); - t2.setDaemon(true); - t2.start(); - t1.join(); - t2.join(); + } catch (IOException e) { + e.printStackTrace(); + throw e; + } // Now assert that there are 4 versions of a record only: thats the // 3 versions that should be in the compacted store and then the one more // we added when we flushed. But could be 3 only if the flush happened @@ -154,7 +140,12 @@ // verify that it is removed as we compact. // Assert all delted. assertNull(this.r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/)); - this.r.flushcache(); + for (HStore s: r.getStores()) { + this.r.flushcache(s); + } + for (HStore s: r.getStores()) { + this.r.compactStore(s); + } assertNull(this.r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/)); // Add a bit of data and flush it so we for sure have the compaction limit // for store files. Usually by this time we will have but if compaction @@ -162,7 +153,22 @@ // compacted store and the flush above when we added deletes. Add more // content to be certain. createSmallerStoreFile(this.r); - assertTrue(this.r.compactIfNeeded()); + try { + for (HStore s: r.getStores()) { + r.flushcache(s); + } + } catch (IOException e) { + e.printStackTrace(); + throw e; + } + try { + for (HStore s: r.getStores()) { + r.compactStore(s); + } + } catch (IOException e) { + e.printStackTrace(); + throw e; + } // Assert that the first row is still deleted. bytes = this.r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/); assertNull(bytes); Index: src/test/org/apache/hadoop/hbase/util/TestMigrate.java =================================================================== --- src/test/org/apache/hadoop/hbase/util/TestMigrate.java (revision 630252) +++ src/test/org/apache/hadoop/hbase/util/TestMigrate.java (working copy) @@ -86,7 +86,7 @@ * First load files from an old style HBase file structure */ - // Current directory is .../workspace/project/build/contrib/hbase/test/data + // Current directory is .../project/build/test/data FileSystem localfs = FileSystem.getLocal(conf); @@ -96,11 +96,11 @@ // this path is for running test with ant - "../../../../../src/contrib/hbase/src/testdata/HADOOP-2478-testdata.zip") + "../../../src/testdata/HADOOP-2478-testdata.zip") // and this path is for when you want to run inside eclipse - /*"src/contrib/hbase/src/testdata/HADOOP-2478-testdata.zip")*/ + /*"src/testdata/HADOOP-2478-testdata.zip")*/ ); ZipInputStream zip = new ZipInputStream(hs); Index: src/java/org/apache/hadoop/hbase/HLogKey.java =================================================================== --- src/java/org/apache/hadoop/hbase/HLogKey.java (revision 630252) +++ src/java/org/apache/hadoop/hbase/HLogKey.java (working copy) @@ -31,7 +31,7 @@ * also sorted. */ public class HLogKey implements WritableComparable { - Text regionName = new Text(); + Text storeName = new Text(); Text tablename = new Text(); Text row = new Text(); long logSeqNum = 0L; @@ -46,14 +46,14 @@ * We maintain the tablename mainly for debugging purposes. * A regionName is always a sub-table object. * - * @param regionName - name of region + * @param storeName - name of HStore * @param tablename - name of table * @param row - row key * @param logSeqNum - log sequence number */ - public HLogKey(Text regionName, Text tablename, Text row, long logSeqNum) { + public HLogKey(Text storeName, Text tablename, Text row, long logSeqNum) { // TODO: Is this copy of the instances necessary? They are expensive. - this.regionName.set(regionName); + this.storeName.set(storeName); this.tablename.set(tablename); this.row.set(row); this.logSeqNum = logSeqNum; @@ -63,8 +63,8 @@ // A bunch of accessors ////////////////////////////////////////////////////////////////////////////// - Text getRegionName() { - return regionName; + Text getStoreName() { + return storeName; } Text getTablename() { @@ -84,7 +84,7 @@ */ @Override public String toString() { - return tablename + "/" + regionName + "/" + row + "/" + logSeqNum; + return tablename + "/" + storeName + "/" + row + "/" + logSeqNum; } /** @@ -100,7 +100,7 @@ */ @Override public int hashCode() { - int result = this.regionName.hashCode(); + int result = this.storeName.hashCode(); result ^= this.row.hashCode(); result ^= this.logSeqNum; return result; @@ -115,7 +115,7 @@ */ public int compareTo(Object o) { HLogKey other = (HLogKey) o; - int result = this.regionName.compareTo(other.regionName); + int result = this.storeName.compareTo(other.getStoreName()); if(result == 0) { result = this.row.compareTo(other.row); @@ -141,7 +141,7 @@ * {@inheritDoc} */ public void write(DataOutput out) throws IOException { - this.regionName.write(out); + this.storeName.write(out); this.tablename.write(out); this.row.write(out); out.writeLong(logSeqNum); @@ -151,7 +151,7 @@ * {@inheritDoc} */ public void readFields(DataInput in) throws IOException { - this.regionName.readFields(in); + this.storeName.readFields(in); this.tablename.readFields(in); this.row.readFields(in); this.logSeqNum = in.readLong(); Index: src/java/org/apache/hadoop/hbase/HStore.java =================================================================== --- src/java/org/apache/hadoop/hbase/HStore.java (revision 630252) +++ src/java/org/apache/hadoop/hbase/HStore.java (working copy) @@ -31,6 +31,7 @@ import java.util.TreeMap; import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -39,11 +40,11 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.filter.RowFilterInterface; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.io.TextSequence; import org.apache.hadoop.io.MapFile; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; @@ -78,9 +79,9 @@ @SuppressWarnings("hiding") private final SortedMap memcache = Collections.synchronizedSortedMap(new TreeMap()); - + volatile SortedMap snapshot; - + @SuppressWarnings("hiding") private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); @@ -108,7 +109,7 @@ this.lock.writeLock().unlock(); } } - + /** * @return memcache snapshot */ @@ -118,14 +119,14 @@ SortedMap currentSnapshot = snapshot; snapshot = Collections.synchronizedSortedMap(new TreeMap()); - + return currentSnapshot; } finally { this.lock.writeLock().unlock(); } } - + /** * Store a value. * @param key @@ -135,7 +136,7 @@ this.lock.readLock().lock(); try { memcache.put(key, value); - + } finally { this.lock.readLock().unlock(); } @@ -159,7 +160,7 @@ internalGet(snapshot, key, numVersions - results.size())); } return results; - + } finally { this.lock.readLock().unlock(); } @@ -215,14 +216,17 @@ /** * Find the key that matches row exactly, or the one that immediately * preceeds it. + * + * @param row + * @param timestamp + * @return row key */ - public Text getRowKeyAtOrBefore(final Text row, long timestamp) - throws IOException{ + public Text getRowKeyAtOrBefore(final Text row, long timestamp) { this.lock.readLock().lock(); - + Text key_memcache = null; Text key_snapshot = null; - + try { synchronized (memcache) { key_memcache = internalGetRowKeyAtOrBefore(memcache, row, timestamp); @@ -238,24 +242,24 @@ return key_snapshot; } else if (key_memcache != null && key_snapshot == null) { return key_memcache; - } else { + } else if ( (key_memcache != null && key_memcache.equals(row)) + || (key_snapshot != null && key_snapshot.equals(row)) ) { // if either is a precise match, return the original row. - if ( (key_memcache != null && key_memcache.equals(row)) - || (key_snapshot != null && key_snapshot.equals(row)) ) { - return row; - } + return row; + } else if (key_memcache != null) { // no precise matches, so return the one that is closer to the search // key (greatest) return key_memcache.compareTo(key_snapshot) > 0 ? - key_memcache : key_snapshot; + key_memcache : key_snapshot; } + return null; } finally { this.lock.readLock().unlock(); } } private Text internalGetRowKeyAtOrBefore(SortedMap map, - Text key, long timestamp) { + Text key, long timestamp) { // TODO: account for deleted cells HStoreKey search_key = new HStoreKey(key, timestamp); @@ -270,31 +274,31 @@ // argument Iterator key_iterator = tailMap.keySet().iterator(); HStoreKey found_key = key_iterator.next(); - + // keep seeking so long as we're in the same row, and the timstamp // isn't as small as we'd like, and there are more cells to check while (found_key.getRow().equals(key) - && found_key.getTimestamp() > timestamp && key_iterator.hasNext()) { + && found_key.getTimestamp() > timestamp && key_iterator.hasNext()) { found_key = key_iterator.next(); } - + // if this check fails, then we've iterated through all the keys that // match by row, but none match by timestamp, so we fall through to // the headMap case. if (found_key.getTimestamp() <= timestamp) { // we didn't find a key that matched by timestamp, so we have to // return null; -/* LOG.debug("Went searching for " + key + ", found " + found_key.getRow());*/ + /* LOG.debug("Went searching for " + key + ", found " + found_key.getRow());*/ return found_key.getRow(); } } - + // the tail didn't contain the key we're searching for, so we should // use the last key in the headmap as the closest before SortedMap headMap = map.headMap(search_key); return headMap.isEmpty()? null: headMap.lastKey().getRow(); } - + /** * Examine a single map for the desired key. * @@ -353,7 +357,7 @@ (versions - results.size()))); } return results; - + } finally { this.lock.readLock().unlock(); } @@ -374,7 +378,7 @@ SortedMap tailMap = map.tailMap(origin); for (Map.Entry es: tailMap.entrySet()) { HStoreKey key = es.getKey(); - + // if there's no column name, then compare rows and timestamps if (origin.getColumn().toString().equals("")) { // if the current and origin row don't match, then we can jump @@ -428,7 +432,7 @@ // locks by the same thread and to be able to downgrade a write lock to // a read lock. We need to hold a lock throughout this method, but only // need the write lock while creating the memcache snapshot - + this.lock.writeLock().lock(); // hold write lock during memcache snapshot snapshot(); // snapshot memcache this.lock.readLock().lock(); // acquire read lock @@ -437,7 +441,7 @@ // Prevent a cache flush while we are constructing the scanner return new MemcacheScanner(timestamp, targetCols, firstRow); - + } finally { this.lock.readLock().unlock(); } @@ -466,22 +470,22 @@ // Generate list of iterators HStoreKey firstKey = new HStoreKey(firstRow); - if (firstRow != null && firstRow.getLength() != 0) { - keyIterator = - backingMap.tailMap(firstKey).keySet().iterator(); + if (firstRow != null && firstRow.getLength() != 0) { + keyIterator = + backingMap.tailMap(firstKey).keySet().iterator(); - } else { - keyIterator = backingMap.keySet().iterator(); - } + } else { + keyIterator = backingMap.keySet().iterator(); + } - while (getNext(0)) { - if (!findFirstRow(0, firstRow)) { - continue; - } - if (columnMatch(0)) { - break; - } + while (getNext(0)) { + if (!findFirstRow(0, firstRow)) { + continue; } + if (columnMatch(0)) { + break; + } + } } catch (RuntimeException ex) { LOG.error("error initializing Memcache scanner: ", ex); close(); @@ -571,7 +575,13 @@ private static final String BLOOMFILTER_FILE_NAME = "filter"; final Memcache memcache = new Memcache(); + private final AtomicLong memcacheSize = new AtomicLong(0L); + private volatile long lastFlushTime; + final long memcacheFlushSize; + final long desiredMaxFileSize; + private volatile long storeSize; private final Path basedir; + final HRegion region; private final HRegionInfo info; private final HColumnDescriptor family; private final SequenceFile.CompressionType compression; @@ -585,9 +595,9 @@ private final Integer flushLock = new Integer(0); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - final AtomicInteger activeScanners = new AtomicInteger(0); + volatile AtomicInteger activeScanners = new AtomicInteger(0); - final String storeName; + final Text storeName; /* * Sorted Map of readers keyed by sequence id (Most recent should be last in @@ -603,11 +613,17 @@ private final SortedMap readers = new TreeMap(); + // The most-recent log-seq-ID that's present. The most-recent such ID means + // we can ignore all log messages up to and including that ID (because they're + // already reflected in the TreeMaps). private volatile long maxSeqId; + private final int compactionThreshold; private final ReentrantReadWriteLock newScannerLock = new ReentrantReadWriteLock(); + private volatile boolean compactionRequested = false; + /** * An HStore is a set of zero or more MapFiles, which stretch backwards over * time. A given HStore is responsible for a certain set of columns for a @@ -633,27 +649,42 @@ * file will be deleted (by whoever has instantiated the HStore). * * @param basedir qualified path under which the region directory lives - * @param info HRegionInfo for this region + * @param region region that this store belongs to * @param family HColumnDescriptor for this column * @param fs file system object * @param reconstructionLog existing log file to apply if any * @param conf configuration object * @throws IOException */ - HStore(Path basedir, HRegionInfo info, HColumnDescriptor family, + HStore(Path basedir, HRegion region, HColumnDescriptor family, FileSystem fs, Path reconstructionLog, HBaseConfiguration conf) throws IOException { - + this.basedir = basedir; - this.info = info; + this.region = region; + this.info = this.region.getRegionInfo(); this.family = family; this.fs = fs; this.conf = conf; - + this.lastFlushTime = System.currentTimeMillis(); + + // By default, we compact if an HStore has more than + // MIN_COMMITS_FOR_COMPACTION map files + this.compactionThreshold = + conf.getInt("hbase.hstore.compactionThreshold", 3); + + // By default, we flush the cache when 64M. + this.memcacheFlushSize = conf.getInt("hbase.hregion.memcache.flush.size", + 1024*1024*64); + + // By default we split region if a file > DEFAULT_MAX_FILE_SIZE. + this.desiredMaxFileSize = + conf.getLong("hbase.hregion.max.filesize", DEFAULT_MAX_FILE_SIZE); + this.compactionDir = HRegion.getCompactionDir(basedir); this.storeName = - this.info.getEncodedName() + "/" + this.family.getFamilyName(); - + new Text(this.info.getEncodedName() + "/" + this.family.getFamilyName()); + if (family.getCompression() == HColumnDescriptor.CompressionType.BLOCK) { this.compression = SequenceFile.CompressionType.BLOCK; } else if (family.getCompression() == @@ -662,7 +693,7 @@ } else { this.compression = SequenceFile.CompressionType.NONE; } - + Path mapdir = HStoreFile.getMapDir(basedir, info.getEncodedName(), family.getFamilyName()); if (!fs.exists(mapdir)) { @@ -673,7 +704,7 @@ if (!fs.exists(infodir)) { fs.mkdirs(infodir); } - + if(family.getBloomFilter() == null) { this.filterDir = null; this.bloomFilter = null; @@ -689,49 +720,28 @@ if(LOG.isDebugEnabled()) { LOG.debug("starting " + storeName + ((reconstructionLog == null || !fs.exists(reconstructionLog)) ? - " (no reconstruction log)" : - " with reconstruction log: " + reconstructionLog.toString())); + " (no reconstruction log)" : + " with reconstruction log: " + reconstructionLog.toString())); } // Go through the 'mapdir' and 'infodir' together, make sure that all // MapFiles are in a reliable state. Every entry in 'mapdir' must have a // corresponding one in 'loginfodir'. Without a corresponding log info // file, the entry in 'mapdir' must be deleted. - List hstoreFiles = loadHStoreFiles(infodir, mapdir); - for(HStoreFile hsf: hstoreFiles) { - this.storefiles.put(Long.valueOf(hsf.loadInfo(fs)), hsf); - } + // loadHStoreFiles also computes the max sequence id + this.maxSeqId = -1L; + this.storefiles.putAll(loadHStoreFiles(infodir, mapdir)); - // Now go through all the HSTORE_LOGINFOFILEs and figure out the - // most-recent log-seq-ID that's present. The most-recent such ID means we - // can ignore all log messages up to and including that ID (because they're - // already reflected in the TreeMaps). - // - // If the HSTORE_LOGINFOFILE doesn't contain a number, just ignore it. That - // means it was built prior to the previous run of HStore, and so it cannot - // contain any updates also contained in the log. - - this.maxSeqId = getMaxSequenceId(hstoreFiles); if (LOG.isDebugEnabled()) { LOG.debug("maximum sequence id for hstore " + storeName + " is " + this.maxSeqId); } - + doReconstructionLog(reconstructionLog, maxSeqId); - // By default, we compact if an HStore has more than - // MIN_COMMITS_FOR_COMPACTION map files - this.compactionThreshold = - conf.getInt("hbase.hstore.compactionThreshold", 3); - - // We used to compact in here before bringing the store online. Instead - // get it online quick even if it needs compactions so we can start - // taking updates as soon as possible (Once online, can take updates even - // during a compaction). + // Move maxSeqId on by one. Why here? And not in HRegion? + this.maxSeqId += 1L; - // Move maxSeqId on by one. Why here? And not in HRegion? - this.maxSeqId += 1; - // Finally, start up all the map readers! (There could be more than one // since we haven't compacted yet.) boolean first = true; @@ -745,30 +755,11 @@ first = false; } else { this.readers.put(e.getKey(), - e.getValue().getReader(this.fs, this.bloomFilter)); + e.getValue().getReader(this.fs, this.bloomFilter)); } } } - /* - * @param hstoreFiles - * @return Maximum sequence number found or -1. - * @throws IOException - */ - private long getMaxSequenceId(final List hstoreFiles) - throws IOException { - long maxSeqID = -1; - for (HStoreFile hsf : hstoreFiles) { - long seqid = hsf.loadInfo(fs); - if (seqid > 0) { - if (seqid > maxSeqID) { - maxSeqID = seqid; - } - } - } - return maxSeqID; - } - long getMaxSequenceId() { return this.maxSeqId; } @@ -783,7 +774,7 @@ */ private void doReconstructionLog(final Path reconstructionLog, final long maxSeqID) throws UnsupportedEncodingException, IOException { - + if (reconstructionLog == null || !fs.exists(reconstructionLog)) { // Nothing to do. return; @@ -791,10 +782,10 @@ long maxSeqIdInLog = -1; TreeMap reconstructedCache = new TreeMap(); - + SequenceFile.Reader logReader = new SequenceFile.Reader(this.fs, reconstructionLog, this.conf); - + try { HLogKey key = new HLogKey(); HLogEdit val = new HLogEdit(); @@ -807,34 +798,32 @@ } if (skippedEdits > 0 && LOG.isDebugEnabled()) { LOG.debug("Skipped " + skippedEdits + - " edits because sequence id <= " + maxSeqID); + " edits because sequence id <= " + maxSeqID); } // Check this edit is for me. Also, guard against writing // METACOLUMN info such as HBASE::CACHEFLUSH entries Text column = val.getColumn(); if (column.equals(HLog.METACOLUMN) - || !key.getRegionName().equals(info.getRegionName()) + || (key.getStoreName().compareTo(storeName) != 0) || !HStoreKey.extractFamily(column).equals(family.getFamilyName())) { if (LOG.isDebugEnabled()) { - LOG.debug("Passing on edit " + key.getRegionName() + ", " + - column.toString() + ": " + - new String(val.getVal(), UTF8_ENCODING) + - ", my region: " + info.getRegionName() + ", my column: " + - family.getFamilyName()); + LOG.debug("Passing on edit " + key.getStoreName() + ": " + + new String(val.getVal(), UTF8_ENCODING) + ", my name: " + + storeName); } continue; } HStoreKey k = new HStoreKey(key.getRow(), column, val.getTimestamp()); if (LOG.isDebugEnabled()) { LOG.debug("Applying edit <" + k.toString() + "=" + val.toString() + - ">"); + ">"); } reconstructedCache.put(k, val.getVal()); } } finally { logReader.close(); } - + if (reconstructedCache.size() > 0) { // We create a "virtual flush" at maxSeqIdInLog+1. if (LOG.isDebugEnabled()) { @@ -853,7 +842,7 @@ * @param mapdir qualified path for map file directory * @throws IOException */ - private List loadHStoreFiles(Path infodir, Path mapdir) + private SortedMap loadHStoreFiles(Path infodir, Path mapdir) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("infodir: " + infodir.toString() + " mapdir: " + @@ -861,10 +850,11 @@ } // Look first at info files. If a reference, these contain info we need // to create the HStoreFile. - Path infofiles[] = fs.listPaths(new Path[] {infodir}); - ArrayList results = new ArrayList(infofiles.length); + FileStatus infofiles[] = fs.listStatus(infodir); + SortedMap results = new TreeMap(); ArrayList mapfiles = new ArrayList(infofiles.length); - for (Path p: infofiles) { + for (int i = 0; i < infofiles.length; i++) { + Path p = infofiles[i].getPath(); Matcher m = REF_NAME_PARSER.matcher(p.getName()); /* * * * * * N O T E * * * * * @@ -885,6 +875,19 @@ } curfile = new HStoreFile(conf, fs, basedir, info.getEncodedName(), family.getFamilyName(), fid, reference); + storeSize += curfile.length(); + long storeSeqId = -1; + try { + storeSeqId = curfile.loadInfo(fs); + if (storeSeqId > this.maxSeqId) { + this.maxSeqId = storeSeqId; + } + } catch (IOException e) { + // If the HSTORE_LOGINFOFILE doesn't contain a number, just ignore it. + // That means it was built prior to the previous run of HStore, and so + // it cannot contain any updates also contained in the log. + } + Path mapfile = curfile.getMapFilePath(); if (!fs.exists(mapfile)) { fs.delete(curfile.getInfoFilePath()); @@ -892,23 +895,23 @@ "Cleaned up info file. Continuing..."); continue; } - + // TODO: Confirm referent exists. - + // Found map and sympathetic info file. Add this hstorefile to result. - results.add(curfile); + results.put(storeSeqId, curfile); // Keep list of sympathetic data mapfiles for cleaning info dir in next // section. Make sure path is fully qualified for compare. mapfiles.add(mapfile); } - + // List paths by experience returns fully qualified names -- at least when // running on a mini hdfs cluster. - Path datfiles[] = fs.listPaths(new Path[] {mapdir}); + FileStatus datfiles[] = fs.listStatus(mapdir); for (int i = 0; i < datfiles.length; i++) { // If does not have sympathetic info file, delete. - if (!mapfiles.contains(fs.makeQualified(datfiles[i]))) { - fs.delete(datfiles[i]); + if (!mapfiles.contains(fs.makeQualified(datfiles[i].getPath()))) { + fs.delete(datfiles[i].getPath()); } } return results; @@ -930,24 +933,24 @@ if (LOG.isDebugEnabled()) { LOG.debug("loading bloom filter for " + this.storeName); } - + BloomFilterDescriptor.BloomFilterType type = family.getBloomFilter().filterType; switch(type) { - + case BLOOMFILTER: bloomFilter = new BloomFilter(); break; - + case COUNTING_BLOOMFILTER: bloomFilter = new CountingBloomFilter(); break; - + case RETOUCHED_BLOOMFILTER: bloomFilter = new RetouchedBloomFilter(); break; - + default: throw new IllegalArgumentException("unknown bloom filter type: " + type); @@ -967,22 +970,21 @@ family.getBloomFilter().filterType; switch(type) { - + case BLOOMFILTER: bloomFilter = new BloomFilter(family.getBloomFilter().vectorSize, family.getBloomFilter().nbHash); break; - + case COUNTING_BLOOMFILTER: bloomFilter = new CountingBloomFilter(family.getBloomFilter().vectorSize, - family.getBloomFilter().nbHash); + family.getBloomFilter().nbHash); break; - + case RETOUCHED_BLOOMFILTER: - bloomFilter = - new RetouchedBloomFilter(family.getBloomFilter().vectorSize, - family.getBloomFilter().nbHash); + bloomFilter = new RetouchedBloomFilter( + family.getBloomFilter().vectorSize, family.getBloomFilter().nbHash); } } return bloomFilter; @@ -1018,15 +1020,28 @@ * * @param key * @param value + * @return size of update */ - void add(HStoreKey key, byte[] value) { + long add(HStoreKey key, byte[] value) { + long updateSize = 0; lock.readLock().lock(); try { this.memcache.add(key, value); - + updateSize = key.getSize() + (value == null ? 0 : value.length); + synchronized (memcacheSize) { + long cacheSize = memcacheSize.addAndGet(updateSize); + if (cacheSize > memcacheFlushSize) { + if (LOG.isDebugEnabled()) { + LOG.debug("Requesting cache flush for store " + storeName); + } + region.flushRequested(this); + memcacheSize.set(0L); + } + } } finally { lock.readLock().unlock(); } + return updateSize; } /** @@ -1078,23 +1093,33 @@ * Return the entire list of HStoreFiles currently used by the HStore. * * @param logCacheFlushId flush sequence number + * @return number of bytes flushed * @throws IOException */ - void flushCache(final long logCacheFlushId) throws IOException { - internalFlushCache(memcache.getSnapshot(), logCacheFlushId); + long flushCache(final long logCacheFlushId) throws IOException { + return internalFlushCache(memcache.getSnapshot(), logCacheFlushId); } - private void internalFlushCache(SortedMap cache, + private long internalFlushCache(SortedMap cache, long logCacheFlushId) throws IOException { - + + long bytesFlushed = 0; + if (cache.size() == 0) { + if (LOG.isDebugEnabled()) { + LOG.info("Not flushing cache for " + storeName + + " because it has 0 entries"); + } + return bytesFlushed; + } + synchronized(flushLock) { // A. Write the Maps out to the disk HStoreFile flushedFile = new HStoreFile(conf, fs, basedir, - info.getEncodedName(), family.getFamilyName(), -1L, null); + info.getEncodedName(), family.getFamilyName(), -1L, null); String name = flushedFile.toString(); MapFile.Writer out = flushedFile.getWriter(this.fs, this.compression, - this.bloomFilter); - + this.bloomFilter); + // Here we tried picking up an existing HStoreFile from disk and // interlacing the memcache flush compacting as we go. The notion was // that interlacing would take as long as a pure flush with the added @@ -1107,23 +1132,27 @@ // Related, looks like 'merging compactions' in BigTable paper interlaces // a memcache flush. We don't. int entries = 0; + long cacheSize = 0; try { for (Map.Entry es: cache.entrySet()) { HStoreKey curkey = es.getKey(); - TextSequence f = HStoreKey.extractFamily(curkey.getColumn()); - if (f.equals(this.family.getFamilyName())) { + byte[] bytes = es.getValue(); + if (curkey.getFamily().equals(this.family.getFamilyName())) { entries++; - out.append(curkey, new ImmutableBytesWritable(es.getValue())); + out.append(curkey, new ImmutableBytesWritable(bytes)); + cacheSize += curkey.getSize() + (bytes != null ? bytes.length : 0); } } } finally { out.close(); } + long newStoreSize = flushedFile.length(); + storeSize += newStoreSize; // B. Write out the log sequence number that corresponds to this output // MapFile. The MapFile is current up to and including the log seq num. flushedFile.writeInfo(fs, logCacheFlushId); - + // C. Flush the bloom filter if any if (bloomFilter != null) { flushBloomFilter(); @@ -1139,15 +1168,27 @@ this.storefiles.put(flushid, flushedFile); if(LOG.isDebugEnabled()) { LOG.debug("Added " + name + " with " + entries + - " entries, sequence id " + logCacheFlushId + ", and size " + - StringUtils.humanReadableInt(flushedFile.length()) + " for " + - this.storeName); + " entries, sequence id " + logCacheFlushId + ", data size " + + StringUtils.humanReadableInt(cacheSize) + ", file size " + + newStoreSize + " for " + this.storeName); } + bytesFlushed = cacheSize; } finally { this.lock.writeLock().unlock(); } - return; + lastFlushTime = System.currentTimeMillis(); } + if (!compactionRequested && + (storefiles.size() > compactionThreshold || + storeSize > this.desiredMaxFileSize)) { + + if (LOG.isDebugEnabled()) { + LOG.debug("Requesting compaction for store " + storeName); + } + compactionRequested = true; + region.compactionRequested(this); + } + return bytesFlushed; } ////////////////////////////////////////////////////////////////////////////// @@ -1155,28 +1196,6 @@ ////////////////////////////////////////////////////////////////////////////// /** - * @return True if this store needs compaction. - */ - boolean needsCompaction() { - return this.storefiles != null && - (this.storefiles.size() >= this.compactionThreshold || hasReferences()); - } - - /* - * @return True if this store has references. - */ - private boolean hasReferences() { - if (this.storefiles != null) { - for (HStoreFile hsf: this.storefiles.values()) { - if (hsf.isReference()) { - return true; - } - } - } - return false; - } - - /** * Compact the back-HStores. This method may take some time, so the calling * thread must be able to block for long periods. * @@ -1193,55 +1212,71 @@ * can be lengthy and we want to allow cache-flushes during this period. * @throws IOException * - * @return true if compaction completed successfully + * @return mid key if a split is needed null otherwise */ - boolean compact() throws IOException { - synchronized (compactLock) { - if (LOG.isDebugEnabled()) { - LOG.debug("started compaction of " + storefiles.size() + - " files using " + compactionDir.toString() + " for " + - this.storeName); - } + Text compact() throws IOException { + try { + synchronized (compactLock) { + List filesToCompact = null; + long maxId = -1; + synchronized (storefiles) { + if (storefiles.size() < 1 || + (storefiles.size() == 1 && + !storefiles.get(storefiles.firstKey()).isReference())) { + if (LOG.isDebugEnabled()) { + LOG.debug("nothing to compact for " + this.storeName + " because" + + (storefiles.size() < 1 ? " no store files to compact" : + " only one store file and it is not a refeerence")); + } + return null; + } + if (LOG.isDebugEnabled()) { + LOG.debug("started compaction of " + storefiles.size() + + " files using " + compactionDir.toString() + " for " + + this.storeName); + } - // Storefiles are keyed by sequence id. The oldest file comes first. - // We need to return out of here a List that has the newest file first. - List filesToCompact = - new ArrayList(this.storefiles.values()); - Collections.reverse(filesToCompact); - if (filesToCompact.size() < 1 || - (filesToCompact.size() == 1 && !filesToCompact.get(0).isReference())) { - if (LOG.isDebugEnabled()) { - LOG.debug("nothing to compact for " + this.storeName); + // Storefiles are keyed by sequence id. The oldest file comes first. + // We need to return out of here a List that has the newest file first. + filesToCompact = new ArrayList(this.storefiles.values()); + Collections.reverse(filesToCompact); + + // The max-sequenceID in any of the to-be-compacted TreeMaps is the + // last key of storefiles. + + maxId = this.storefiles.lastKey(); } - return false; - } - if (!fs.exists(compactionDir) && !fs.mkdirs(compactionDir)) { - LOG.warn("Mkdir on " + compactionDir.toString() + " failed"); - return false; - } + if (!fs.exists(compactionDir) && !fs.mkdirs(compactionDir)) { + LOG.warn("Mkdir on " + compactionDir.toString() + " failed"); + return null; + } - // Step through them, writing to the brand-new MapFile - HStoreFile compactedOutputFile = new HStoreFile(conf, fs, - this.compactionDir, info.getEncodedName(), family.getFamilyName(), - -1L, null); - MapFile.Writer compactedOut = compactedOutputFile.getWriter(this.fs, - this.compression, this.bloomFilter); - try { - compactHStoreFiles(compactedOut, filesToCompact); - } finally { - compactedOut.close(); - } + // Step through them, writing to the brand-new MapFile + HStoreFile compactedOutputFile = new HStoreFile(conf, fs, + this.compactionDir, info.getEncodedName(), family.getFamilyName(), + -1L, null); + MapFile.Writer compactedOut = compactedOutputFile.getWriter(this.fs, + this.compression, this.bloomFilter); + try { + compactHStoreFiles(compactedOut, filesToCompact); + } finally { + compactedOut.close(); + } - // Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap. - // Compute max-sequenceID seen in any of the to-be-compacted TreeMaps. - long maxId = getMaxSequenceId(filesToCompact); - compactedOutputFile.writeInfo(fs, maxId); + // Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap. + compactedOutputFile.writeInfo(fs, maxId); - // Move the compaction into place. - completeCompaction(filesToCompact, compactedOutputFile); - return true; + // Move the compaction into place. + completeCompaction(filesToCompact, compactedOutputFile); + } + } finally { + compactionRequested = false; + if (storeSize > this.desiredMaxFileSize) { + return checkSplit(); + } } + return null; } /* @@ -1258,7 +1293,7 @@ */ private void compactHStoreFiles(final MapFile.Writer compactedOut, final List toCompactFiles) throws IOException { - + int size = toCompactFiles.size(); CompactionReader[] rdrs = new CompactionReader[size]; int index = 0; @@ -1271,8 +1306,8 @@ // exception message so output a message here where we know the // culprit. LOG.warn("Failed with " + e.toString() + ": " + hsf.toString() + - (hsf.isReference() ? " " + hsf.getReference().toString() : "") + - " for " + this.storeName); + (hsf.isReference() ? " " + hsf.getReference().toString() : "") + + " for " + this.storeName); closeCompactionReaders(rdrs); throw e; } @@ -1335,7 +1370,7 @@ } byte [] value = (vals[smallestKey] == null)? - null: vals[smallestKey].get(); + null: vals[smallestKey].get(); if (!isDeleted(sk, value, false, deletes) && timesSeen <= family.getMaxVersions()) { // Keep old versions until we have maxVersions worth. @@ -1380,13 +1415,13 @@ /** Interface for generic reader for compactions */ interface CompactionReader { - + /** * Closes the reader * @throws IOException */ public void close() throws IOException; - + /** * Get the next key/value pair * @@ -1397,7 +1432,7 @@ */ public boolean next(WritableComparable key, Writable val) throws IOException; - + /** * Resets the reader * @throws IOException @@ -1408,11 +1443,11 @@ /** A compaction reader for MapFile */ static class MapFileCompactionReader implements CompactionReader { final MapFile.Reader reader; - + MapFileCompactionReader(final MapFile.Reader r) { this.reader = r; } - + /** {@inheritDoc} */ public void close() throws IOException { this.reader.close(); @@ -1489,11 +1524,11 @@ *
    * 1) Wait for active scanners to exit
    * 2) Acquiring the write-lock
-   * 3) Figuring out what MapFiles are going to be replaced
-   * 4) Moving the new compacted MapFile into place
-   * 5) Unloading all the replaced MapFiles.
-   * 6) Deleting all the old MapFile files.
-   * 7) Loading the new TreeMap.
+   * 3) Moving the new MapFile into place.
+   * 4) Unload all the replaced MapFiles and close.
+   * 5) Delete replaced MapFiles
+   * 6) Load the new TreeMap.
+   * 7) Compute new store size
    * 8) Releasing the write-lock
    * 9) Allow new scanners to proceed.
    * 
@@ -1504,9 +1539,9 @@ */ private void completeCompaction(List compactedFiles, HStoreFile compactedFile) throws IOException { - + // 1. Wait for active scanners to exit - + newScannerLock.writeLock().lock(); // prevent new scanners try { synchronized (activeScanners) { @@ -1524,7 +1559,7 @@ try { // 3. Moving the new MapFile into place. - + HStoreFile finalCompactedFile = new HStoreFile(conf, fs, basedir, info.getEncodedName(), family.getFamilyName(), -1, null); if(LOG.isDebugEnabled()) { @@ -1540,47 +1575,54 @@ } // 4. and 5. Unload all the replaced MapFiles, close and delete. - - List toDelete = new ArrayList(); - for (Map.Entry e: this.storefiles.entrySet()) { - if (!compactedFiles.contains(e.getValue())) { - continue; + + synchronized (storefiles) { + List toDelete = new ArrayList(); + for (Map.Entry e: this.storefiles.entrySet()) { + if (!compactedFiles.contains(e.getValue())) { + continue; + } + Long key = e.getKey(); + MapFile.Reader reader = this.readers.remove(key); + if (reader != null) { + reader.close(); + } + toDelete.add(key); } - Long key = e.getKey(); - MapFile.Reader reader = this.readers.remove(key); - if (reader != null) { - reader.close(); - } - toDelete.add(key); - } - try { - for (Long key: toDelete) { - HStoreFile hsf = this.storefiles.remove(key); - hsf.delete(); + try { + for (Long key: toDelete) { + HStoreFile hsf = this.storefiles.remove(key); + hsf.delete(); + } + + // 6. Loading the new TreeMap. + Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs)); + this.readers.put(orderVal, + // Use a block cache (if configured) for this reader since + // it is the only one. + finalCompactedFile.getReader(this.fs, this.bloomFilter, + family.isBlockCacheEnabled())); + this.storefiles.put(orderVal, finalCompactedFile); + } catch (IOException e) { + e = RemoteExceptionHandler.checkIOException(e); + LOG.error("Failed replacing compacted files for " + this.storeName + + ". Compacted file is " + finalCompactedFile.toString() + + ". Files replaced are " + compactedFiles.toString() + + " some of which may have been already removed", e); } - - // 6. Loading the new TreeMap. - Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs)); - this.readers.put(orderVal, - // Use a block cache (if configured) for this reader since - // it is the only one. - finalCompactedFile.getReader(this.fs, this.bloomFilter, - family.isBlockCacheEnabled())); - this.storefiles.put(orderVal, finalCompactedFile); - } catch (IOException e) { - e = RemoteExceptionHandler.checkIOException(e); - LOG.error("Failed replacing compacted files for " + this.storeName + - ". Compacted file is " + finalCompactedFile.toString() + - ". Files replaced are " + compactedFiles.toString() + - " some of which may have been already removed", e); + // 7. Compute new store size + storeSize = 0L; + for (HStoreFile hsf: storefiles.values()) { + storeSize += hsf.length(); + } } } finally { - // 7. Releasing the write-lock + // 8. Releasing the write-lock this.lock.writeLock().unlock(); } } finally { - // 8. Allow new scanners to proceed. + // 9. Allow new scanners to proceed. newScannerLock.writeLock().unlock(); } } @@ -1597,13 +1639,13 @@ * The returned object should map column names to byte arrays (byte[]). */ void getFull(HStoreKey key, TreeMap results) - throws IOException { + throws IOException { Map> deletes = new HashMap>(); - + if (key == null) { return; } - + this.lock.readLock().lock(); memcache.getFull(key, results); try { @@ -1629,11 +1671,11 @@ } else if(key.getRow().compareTo(readkey.getRow()) < 0) { break; } - + } while(map.next(readkey, readval)); } } - + } finally { this.lock.readLock().unlock(); } @@ -1658,7 +1700,7 @@ if (numVersions <= 0) { throw new IllegalArgumentException("Number of versions must be > 0"); } - + this.lock.readLock().lock(); try { // Check the memcache @@ -1705,10 +1747,10 @@ } } for (readval = new ImmutableBytesWritable(); - map.next(readkey, readval) && - readkey.matchesRowCol(key) && - !hasEnoughVersions(numVersions, results); - readval = new ImmutableBytesWritable()) { + map.next(readkey, readval) && + readkey.matchesRowCol(key) && + !hasEnoughVersions(numVersions, results); + readval = new ImmutableBytesWritable()) { if (!isDeleted(readkey, readval.get(), true, deletes)) { results.add(readval.get()); } @@ -1719,7 +1761,7 @@ } } return results.size() == 0 ? - null : ImmutableBytesWritable.toArray(results); + null : ImmutableBytesWritable.toArray(results); } finally { this.lock.readLock().unlock(); } @@ -1746,13 +1788,13 @@ * @throws IOException */ List getKeys(final HStoreKey origin, final int versions) - throws IOException { - + throws IOException { + List keys = this.memcache.getKeys(origin, versions); if (versions != ALL_VERSIONS && keys.size() >= versions) { return keys; } - + // This code below is very close to the body of the get method. this.lock.readLock().lock(); try { @@ -1761,7 +1803,7 @@ MapFile.Reader map = maparray[i]; synchronized(map) { map.reset(); - + // do the priming read ImmutableBytesWritable readval = new ImmutableBytesWritable(); HStoreKey readkey = (HStoreKey)map.getClosest(origin, readval); @@ -1772,7 +1814,7 @@ // BEFORE. continue; } - + do{ // if the row matches, we might want this one. if(rowMatches(origin, readkey)){ @@ -1801,7 +1843,7 @@ }while(map.next(readkey, readval)); // advance to the next key } } - + return keys; } finally { this.lock.readLock().unlock(); @@ -1811,9 +1853,14 @@ /** * Find the key that matches row exactly, or the one that immediately * preceeds it. + * + * @param row + * @param timestamp + * @return row key + * @throws IOException */ - public Text getRowKeyAtOrBefore(final Text row, final long timestamp) - throws IOException{ + Text getRowKeyAtOrBefore(final Text row, final long timestamp) + throws IOException { // if the exact key is found, return that key // if we find a key that is greater than our search key, then use the // last key we processed, and if that was null, return null. @@ -1822,36 +1869,34 @@ if (foundKey != null) { return foundKey; } - + // obtain read lock this.lock.readLock().lock(); try { MapFile.Reader[] maparray = getReaders(); - Text bestSoFar = null; - // process each store file for(int i = maparray.length - 1; i >= 0; i--) { Text row_from_mapfile = rowAtOrBeforeFromMapFile(maparray[i], row, timestamp); - + // if the result from the mapfile is null, then we know that // the mapfile was empty and can move on to the next one. if (row_from_mapfile == null) { continue; } - + // short circuit on an exact match if (row.equals(row_from_mapfile)) { return row; } - + // check to see if we've found a new closest row key as a result if (bestSoFar == null || bestSoFar.compareTo(row_from_mapfile) < 0) { bestSoFar = row_from_mapfile; } } - + return bestSoFar; } finally { this.lock.readLock().unlock(); @@ -1863,34 +1908,33 @@ * and timestamp */ private Text rowAtOrBeforeFromMapFile(MapFile.Reader map, Text row, - long timestamp) - throws IOException { + long timestamp) throws IOException { HStoreKey searchKey = new HStoreKey(row, timestamp); Text previousRow = null; ImmutableBytesWritable readval = new ImmutableBytesWritable(); HStoreKey readkey = new HStoreKey(); - + synchronized(map) { // don't bother with the rest of this if the file is empty map.reset(); if (!map.next(readkey, readval)) { return null; } - + HStoreKey finalKey = new HStoreKey(); map.finalKey(finalKey); if (finalKey.getRow().compareTo(row) < 0) { return finalKey.getRow(); } - + // seek to the exact row, or the one that would be immediately before it readkey = (HStoreKey)map.getClosest(searchKey, readval, true); - + if (readkey == null) { // didn't find anything that would match, so returns return null; } - + do { if (readkey.getRow().compareTo(row) == 0) { // exact match on row @@ -1962,104 +2006,81 @@ // otherwise, we want to match on row and column return target.matchesRowCol(origin); } - - /* - * Data structure to hold result of a look at store file sizes. - */ - static class HStoreSize { - final long aggregate; - final long largest; - boolean splitable; - - HStoreSize(final long a, final long l, final boolean s) { - this.aggregate = a; - this.largest = l; - this.splitable = s; - } - - long getAggregate() { - return this.aggregate; - } - - long getLargest() { - return this.largest; - } - - boolean isSplitable() { - return this.splitable; - } - - void setSplitable(final boolean s) { - this.splitable = s; - } - } - + /** - * Gets size for the store. + * Determines if HStore can be split * - * @param midKey Gets set to the middle key of the largest splitable store - * file or its set to empty if largest is not splitable. - * @return Sizes for the store and the passed midKey is - * set to midKey of largest splitable. Otherwise, its set to empty - * to indicate we couldn't find a midkey to split on + * @return midKey if store can be split, null otherwise */ - HStoreSize size(Text midKey) { - long maxSize = 0L; - long aggregateSize = 0L; - // Not splitable if we find a reference store file present in the store. - boolean splitable = true; + Text checkSplit() { if (this.storefiles.size() <= 0) { - return new HStoreSize(0, 0, splitable); + return null; } - this.lock.readLock().lock(); try { + // Not splitable if we find a reference store file present in the store. + boolean splitable = true; + long maxSize = 0L; Long mapIndex = Long.valueOf(0L); // Iterate through all the MapFiles - for (Map.Entry e: storefiles.entrySet()) { - HStoreFile curHSF = e.getValue(); - long size = curHSF.length(); - aggregateSize += size; - if (maxSize == 0L || size > maxSize) { - // This is the largest one so far - maxSize = size; - mapIndex = e.getKey(); + synchronized (storefiles) { + for(Map.Entry e: storefiles.entrySet()) { + HStoreFile curHSF = e.getValue(); + long size = curHSF.length(); + if (size > maxSize) { + // This is the largest one so far + maxSize = size; + mapIndex = e.getKey(); + } + if (splitable) { + splitable = !curHSF.isReference(); + } } - if (splitable) { - splitable = !curHSF.isReference(); - } } - if (splitable) { - MapFile.Reader r = this.readers.get(mapIndex); - // seek back to the beginning of mapfile - r.reset(); - // get the first and last keys - HStoreKey firstKey = new HStoreKey(); - HStoreKey lastKey = new HStoreKey(); - Writable value = new ImmutableBytesWritable(); - r.next(firstKey, value); - r.finalKey(lastKey); - // get the midkey - HStoreKey mk = (HStoreKey)r.midKey(); - if (mk != null) { - // if the midkey is the same as the first and last keys, then we cannot - // (ever) split this region. - if (mk.getRow().equals(firstKey.getRow()) && - mk.getRow().equals(lastKey.getRow())) { - return new HStoreSize(aggregateSize, maxSize, false); - } - // Otherwise, set midKey - midKey.set(mk.getRow()); + if (!splitable) { + return null; + } + MapFile.Reader r = this.readers.get(mapIndex); + + // seek back to the beginning of mapfile + r.reset(); + + // get the first and last keys + HStoreKey firstKey = new HStoreKey(); + HStoreKey lastKey = new HStoreKey(); + Writable value = new ImmutableBytesWritable(); + r.next(firstKey, value); + r.finalKey(lastKey); + + // get the midkey + HStoreKey midkey = (HStoreKey)r.midKey(); + + if (midkey != null) { + // if the midkey is the same as the first and last keys, then we cannot + // (ever) split this region. + if (midkey.getRow().equals(firstKey.getRow()) && + midkey.getRow().equals(lastKey.getRow())) { + return null; } + return midkey.getRow(); } } catch(IOException e) { LOG.warn("Failed getting store size for " + this.storeName, e); } finally { this.lock.readLock().unlock(); } - return new HStoreSize(aggregateSize, maxSize, splitable); + return null; } + + /** @return aggregate size of HStore */ + public long getSize() { + return storeSize; + } + long getLastFlushTime() { + return lastFlushTime; + } + ////////////////////////////////////////////////////////////////////////////// // File administration ////////////////////////////////////////////////////////////////////////////// @@ -2085,10 +2106,15 @@ } } + /** @return the HColumnDescriptor for this store */ + public HColumnDescriptor getFamily() { + return family; + } + /** {@inheritDoc} */ @Override public String toString() { - return this.storeName; + return this.storeName.toString(); } /* @@ -2129,39 +2155,41 @@ private class StoreFileScanner extends HAbstractScanner { @SuppressWarnings("hiding") private MapFile.Reader[] readers; - + StoreFileScanner(long timestamp, Text[] targetCols, Text firstRow) throws IOException { super(timestamp, targetCols); try { - this.readers = new MapFile.Reader[storefiles.size()]; - - // Most recent map file should be first - int i = readers.length - 1; - for(HStoreFile curHSF: storefiles.values()) { - readers[i--] = curHSF.getReader(fs, bloomFilter); + synchronized (storefiles) { + this.readers = new MapFile.Reader[storefiles.size()]; + + // Most recent map file should be first + int i = readers.length - 1; + for(HStoreFile curHSF: storefiles.values()) { + readers[i--] = curHSF.getReader(fs, bloomFilter); + } } - + this.keys = new HStoreKey[readers.length]; this.vals = new byte[readers.length][]; - + // Advance the readers to the first pos. - for(i = 0; i < readers.length; i++) { + for(int i = 0; i < readers.length; i++) { keys[i] = new HStoreKey(); - + if(firstRow.getLength() != 0) { if(findFirstRow(i, firstRow)) { continue; } } - + while(getNext(i)) { if(columnMatch(i)) { break; } } } - + } catch (Exception ex) { close(); IOException e = new IOException("HStoreScanner failed construction"); @@ -2182,7 +2210,7 @@ boolean findFirstRow(int i, Text firstRow) throws IOException { ImmutableBytesWritable ibw = new ImmutableBytesWritable(); HStoreKey firstKey - = (HStoreKey)readers[i].getClosest(new HStoreKey(firstRow), ibw); + = (HStoreKey)readers[i].getClosest(new HStoreKey(firstRow), ibw); if (firstKey == null) { // Didn't find it. Close the scanner and return TRUE closeSubScanner(i); @@ -2194,7 +2222,7 @@ keys[i].setVersion(firstKey.getTimestamp()); return columnMatch(i); } - + /** * Get the next value from the specified reader. * @@ -2218,7 +2246,7 @@ } return result; } - + /** Close down the indicated reader. */ @Override void closeSubScanner(int i) { @@ -2230,7 +2258,7 @@ LOG.error(storeName + " closing sub-scanner", e); } } - + } finally { readers[i] = null; keys[i] = null; @@ -2251,7 +2279,7 @@ } } } - + } finally { scannerClosed = true; } @@ -2274,7 +2302,7 @@ @SuppressWarnings("unchecked") HStoreScanner(Text[] targetCols, Text firstRow, long timestamp, RowFilterInterface filter) throws IOException { - + this.dataFilter = filter; if (null != dataFilter) { dataFilter.reset(); @@ -2286,7 +2314,7 @@ try { scanners[0] = memcache.getScanner(timestamp, targetCols, firstRow); scanners[1] = new StoreFileScanner(timestamp, targetCols, firstRow); - + for (int i = 0; i < scanners.length; i++) { if (scanners[i].isWildcardScanner()) { this.wildcardMatch = true; @@ -2304,10 +2332,10 @@ } throw e; } - + // Advance to the first key in each scanner. // All results will match the required column-set and scanTime. - + for (int i = 0; i < scanners.length; i++) { keys[i] = new HStoreKey(); resultSets[i] = new TreeMap(); @@ -2332,7 +2360,7 @@ /** {@inheritDoc} */ public boolean next(HStoreKey key, SortedMap results) - throws IOException { + throws IOException { // Filtered flag is set by filters. If a cell has been 'filtered out' // -- i.e. it is not to be returned to the caller -- the flag is 'true'. @@ -2345,14 +2373,14 @@ for (int i = 0; i < this.keys.length; i++) { if (scanners[i] != null && (chosenRow == null || - (keys[i].getRow().compareTo(chosenRow) < 0) || - ((keys[i].getRow().compareTo(chosenRow) == 0) && - (keys[i].getTimestamp() > chosenTimestamp)))) { + (keys[i].getRow().compareTo(chosenRow) < 0) || + ((keys[i].getRow().compareTo(chosenRow) == 0) && + (keys[i].getTimestamp() > chosenTimestamp)))) { chosenRow = new Text(keys[i].getRow()); chosenTimestamp = keys[i].getTimestamp(); } } - + // Filter whole row by row key? filtered = dataFilter != null? dataFilter.filter(chosenRow) : false; @@ -2396,7 +2424,7 @@ // values with older ones. So now we only insert // a result if the map does not contain the key. HStoreKey hsk = new HStoreKey(key.getRow(), EMPTY_TEXT, - key.getTimestamp()); + key.getTimestamp()); for (Map.Entry e : resultSets[i].entrySet()) { hsk.setColumn(e.getKey()); if (HLogEdit.isDeleted(e.getValue())) { @@ -2412,7 +2440,7 @@ if (dataFilter != null) { // Filter whole row by column data? filtered = - dataFilter.filter(chosenRow, e.getKey(), e.getValue()); + dataFilter.filter(chosenRow, e.getKey(), e.getValue()); if (filtered) { results.clear(); break; @@ -2428,7 +2456,7 @@ } } } - + for (int i = 0; i < scanners.length; i++) { // If the current scanner is non-null AND has a lower-or-equal // row label, then its timestamp is bad. We need to advance it. @@ -2442,7 +2470,7 @@ } moreToFollow = chosenTimestamp >= 0; - + if (dataFilter != null) { if (moreToFollow) { dataFilter.rowProcessed(filtered, chosenRow); @@ -2451,19 +2479,19 @@ moreToFollow = false; } } - + if (results.size() <= 0 && !filtered) { // There were no results found for this row. Marked it as // 'filtered'-out otherwise we will not move on to the next row. filtered = true; } } - + // If we got no results, then there is no more to follow. if (results == null || results.size() <= 0) { moreToFollow = false; } - + // Make sure scanners closed if no more results if (!moreToFollow) { for (int i = 0; i < scanners.length; i++) { @@ -2472,11 +2500,11 @@ } } } - + return moreToFollow; } - + /** Shut down a single scanner */ void closeScanner(int i) { try { @@ -2495,11 +2523,11 @@ /** {@inheritDoc} */ public void close() { try { - for(int i = 0; i < scanners.length; i++) { - if(scanners[i] != null) { - closeScanner(i); + for(int i = 0; i < scanners.length; i++) { + if(scanners[i] != null) { + closeScanner(i); + } } - } } finally { synchronized (activeScanners) { int numberOfScanners = activeScanners.decrementAndGet(); @@ -2521,7 +2549,7 @@ /** {@inheritDoc} */ public Iterator>> iterator() { throw new UnsupportedOperationException("Unimplemented serverside. " + - "next(HStoreKey, StortedMap(...) is more efficient"); + "next(HStoreKey, StortedMap(...) is more efficient"); } } } Index: src/java/org/apache/hadoop/hbase/HMerge.java =================================================================== --- src/java/org/apache/hadoop/hbase/HMerge.java (revision 630252) +++ src/java/org/apache/hadoop/hbase/HMerge.java (working copy) @@ -140,17 +140,16 @@ long currentSize = 0; HRegion nextRegion = null; long nextSize = 0; - Text midKey = new Text(); for (int i = 0; i < info.length - 1; i++) { if (currentRegion == null) { currentRegion = new HRegion(tabledir, hlog, fs, conf, info[i], null, null); - currentSize = currentRegion.largestHStore(midKey).getAggregate(); + currentSize = currentRegion.getLargestHStoreSize(); } nextRegion = new HRegion(tabledir, hlog, fs, conf, info[i + 1], null, null); - nextSize = nextRegion.largestHStore(midKey).getAggregate(); + nextSize = nextRegion.getLargestHStoreSize(); if ((currentSize + nextSize) <= (maxFilesize / 2)) { // We merge two adjacent regions if their total size is less than @@ -350,7 +349,6 @@ oldRegion2 }; for(int r = 0; r < regionsToDelete.length; r++) { - long lockid = Math.abs(rand.nextLong()); BatchUpdate b = new BatchUpdate(regionsToDelete[r]); b.delete(COL_REGIONINFO); b.delete(COL_SERVER); Index: src/java/org/apache/hadoop/hbase/CacheFlushListener.java =================================================================== --- src/java/org/apache/hadoop/hbase/CacheFlushListener.java (revision 630252) +++ src/java/org/apache/hadoop/hbase/CacheFlushListener.java (working copy) @@ -1,36 +0,0 @@ -/** - * Copyright 2007 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase; - -/** - * Implementors of this interface want to be notified when an HRegion - * determines that a cache flush is needed. A CacheFlushListener (or null) - * must be passed to the HRegion constructor. - */ -public interface CacheFlushListener { - - /** - * Tell the listener the cache needs to be flushed. - * - * @param region the HRegion requesting the cache flush - */ - void flushRequested(HRegion region); -} Index: src/java/org/apache/hadoop/hbase/Leases.java =================================================================== --- src/java/org/apache/hadoop/hbase/Leases.java (revision 630252) +++ src/java/org/apache/hadoop/hbase/Leases.java (working copy) @@ -50,8 +50,6 @@ private volatile DelayQueue leaseQueue = new DelayQueue(); protected final Map leases = new HashMap(); - protected final Map listeners = - new HashMap(); private volatile boolean stopRequested = false; /** @@ -84,17 +82,14 @@ continue; } // A lease expired - LeaseListener listener = null; synchronized (leaseQueue) { - String leaseName = lease.getLeaseName(); - leases.remove(leaseName); - listener = listeners.remove(leaseName); - if (listener == null) { - LOG.error("lease listener is null for lease " + leaseName); + leases.remove(lease.getLeaseName()); + if (lease.getListener() == null) { + LOG.error("lease listener is null for lease " + lease.getLeaseName()); continue; } } - listener.leaseExpired(); + lease.getListener().leaseExpired(); } close(); } @@ -120,7 +115,6 @@ synchronized (leaseQueue) { leaseQueue.clear(); leases.clear(); - listeners.clear(); leaseQueue.notifyAll(); } LOG.info(Thread.currentThread().getName() + " closed leases"); @@ -136,14 +130,14 @@ if (stopRequested) { return; } - Lease lease = new Lease(leaseName, System.currentTimeMillis() + leasePeriod); + Lease lease = new Lease(leaseName, listener, + System.currentTimeMillis() + leasePeriod); synchronized (leaseQueue) { if (leases.containsKey(leaseName)) { throw new IllegalStateException("lease '" + leaseName + "' already exists"); } leases.put(leaseName, lease); - listeners.put(leaseName, listener); leaseQueue.add(lease); } } @@ -179,17 +173,18 @@ "' does not exist"); } leaseQueue.remove(lease); - listeners.remove(leaseName); } } /** This class tracks a single Lease. */ private static class Lease implements Delayed { private final String leaseName; + private final LeaseListener listener; private long expirationTime; - Lease(final String leaseName, long expirationTime) { + Lease(final String leaseName, LeaseListener listener, long expirationTime) { this.leaseName = leaseName; + this.listener = listener; this.expirationTime = expirationTime; } @@ -197,6 +192,11 @@ public String getLeaseName() { return leaseName; } + + /** @return listener */ + public LeaseListener getListener() { + return this.listener; + } /** {@inheritDoc} */ @Override @@ -219,16 +219,9 @@ /** {@inheritDoc} */ public int compareTo(Delayed o) { long delta = this.getDelay(TimeUnit.MILLISECONDS) - - o.getDelay(TimeUnit.MILLISECONDS); + o.getDelay(TimeUnit.MILLISECONDS); - int value = 0; - if (delta > 0) { - value = 1; - - } else if (delta < 0) { - value = -1; - } - return value; + return this.equals(o) ? 0 : (delta > 0 ? 1 : -1); } /** @param expirationTime the expirationTime to set */ Index: src/java/org/apache/hadoop/hbase/HStoreKey.java =================================================================== --- src/java/org/apache/hadoop/hbase/HStoreKey.java (revision 630252) +++ src/java/org/apache/hadoop/hbase/HStoreKey.java (working copy) @@ -37,6 +37,7 @@ private Text row; private Text column; private long timestamp; + private transient Text family; /** Default constructor used in conjunction with Writable interface */ @@ -89,6 +90,7 @@ this.row = new Text(row); this.column = new Text(column); this.timestamp = timestamp; + this.family = null; } /** @return Approximate size in bytes of this key. */ @@ -120,8 +122,9 @@ * * @param newcol new column key value */ - public void setColumn(Text newcol) { + public synchronized void setColumn(Text newcol) { this.column.set(newcol); + this.family = null; } /** @@ -154,6 +157,17 @@ return column; } + /** + * @return the column family name (minus the trailing colon) + * @throws InvalidColumnNameException + */ + public synchronized Text getFamily() throws InvalidColumnNameException { + if (family == null) { + family = extractFamily(column); + } + return family; + } + /** @return value of timestamp */ public long getTimestamp() { return timestamp; @@ -198,8 +212,7 @@ public boolean matchesRowFamily(HStoreKey that) throws InvalidColumnNameException { return this.row.compareTo(that.row) == 0 && - extractFamily(this.column). - compareTo(extractFamily(that.getColumn())) == 0; + this.getFamily().equals(that.getFamily()); } /** {@inheritDoc} */ @@ -225,6 +238,7 @@ // Comparable + /** {@inheritDoc} */ public int compareTo(Object o) { HStoreKey other = (HStoreKey)o; int result = this.row.compareTo(other.row); @@ -275,7 +289,7 @@ * the result by calling {@link TextSequence#toText()}. * @throws InvalidColumnNameException */ - public static TextSequence extractFamily(final Text col) + public static Text extractFamily(final Text col) throws InvalidColumnNameException { return extractFamily(col, false); } @@ -284,40 +298,41 @@ * Extracts the column family name from a column * For example, returns 'info' if the specified column was 'info:server' * @param col name of column - * @return column famile as a TextSequence based on the passed - * col. If col is reused, make a new Text of - * the result by calling {@link TextSequence#toText()}. + * @param withColon + * @return column family as a Text based on the passed col. * @throws InvalidColumnNameException */ - public static TextSequence extractFamily(final Text col, - final boolean withColon) + public static Text extractFamily(final Text col, final boolean withColon) throws InvalidColumnNameException { int offset = getColonOffset(col); // Include ':' in copy? - offset += (withColon)? 1: 0; + offset += (withColon)? 1 : 0; if (offset == col.getLength()) { - return new TextSequence(col); + return col; } - return new TextSequence(col, 0, offset); + Text family = new Text(); + family.set(col.getBytes(), 0, offset); + return family; } /** - * Extracts the column qualifier, the portion that follows the colon (':') - * family/qualifier separator. + * Extracts the column member, the portion that follows the colon (':') + * family:member separator. * For example, returns 'server' if the specified column was 'info:server' * @param col name of column - * @return column qualifier as a TextSequence based on the passed - * col. If col is reused, make a new Text of - * the result by calling {@link TextSequence#toText()}. + * @return column qualifier as a Text based on the passed col. * @throws InvalidColumnNameException */ - public static TextSequence extractQualifier(final Text col) + public static Text extractMember(final Text col) throws InvalidColumnNameException { - int offset = getColonOffset(col); - if (offset + 1 == col.getLength()) { + int offset = getColonOffset(col) + 1; + if (offset == col.getLength()) { return null; } - return new TextSequence(col, offset + 1); + + Text member = new Text(); + member.set(col.getBytes(), offset, col.getLength() - offset); + return member; } private static int getColonOffset(final Text col) @@ -333,7 +348,7 @@ } if(offset < 0) { throw new InvalidColumnNameException(col + " is missing the colon " + - "family/qualifier separator"); + "family:member separator"); } return offset; } Index: src/java/org/apache/hadoop/hbase/HRegionServer.java =================================================================== --- src/java/org/apache/hadoop/hbase/HRegionServer.java (revision 630252) +++ src/java/org/apache/hadoop/hbase/HRegionServer.java (working copy) @@ -28,13 +28,10 @@ import java.util.Collections; import java.util.ConcurrentModificationException; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; -import java.util.Set; -import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; @@ -73,7 +70,8 @@ * HRegionServer makes a set of HRegions available to clients. It checks in with * the HMaster. There are many HRegionServers in a single HBase deployment. */ -public class HRegionServer implements HConstants, HRegionInterface, Runnable { +public class HRegionServer implements HConstants, HRegionInterface, +HStoreListener, Runnable { static final Log LOG = LogFactory.getLog(HRegionServer.class); // Set when a report to the master comes back with a message asking us to @@ -99,10 +97,12 @@ private final Random rand = new Random(); // region name -> HRegion - protected volatile SortedMap onlineRegions = - Collections.synchronizedSortedMap(new TreeMap()); - protected volatile Map retiringRegions = + protected Map onlineRegions = new ConcurrentHashMap(); + Map onlineStores = + new ConcurrentHashMap(); + protected Map retiringRegions = + new ConcurrentHashMap(); protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private volatile List outboundMsgs = @@ -166,11 +166,13 @@ } /** Queue entry passed to flusher, compactor and splitter threads */ - class QueueEntry implements Delayed { + static class QueueEntry implements Delayed { + private final HStore store; private final HRegion region; - private long expirationTime; + private final long expirationTime; - QueueEntry(HRegion region, long expirationTime) { + QueueEntry(HStore store, HRegion region, long expirationTime) { + this.store = store; this.region = region; this.expirationTime = expirationTime; } @@ -185,7 +187,7 @@ /** {@inheritDoc} */ @Override public int hashCode() { - return this.region.getRegionInfo().hashCode(); + return this.store.getFamily().hashCode(); } /** {@inheritDoc} */ @@ -199,25 +201,19 @@ long delta = this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS); - int value = 0; - if (delta > 0) { - value = 1; - - } else if (delta < 0) { - value = -1; - } - return value; + + return this.equals(o) ? 0 : (delta > 0 ? 1 : -1); } + + /** @return the store */ + public HStore getStore() { + return store; + } /** @return the region */ public HRegion getRegion() { return region; } - - /** @param expirationTime the expirationTime to set */ - public void setExpirationTime(long expirationTime) { - this.expirationTime = expirationTime; - } } // Compactions @@ -237,6 +233,8 @@ private final BlockingQueue compactionQueue = new LinkedBlockingQueue(); + + private volatile QueueEntry compactionInProgress; /** constructor */ public CompactSplitThread() { @@ -244,6 +242,7 @@ this.frequency = conf.getLong("hbase.regionserver.thread.splitcompactcheckfrequency", 20 * 1000); + this.compactionInProgress = null; } /** {@inheritDoc} */ @@ -256,8 +255,28 @@ if (e == null) { continue; } - e.getRegion().compactIfNeeded(); - split(e.getRegion()); + Text storeName = e.getStore().storeName; + if (!onlineStores.containsKey(storeName)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Not compacting store " + storeName + + " because it is no longer in the list of known stores"); + } + continue; + } + HRegion r = e.getRegion(); + HStore storeToCompact = e.getStore(); + Text midKey = r.compactStore(e.getStore()); + if (midKey != null) { + // Store wants region to split + // compact all the other stores so split can happen + for (HStore s: r.getStores()) { + if (storeName.equals(storeToCompact.storeName)) { + continue; + } + r.compactStore(s); + } + split(r, midKey); + } } catch (InterruptedException ex) { continue; } catch (IOException ex) { @@ -281,19 +300,28 @@ } /** - * @param e QueueEntry for region to be compacted + * @param s HStore to be compacted + * @param r HRegion the store belongs to + * @return true if a compaction was queued */ - public void compactionRequested(QueueEntry e) { - compactionQueue.add(e); + public boolean compactionRequested(final HStore s, final HRegion r) { + QueueEntry e = new QueueEntry(s, r, System.currentTimeMillis()); + synchronized (compactionQueue) { + if (compactionInProgress != null && compactionInProgress.equals(e)) { + return false; + } + if (compactionQueue.contains(e)) { + return false; + } + compactionQueue.add(e); + } + return true; } - void compactionRequested(final HRegion r) { - compactionRequested(new QueueEntry(r, System.currentTimeMillis())); - } - - private void split(final HRegion region) throws IOException { + private void split(final HRegion region, final Text midKey) + throws IOException { final HRegionInfo oldRegionInfo = region.getRegionInfo(); - final HRegion[] newRegions = region.splitRegion(this); + final HRegion[] newRegions = region.splitRegion(this, midKey); if (newRegions == null) { // Didn't need to be split return; @@ -324,6 +352,8 @@ oldRegionInfo.setSplit(true); BatchUpdate update = new BatchUpdate(oldRegionInfo.getRegionName()); update.put(COL_REGIONINFO, Writables.getBytes(oldRegionInfo)); + update.delete(COL_SERVER); + update.delete(COL_STARTCODE); update.put(COL_SPLITA, Writables.getBytes(newRegions[0].getRegionInfo())); update.put(COL_SPLITB, Writables.getBytes(newRegions[1].getRegionInfo())); t.commit(update); @@ -358,7 +388,9 @@ try { // Remove region from regions Map and add it to the Map of retiring // regions. - retiringRegions.put(regionName, onlineRegions.remove(regionName)); + HRegion r = onlineRegions.remove(regionName); + removeRegion(r); + retiringRegions.put(regionName, r); if (LOG.isDebugEnabled()) { LOG.debug(regionName.toString() + " closing (" + "Adding to retiringRegions)"); @@ -389,17 +421,19 @@ final Integer cacheFlusherLock = new Integer(0); /** Flush cache upon request */ - class Flusher extends Thread implements CacheFlushListener { - private final DelayQueue flushQueue = + class Flusher extends Thread { + private DelayQueue flushQueue = new DelayQueue(); private final long optionalFlushPeriod; + private long lastOptionalFlushTime; /** constructor */ public Flusher() { super(); this.optionalFlushPeriod = conf.getLong( "hbase.regionserver.optionalcacheflushinterval", 30 * 60 * 1000L); + this.lastOptionalFlushTime = System.currentTimeMillis(); } @@ -411,40 +445,24 @@ try { e = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); if (e == null) { + long now = System.currentTimeMillis(); + if (lastOptionalFlushTime + optionalFlushPeriod <= now) { + optionalFlush(now, now - optionalFlushPeriod); + lastOptionalFlushTime = now; + } continue; } - synchronized(cacheFlusherLock) { // Don't interrupt while we're working - if (e.getRegion().flushcache()) { - compactSplitThread.compactionRequested(e); + Text storeName = e.getStore().storeName; + if (!onlineStores.containsKey(storeName)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Not flushing cache for store " + storeName + + " because it is no longer in the list of known stores"); } - - e.setExpirationTime(System.currentTimeMillis() + - optionalFlushPeriod); - flushQueue.add(e); + continue; } - - // Now insure that all the active regions are in the queue - - Set regions = getRegionsToCheck(); - for (HRegion r: regions) { - e = new QueueEntry(r, r.getLastFlushTime() + optionalFlushPeriod); - synchronized (flushQueue) { - if (!flushQueue.contains(e)) { - flushQueue.add(e); - } - } + synchronized (cacheFlusherLock) { // Don't interrupt while we're working + e.getRegion().flushcache(e.getStore()); } - - // Now make sure that the queue only contains active regions - - synchronized (flushQueue) { - for (Iterator i = flushQueue.iterator(); i.hasNext(); ) { - e = i.next(); - if (!regions.contains(e.getRegion())) { - i.remove(); - } - } - } } catch (InterruptedException ex) { continue; @@ -483,16 +501,34 @@ LOG.info(getName() + " exiting"); } - /** {@inheritDoc} */ - public void flushRequested(HRegion region) { - QueueEntry e = new QueueEntry(region, System.currentTimeMillis()); - synchronized (flushQueue) { - if (flushQueue.contains(e)) { - flushQueue.remove(e); + private void optionalFlush(long now, long lastFlushTime) { + synchronized (onlineStores) { + for (HStore s: onlineStores.values()) { + if (s.getLastFlushTime() > lastFlushTime) { + // Store was flushed more recently + continue; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Queueing optional cache flush for store " + s.storeName); + } + flushQueue.add(new QueueEntry(s, s.region, now)); } - flushQueue.add(e); } } + + /** + * Called (usually by HStore) when it needs its cache to be flushed + * + * @param store + * @param region + */ + public void flushRequested(HStore store, HRegion region) { + Text storeName = store.storeName; + if (LOG.isDebugEnabled()) { + LOG.debug("Queueing flush request for " + storeName); + } + flushQueue.add(new QueueEntry(store, region, System.currentTimeMillis())); + } } // HLog and HLog roller. log is protected rather than private to avoid @@ -1043,6 +1079,49 @@ return result; } + /** {@inheritDoc} */ + public void flushRequested(HStore store, HRegion region) { + cacheFlusher.flushRequested(store, region); + } + + /** + * Called whenever a region is being closed to remove all its stores from + * the queue + * + * @param region + */ + void removeRegion(HRegion region) { + synchronized (onlineStores) { + for (HStore s: region.getStores()) { + onlineStores.remove(s.storeName); + } + } + } + + /** + * Called whenever a region is opened to add its stores to the queue. + * + * @param region + */ + void addRegion(HRegion region) { + synchronized (onlineStores) { + for (HStore s: region.getStores()) { + if (!onlineStores.containsKey(s.storeName)) { + // We don't know about this store (which is good) + onlineStores.put(s.storeName, s); + } else { + LOG.error("Not adding store " + s.storeName + + " to online stores. It is already present."); + } + } + } + } + + /** {@inheritDoc} */ + public boolean compactionRequested(HStore store, HRegion region) { + return compactSplitThread.compactionRequested(store, region); + } + /** Add to the outbound message buffer */ private void reportOpen(HRegionInfo region) { outboundMsgs.add(new HMsg(HMsg.MSG_REPORT_OPEN, region)); @@ -1172,10 +1251,10 @@ HTableDescriptor.getTableDir(rootDir, regionInfo.getTableDesc().getName() ), - this.log, this.fs, conf, regionInfo, null, this.cacheFlusher + this.log, this.fs, conf, regionInfo, null, this ); // Startup a compaction early if one is needed. - this.compactSplitThread.compactionRequested(region); + region.compactStores(); } catch (IOException e) { LOG.error("error opening region " + regionInfo.getRegionName(), e); @@ -1192,6 +1271,7 @@ try { this.log.setSequenceNumber(region.getMinSequenceId()); this.onlineRegions.put(region.getRegionName(), region); + addRegion(region); } finally { this.lock.writeLock().unlock(); } @@ -1210,6 +1290,7 @@ } if(region != null) { + removeRegion(region); region.close(); if(reportWhenCompleted) { reportClose(hri); @@ -1222,8 +1303,13 @@ ArrayList regionsToClose = new ArrayList(); this.lock.writeLock().lock(); try { - regionsToClose.addAll(onlineRegions.values()); - onlineRegions.clear(); + synchronized (onlineRegions) { + regionsToClose.addAll(onlineRegions.values()); + onlineRegions.clear(); + } + for (HRegion r: regionsToClose) { + removeRegion(r); + } } finally { this.lock.writeLock().unlock(); } @@ -1266,6 +1352,7 @@ LOG.debug("closing region " + region.getRegionName()); } try { + removeRegion(region); region.close(); } catch (IOException e) { LOG.error("error closing region " + region.getRegionName(), @@ -1574,8 +1661,8 @@ /** * @return Immutable list of this servers regions. */ - public SortedMap getOnlineRegions() { - return Collections.unmodifiableSortedMap(this.onlineRegions); + public Map getOnlineRegions() { + return Collections.unmodifiableMap(this.onlineRegions); } /** @return the request count */ @@ -1583,11 +1670,6 @@ return this.requestCount; } - /** @return reference to CacheFlushListener */ - public CacheFlushListener getCacheFlushListener() { - return this.cacheFlusher; - } - /** * Protected utility method for safely obtaining an HRegion handle. * @param regionName Name of online {@link HRegion} to return @@ -1596,37 +1678,13 @@ */ protected HRegion getRegion(final Text regionName) throws NotServingRegionException { - return getRegion(regionName, false); - } - - /** - * Protected utility method for safely obtaining an HRegion handle. - * @param regionName Name of online {@link HRegion} to return - * @param checkRetiringRegions Set true if we're to check retiring regions - * as well as online regions. - * @return {@link HRegion} for regionName - * @throws NotServingRegionException - */ - protected HRegion getRegion(final Text regionName, - final boolean checkRetiringRegions) - throws NotServingRegionException { HRegion region = null; this.lock.readLock().lock(); try { region = onlineRegions.get(regionName); - if (region == null && checkRetiringRegions) { - region = this.retiringRegions.get(regionName); - if (LOG.isDebugEnabled()) { - if (region != null) { - LOG.debug("Found region " + regionName + " in retiringRegions"); - } - } - } - if (region == null) { throw new NotServingRegionException(regionName.toString()); } - return region; } finally { this.lock.readLock().unlock(); @@ -1677,22 +1735,25 @@ * @return Returns list of non-closed regions hosted on this server. If no * regions to check, returns an empty list. */ - protected Set getRegionsToCheck() { - HashSet regionsToCheck = new HashSet(); + protected Map getRegionsToCheck() { + Map regionsToCheck = new HashMap(); //TODO: is this locking necessary? lock.readLock().lock(); try { - regionsToCheck.addAll(this.onlineRegions.values()); + regionsToCheck.putAll(this.onlineRegions); } finally { lock.readLock().unlock(); } // Purge closed regions. - for (final Iterator i = regionsToCheck.iterator(); i.hasNext();) { - HRegion r = i.next(); - if (r.isClosed()) { - i.remove(); + List closedRegions = new ArrayList(); + for (Map.Entry e: regionsToCheck.entrySet()) { + if (e.getValue().isClosed()) { + closedRegions.add(e.getKey()); } } + for (Text r: closedRegions) { + regionsToCheck.remove(r); + } return regionsToCheck; } Index: src/java/org/apache/hadoop/hbase/HConstants.java =================================================================== --- src/java/org/apache/hadoop/hbase/HConstants.java (revision 630252) +++ src/java/org/apache/hadoop/hbase/HConstants.java (working copy) @@ -25,7 +25,8 @@ * HConstants holds a bunch of HBase-related constants */ public interface HConstants { - + + /** long constant for zero */ static final Long ZERO_L = Long.valueOf(0L); // For migration @@ -34,7 +35,7 @@ static final String VERSION_FILE_NAME = "hbase.version"; /** version of file system */ - static final String FILE_SYSTEM_VERSION = "0.1"; + static final String FILE_SYSTEM_VERSION = "2"; // Configuration parameters Index: src/java/org/apache/hadoop/hbase/HLog.java =================================================================== --- src/java/org/apache/hadoop/hbase/HLog.java (revision 630252) +++ src/java/org/apache/hadoop/hbase/HLog.java (working copy) @@ -35,6 +35,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; @@ -50,24 +51,25 @@ * underlying file is being rolled. * *

- * A single HLog is used by several HRegions simultaneously. + * A single HLog is used by several HRegion/HStores simultaneously. * *

- * Each HRegion is identified by a unique long int. HRegions do - * not need to declare themselves before using the HLog; they simply include - * their HRegion-id in the append or - * completeCacheFlush calls. + * Each HStore is identified by a unique string consisting of the encoded + * HRegion name and by the HStore family name. + * HStores do not need to declare themselves before using the HLog; they simply + * include their HStore.storeName in the append + * or completeCacheFlush calls. * *

* An HLog consists of multiple on-disk files, which have a chronological order. * As data is flushed to other (better) on-disk structures, the log becomes - * obsolete. We can destroy all the log messages for a given HRegion-id up to - * the most-recent CACHEFLUSH message from that HRegion. + * obsolete. We can destroy all the log messages for a given HStore up to + * the most-recent CACHEFLUSH message from that HStore. * *

* It's only practical to delete entire files. Thus, we delete an entire on-disk * file F when all of the messages in F have a log-sequence-id that's older - * (smaller) than the most-recent CACHEFLUSH message for every HRegion that has + * (smaller) than the most-recent CACHEFLUSH message for every HStore that has * a message in F. * *

@@ -77,7 +79,7 @@ * separate reentrant lock is used. * *

- * TODO: Vuk Ercegovac also pointed out that keeping HBase HRegion edit logs in + * TODO: Vuk Ercegovac also pointed out that keeping HBase edit logs in * HDFS is currently flawed. HBase writes edits to logs and to a memcache. The * 'atomic' write to the log is meant to serve as insurance against abnormal * RegionServer exit: on startup, the log is rerun to reconstruct an HRegion's @@ -109,7 +111,7 @@ Collections.synchronizedSortedMap(new TreeMap()); /* - * Map of region to last sequence/edit id. + * Map of store to last sequence/edit id. */ final Map lastSeqWritten = new ConcurrentHashMap(); @@ -238,7 +240,7 @@ if (this.outputfiles.size() > 0) { if (this.lastSeqWritten.size() <= 0) { LOG.debug("Last sequence written is empty. Deleting all old hlogs"); - // If so, then no new writes have come in since all regions were + // If so, then no new writes have come in since all stores were // flushed (and removed from the lastSeqWritten map). Means can // remove all but currently open log file. for (Map.Entry e : this.outputfiles.entrySet()) { @@ -251,17 +253,17 @@ Long oldestOutstandingSeqNum = Collections.min(this.lastSeqWritten.values()); // Get the set of all log files whose final ID is older than or - // equal to the oldest pending region operation + // equal to the oldest pending HStore operation TreeSet sequenceNumbers = new TreeSet(this.outputfiles.headMap( (Long.valueOf(oldestOutstandingSeqNum.longValue() + 1L))).keySet()); // Now remove old log files (if any) if (LOG.isDebugEnabled()) { - // Find region associated with oldest key -- helps debugging. - Text oldestRegion = null; + // Find store associated with oldest key -- helps debugging. + Text oldestStore = null; for (Map.Entry e: this.lastSeqWritten.entrySet()) { if (e.getValue().longValue() == oldestOutstandingSeqNum.longValue()) { - oldestRegion = e.getKey(); + oldestStore = e.getKey(); break; } } @@ -269,7 +271,7 @@ LOG.debug("Found " + sequenceNumbers.size() + " logs to remove " + "using oldest outstanding seqnum of " + - oldestOutstandingSeqNum + " from region " + oldestRegion); + oldestOutstandingSeqNum + " from store " + oldestStore); } } if (sequenceNumbers.size() > 0) { @@ -331,7 +333,7 @@ } /** - * Append a set of edits to the log. Log edits are keyed by regionName, + * Append a set of edits to the log. Log edits are keyed by storeName, * rowname, and log-sequence-id. * * Later, if we sort by these keys, we obtain all the relevant edits for a @@ -347,38 +349,32 @@ * synchronized prevents appends during the completion of a cache flush or for * the duration of a log roll. * - * @param regionName + * @param storeName * @param tableName - * @param row - * @param columns - * @param timestamp + * @param key + * @param value * @throws IOException */ - void append(Text regionName, Text tableName, - TreeMap edits) throws IOException { + void append(Text storeName, Text tableName, HStoreKey key, byte[] value) + throws IOException { if (closed) { throw new IOException("Cannot append; log is closed"); } synchronized (updateLock) { - long seqNum[] = obtainSeqNum(edits.size()); + long seqNum = obtainSeqNum(); // The 'lastSeqWritten' map holds the sequence number of the oldest - // write for each region. When the cache is flushed, the entry for the - // region being flushed is removed if the sequence number of the flush + // write for each HStore. When the cache is flushed, the entry for the + // store being flushed is removed if the sequence number of the flush // is greater than or equal to the value in lastSeqWritten. - if (!this.lastSeqWritten.containsKey(regionName)) { - this.lastSeqWritten.put(regionName, Long.valueOf(seqNum[0])); + if (!this.lastSeqWritten.containsKey(storeName)) { + this.lastSeqWritten.put(storeName, Long.valueOf(seqNum)); } - int counter = 0; - for (Map.Entry es : edits.entrySet()) { - HStoreKey key = es.getKey(); - HLogKey logKey = - new HLogKey(regionName, tableName, key.getRow(), seqNum[counter++]); - HLogEdit logEdit = - new HLogEdit(key.getColumn(), es.getValue(), key.getTimestamp()); - this.writer.append(logKey, logEdit); - this.numEntries++; - } + HLogKey logKey = new HLogKey(storeName, tableName, key.getRow(), seqNum); + HLogEdit logEdit = + new HLogEdit(key.getColumn(), value, key.getTimestamp()); + this.writer.append(logKey, logEdit); + this.numEntries++; } if (this.numEntries > this.maxlogentries) { if (listener != null) { @@ -409,22 +405,6 @@ } /** - * Obtain a specified number of sequence numbers - * - * @param num number of sequence numbers to obtain - * @return array of sequence numbers - */ - private long[] obtainSeqNum(int num) { - long[] results = new long[num]; - synchronized (this.sequenceLock) { - for (int i = 0; i < num; i++) { - results[i] = this.logSeqNum++; - } - } - return results; - } - - /** * By acquiring a log sequence ID, we can allow log messages to continue while * we flush the cache. * @@ -446,12 +426,12 @@ * * Protected by cacheFlushLock * - * @param regionName + * @param storeName * @param tableName * @param logSeqId * @throws IOException */ - void completeCacheFlush(final Text regionName, final Text tableName, + void completeCacheFlush(final Text storeName, final Text tableName, final long logSeqId) throws IOException { try { @@ -459,13 +439,14 @@ return; } synchronized (updateLock) { - this.writer.append(new HLogKey(regionName, tableName, HLog.METAROW, logSeqId), + this.writer.append( + new HLogKey(storeName, tableName, HLog.METAROW, logSeqId), new HLogEdit(HLog.METACOLUMN, HLogEdit.completeCacheFlush.get(), System.currentTimeMillis())); this.numEntries++; - Long seq = this.lastSeqWritten.get(regionName); + Long seq = this.lastSeqWritten.get(storeName); if (seq != null && logSeqId >= seq.longValue()) { - this.lastSeqWritten.remove(regionName); + this.lastSeqWritten.remove(storeName); } } } finally { @@ -496,7 +477,7 @@ */ public static void splitLog(Path rootDir, Path srcDir, FileSystem fs, Configuration conf) throws IOException { - Path logfiles[] = fs.listPaths(new Path[] { srcDir }); + FileStatus logfiles[] = fs.listStatus(srcDir); LOG.info("splitting " + logfiles.length + " log(s) in " + srcDir.toString()); Map logWriters = @@ -508,25 +489,28 @@ logfiles[i]); } // Check for empty file. - if (fs.getFileStatus(logfiles[i]).getLen() <= 0) { + if (logfiles[i].getLen() <= 0) { LOG.info("Skipping " + logfiles[i].toString() + " because zero length"); continue; } HLogKey key = new HLogKey(); HLogEdit val = new HLogEdit(); - SequenceFile.Reader in = new SequenceFile.Reader(fs, logfiles[i], conf); + SequenceFile.Reader in = + new SequenceFile.Reader(fs, logfiles[i].getPath(), conf); try { int count = 0; for (; in.next(key, val); count++) { Text tableName = key.getTablename(); - Text regionName = key.getRegionName(); + Text storeName = key.getStoreName(); + Text regionName = new Text(); + regionName.set(storeName.getBytes(), 0, storeName.find("/")); SequenceFile.Writer w = logWriters.get(regionName); if (w == null) { Path logfile = new Path( HRegion.getRegionDir( HTableDescriptor.getTableDir(rootDir, tableName), - HRegionInfo.encodeRegionName(regionName) + regionName.toString() ), HREGION_OLDLOGFILE_NAME ); Index: src/java/org/apache/hadoop/hbase/HStoreListener.java =================================================================== --- src/java/org/apache/hadoop/hbase/HStoreListener.java (revision 0) +++ src/java/org/apache/hadoop/hbase/HStoreListener.java (revision 0) @@ -0,0 +1,46 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +/** + * Implementors of this interface want to be notified when an HRegion + * determines that a cache flush is needed. A CacheFlushListener (or null) + * must be passed to the HRegion constructor. + */ +public interface HStoreListener { + + /** + * Tell the listener the cache needs to be flushed. + * + * @param store the HStore requesting the cache flush + * @param region the HRegion the HStore belongs to + */ + void flushRequested(HStore store, HRegion region); + + /** + * Tell the listener that the HStore needs compacting. + * + * @param store the HStore requesting the compaction. + * @param region the HRegion that the HStore belongs to + * @return true if a compaction was queued + */ + boolean compactionRequested(HStore store, HRegion region); +} Index: src/java/org/apache/hadoop/hbase/master/HMaster.java =================================================================== --- src/java/org/apache/hadoop/hbase/master/HMaster.java (revision 630252) +++ src/java/org/apache/hadoop/hbase/master/HMaster.java (working copy) @@ -148,6 +148,7 @@ /** Name of master server */ public static final String MASTER = "master"; + /** @return InfoServer object */ public InfoServer getInfoServer() { return infoServer; } @@ -272,14 +273,19 @@ if(! fs.exists(rootdir)) { fs.mkdirs(rootdir); FSUtils.setVersion(fs, rootdir); - } else if (!FSUtils.checkVersion(fs, rootdir)) { - // Output on stdout so user sees it in terminal. - String message = "The HBase data files stored on the FileSystem are " + - "from an earlier version of HBase. You need to run " + - "'${HBASE_HOME}/bin/hbase migrate' to bring your installation" + + } else { + String fsversion = FSUtils.checkVersion(fs, rootdir); + if (fsversion == null || + fsversion.compareTo(FILE_SYSTEM_VERSION) != 0) { + // Output on stdout so user sees it in terminal. + String message = "The HBase data files stored on the FileSystem " + + "are from an earlier version of HBase. You need to run " + + "'${HBASE_HOME}/bin/hbase migrate' to bring your installation " + "up-to-date."; - System.out.println("WARNING! " + message + " Master shutting down..."); - throw new IOException(message); + // Output on stdout so user sees it in terminal. + System.out.println("WARNING! " + message + " Master shutting down..."); + throw new IOException(message); + } } if (!fs.exists(rootRegionDir)) { Index: src/java/org/apache/hadoop/hbase/generated/regionserver/regionserver_jsp.java =================================================================== --- src/java/org/apache/hadoop/hbase/generated/regionserver/regionserver_jsp.java (revision 630252) +++ src/java/org/apache/hadoop/hbase/generated/regionserver/regionserver_jsp.java (working copy) @@ -8,9 +8,7 @@ import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.hbase.HRegionServer; import org.apache.hadoop.hbase.HRegion; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HServerInfo; -import org.apache.hadoop.hbase.HRegionInfo; public final class regionserver_jsp extends org.apache.jasper.runtime.HttpJspBase implements org.apache.jasper.runtime.JspSourceDependent { @@ -50,7 +48,8 @@ HRegionServer regionServer = (HRegionServer)getServletContext().getAttribute(HRegionServer.REGIONSERVER); HServerInfo serverInfo = regionServer.getServerInfo(); - SortedMap onlineRegions = regionServer.getOnlineRegions(); + SortedMap onlineRegions = + new TreeMap(regionServer.getOnlineRegions()); out.write("\n \n\n\nHbase Region Server: "); out.print( serverInfo.getServerAddress().toString() ); Index: src/java/org/apache/hadoop/hbase/HAbstractScanner.java =================================================================== --- src/java/org/apache/hadoop/hbase/HAbstractScanner.java (revision 630252) +++ src/java/org/apache/hadoop/hbase/HAbstractScanner.java (working copy) @@ -69,13 +69,13 @@ private Text col; ColumnMatcher(final Text col) throws IOException { - Text qualifier = HStoreKey.extractQualifier(col); + Text member = HStoreKey.extractMember(col); try { - if(qualifier == null || qualifier.getLength() == 0) { + if(member == null || member.getLength() == 0) { this.matchType = MATCH_TYPE.FAMILY_ONLY; - this.family = HStoreKey.extractFamily(col).toText(); + this.family = HStoreKey.extractFamily(col); this.wildCardmatch = true; - } else if(isRegexPattern.matcher(qualifier.toString()).matches()) { + } else if(isRegexPattern.matcher(member.toString()).matches()) { this.matchType = MATCH_TYPE.REGEX; this.columnMatcher = Pattern.compile(col.toString()); this.wildCardmatch = true; @@ -127,7 +127,7 @@ this.multipleMatchers = false; this.okCols = new TreeMap<Text, Vector<ColumnMatcher>>(); for(int i = 0; i < targetCols.length; i++) { - Text family = HStoreKey.extractFamily(targetCols[i]).toText(); + Text family = HStoreKey.extractFamily(targetCols[i]); Vector<ColumnMatcher> matchers = okCols.get(family); if(matchers == null) { matchers = new Vector<ColumnMatcher>(); Index: src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java =================================================================== --- src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java (revision 630252) +++ src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java (working copy) @@ -39,11 +39,12 @@ */ @Override public void reduce(Text key, Iterator<MapWritable> values, - OutputCollector<Text, MapWritable> output, Reporter reporter) + OutputCollector<Text, MapWritable> output, + @SuppressWarnings("unused") Reporter reporter) throws IOException { while(values.hasNext()) { - MapWritable r = (MapWritable)values.next(); + MapWritable r = values.next(); output.collect(key, r); } } Index: src/java/org/apache/hadoop/hbase/HRegion.java =================================================================== --- src/java/org/apache/hadoop/hbase/HRegion.java (revision 630252) +++ src/java/org/apache/hadoop/hbase/HRegion.java (working copy) @@ -21,6 +21,8 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -198,6 +200,7 @@ new ConcurrentHashMap<Long, TreeMap<HStoreKey, byte []>>(); final AtomicLong memcacheSize = new AtomicLong(0); + private final AtomicInteger cacheFlushesRequested = new AtomicInteger(0); final Path basedir; final HLog log; @@ -222,15 +225,11 @@ volatile WriteState writestate = new WriteState(); - final int memcacheFlushSize; - private volatile long lastFlushTime; - final CacheFlushListener flushListener; - final int blockingMemcacheSize; + private final HStoreListener listener; protected final long threadWakeFrequency; private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final Integer updateLock = new Integer(0); private final Integer splitLock = new Integer(0); - private final long desiredMaxFileSize; private final long minSequenceId; final AtomicInteger activeScannerCount = new AtomicInteger(0); @@ -255,14 +254,14 @@ * @param regionInfo - HRegionInfo that describes the region * @param initialFiles If there are initial files (implying that the HRegion * is new), then read them from the supplied path. - * @param listener an object that implements CacheFlushListener or null + * @param listener an object that implements HStoreListener or null * * @throws IOException */ public HRegion(Path basedir, HLog log, FileSystem fs, HBaseConfiguration conf, - HRegionInfo regionInfo, Path initialFiles, CacheFlushListener listener) - throws IOException { - + HRegionInfo regionInfo, Path initialFiles, HStoreListener listener) + throws IOException { + this.basedir = basedir; this.log = log; this.fs = fs; @@ -286,8 +285,8 @@ for(HColumnDescriptor c : this.regionInfo.getTableDesc().families().values()) { - HStore store = new HStore(this.basedir, this.regionInfo, c, this.fs, - oldLogFile, this.conf); + HStore store = + new HStore(this.basedir, this, c, this.fs, oldLogFile, this.conf); stores.put(c.getFamilyName(), store); @@ -312,23 +311,13 @@ fs.delete(merges); } - // By default, we flush the cache when 64M. - this.memcacheFlushSize = conf.getInt("hbase.hregion.memcache.flush.size", - 1024*1024*64); - this.flushListener = listener; - this.blockingMemcacheSize = this.memcacheFlushSize * - conf.getInt("hbase.hregion.memcache.block.multiplier", 1); + this.listener = listener; - // By default we split region if a file > DEFAULT_MAX_FILE_SIZE. - this.desiredMaxFileSize = - conf.getLong("hbase.hregion.max.filesize", DEFAULT_MAX_FILE_SIZE); - // HRegion is ready to go! this.writestate.compacting = false; - this.lastFlushTime = System.currentTimeMillis(); LOG.info("region " + this.regionInfo.getRegionName() + " available"); } - + /** * @return Updates to this region need to have a sequence id that is >= to * the this number. @@ -392,7 +381,7 @@ // region. writestate.writesEnabled = false; LOG.debug("compactions and cache flushes disabled for region " + - regionName); + regionName); while (writestate.compacting || writestate.flushing) { LOG.debug("waiting for" + (writestate.compacting ? " compaction" : "") + @@ -410,15 +399,15 @@ } lock.writeLock().lock(); LOG.debug("new updates and scanners for region " + regionName + - " disabled"); - + " disabled"); + try { // Wait for active scanners to finish. The write lock we hold will prevent // new scanners from being created. synchronized (activeScannerCount) { while (activeScannerCount.get() != 0) { LOG.debug("waiting for " + activeScannerCount.get() + - " scanners to finish"); + " scanners to finish"); try { activeScannerCount.wait(); @@ -434,17 +423,20 @@ // outstanding updates. waitOnRowLocks(); LOG.debug("no more row locks outstanding on region " + regionName); - + if (listener != null) { // If there is a listener, let them know that we have now // acquired all the necessary locks and are starting to // do the close listener.closing(getRegionName()); } - + // Don't flush the cache if we are aborting if (!abort) { - internalFlushcache(snapshotMemcaches()); + for (HStore s: stores.values()) { + s.snapshotMemcache(); + internalFlushcache(s); + } } List<HStoreFile> result = new ArrayList<HStoreFile>(); @@ -452,13 +444,13 @@ result.addAll(store.close()); } this.closed.set(true); - + if (listener != null) { // If there is a listener, tell them that the region is now // closed. listener.closed(getRegionName()); } - + LOG.info("closed " + this.regionInfo.getRegionName()); return result; } finally { @@ -516,11 +508,12 @@ return this.fs; } - /** @return the last time the region was flushed */ - public long getLastFlushTime() { - return this.lastFlushTime; + /** + * @return collection of stores that belong to this region + */ + public Collection<HStore> getStores() { + return Collections.unmodifiableCollection(this.stores.values()); } - ////////////////////////////////////////////////////////////////////////////// // HRegion maintenance. // @@ -529,32 +522,17 @@ ////////////////////////////////////////////////////////////////////////////// /** - * @return returns size of largest HStore. Also returns whether store is - * splitable or not (Its not splitable if region has a store that has a - * reference store file). + * @return returns size of largest HStore. */ - HStore.HStoreSize largestHStore(Text midkey) { - HStore.HStoreSize biggest = null; - boolean splitable = true; - for (HStore h: stores.values()) { - HStore.HStoreSize size = h.size(midkey); - // If we came across a reference down in the store, then propagate - // fact that region is not splitable. - if (splitable) { - splitable = size.splitable; + long getLargestHStoreSize() { + long size = 0; + for (HStore store: stores.values()) { + long storeSize = store.getSize(); + if (storeSize > size) { + size = storeSize; } - if (biggest == null) { - biggest = size; - continue; - } - if(size.getAggregate() > biggest.getAggregate()) { // Largest so far - biggest = size; - } } - if (biggest != null) { - biggest.setSplitable(splitable); - } - return biggest; + return size; } /* @@ -563,16 +541,17 @@ * but instead create new 'reference' store files that read off the top and * bottom ranges of parent store files. * @param listener May be null. + * @param midKey key on which to split region * @return two brand-new (and open) HRegions or null if a split is not needed * @throws IOException */ - HRegion[] splitRegion(final RegionUnavailableListener listener) - throws IOException { + HRegion[] splitRegion(final RegionUnavailableListener listener, + final Text midKey) throws IOException { synchronized (splitLock) { - Text midKey = new Text(); - if (closed.get() || !needsSplit(midKey)) { + if (closed.get()) { return null; } + LOG.info("Starting split of region " + getRegionName()); Path splits = new Path(this.regiondir, SPLITDIR); if(!this.fs.exists(splits)) { this.fs.mkdirs(splits); @@ -654,71 +633,6 @@ } /* - * Iterates through all the HStores and finds the one with the largest - * MapFile size. If the size is greater than the (currently hard-coded) - * threshold, returns true indicating that the region should be split. The - * midKey for the largest MapFile is returned through the midKey parameter. - * It is possible for us to rule the region non-splitable even in excess of - * configured size. This happens if region contains a reference file. If - * a reference file, the region can not be split. - * - * Note that there is no need to do locking in this method because it calls - * largestHStore which does the necessary locking. - * - * @param midKey midKey of the largest MapFile - * @return true if the region should be split. midKey is set by this method. - * Check it for a midKey value on return. - */ - boolean needsSplit(Text midKey) { - HStore.HStoreSize biggest = largestHStore(midKey); - if (biggest == null || midKey.getLength() == 0 || - (midKey.equals(getStartKey()) && midKey.equals(getEndKey())) ) { - return false; - } - boolean split = (biggest.getAggregate() >= this.desiredMaxFileSize); - if (split) { - if (!biggest.isSplitable()) { - LOG.warn("Region " + getRegionName().toString() + - " is NOT splitable though its aggregate size is " + - StringUtils.humanReadableInt(biggest.getAggregate()) + - " and desired size is " + - StringUtils.humanReadableInt(this.desiredMaxFileSize)); - split = false; - } else { - LOG.info("Splitting " + getRegionName().toString() + - " because largest aggregate size is " + - StringUtils.humanReadableInt(biggest.getAggregate()) + - " and desired size is " + - StringUtils.humanReadableInt(this.desiredMaxFileSize)); - } - } - return split; - } - - /** - * Only do a compaction if it is necessary - * - * @return - * @throws IOException - */ - boolean compactIfNeeded() throws IOException { - boolean needsCompaction = false; - for (HStore store: stores.values()) { - if (store.needsCompaction()) { - needsCompaction = true; - if (LOG.isDebugEnabled()) { - LOG.debug(store.toString() + " needs compaction"); - } - break; - } - } - if (!needsCompaction) { - return false; - } - return compactStores(); - } - - /* * @param dir * @return compaction directory for the passed in <code>dir</code> */ @@ -745,25 +659,42 @@ } } + boolean compactionRequested(HStore store) { + if (listener == null) { + return false; + } + return listener.compactionRequested(store, this); + } + /** - * Compact all the stores. This should be called periodically to make sure - * the stores are kept manageable. + * Called after region is opened to compact the HStores if necessary + */ + void compactStores() { + if (listener != null) { + for (HStore store: stores.values()) { + listener.compactionRequested(store, this); + } + } + } + + /** + * Compact the specified HStore * * <p>This operation could block for a long time, so don't call it from a * time-sensitive thread. - * - * @return Returns TRUE if the compaction has completed. FALSE, if the - * compaction was not carried out, because the HRegion is busy doing - * something else storage-intensive (like flushing the cache). The caller - * should check back later. * * Note that no locking is necessary at this level because compaction only * conflicts with a region split, and that cannot happen because the region * server does them sequentially and not in parallel. + * + * @param store the HStore to compact + * @return mid key if split is needed + * @throws IOException */ - boolean compactStores() throws IOException { + Text compactStore(HStore store) throws IOException { + Text midKey = null; if (this.closed.get()) { - return false; + return midKey; } try { synchronized (writestate) { @@ -771,36 +702,45 @@ writestate.compacting = true; } else { LOG.info("NOT compacting region " + - this.regionInfo.getRegionName().toString() + ": compacting=" + + getRegionName() + " store " + store.storeName + ": compacting=" + writestate.compacting + ", writesEnabled=" + writestate.writesEnabled); - return false; + return midKey; } } long startTime = System.currentTimeMillis(); - LOG.info("starting compaction on region " + - this.regionInfo.getRegionName().toString()); - boolean status = true; + LOG.info("starting compaction on region " + getRegionName() + " store " + + store.storeName); doRegionCompactionPrep(); - for (HStore store : stores.values()) { - if(!store.compact()) { - status = false; - } - } + midKey = store.compact(); doRegionCompactionCleanup(); - LOG.info("compaction completed on region " + - this.regionInfo.getRegionName().toString() + ". Took " + - StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime)); - return status; - + LOG.info("compaction completed on region " + getRegionName() + " store " + + store.storeName + ". Took " + + StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime)); + } finally { synchronized (writestate) { writestate.compacting = false; writestate.notifyAll(); } } + return midKey; } + void flushRequested(HStore store) { + synchronized (cacheFlushesRequested) { + if (listener == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Not queueing cache flush for " + store.storeName + + " because listener is null"); + } + } else { + listener.flushRequested(store, this); + cacheFlushesRequested.incrementAndGet(); + } + } + } + /** * Flush the cache. * @@ -814,16 +754,16 @@ * * <p>This method may block for some time, so it should not be called from a * time-sensitive thread. + * + * @param s store to be flushed * - * @return true if cache was flushed - * * @throws IOException * @throws DroppedSnapshotException Thrown when replay of hlog is required * because a Snapshot was not properly persisted. */ - boolean flushcache() throws IOException { + void flushcache(HStore s) throws IOException { if (this.closed.get()) { - return false; + return; } synchronized (writestate) { if (!writestate.flushing && writestate.writesEnabled) { @@ -835,17 +775,17 @@ writestate.flushing + ", writesEnabled=" + writestate.writesEnabled); } - return false; + return; } } try { lock.readLock().lock(); // Prevent splits and closes try { - long startTime = -1; - synchronized (updateLock) {// Stop updates while we snapshot the memcaches - startTime = snapshotMemcaches(); + synchronized (updateLock) { + // Stop updates while we snapshot the memcache + s.snapshotMemcache(); } - return internalFlushcache(startTime); + internalFlushcache(s); } finally { lock.readLock().unlock(); } @@ -857,32 +797,6 @@ } } - /* - * It is assumed that updates are blocked for the duration of this method - */ - private long snapshotMemcaches() { - if (this.memcacheSize.get() == 0) { - return -1; - } - long startTime = System.currentTimeMillis(); - - if(LOG.isDebugEnabled()) { - LOG.debug("Started memcache flush for region " + - this.regionInfo.getRegionName() + ". Size " + - StringUtils.humanReadableInt(this.memcacheSize.get())); - } - - // We reset the aggregate memcache size here so that subsequent updates - // will add to the unflushed size - - this.memcacheSize.set(0L); - - for (HStore hstore: stores.values()) { - hstore.snapshotMemcache(); - } - return startTime; - } - /** * Flushing the cache is a little tricky. We have a lot of updates in the * HMemcache, all of which have also been written to the log. We need to @@ -909,76 +823,90 @@ * * <p> This method may block for some time. * - * @param startTime the time the cache was snapshotted or -1 if a flush is - * not needed + * @param store the HStore to be flushed * - * @return true if the cache was flushed - * * @throws IOException * @throws DroppedSnapshotException Thrown when replay of hlog is required * because a Snapshot was not properly persisted. */ - private boolean internalFlushcache(long startTime) throws IOException { - if (startTime == -1) { - return false; + private void internalFlushcache(HStore store) throws IOException { + long startTime = System.currentTimeMillis(); + long sequenceId = -1; + if(LOG.isDebugEnabled()) { + LOG.debug("Started memcache flush for region " + + this.regionInfo.getRegionName() + " store " + store.toString()); } + try { + // We pass the log to the HMemcache, so we can lock down both + // simultaneously. We only have to do this for a moment: we need the + // HMemcache state at the time of a known log sequence number. Since + // multiple HRegions may write to a single HLog, the sequence numbers may + // zoom past unless we lock it. + // + // When execution returns from snapshotMemcacheForLog() with a non-NULL + // value, the HMemcache will have a snapshot object stored that must be + // explicitly cleaned up using a call to deleteSnapshot() or by calling + // abort. + // + sequenceId = log.startCacheFlush(); - // We pass the log to the HMemcache, so we can lock down both - // simultaneously. We only have to do this for a moment: we need the - // HMemcache state at the time of a known log sequence number. Since - // multiple HRegions may write to a single HLog, the sequence numbers may - // zoom past unless we lock it. - // - // When execution returns from snapshotMemcacheForLog() with a non-NULL - // value, the HMemcache will have a snapshot object stored that must be - // explicitly cleaned up using a call to deleteSnapshot() or by calling - // abort. - // - long sequenceId = log.startCacheFlush(); + // Any failure from here on out will be catastrophic requiring server + // restart so hlog content can be replayed and put back into the memcache. + // Otherwise, the snapshot content while backed up in the hlog, it will + // not be part of the current running servers state. - // Any failure from here on out will be catastrophic requiring server - // restart so hlog content can be replayed and put back into the memcache. - // Otherwise, the snapshot content while backed up in the hlog, it will not - // be part of the current running servers state. + try { + // A. Flush memcache for the specified HStore + // Keep running vector of all store files that includes both old and the + // just-made new flush store file. - try { - // A. Flush memcache to all the HStores. - // Keep running vector of all store files that includes both old and the - // just-made new flush store file. - - for (HStore hstore: stores.values()) { - hstore.flushCache(sequenceId); + long bytesFlushed = store.flushCache(sequenceId); + memcacheSize.set(memcacheSize.get() - bytesFlushed); + if (memcacheSize.get() < 0) { + LOG.warn("memcacheSize < 0. Resetting to zero"); + memcacheSize.set(0L); + } + } catch (IOException e) { + // An exception here means that the snapshot was not persisted. + // The hlog needs to be replayed so its content is restored to memcache. + // Currently, only a server restart will do this. + this.log.abortCacheFlush(); + throw new DroppedSnapshotException(e.getMessage()); } - } catch (IOException e) { - // An exception here means that the snapshot was not persisted. - // The hlog needs to be replayed so its content is restored to memcache. - // Currently, only a server restart will do this. - this.log.abortCacheFlush(); - throw new DroppedSnapshotException(e.getMessage()); - } - // If we get to here, the HStores have been written. If we get an - // error in completeCacheFlush it will release the lock it is holding + // If we get to here, the HStores has been written. If we get an + // error in completeCacheFlush it will release the lock it is holding - // B. Write a FLUSHCACHE-COMPLETE message to the log. - // This tells future readers that the HStores were emitted correctly, - // and that all updates to the log for this regionName that have lower - // log-sequence-ids can be safely ignored. - this.log.completeCacheFlush(this.regionInfo.getRegionName(), - regionInfo.getTableDesc().getName(), sequenceId); + // B. Write a FLUSHCACHE-COMPLETE message to the log. + // This tells future readers that the HStore was emitted correctly, + // and that all updates to the log for this store that have lower + // log-sequence-ids can be safely ignored. + this.log.completeCacheFlush(store.storeName, getTableDesc().getName(), + sequenceId); - // D. Finally notify anyone waiting on memcache to clear: - // e.g. checkResources(). - synchronized (this) { - notifyAll(); + } finally { + // D. Finally notify anyone waiting on memcache to clear: + // e.g. checkResources(). + synchronized (cacheFlushesRequested) { + int flushes = cacheFlushesRequested.decrementAndGet(); + if (flushes < 0) { + // This can happen because optional cache flushes are not queued + // by flushRequested + cacheFlushesRequested.set(0); + } + if (flushes < stores.size()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Waking threads blocked on cache flush (if any)"); + } + cacheFlushesRequested.notifyAll(); + } + } } if (LOG.isDebugEnabled()) { - LOG.debug("Finished memcache flush for region " + - this.regionInfo.getRegionName() + " in " + - (System.currentTimeMillis() - startTime) + "ms, sequenceid=" + - sequenceId); + LOG.debug("Finished memcache flush for store " + store.storeName + + " in " + (System.currentTimeMillis() - startTime) + + "ms, sequenceid=" + sequenceId); } - return true; } ////////////////////////////////////////////////////////////////////////////// @@ -1220,7 +1148,7 @@ * @throws IOException */ public void batchUpdate(BatchUpdate b) - throws IOException { + throws IOException { // Do a rough check that we have resources to accept a write. The check is // 'rough' in that between the resource check and the call to obtain a // read lock, resources may run out. For now, the thought is that this @@ -1237,52 +1165,52 @@ long commitTime = (b.getTimestamp() == LATEST_TIMESTAMP) ? System.currentTimeMillis() : b.getTimestamp(); - - try { - List<Text> deletes = null; - for (BatchOperation op: b) { - HStoreKey key = new HStoreKey(row, op.getColumn(), commitTime); - byte[] val = null; - if (op.isPut()) { - val = op.getValue(); - if (HLogEdit.isDeleted(val)) { - throw new IOException("Cannot insert value: " + val); - } - } else { - if (b.getTimestamp() == LATEST_TIMESTAMP) { - // Save off these deletes - if (deletes == null) { - deletes = new ArrayList<Text>(); + + try { + List<Text> deletes = null; + for (BatchOperation op: b) { + HStoreKey key = new HStoreKey(row, op.getColumn(), commitTime); + byte[] val = null; + if (op.isPut()) { + val = op.getValue(); + if (HLogEdit.isDeleted(val)) { + throw new IOException("Cannot insert value: " + val); } - deletes.add(op.getColumn()); } else { - val = HLogEdit.deleteBytes.get(); + if (b.getTimestamp() == LATEST_TIMESTAMP) { + // Save off these deletes + if (deletes == null) { + deletes = new ArrayList<Text>(); + } + deletes.add(op.getColumn()); + } else { + val = HLogEdit.deleteBytes.get(); + } } + if (val != null) { + localput(lockid, key, val); + } } - if (val != null) { - localput(lockid, key, val); + TreeMap<HStoreKey, byte[]> edits = + this.targetColumns.remove(Long.valueOf(lockid)); + if (edits != null && edits.size() > 0) { + update(edits); } - } - TreeMap<HStoreKey, byte[]> edits = - this.targetColumns.remove(Long.valueOf(lockid)); - if (edits != null && edits.size() > 0) { - update(edits); - } - - if (deletes != null && deletes.size() > 0) { - // We have some LATEST_TIMESTAMP deletes to run. - for (Text column: deletes) { - deleteMultiple(row, column, LATEST_TIMESTAMP, 1); + + if (deletes != null && deletes.size() > 0) { + // We have some LATEST_TIMESTAMP deletes to run. + for (Text column: deletes) { + deleteMultiple(row, column, LATEST_TIMESTAMP, 1); + } } - } - } catch (IOException e) { - this.targetColumns.remove(Long.valueOf(lockid)); - throw e; - - } finally { - releaseRowLock(row); - } + } catch (IOException e) { + this.targetColumns.remove(Long.valueOf(lockid)); + throw e; + + } finally { + releaseRowLock(row); + } } /* @@ -1290,34 +1218,33 @@ * * For now, just checks memcache saturation. * - * Here we synchronize on HRegion, a broad scoped lock. Its appropriate + * Here we synchronize on cacheFlushesRequested. Its appropriate * given we're figuring in here whether this region is able to take on * writes. This is only method with a synchronize (at time of writing), - * this and the synchronize on 'this' inside in internalFlushCache to send - * the notify. + * and the synchronize inside in internalFlushCache to send the notify. */ - private synchronized void checkResources() { + private void checkResources() { boolean blocked = false; - - while (this.memcacheSize.get() >= this.blockingMemcacheSize) { - if (!blocked) { - LOG.info("Blocking updates for '" + Thread.currentThread().getName() + - "': Memcache size " + - StringUtils.humanReadableInt(this.memcacheSize.get()) + - " is >= than blocking " + - StringUtils.humanReadableInt(this.blockingMemcacheSize) + " size"); - } - blocked = true; - try { - wait(threadWakeFrequency); - } catch (InterruptedException e) { - // continue; + synchronized (cacheFlushesRequested) { + while (this.cacheFlushesRequested.get() >= stores.size()) { + if (!blocked) { + LOG.info("Blocking updates for '" + Thread.currentThread().getName() + + "': cache flushes requested " + cacheFlushesRequested.get() + + " is >= max flush request count " + stores.size()); + } + + blocked = true; + try { + cacheFlushesRequested.wait(threadWakeFrequency); + } catch (InterruptedException e) { + // continue; + } } } if (blocked) { LOG.info("Unblocking updates for region " + getRegionName() + " '" + - Thread.currentThread().getName() + "'"); + Thread.currentThread().getName() + "'"); } } @@ -1462,21 +1389,13 @@ return; } synchronized (updateLock) { // prevent a cache flush - this.log.append(regionInfo.getRegionName(), - regionInfo.getTableDesc().getName(), updatesByColumn); - - long size = 0; for (Map.Entry<HStoreKey, byte[]> e: updatesByColumn.entrySet()) { HStoreKey key = e.getKey(); byte[] val = e.getValue(); - size = this.memcacheSize.addAndGet(key.getSize() + - (val == null ? 0 : val.length)); - stores.get(HStoreKey.extractFamily(key.getColumn())).add(key, val); + HStore store = stores.get(key.getFamily()); + this.log.append(store.storeName, getTableDesc().getName(), key, val); + memcacheSize.addAndGet(store.add(key, val)); } - if (this.flushListener != null && size > this.memcacheFlushSize) { - // Request a cache flush - this.flushListener.flushRequested(this); - } } } @@ -1820,6 +1739,7 @@ * @param r HRegion to add to <code>meta</code> * * @throws IOException + * @see #removeRegionFromMETA(HRegionInterface, Text, Text) */ public static void addRegionToMETA(HRegion meta, HRegion r) throws IOException { @@ -1848,10 +1768,10 @@ * @param regionName HRegion to remove from <code>meta</code> * * @throws IOException + * @see #addRegionToMETA(HRegion, HRegion) */ public static void removeRegionFromMETA(final HRegionInterface srvr, - final Text metaRegionName, final Text regionName) - throws IOException { + final Text metaRegionName, final Text regionName) throws IOException { srvr.deleteAll(metaRegionName, regionName, HConstants.LATEST_TIMESTAMP); } @@ -1862,6 +1782,7 @@ * @param info HRegion to update in <code>meta</code> * * @throws IOException + * @see #addRegionToMETA(HRegion, HRegion) */ public static void offlineRegionInMETA(final HRegionInterface srvr, final Text metaRegionName, final HRegionInfo info) Index: src/java/org/apache/hadoop/hbase/util/Migrate.java =================================================================== --- src/java/org/apache/hadoop/hbase/util/Migrate.java (revision 630252) +++ src/java/org/apache/hadoop/hbase/util/Migrate.java (working copy) @@ -25,6 +25,8 @@ import java.io.InputStreamReader; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.HashMap; import java.util.HashSet; @@ -64,7 +66,7 @@ /** * Perform a file system upgrade to convert older file layouts to that - * supported by HADOOP-2478 + * supported by HADOOP-2478, and then to the form supported by HBASE-69 */ public class Migrate extends Configured implements Tool { static final Log LOG = LogFactory.getLog(Migrate.class); @@ -96,10 +98,11 @@ options.put("prompt", ACTION.PROMPT); } + private FileSystem fs = null; + private Path rootdir = null; private boolean readOnly = false; private boolean migrationNeeded = false; private boolean newRootRegion = false; - private ACTION logFiles = ACTION.IGNORE; private ACTION otherFiles = ACTION.IGNORE; private BufferedReader reader = null; @@ -127,7 +130,7 @@ } try { - FileSystem fs = FileSystem.get(conf); // get DFS handle + fs = FileSystem.get(conf); // get DFS handle LOG.info("Verifying that file system is available..."); if (!FSUtils.isFileSystemAvailable(fs)) { @@ -148,8 +151,7 @@ LOG.info("Starting upgrade" + (readOnly ? " check" : "")); - Path rootdir = - fs.makeQualified(new Path(this.conf.get(HConstants.HBASE_DIR))); + rootdir = fs.makeQualified(new Path(this.conf.get(HConstants.HBASE_DIR))); if (!fs.exists(rootdir)) { throw new FileNotFoundException("HBase root directory " + @@ -158,40 +160,28 @@ // See if there is a file system version file - if (FSUtils.checkVersion(fs, rootdir)) { + String version = FSUtils.checkVersion(fs, rootdir); + if (version != null && + version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) { LOG.info("No upgrade necessary."); return 0; } - // check to see if new root region dir exists + // Get contents of root directory + + FileStatus[] rootFiles = getRootDirFiles(); - checkNewRootRegionDirExists(fs, rootdir); - - // check for "extra" files and for old upgradable regions - - extraFiles(fs, rootdir); - - if (!newRootRegion) { - // find root region - - Path rootRegion = new Path(rootdir, - OLD_PREFIX + HRegionInfo.rootRegionInfo.getEncodedName()); - - if (!fs.exists(rootRegion)) { - throw new IOException("Cannot find root region " + - rootRegion.toString()); - } else if (readOnly) { - migrationNeeded = true; - } else { - migrateRegionDir(fs, rootdir, HConstants.ROOT_TABLE_NAME, rootRegion); - scanRootRegion(fs, rootdir); - - // scan for left over regions - - extraRegions(fs, rootdir); - } + if (version == null) { + migrateFromNoVersion(rootFiles); + migrateToV2(rootFiles); + } else if (version.compareTo("0.1") == 0) { + migrateToV2(rootFiles); + } else if (version.compareTo("2") == 0) { + // Nothing to do (yet) + } else { + throw new IOException("Unrecognized version: " + version); } - + if (!readOnly) { // set file system version LOG.info("Setting file system version."); @@ -207,21 +197,81 @@ } } - private void checkNewRootRegionDirExists(FileSystem fs, Path rootdir) - throws IOException { + private void migrateFromNoVersion(FileStatus[] rootFiles) throws IOException { + // check to see if new root region dir exists + + checkNewRootRegionDirExists(); + + // check for unrecovered region server log files + + checkForUnrecoveredLogFiles(rootFiles); + + // check for "extra" files and for old upgradable regions + + extraFiles(rootFiles); + + if (!newRootRegion) { + // find root region + + Path rootRegion = new Path(rootdir, + OLD_PREFIX + HRegionInfo.rootRegionInfo.getEncodedName()); + + if (!fs.exists(rootRegion)) { + throw new IOException("Cannot find root region " + + rootRegion.toString()); + } else if (readOnly) { + migrationNeeded = true; + } else { + migrateRegionDir(HConstants.ROOT_TABLE_NAME, rootRegion); + scanRootRegion(); + + // scan for left over regions + + extraRegions(); + } + } + } + + private void migrateToV2(FileStatus[] rootFiles) throws IOException { + checkForUnrecoveredLogFiles(rootFiles); + } + + private FileStatus[] getRootDirFiles() throws IOException { + FileStatus[] stats = fs.listStatus(rootdir); + if (stats == null || stats.length == 0) { + throw new IOException("No files found under root directory " + + rootdir.toString()); + } + return stats; + } + + private void checkNewRootRegionDirExists() throws IOException { Path rootRegionDir = HRegion.getRegionDir(rootdir, HRegionInfo.rootRegionInfo); newRootRegion = fs.exists(rootRegionDir); migrationNeeded = !newRootRegion; } + + private void checkForUnrecoveredLogFiles(FileStatus[] rootFiles) + throws IOException { + List<String> unrecoveredLogs = new ArrayList<String>(); + for (int i = 0; i < rootFiles.length; i++) { + String name = rootFiles[i].getPath().getName(); + if (name.startsWith("log_")) { + unrecoveredLogs.add(name); + } + } + if (unrecoveredLogs.size() != 0) { + throw new IOException("There are " + unrecoveredLogs.size() + + " unrecovered region server logs. Please uninstall this version of " + + "HBase, re-install the previous version, start your cluster and " + + "shut it down cleanly, so that all region server logs are recovered" + + " and deleted."); + } + } // Check for files that should not be there or should be migrated - private void extraFiles(FileSystem fs, Path rootdir) throws IOException { - FileStatus[] stats = fs.listStatus(rootdir); - if (stats == null || stats.length == 0) { - throw new IOException("No files found under root directory " + - rootdir.toString()); - } + private void extraFiles(FileStatus[] stats) throws IOException { for (int i = 0; i < stats.length; i++) { String name = stats[i].getPath().getName(); if (name.startsWith(OLD_PREFIX)) { @@ -234,31 +284,27 @@ } catch (NumberFormatException e) { extraFile(otherFiles, "Old region format can not be upgraded: " + - name, fs, stats[i].getPath()); + name, stats[i].getPath()); } } else { // Since the new root region directory exists, we assume that this // directory is not necessary - extraFile(otherFiles, "Old region directory found: " + name, fs, + extraFile(otherFiles, "Old region directory found: " + name, stats[i].getPath()); } } else { // File name does not start with "hregion_" - if (name.startsWith("log_")) { - String message = "Unrecovered region server log file " + name + - " this file can be recovered by the master when it starts."; - extraFile(logFiles, message, fs, stats[i].getPath()); - } else if (!newRootRegion) { + if (!newRootRegion) { // new root region directory does not exist. This is an extra file String message = "Unrecognized file " + name; - extraFile(otherFiles, message, fs, stats[i].getPath()); + extraFile(otherFiles, message, stats[i].getPath()); } } } } - private void extraFile(ACTION action, String message, FileSystem fs, - Path p) throws IOException { + private void extraFile(ACTION action, String message, Path p) + throws IOException { if (action == ACTION.ABORT) { throw new IOException(message + " aborting"); @@ -277,8 +323,8 @@ } } - private void migrateRegionDir(FileSystem fs, Path rootdir, Text tableName, - Path oldPath) throws IOException { + private void migrateRegionDir(Text tableName, Path oldPath) + throws IOException { // Create directory where table will live @@ -323,7 +369,7 @@ } } - private void scanRootRegion(FileSystem fs, Path rootdir) throws IOException { + private void scanRootRegion() throws IOException { HLog log = new HLog(fs, new Path(rootdir, HConstants.HREGION_LOGDIR_NAME), conf, null); @@ -354,12 +400,12 @@ // First move the meta region to where it should be and rename // subdirectories as necessary - migrateRegionDir(fs, rootdir, HConstants.META_TABLE_NAME, + migrateRegionDir(HConstants.META_TABLE_NAME, new Path(rootdir, OLD_PREFIX + info.getEncodedName())); // Now scan and process the meta table - scanMetaRegion(fs, rootdir, log, info); + scanMetaRegion(log, info); } } finally { @@ -375,8 +421,7 @@ } } - private void scanMetaRegion(FileSystem fs, Path rootdir, HLog log, - HRegionInfo info) throws IOException { + private void scanMetaRegion(HLog log, HRegionInfo info) throws IOException { HRegion metaRegion = new HRegion( new Path(rootdir, info.getTableDesc().getName().toString()), log, fs, @@ -402,7 +447,7 @@ // Move the region to where it should be and rename // subdirectories as necessary - migrateRegionDir(fs, rootdir, region.getTableDesc().getName(), + migrateRegionDir(region.getTableDesc().getName(), new Path(rootdir, OLD_PREFIX + region.getEncodedName())); results.clear(); @@ -417,7 +462,7 @@ } } - private void extraRegions(FileSystem fs, Path rootdir) throws IOException { + private void extraRegions() throws IOException { FileStatus[] stats = fs.listStatus(rootdir); if (stats == null || stats.length == 0) { throw new IOException("No files found under root directory " + @@ -436,7 +481,7 @@ message = "Region not in meta table and no other regions reference it " + name; } - extraFile(otherFiles, message, fs, stats[i].getPath()); + extraFile(otherFiles, message, stats[i].getPath()); } } } @@ -444,18 +489,11 @@ @SuppressWarnings("static-access") private int parseArgs(String[] args) { Options opts = new Options(); - Option logFiles = OptionBuilder.withArgName(ACTIONS) - .hasArg() - .withDescription( - "disposition of unrecovered region server logs: {abort|ignore|delete|prompt}") - .create("logfiles"); - Option extraFiles = OptionBuilder.withArgName(ACTIONS) .hasArg() .withDescription("disposition of 'extra' files: {abort|ignore|delete|prompt}") .create("extrafiles"); - opts.addOption(logFiles); opts.addOption(extraFiles); GenericOptionsParser parser = @@ -474,21 +512,12 @@ } if (readOnly) { - this.logFiles = ACTION.IGNORE; this.otherFiles = ACTION.IGNORE; } else { CommandLine commandLine = parser.getCommandLine(); ACTION action = null; - if (commandLine.hasOption("logfiles")) { - action = options.get(commandLine.getOptionValue("logfiles")); - if (action == null) { - usage(); - return -1; - } - this.logFiles = action; - } if (commandLine.hasOption("extrafiles")) { action = options.get(commandLine.getOptionValue("extrafiles")); if (action == null) { @@ -506,9 +535,6 @@ System.err.println(" check perform upgrade checks only."); System.err.println(" upgrade perform upgrade checks and modify hbase.\n"); System.err.println(" Options are:"); - System.err.println(" -logfiles={abort|ignore|delete|prompt}"); - System.err.println(" action to take when unrecovered region"); - System.err.println(" server log files are found.\n"); System.err.println(" -extrafiles={abort|ignore|delete|prompt}"); System.err.println(" action to take if \"extra\" files are found.\n"); System.err.println(" -conf <configuration file> specify an application configuration file"); Index: src/java/org/apache/hadoop/hbase/util/FSUtils.java =================================================================== --- src/java/org/apache/hadoop/hbase/util/FSUtils.java (revision 630252) +++ src/java/org/apache/hadoop/hbase/util/FSUtils.java (working copy) @@ -81,20 +81,19 @@ * * @param fs * @param rootdir - * @return true if the current file system is the correct version + * @return null if no version file exists, version string otherwise. * @throws IOException */ - public static boolean checkVersion(FileSystem fs, Path rootdir) throws IOException { + public static String checkVersion(FileSystem fs, Path rootdir) throws IOException { Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME); - boolean versionOk = false; + String version = null; if (fs.exists(versionFile)) { FSDataInputStream s = fs.open(new Path(rootdir, HConstants.VERSION_FILE_NAME)); - String version = DataInputStream.readUTF(s); + version = DataInputStream.readUTF(s); s.close(); - versionOk = version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0; } - return versionOk; + return version; } /**