Index: src/test/org/apache/hadoop/hbase/TestGet2.java =================================================================== --- src/test/org/apache/hadoop/hbase/TestGet2.java (revision 620703) +++ src/test/org/apache/hadoop/hbase/TestGet2.java (working copy) @@ -80,7 +80,9 @@ assertColumnsPresent(region, actualStartRow); assertColumnsPresent(region, actualStopRow); // Force a flush so store files come into play. - region.flushcache(); + for (HStore s: region.getStores()) { + region.flushcache(s); + } // Assert I got all out. assertColumnsPresent(region, actualStartRow); assertColumnsPresent(region, actualStopRow); @@ -143,15 +145,17 @@ } } - /** For HADOOP-2443 */ - public void testGetClosestRowBefore() throws IOException{ + /** + * For HADOOP-2443 + * @throws IOException + */ + public void testGetClosestRowBefore() throws IOException { HRegion region = null; HRegionIncommon region_incommon = null; try { HTableDescriptor htd = createTableDescriptor(getName()); - HRegionInfo hri = new HRegionInfo(htd, null, null); region = createNewHRegion(htd, null, null); region_incommon = new HRegionIncommon(region); @@ -193,7 +197,9 @@ assertEquals(new String(results.get(COLUMNS[0])), "t40 bytes"); // force a flush - region.flushcache(); + for (HStore s: region.getStores()) { + region.flushcache(s); + } // try finding "015" results = region.getClosestRowBefore(t15, HConstants.LATEST_TIMESTAMP); Index: src/test/org/apache/hadoop/hbase/TestBloomFilters.java =================================================================== --- src/test/org/apache/hadoop/hbase/TestBloomFilters.java (revision 620703) +++ src/test/org/apache/hadoop/hbase/TestBloomFilters.java (working copy) @@ -24,6 +24,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.Text; +import org.apache.hadoop.hbase.io.BatchUpdate; + /** Tests per-column bloom filters */ public class TestBloomFilters extends HBaseClusterTestCase { static final Log LOG = LogFactory.getLog(TestBloomFilters.class); @@ -143,8 +145,6 @@ /** constructor */ public TestBloomFilters() { super(); - conf.set("hbase.hregion.memcache.flush.size", "100");// flush cache every 100 bytes - conf.set("hbase.regionserver.maxlogentries", "90"); // and roll log too } /** @@ -189,9 +189,9 @@ for(int i = 0; i < 100; i++) { Text row = rows[i]; String value = row.toString(); - long lockid = table.startUpdate(rows[i]); - table.put(lockid, CONTENTS, value.getBytes(HConstants.UTF8_ENCODING)); - table.commit(lockid); + BatchUpdate b = new BatchUpdate(row); + b.put(CONTENTS, value.getBytes(HConstants.UTF8_ENCODING)); + table.commit(b); } try { // Give cache flusher and log roller a chance to run @@ -255,9 +255,9 @@ for(int i = 0; i < 100; i++) { Text row = rows[i]; String value = row.toString(); - long lockid = table.startUpdate(rows[i]); - table.put(lockid, CONTENTS, value.getBytes(HConstants.UTF8_ENCODING)); - table.commit(lockid); + BatchUpdate b = new BatchUpdate(row); + b.put(CONTENTS, value.getBytes(HConstants.UTF8_ENCODING)); + table.commit(b); } try { // Give cache flusher and log roller a chance to run Index: src/test/org/apache/hadoop/hbase/HBaseTestCase.java =================================================================== --- src/test/org/apache/hadoop/hbase/HBaseTestCase.java (revision 620703) +++ src/test/org/apache/hadoop/hbase/HBaseTestCase.java (working copy) @@ -327,6 +327,7 @@ public static interface Incommon { /** * @param row + * @return lock id * @throws IOException */ public long startUpdate(Text row) throws IOException; @@ -496,7 +497,9 @@ } /** {@inheritDoc} */ public void flushcache() throws IOException { - this.region.flushcache(); + for (HStore store: this.region.getStores()) { + this.region.flushcache(store); + } } } @@ -505,6 +508,14 @@ */ public static class HTableIncommon implements Incommon { final HTable table; + BatchUpdate batch = null; + + private void checkBatch() { + if (batch == null) { + throw new IllegalStateException("No update in progress"); + } + } + /** * @param table */ @@ -513,24 +524,34 @@ this.table = table; } /** {@inheritDoc} */ - public void abort(long lockid) { - this.table.abort(lockid); + public void abort(@SuppressWarnings("unused") long lockid) { + this.batch = null; } /** {@inheritDoc} */ public void commit(long lockid) throws IOException { - this.table.commit(lockid); + commit(lockid, HConstants.LATEST_TIMESTAMP); } /** {@inheritDoc} */ - public void commit(long lockid, final long ts) throws IOException { - this.table.commit(lockid, ts); + public void commit(@SuppressWarnings("unused") long lockid, final long ts) + throws IOException { + checkBatch(); + try { + this.batch.setTimestamp(ts); + this.table.commit(batch); + } finally { + this.batch = null; + } } /** {@inheritDoc} */ - public void put(long lockid, Text column, byte[] val) { - this.table.put(lockid, column, val); + public void put(@SuppressWarnings("unused") long lockid, Text column, + byte[] val) { + checkBatch(); + this.batch.put(column, val); } /** {@inheritDoc} */ - public void delete(long lockid, Text column) { - this.table.delete(lockid, column); + public void delete(@SuppressWarnings("unused") long lockid, Text column) { + checkBatch(); + this.batch.delete(column); } /** {@inheritDoc} */ public void deleteAll(Text row, Text column, long ts) throws IOException { @@ -538,7 +559,11 @@ } /** {@inheritDoc} */ public long startUpdate(Text row) { - return this.table.startUpdate(row); + if (this.batch != null) { + throw new IllegalStateException("Update already in progress"); + } + this.batch = new BatchUpdate(row); + return 1; } /** {@inheritDoc} */ public HScannerInterface getScanner(Text [] columns, Text firstRow, Index: src/test/org/apache/hadoop/hbase/AbstractMergeTestBase.java =================================================================== --- src/test/org/apache/hadoop/hbase/AbstractMergeTestBase.java (revision 620703) +++ src/test/org/apache/hadoop/hbase/AbstractMergeTestBase.java (working copy) @@ -146,7 +146,6 @@ r.flushcache(); } } - region.compactIfNeeded(); region.close(); region.getLog().closeAndDelete(); region.getRegionInfo().setOffline(true); Index: src/test/org/apache/hadoop/hbase/TestHLog.java =================================================================== --- src/test/org/apache/hadoop/hbase/TestHLog.java (revision 620703) +++ src/test/org/apache/hadoop/hbase/TestHLog.java (working copy) @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase; import java.io.IOException; -import java.util.TreeMap; import org.apache.hadoop.dfs.MiniDFSCluster; import org.apache.hadoop.fs.Path; @@ -71,12 +70,11 @@ for (int ii = 0; ii < 3; ii++) { for (int i = 0; i < 3; i++) { for (int j = 0; j < 3; j++) { - TreeMap edit = new TreeMap(); Text column = new Text(Integer.toString(j)); - edit.put( - new HStoreKey(rowName, column, System.currentTimeMillis()), - column.getBytes()); - log.append(new Text(Integer.toString(i)), tableName, edit); + HStoreKey k = + new HStoreKey(rowName, column, System.currentTimeMillis()); + log.append(new Text(Integer.toString(i) + "/" + column), tableName, + k, column.getBytes()); } } log.rollWriter(); @@ -104,12 +102,11 @@ // Write columns named 1, 2, 3, etc. and then values of single byte // 1, 2, 3... long timestamp = System.currentTimeMillis(); - TreeMap cols = new TreeMap(); for (int i = 0; i < COL_COUNT; i++) { - cols.put(new HStoreKey(row, new Text(Integer.toString(i)), timestamp), - new byte[] { (byte)(i + '0') }); + HStoreKey k = + new HStoreKey(row, new Text(Integer.toString(i)), timestamp); + log.append(regionName, tableName, k, new byte[] { (byte)(i + '0') }); } - log.append(regionName, tableName, cols); long logSeqId = log.startCacheFlush(); log.completeCacheFlush(regionName, tableName, logSeqId); log.close(); @@ -121,7 +118,7 @@ HLogEdit val = new HLogEdit(); for (int i = 0; i < COL_COUNT; i++) { reader.next(key, val); - assertEquals(regionName, key.getRegionName()); + assertEquals(regionName, key.getStoreName()); assertEquals(tableName, key.getTablename()); assertEquals(row, key.getRow()); assertEquals((byte)(i + '0'), val.getVal()[0]); @@ -129,7 +126,7 @@ } while (reader.next(key, val)) { // Assert only one more row... the meta flushed row. - assertEquals(regionName, key.getRegionName()); + assertEquals(regionName, key.getStoreName()); assertEquals(tableName, key.getTablename()); assertEquals(HLog.METAROW, key.getRow()); assertEquals(HLog.METACOLUMN, val.getColumn()); Index: src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java =================================================================== --- src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (revision 620703) +++ src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (working copy) @@ -266,7 +266,9 @@ for (LocalHBaseCluster.RegionServerThread t: this.hbaseCluster.getRegionServers()) { for(HRegion r: t.getRegionServer().onlineRegions.values() ) { - r.flushcache(); + for (HStore s: r.getStores()) { + r.flushcache(s); + } } } } Index: src/test/org/apache/hadoop/hbase/MultiRegionTable.java =================================================================== --- src/test/org/apache/hadoop/hbase/MultiRegionTable.java (revision 620703) +++ src/test/org/apache/hadoop/hbase/MultiRegionTable.java (working copy) @@ -88,11 +88,17 @@ // with EMPTY_START_ROW will be one of the unsplittable daughters. HRegionInfo hri = null; HRegion r = null; + HRegionServer server = cluster.getRegionThreads().get(0).getRegionServer(); for (int i = 0; i < 30; i++) { - hri = t.getRegionLocation(HConstants.EMPTY_START_ROW).getRegionInfo(); + try { + hri = t.getRegionLocation(HConstants.EMPTY_START_ROW).getRegionInfo(); + } catch (IOException e) { + e = RemoteExceptionHandler.checkIOException(e); + e.printStackTrace(); + continue; + } LOG.info("Region location: " + hri); - r = cluster.getRegionThreads().get(0).getRegionServer(). - onlineRegions.get(hri.getRegionName()); + r = server.onlineRegions.get(hri.getRegionName()); if (r != null) { break; } @@ -102,10 +108,12 @@ LOG.warn("Waiting on region to come online", e); } } + assertNotNull(r); // Flush the cache - cluster.getRegionThreads().get(0).getRegionServer().getCacheFlushListener(). - flushRequested(r); + for (HStore s: r.getStores()) { + server.flushRequested(s, r); + } // Now, wait until split makes it into the meta table. int oldCount = count; @@ -158,7 +166,8 @@ // still has references. while (true) { data = getSplitParentInfo(meta, parent); - if (data == null || data.size() == 3) { + if (data != null && data.size() == 3) { + LOG.info("Waiting for splitA to release reference to parent"); try { Thread.sleep(waitTime); } catch (InterruptedException e) { @@ -168,7 +177,9 @@ } break; } - LOG.info("Parent split info returned " + data.keySet().toString()); + if (data != null) { + LOG.info("Parent split info returned " + data.keySet().toString()); + } } if (splitB == null) { @@ -199,8 +210,10 @@ for (int i = 0; i < retries; i++) { if (!fs.exists(parentDir)) { + LOG.info("Parent directory was deleted. tries=" + i); break; } + LOG.info("Waiting for parent directory to be deleted. tries=" + i); try { Thread.sleep(waitTime); } catch (InterruptedException e) { @@ -260,8 +273,7 @@ continue; } // Make sure I get the parent. - if (hri.getRegionName().toString(). - equals(parent.getRegionName().toString()) && + if (hri.getRegionName().equals(parent.getRegionName()) && hri.getRegionId() == parent.getRegionId()) { return curVals; } @@ -316,8 +328,7 @@ * @throws IOException */ protected static void compact(final MiniHBaseCluster cluster, - final HRegionInfo r) - throws IOException { + final HRegionInfo r) throws IOException { if (r == null) { LOG.debug("Passed region is null"); return; @@ -332,9 +343,10 @@ for (int i = 0; i < 10; i++) { try { for (HRegion online: regions.values()) { - if (online.getRegionName().toString(). - equals(r.getRegionName().toString())) { - online.compactStores(); + if (online.getRegionName().equals(r.getRegionName())) { + for (HStore s: online.getStores()) { + online.compactStore(s); + } } } break; Index: src/test/org/apache/hadoop/hbase/TestLogRolling.java =================================================================== --- src/test/org/apache/hadoop/hbase/TestLogRolling.java (revision 620703) +++ src/test/org/apache/hadoop/hbase/TestLogRolling.java (working copy) @@ -27,6 +27,8 @@ import org.apache.hadoop.dfs.MiniDFSCluster; import org.apache.hadoop.io.Text; +import org.apache.hadoop.hbase.io.BatchUpdate; + /** * Test log deletion as logs are rolled. */ @@ -57,7 +59,7 @@ conf.setLong("hbase.hregion.max.filesize", 768L * 1024L); // We roll the log after every 256 writes - conf.setInt("hbase.regionserver.maxlogentries", 256); + conf.setInt("hbase.regionserver.maxlogentries", 32); // For less frequently updated regions flush after every 2 flushes conf.setInt("hbase.hregion.memcache.optionalflushcount", 2); @@ -132,7 +134,7 @@ this.log = server.getLog(); // When the META table can be opened, the region servers are running - HTable meta = new HTable(conf, HConstants.META_TABLE_NAME); + new HTable(conf, HConstants.META_TABLE_NAME); // Create the test table and open it HTableDescriptor desc = new HTableDescriptor(tableName); @@ -141,13 +143,13 @@ admin.createTable(desc); HTable table = new HTable(conf, new Text(tableName)); - for (int i = 1; i <= 2048; i++) { // 2048 writes should cause 8 log rolls - long lockid = - table.startUpdate(new Text("row" + String.format("%1$04d", i))); - table.put(lockid, HConstants.COLUMN_FAMILY, value); - table.commit(lockid); + for (int i = 1; i <= 256; i++) { // 256 writes should cause 8 log rolls + BatchUpdate b = + new BatchUpdate(new Text("row" + String.format("%1$04d", i))); + b.put(HConstants.COLUMN_FAMILY, value); + table.commit(b); - if (i % 256 == 0) { + if (i % 32 == 0) { // After every 256 writes sleep to let the log roller run try { @@ -175,7 +177,9 @@ List regions = new ArrayList(server.getOnlineRegions().values()); for (HRegion r: regions) { - r.flushcache(); + for (HStore s: r.getStores()) { + r.flushcache(s); + } } // Now roll the log Index: src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java =================================================================== --- src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java (revision 620703) +++ src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java (working copy) @@ -31,6 +31,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.dfs.MiniDFSCluster; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseAdmin; @@ -96,6 +97,9 @@ // below. After adding all data, the first region is 1.3M conf.setLong("hbase.hregion.max.filesize", 1024 * 1024); + // Always compact if there is more than one store file. + conf.setInt("hbase.hstore.compactionThreshold", 2); + desc = new HTableDescriptor(TABLE_NAME); desc.addFamily(new HColumnDescriptor(INPUT_COLUMN)); desc.addFamily(new HColumnDescriptor(OUTPUT_COLUMN)); @@ -160,7 +164,6 @@ } scanTable(printResults); - @SuppressWarnings("deprecation") MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1); // set configuration parameter for index build @@ -267,17 +270,17 @@ Integer.toString(new Random().nextInt())); this.fs.copyToLocalFile(new Path(INDEX_DIR), localDir); FileSystem localfs = FileSystem.getLocal(conf); - Path [] indexDirs = localfs.listPaths(new Path [] {localDir}); + FileStatus [] indexDirs = localfs.listStatus(localDir); Searcher searcher = null; HScannerInterface scanner = null; try { if (indexDirs.length == 1) { - searcher = new IndexSearcher((new File(indexDirs[0]. + searcher = new IndexSearcher((new File(indexDirs[0].getPath(). toUri())).getAbsolutePath()); } else if (indexDirs.length > 1) { Searchable[] searchers = new Searchable[indexDirs.length]; for (int i = 0; i < indexDirs.length; i++) { - searchers[i] = new IndexSearcher((new File(indexDirs[i]. + searchers[i] = new IndexSearcher((new File(indexDirs[i].getPath(). toUri()).getAbsolutePath())); } searcher = new MultiSearcher(searchers); @@ -301,7 +304,6 @@ int count = 0; while (scanner.next(key, results)) { String value = key.getRow().toString(); - LOG.debug("Scanned over " + key.getRow()); Term term = new Term(rowkeyName, value); int hitCount = searcher.search(new TermQuery(term)).length(); assertEquals("check row " + value, 1, hitCount); Index: src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java =================================================================== --- src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java (revision 620703) +++ src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java (working copy) @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.MultiRegionTable; import org.apache.hadoop.hbase.StaticTestEnvironment; +import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; @@ -99,7 +100,7 @@ // This size should make it so we always split using the addContent // below. After adding all data, the first region is 1.3M - conf.setLong("hbase.hregion.max.filesize", 256 * 1024); + conf.setLong("hbase.hregion.max.filesize", 1024 * 1024); // Make lease timeout longer, lease checks less frequent conf.setInt("hbase.master.lease.period", 10 * 1000); @@ -156,7 +157,7 @@ /** * Pass the key, and reversed value to reduce * - * @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.io.MapWritable, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter) + * @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.io.MapWritable, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter) */ @SuppressWarnings("unchecked") @Override @@ -222,17 +223,11 @@ HTable table = new HTable(conf, new Text(SINGLE_REGION_TABLE_NAME)); for(int i = 0; i < values.length; i++) { - long lockid = table.startUpdate(new Text("row_" - + String.format("%1$05d", i))); + BatchUpdate b = new BatchUpdate(new Text("row_" + + String.format("%1$05d", i))); - try { - table.put(lockid, TEXT_INPUT_COLUMN, values[i]); - table.commit(lockid, System.currentTimeMillis()); - lockid = -1; - } finally { - if (lockid != -1) - table.abort(lockid); - } + b.put(TEXT_INPUT_COLUMN, values[i]); + table.commit(b); } LOG.info("Print table contents before map/reduce for " + Index: src/test/org/apache/hadoop/hbase/TestHRegion.java =================================================================== --- src/test/org/apache/hadoop/hbase/TestHRegion.java (revision 620703) +++ src/test/org/apache/hadoop/hbase/TestHRegion.java (working copy) @@ -554,15 +554,17 @@ } } long startCompact = System.currentTimeMillis(); - if(r.compactIfNeeded()) { - totalCompact = System.currentTimeMillis() - startCompact; - System.out.println("Region compacted - elapsedTime: " + (totalCompact / 1000.0)); - - } else { - System.out.println("No compaction required."); + try { + for (HStore s: r.getStores()) { + r.compactStore(s); + } + } catch (IOException e) { + e.printStackTrace(); + throw e; } + totalCompact = System.currentTimeMillis() - startCompact; + System.out.println("Region compacted - elapsedTime: " + (totalCompact / 1000.0)); long endTime = System.currentTimeMillis(); - long totalElapsed = (endTime - startTime); System.out.println(); System.out.println("Batch-write complete."); @@ -582,7 +584,17 @@ private void splitAndMerge() throws IOException { Path oldRegionPath = r.getRegionDir(); long startTime = System.currentTimeMillis(); - HRegion subregions[] = r.splitRegion(this); + Text midKey = null; + for (HStore s: r.getStores()) { + Text k = r.compactStore(s); + if (midKey == null && k != null) { + midKey = k; + } + } + HRegion subregions[] = null; + if (midKey != null) { + subregions = r.splitRegion(this, midKey); + } if (subregions != null) { System.out.println("Split region elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); Index: src/test/org/apache/hadoop/hbase/TestSplit.java =================================================================== --- src/test/org/apache/hadoop/hbase/TestSplit.java (revision 620703) +++ src/test/org/apache/hadoop/hbase/TestSplit.java (working copy) @@ -88,10 +88,18 @@ private void basicSplit(final HRegion region) throws Exception { addContent(region, COLFAMILY_NAME3); - region.flushcache(); - Text midkey = new Text(); - assertTrue(region.needsSplit(midkey)); - HRegion [] regions = split(region); + for (HStore s: region.getStores()) { + region.flushcache(s); + } + Text midkey = null; + for (HStore s: region.getStores()) { + Text k = region.compactStore(s); + if (midkey == null && k != null) { + midkey = k; + } + } + assertNotNull(midkey); + HRegion [] regions = split(region, midkey); try { // Need to open the regions. // TODO: Add an 'open' to HRegion... don't do open by constructing @@ -107,49 +115,42 @@ assertScan(regions[0], COLFAMILY_NAME3, new Text(START_KEY)); assertScan(regions[1], COLFAMILY_NAME3, midkey); // Now prove can't split regions that have references. - Text[] midkeys = new Text[regions.length]; for (int i = 0; i < regions.length; i++) { - midkeys[i] = new Text(); - // Even after above splits, still needs split but after splits its - // unsplitable because biggest store file is reference. References - // make the store unsplittable, until something bigger comes along. - assertFalse(regions[i].needsSplit(midkeys[i])); // Add so much data to this region, we create a store file that is > - // than - // one of our unsplitable references. - // it will. + // than one of our unsplitable references. it will. for (int j = 0; j < 2; j++) { addContent(regions[i], COLFAMILY_NAME3); } addContent(regions[i], COLFAMILY_NAME2); addContent(regions[i], COLFAMILY_NAME1); - regions[i].flushcache(); + for (HStore s: regions[i].getStores()) { + regions[i].flushcache(s); + } } - // Assert that even if one store file is larger than a reference, the - // region is still deemed unsplitable (Can't split region if references - // presen). - for (int i = 0; i < regions.length; i++) { - midkeys[i] = new Text(); - // Even after above splits, still needs split but after splits its - // unsplitable because biggest store file is reference. References - // make the store unsplittable, until something bigger comes along. - assertFalse(regions[i].needsSplit(midkeys[i])); - } - + Text[] midkeys = new Text[regions.length]; // To make regions splitable force compaction. for (int i = 0; i < regions.length; i++) { - regions[i].compactStores(); + midkeys[i] = null; + for (HStore s: regions[i].getStores()) { + Text k = regions[i].compactStore(s); + if (midkeys[i] == null && k != null) { + midkeys[i] = k; + } + } } TreeMap sortedMap = new TreeMap(); // Split these two daughter regions so then I'll have 4 regions. Will // split because added data above. for (int i = 0; i < regions.length; i++) { - HRegion[] rs = split(regions[i]); - for (int j = 0; j < rs.length; j++) { - sortedMap.put(rs[j].getRegionName().toString(), - openClosedRegion(rs[j])); + HRegion[] rs = null; + if (midkeys[i] != null) { + rs = split(regions[i], midkeys[i]); + for (int j = 0; j < rs.length; j++) { + sortedMap.put(rs[j].getRegionName().toString(), + openClosedRegion(rs[j])); + } } } LOG.info("Made 4 regions"); @@ -220,12 +221,11 @@ } } - private HRegion [] split(final HRegion r) throws IOException { - Text midKey = new Text(); - assertTrue(r.needsSplit(midKey)); + private HRegion [] split(final HRegion r, final Text midKey) + throws IOException { // Assert can get mid key from passed region. assertGet(r, COLFAMILY_NAME3, midKey); - HRegion [] regions = r.splitRegion(null); + HRegion [] regions = r.splitRegion(null, midKey); assertEquals(regions.length, 2); return regions; } Index: src/test/org/apache/hadoop/hbase/TestCompaction.java =================================================================== --- src/test/org/apache/hadoop/hbase/TestCompaction.java (revision 620703) +++ src/test/org/apache/hadoop/hbase/TestCompaction.java (working copy) @@ -86,7 +86,6 @@ */ public void testCompaction() throws Exception { createStoreFile(r); - assertFalse(r.compactIfNeeded()); for (int i = 0; i < COMPACTION_THRESHOLD; i++) { createStoreFile(r); } @@ -98,35 +97,22 @@ byte [][] bytes = this.r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/); // Assert that I can get > 5 versions (Should be at least 5 in there). assertTrue(bytes.length >= 5); - // Try to run compaction concurrent with a thread flush just to see that - // we can. - final HRegion region = this.r; - Thread t1 = new Thread() { - @Override - public void run() { - try { - region.flushcache(); - } catch (IOException e) { - e.printStackTrace(); - } + try { + for (HStore s: r.getStores()) { + r.flushcache(s); } - }; - Thread t2 = new Thread() { - @Override - public void run() { - try { - assertTrue(region.compactIfNeeded()); - } catch (IOException e) { - e.printStackTrace(); - } + } catch (IOException e) { + e.printStackTrace(); + throw e; + } + try { + for (HStore s: r.getStores()) { + r.compactStore(s); } - }; - t1.setDaemon(true); - t1.start(); - t2.setDaemon(true); - t2.start(); - t1.join(); - t2.join(); + } catch (IOException e) { + e.printStackTrace(); + throw e; + } // Now assert that there are 4 versions of a record only: thats the // 3 versions that should be in the compacted store and then the one more // we added when we flushed. But could be 3 only if the flush happened @@ -154,7 +140,12 @@ // verify that it is removed as we compact. // Assert all delted. assertNull(this.r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/)); - this.r.flushcache(); + for (HStore s: r.getStores()) { + this.r.flushcache(s); + } + for (HStore s: r.getStores()) { + this.r.compactStore(s); + } assertNull(this.r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/)); // Add a bit of data and flush it so we for sure have the compaction limit // for store files. Usually by this time we will have but if compaction @@ -162,7 +153,22 @@ // compacted store and the flush above when we added deletes. Add more // content to be certain. createSmallerStoreFile(this.r); - assertTrue(this.r.compactIfNeeded()); + try { + for (HStore s: r.getStores()) { + r.flushcache(s); + } + } catch (IOException e) { + e.printStackTrace(); + throw e; + } + try { + for (HStore s: r.getStores()) { + r.compactStore(s); + } + } catch (IOException e) { + e.printStackTrace(); + throw e; + } // Assert that the first row is still deleted. bytes = this.r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/); assertNull(bytes); Index: src/test/org/apache/hadoop/hbase/util/TestMigrate.java =================================================================== --- src/test/org/apache/hadoop/hbase/util/TestMigrate.java (revision 620703) +++ src/test/org/apache/hadoop/hbase/util/TestMigrate.java (working copy) @@ -86,7 +86,7 @@ * First load files from an old style HBase file structure */ - // Current directory is .../workspace/project/build/contrib/hbase/test/data + // Current directory is .../project/build/test/data FileSystem localfs = FileSystem.getLocal(conf); @@ -96,11 +96,11 @@ // this path is for running test with ant - "../../../../../src/contrib/hbase/src/testdata/HADOOP-2478-testdata.zip") + "../../../src/testdata/HADOOP-2478-testdata.zip") // and this path is for when you want to run inside eclipse - /*"src/contrib/hbase/src/testdata/HADOOP-2478-testdata.zip")*/ + /*"src/testdata/HADOOP-2478-testdata.zip")*/ ); ZipInputStream zip = new ZipInputStream(hs); Index: src/java/org/apache/hadoop/hbase/HLogKey.java =================================================================== --- src/java/org/apache/hadoop/hbase/HLogKey.java (revision 620703) +++ src/java/org/apache/hadoop/hbase/HLogKey.java (working copy) @@ -31,7 +31,7 @@ * also sorted. */ public class HLogKey implements WritableComparable { - Text regionName = new Text(); + Text storeName = new Text(); Text tablename = new Text(); Text row = new Text(); long logSeqNum = 0L; @@ -46,14 +46,14 @@ * We maintain the tablename mainly for debugging purposes. * A regionName is always a sub-table object. * - * @param regionName - name of region + * @param storeName - name of HStore * @param tablename - name of table * @param row - row key * @param logSeqNum - log sequence number */ - public HLogKey(Text regionName, Text tablename, Text row, long logSeqNum) { + public HLogKey(Text storeName, Text tablename, Text row, long logSeqNum) { // TODO: Is this copy of the instances necessary? They are expensive. - this.regionName.set(regionName); + this.storeName.set(storeName); this.tablename.set(tablename); this.row.set(row); this.logSeqNum = logSeqNum; @@ -63,8 +63,8 @@ // A bunch of accessors ////////////////////////////////////////////////////////////////////////////// - Text getRegionName() { - return regionName; + Text getStoreName() { + return storeName; } Text getTablename() { @@ -84,7 +84,7 @@ */ @Override public String toString() { - return tablename + "/" + regionName + "/" + row + "/" + logSeqNum; + return tablename + "/" + storeName + "/" + row + "/" + logSeqNum; } /** @@ -100,7 +100,7 @@ */ @Override public int hashCode() { - int result = this.regionName.hashCode(); + int result = this.storeName.hashCode(); result ^= this.row.hashCode(); result ^= this.logSeqNum; return result; @@ -115,7 +115,7 @@ */ public int compareTo(Object o) { HLogKey other = (HLogKey) o; - int result = this.regionName.compareTo(other.regionName); + int result = this.storeName.compareTo(other.getStoreName()); if(result == 0) { result = this.row.compareTo(other.row); @@ -141,7 +141,7 @@ * {@inheritDoc} */ public void write(DataOutput out) throws IOException { - this.regionName.write(out); + this.storeName.write(out); this.tablename.write(out); this.row.write(out); out.writeLong(logSeqNum); @@ -151,7 +151,7 @@ * {@inheritDoc} */ public void readFields(DataInput in) throws IOException { - this.regionName.readFields(in); + this.storeName.readFields(in); this.tablename.readFields(in); this.row.readFields(in); this.logSeqNum = in.readLong(); Index: src/java/org/apache/hadoop/hbase/HStore.java =================================================================== --- src/java/org/apache/hadoop/hbase/HStore.java (revision 620703) +++ src/java/org/apache/hadoop/hbase/HStore.java (working copy) @@ -39,11 +39,11 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.filter.RowFilterInterface; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.io.TextSequence; import org.apache.hadoop.io.MapFile; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; @@ -78,9 +78,9 @@ @SuppressWarnings("hiding") private final SortedMap memcache = Collections.synchronizedSortedMap(new TreeMap()); - + volatile SortedMap snapshot; - + @SuppressWarnings("hiding") private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); @@ -108,7 +108,7 @@ this.lock.writeLock().unlock(); } } - + /** * @return memcache snapshot */ @@ -118,14 +118,14 @@ SortedMap currentSnapshot = snapshot; snapshot = Collections.synchronizedSortedMap(new TreeMap()); - + return currentSnapshot; } finally { this.lock.writeLock().unlock(); } } - + /** * Store a value. * @param key @@ -135,7 +135,7 @@ this.lock.readLock().lock(); try { memcache.put(key, value); - + } finally { this.lock.readLock().unlock(); } @@ -159,7 +159,7 @@ internalGet(snapshot, key, numVersions - results.size())); } return results; - + } finally { this.lock.readLock().unlock(); } @@ -215,14 +215,17 @@ /** * Find the key that matches row exactly, or the one that immediately * preceeds it. + * + * @param row + * @param timestamp + * @return row key */ - public Text getRowKeyAtOrBefore(final Text row, long timestamp) - throws IOException{ + public Text getRowKeyAtOrBefore(final Text row, long timestamp) { this.lock.readLock().lock(); - + Text key_memcache = null; Text key_snapshot = null; - + try { synchronized (memcache) { key_memcache = internalGetRowKeyAtOrBefore(memcache, row, timestamp); @@ -238,25 +241,24 @@ return key_snapshot; } else if (key_memcache != null && key_snapshot == null) { return key_memcache; - } else { + } else if ( (key_memcache != null && key_memcache.equals(row)) + || (key_snapshot != null && key_snapshot.equals(row)) ) { // if either is a precise match, return the original row. - if ( (key_memcache != null && key_memcache.equals(row)) - || (key_snapshot != null && key_snapshot.equals(row)) ) { - return row; - } else { - // no precise matches, so return the one that is closer to the search - // key (greatest) - return key_memcache.compareTo(key_snapshot) > 0 ? + return row; + } else if (key_memcache != null) { + // no precise matches, so return the one that is closer to the search + // key (greatest) + return key_memcache.compareTo(key_snapshot) > 0 ? key_memcache : key_snapshot; - } } + return null; } finally { this.lock.readLock().unlock(); } } private Text internalGetRowKeyAtOrBefore(SortedMap map, - Text key, long timestamp) { + Text key, long timestamp) { // TODO: account for deleted cells HStoreKey search_key = new HStoreKey(key, timestamp); @@ -271,37 +273,37 @@ // argument Iterator key_iterator = tailMap.keySet().iterator(); HStoreKey found_key = key_iterator.next(); - + // keep seeking so long as we're in the same row, and the timstamp // isn't as small as we'd like, and there are more cells to check while (found_key.getRow().equals(key) - && found_key.getTimestamp() > timestamp && key_iterator.hasNext()) { + && found_key.getTimestamp() > timestamp && key_iterator.hasNext()) { found_key = key_iterator.next(); } - + // if this check fails, then we've iterated through all the keys that // match by row, but none match by timestamp, so we fall through to // the headMap case. if (found_key.getTimestamp() <= timestamp) { // we didn't find a key that matched by timestamp, so we have to // return null; -/* LOG.debug("Went searching for " + key + ", found " + found_key.getRow());*/ + /* LOG.debug("Went searching for " + key + ", found " + found_key.getRow());*/ return found_key.getRow(); } } - + // the tail didn't contain the key we're searching for, so we should // use the last key in the headmap as the closest before SortedMap headMap = map.headMap(search_key); if (headMap.isEmpty()) { -/* LOG.debug("Went searching for " + key + ", found nothing!");*/ + /* LOG.debug("Went searching for " + key + ", found nothing!");*/ return null; - } else { -/* LOG.debug("Went searching for " + key + ", found " + headMap.lastKey().getRow());*/ - return headMap.lastKey().getRow(); } + /* LOG.debug("Went searching for " + key + ", found " + headMap.lastKey().getRow());*/ + + return headMap.lastKey().getRow(); } - + /** * Examine a single map for the desired key. * @@ -360,7 +362,7 @@ (versions - results.size()))); } return results; - + } finally { this.lock.readLock().unlock(); } @@ -381,7 +383,7 @@ SortedMap tailMap = map.tailMap(origin); for (Map.Entry es: tailMap.entrySet()) { HStoreKey key = es.getKey(); - + // if there's no column name, then compare rows and timestamps if (origin.getColumn().toString().equals("")) { // if the current and origin row don't match, then we can jump @@ -435,7 +437,7 @@ // locks by the same thread and to be able to downgrade a write lock to // a read lock. We need to hold a lock throughout this method, but only // need the write lock while creating the memcache snapshot - + this.lock.writeLock().lock(); // hold write lock during memcache snapshot snapshot(); // snapshot memcache this.lock.readLock().lock(); // acquire read lock @@ -444,7 +446,7 @@ // Prevent a cache flush while we are constructing the scanner return new MemcacheScanner(timestamp, targetCols, firstRow); - + } finally { this.lock.readLock().unlock(); } @@ -473,22 +475,22 @@ // Generate list of iterators HStoreKey firstKey = new HStoreKey(firstRow); - if (firstRow != null && firstRow.getLength() != 0) { - keyIterator = - backingMap.tailMap(firstKey).keySet().iterator(); + if (firstRow != null && firstRow.getLength() != 0) { + keyIterator = + backingMap.tailMap(firstKey).keySet().iterator(); - } else { - keyIterator = backingMap.keySet().iterator(); - } + } else { + keyIterator = backingMap.keySet().iterator(); + } - while (getNext(0)) { - if (!findFirstRow(0, firstRow)) { - continue; - } - if (columnMatch(0)) { - break; - } + while (getNext(0)) { + if (!findFirstRow(0, firstRow)) { + continue; } + if (columnMatch(0)) { + break; + } + } } catch (RuntimeException ex) { LOG.error("error initializing Memcache scanner: ", ex); close(); @@ -578,7 +580,13 @@ private static final String BLOOMFILTER_FILE_NAME = "filter"; final Memcache memcache = new Memcache(); + private volatile long memcacheSize = 0; + private volatile long lastFlushTime; + final long memcacheFlushSize; + final long desiredMaxFileSize; + volatile long storeSize = 0; private final Path basedir; + private final HRegion region; private final HRegionInfo info; private final HColumnDescriptor family; private final SequenceFile.CompressionType compression; @@ -594,7 +602,7 @@ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); final AtomicInteger activeScanners = new AtomicInteger(0); - final String storeName; + final Text storeName; /* * Sorted Map of readers keyed by sequence id (Most recent should be last in @@ -610,11 +618,18 @@ private final SortedMap readers = new TreeMap(); + // The most-recent log-seq-ID that's present. The most-recent such ID means + // we can ignore all log messages up to and including that ID (because they're + // already reflected in the TreeMaps). private volatile long maxSeqId; + private final int compactionThreshold; private final ReentrantReadWriteLock newScannerLock = new ReentrantReadWriteLock(); + private volatile boolean compactionRequested = false; + private volatile boolean flushRequested = false; + /** * An HStore is a set of zero or more MapFiles, which stretch backwards over * time. A given HStore is responsible for a certain set of columns for a @@ -640,27 +655,42 @@ * file will be deleted (by whoever has instantiated the HStore). * * @param basedir qualified path under which the region directory lives - * @param info HRegionInfo for this region + * @param region region that this store belongs to * @param family HColumnDescriptor for this column * @param fs file system object * @param reconstructionLog existing log file to apply if any * @param conf configuration object * @throws IOException */ - HStore(Path basedir, HRegionInfo info, HColumnDescriptor family, + HStore(Path basedir, HRegion region, HColumnDescriptor family, FileSystem fs, Path reconstructionLog, HBaseConfiguration conf) throws IOException { - + this.basedir = basedir; - this.info = info; + this.region = region; + this.info = this.region.getRegionInfo(); this.family = family; this.fs = fs; this.conf = conf; - + this.lastFlushTime = System.currentTimeMillis(); + + // By default, we compact if an HStore has more than + // MIN_COMMITS_FOR_COMPACTION map files + this.compactionThreshold = + conf.getInt("hbase.hstore.compactionThreshold", 3); + + // By default, we flush the cache when 64M. + this.memcacheFlushSize = conf.getInt("hbase.hregion.memcache.flush.size", + 1024*1024*64); + + // By default we split region if a file > DEFAULT_MAX_FILE_SIZE. + this.desiredMaxFileSize = + conf.getLong("hbase.hregion.max.filesize", DEFAULT_MAX_FILE_SIZE); + this.compactionDir = HRegion.getCompactionDir(basedir); this.storeName = - this.info.getEncodedName() + "/" + this.family.getFamilyName(); - + new Text(this.info.getEncodedName() + "/" + this.family.getFamilyName()); + if (family.getCompression() == HColumnDescriptor.CompressionType.BLOCK) { this.compression = SequenceFile.CompressionType.BLOCK; } else if (family.getCompression() == @@ -669,7 +699,7 @@ } else { this.compression = SequenceFile.CompressionType.NONE; } - + Path mapdir = HStoreFile.getMapDir(basedir, info.getEncodedName(), family.getFamilyName()); if (!fs.exists(mapdir)) { @@ -680,7 +710,7 @@ if (!fs.exists(infodir)) { fs.mkdirs(infodir); } - + if(family.getBloomFilter() == null) { this.filterDir = null; this.bloomFilter = null; @@ -696,49 +726,28 @@ if(LOG.isDebugEnabled()) { LOG.debug("starting " + storeName + ((reconstructionLog == null || !fs.exists(reconstructionLog)) ? - " (no reconstruction log)" : - " with reconstruction log: " + reconstructionLog.toString())); + " (no reconstruction log)" : + " with reconstruction log: " + reconstructionLog.toString())); } // Go through the 'mapdir' and 'infodir' together, make sure that all // MapFiles are in a reliable state. Every entry in 'mapdir' must have a // corresponding one in 'loginfodir'. Without a corresponding log info // file, the entry in 'mapdir' must be deleted. - List hstoreFiles = loadHStoreFiles(infodir, mapdir); - for(HStoreFile hsf: hstoreFiles) { - this.storefiles.put(Long.valueOf(hsf.loadInfo(fs)), hsf); - } + // loadHStoreFiles also computes the max sequence id + this.maxSeqId = -1; + this.storefiles.putAll(loadHStoreFiles(infodir, mapdir)); - // Now go through all the HSTORE_LOGINFOFILEs and figure out the - // most-recent log-seq-ID that's present. The most-recent such ID means we - // can ignore all log messages up to and including that ID (because they're - // already reflected in the TreeMaps). - // - // If the HSTORE_LOGINFOFILE doesn't contain a number, just ignore it. That - // means it was built prior to the previous run of HStore, and so it cannot - // contain any updates also contained in the log. - - this.maxSeqId = getMaxSequenceId(hstoreFiles); if (LOG.isDebugEnabled()) { LOG.debug("maximum sequence id for hstore " + storeName + " is " + this.maxSeqId); } - + doReconstructionLog(reconstructionLog, maxSeqId); - // By default, we compact if an HStore has more than - // MIN_COMMITS_FOR_COMPACTION map files - this.compactionThreshold = - conf.getInt("hbase.hstore.compactionThreshold", 3); - - // We used to compact in here before bringing the store online. Instead - // get it online quick even if it needs compactions so we can start - // taking updates as soon as possible (Once online, can take updates even - // during a compaction). - // Move maxSeqId on by one. Why here? And not in HRegion? this.maxSeqId += 1; - + // Finally, start up all the map readers! (There could be more than one // since we haven't compacted yet.) boolean first = true; @@ -752,30 +761,11 @@ first = false; } else { this.readers.put(e.getKey(), - e.getValue().getReader(this.fs, this.bloomFilter)); + e.getValue().getReader(this.fs, this.bloomFilter)); } } } - /* - * @param hstoreFiles - * @return Maximum sequence number found or -1. - * @throws IOException - */ - private long getMaxSequenceId(final List hstoreFiles) - throws IOException { - long maxSeqID = -1; - for (HStoreFile hsf : hstoreFiles) { - long seqid = hsf.loadInfo(fs); - if (seqid > 0) { - if (seqid > maxSeqID) { - maxSeqID = seqid; - } - } - } - return maxSeqID; - } - long getMaxSequenceId() { return this.maxSeqId; } @@ -790,7 +780,7 @@ */ private void doReconstructionLog(final Path reconstructionLog, final long maxSeqID) throws UnsupportedEncodingException, IOException { - + if (reconstructionLog == null || !fs.exists(reconstructionLog)) { // Nothing to do. return; @@ -798,10 +788,10 @@ long maxSeqIdInLog = -1; TreeMap reconstructedCache = new TreeMap(); - + SequenceFile.Reader logReader = new SequenceFile.Reader(this.fs, reconstructionLog, this.conf); - + try { HLogKey key = new HLogKey(); HLogEdit val = new HLogEdit(); @@ -814,34 +804,32 @@ } if (skippedEdits > 0 && LOG.isDebugEnabled()) { LOG.debug("Skipped " + skippedEdits + - " edits because sequence id <= " + maxSeqID); + " edits because sequence id <= " + maxSeqID); } // Check this edit is for me. Also, guard against writing // METACOLUMN info such as HBASE::CACHEFLUSH entries Text column = val.getColumn(); if (column.equals(HLog.METACOLUMN) - || !key.getRegionName().equals(info.getRegionName()) + || (key.getStoreName().compareTo(storeName) != 0) || !HStoreKey.extractFamily(column).equals(family.getFamilyName())) { if (LOG.isDebugEnabled()) { - LOG.debug("Passing on edit " + key.getRegionName() + ", " + - column.toString() + ": " + - new String(val.getVal(), UTF8_ENCODING) + - ", my region: " + info.getRegionName() + ", my column: " + - family.getFamilyName()); + LOG.debug("Passing on edit " + key.getStoreName() + ": " + + new String(val.getVal(), UTF8_ENCODING) + ", my name: " + + storeName); } continue; } HStoreKey k = new HStoreKey(key.getRow(), column, val.getTimestamp()); if (LOG.isDebugEnabled()) { LOG.debug("Applying edit <" + k.toString() + "=" + val.toString() + - ">"); + ">"); } reconstructedCache.put(k, val.getVal()); } } finally { logReader.close(); } - + if (reconstructedCache.size() > 0) { // We create a "virtual flush" at maxSeqIdInLog+1. if (LOG.isDebugEnabled()) { @@ -860,7 +848,7 @@ * @param mapdir qualified path for map file directory * @throws IOException */ - private List loadHStoreFiles(Path infodir, Path mapdir) + private SortedMap loadHStoreFiles(Path infodir, Path mapdir) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("infodir: " + infodir.toString() + " mapdir: " + @@ -868,10 +856,11 @@ } // Look first at info files. If a reference, these contain info we need // to create the HStoreFile. - Path infofiles[] = fs.listPaths(new Path[] {infodir}); - ArrayList results = new ArrayList(infofiles.length); + FileStatus infofiles[] = fs.listStatus(infodir); + SortedMap results = new TreeMap(); ArrayList mapfiles = new ArrayList(infofiles.length); - for (Path p: infofiles) { + for (int i = 0; i < infofiles.length; i++) { + Path p = infofiles[i].getPath(); Matcher m = REF_NAME_PARSER.matcher(p.getName()); /* * * * * * N O T E * * * * * @@ -892,6 +881,19 @@ } curfile = new HStoreFile(conf, fs, basedir, info.getEncodedName(), family.getFamilyName(), fid, reference); + storeSize += curfile.length(); + long storeSeqId = -1; + try { + storeSeqId = curfile.loadInfo(fs); + if (storeSeqId > this.maxSeqId) { + this.maxSeqId = storeSeqId; + } + } catch (IOException e) { + // If the HSTORE_LOGINFOFILE doesn't contain a number, just ignore it. + // That means it was built prior to the previous run of HStore, and so + // it cannot contain any updates also contained in the log. + } + Path mapfile = curfile.getMapFilePath(); if (!fs.exists(mapfile)) { fs.delete(curfile.getInfoFilePath()); @@ -899,23 +901,23 @@ "Cleaned up info file. Continuing..."); continue; } - + // TODO: Confirm referent exists. - + // Found map and sympathetic info file. Add this hstorefile to result. - results.add(curfile); + results.put(storeSeqId, curfile); // Keep list of sympathetic data mapfiles for cleaning info dir in next // section. Make sure path is fully qualified for compare. mapfiles.add(mapfile); } - + // List paths by experience returns fully qualified names -- at least when // running on a mini hdfs cluster. - Path datfiles[] = fs.listPaths(new Path[] {mapdir}); + FileStatus datfiles[] = fs.listStatus(mapdir); for (int i = 0; i < datfiles.length; i++) { // If does not have sympathetic info file, delete. - if (!mapfiles.contains(fs.makeQualified(datfiles[i]))) { - fs.delete(datfiles[i]); + if (!mapfiles.contains(fs.makeQualified(datfiles[i].getPath()))) { + fs.delete(datfiles[i].getPath()); } } return results; @@ -937,24 +939,24 @@ if (LOG.isDebugEnabled()) { LOG.debug("loading bloom filter for " + this.storeName); } - + BloomFilterDescriptor.BloomFilterType type = family.getBloomFilter().filterType; switch(type) { - + case BLOOMFILTER: bloomFilter = new BloomFilter(); break; - + case COUNTING_BLOOMFILTER: bloomFilter = new CountingBloomFilter(); break; - + case RETOUCHED_BLOOMFILTER: bloomFilter = new RetouchedBloomFilter(); break; - + default: throw new IllegalArgumentException("unknown bloom filter type: " + type); @@ -974,22 +976,21 @@ family.getBloomFilter().filterType; switch(type) { - + case BLOOMFILTER: bloomFilter = new BloomFilter(family.getBloomFilter().vectorSize, family.getBloomFilter().nbHash); break; - + case COUNTING_BLOOMFILTER: bloomFilter = new CountingBloomFilter(family.getBloomFilter().vectorSize, - family.getBloomFilter().nbHash); + family.getBloomFilter().nbHash); break; - + case RETOUCHED_BLOOMFILTER: - bloomFilter = - new RetouchedBloomFilter(family.getBloomFilter().vectorSize, - family.getBloomFilter().nbHash); + bloomFilter = new RetouchedBloomFilter( + family.getBloomFilter().vectorSize, family.getBloomFilter().nbHash); } } return bloomFilter; @@ -1025,15 +1026,26 @@ * * @param key * @param value + * @return size of update */ - void add(HStoreKey key, byte[] value) { + long add(HStoreKey key, byte[] value) { + long updateSize = 0; lock.readLock().lock(); try { this.memcache.add(key, value); - + updateSize = key.getSize() + (value == null ? 0 : value.length); + memcacheSize += updateSize; + } finally { lock.readLock().unlock(); } + if (!flushRequested && memcacheSize > memcacheFlushSize) { + flushRequested = region.flushRequested(this, null); + if (flushRequested) { + memcacheSize = 0; + } + } + return updateSize; } /** @@ -1085,76 +1097,109 @@ * Return the entire list of HStoreFiles currently used by the HStore. * * @param logCacheFlushId flush sequence number + * @return number of bytes flushed * @throws IOException */ - void flushCache(final long logCacheFlushId) throws IOException { - internalFlushCache(memcache.getSnapshot(), logCacheFlushId); + long flushCache(final long logCacheFlushId) throws IOException { + return internalFlushCache(memcache.getSnapshot(), logCacheFlushId); } - private void internalFlushCache(SortedMap cache, + private long internalFlushCache(SortedMap cache, long logCacheFlushId) throws IOException { - - synchronized(flushLock) { - // A. Write the Maps out to the disk - HStoreFile flushedFile = new HStoreFile(conf, fs, basedir, - info.getEncodedName(), family.getFamilyName(), -1L, null); - String name = flushedFile.toString(); - MapFile.Writer out = flushedFile.getWriter(this.fs, this.compression, - this.bloomFilter); - - // Here we tried picking up an existing HStoreFile from disk and - // interlacing the memcache flush compacting as we go. The notion was - // that interlacing would take as long as a pure flush with the added - // benefit of having one less file in the store. Experiments showed that - // it takes two to three times the amount of time flushing -- more column - // families makes it so the two timings come closer together -- but it - // also complicates the flush. The code was removed. Needed work picking - // which file to interlace (favor references first, etc.) - // - // Related, looks like 'merging compactions' in BigTable paper interlaces - // a memcache flush. We don't. - int entries = 0; - try { - for (Map.Entry es: cache.entrySet()) { - HStoreKey curkey = es.getKey(); - TextSequence f = HStoreKey.extractFamily(curkey.getColumn()); - if (f.equals(this.family.getFamilyName())) { - entries++; - out.append(curkey, new ImmutableBytesWritable(es.getValue())); + + long bytesFlushed = 0; + try { + if (cache.size() == 0) { + if (LOG.isDebugEnabled()) { + LOG.info("Not flushing cache for " + storeName + + " because it has 0 entries"); + } + return bytesFlushed; + } + + synchronized(flushLock) { + // A. Write the Maps out to the disk + HStoreFile flushedFile = new HStoreFile(conf, fs, basedir, + info.getEncodedName(), family.getFamilyName(), -1L, null); + String name = flushedFile.toString(); + MapFile.Writer out = flushedFile.getWriter(this.fs, this.compression, + this.bloomFilter); + + // Here we tried picking up an existing HStoreFile from disk and + // interlacing the memcache flush compacting as we go. The notion was + // that interlacing would take as long as a pure flush with the added + // benefit of having one less file in the store. Experiments showed that + // it takes two to three times the amount of time flushing -- more column + // families makes it so the two timings come closer together -- but it + // also complicates the flush. The code was removed. Needed work picking + // which file to interlace (favor references first, etc.) + // + // Related, looks like 'merging compactions' in BigTable paper interlaces + // a memcache flush. We don't. + int entries = 0; + long cacheSize = 0; + try { + for (Map.Entry es: cache.entrySet()) { + HStoreKey curkey = es.getKey(); + byte[] bytes = es.getValue(); + if (curkey.getFamily().equals(this.family.getFamilyName())) { + entries++; + out.append(curkey, new ImmutableBytesWritable(bytes)); + cacheSize += curkey.getSize() + (bytes != null ? bytes.length : 0); + } } + } finally { + out.close(); } - } finally { - out.close(); + long newStoreSize = flushedFile.length(); + storeSize += newStoreSize; + + // B. Write out the log sequence number that corresponds to this output + // MapFile. The MapFile is current up to and including the log seq num. + flushedFile.writeInfo(fs, logCacheFlushId); + + // C. Flush the bloom filter if any + if (bloomFilter != null) { + flushBloomFilter(); + } + + // D. Finally, make the new MapFile available. + this.lock.writeLock().lock(); + try { + Long flushid = Long.valueOf(logCacheFlushId); + // Open the map file reader. + this.readers.put(flushid, + flushedFile.getReader(this.fs, this.bloomFilter)); + this.storefiles.put(flushid, flushedFile); + if(LOG.isDebugEnabled()) { + LOG.debug("Added " + name + " with " + entries + + " entries, sequence id " + logCacheFlushId + ", data size " + + StringUtils.humanReadableInt(cacheSize) + ", file size " + + newStoreSize + " for " + this.storeName); + } + bytesFlushed = cacheSize; + } finally { + this.lock.writeLock().unlock(); + } + lastFlushTime = System.currentTimeMillis(); } + } finally { + flushRequested = false; + } + if (!compactionRequested && + (storefiles.size() > compactionThreshold || + storeSize > this.desiredMaxFileSize)) { - // B. Write out the log sequence number that corresponds to this output - // MapFile. The MapFile is current up to and including the log seq num. - flushedFile.writeInfo(fs, logCacheFlushId); - - // C. Flush the bloom filter if any - if (bloomFilter != null) { - flushBloomFilter(); + if (LOG.isDebugEnabled()) { + LOG.debug("Requesting compaction for store " + storeName); } + compactionRequested = region.compactionRequested(this, null); - // D. Finally, make the new MapFile available. - this.lock.writeLock().lock(); - try { - Long flushid = Long.valueOf(logCacheFlushId); - // Open the map file reader. - this.readers.put(flushid, - flushedFile.getReader(this.fs, this.bloomFilter)); - this.storefiles.put(flushid, flushedFile); - if(LOG.isDebugEnabled()) { - LOG.debug("Added " + name + " with " + entries + - " entries, sequence id " + logCacheFlushId + ", and size " + - StringUtils.humanReadableInt(flushedFile.length()) + " for " + - this.storeName); - } - } finally { - this.lock.writeLock().unlock(); + if (LOG.isDebugEnabled() && !compactionRequested) { + LOG.debug("Compaction request for store " + storeName + " was rejected"); } - return; } + return bytesFlushed; } ////////////////////////////////////////////////////////////////////////////// @@ -1162,28 +1207,6 @@ ////////////////////////////////////////////////////////////////////////////// /** - * @return True if this store needs compaction. - */ - boolean needsCompaction() { - return this.storefiles != null && - (this.storefiles.size() >= this.compactionThreshold || hasReferences()); - } - - /* - * @return True if this store has references. - */ - private boolean hasReferences() { - if (this.storefiles != null) { - for (HStoreFile hsf: this.storefiles.values()) { - if (hsf.isReference()) { - return true; - } - } - } - return false; - } - - /** * Compact the back-HStores. This method may take some time, so the calling * thread must be able to block for long periods. * @@ -1200,55 +1223,71 @@ * can be lengthy and we want to allow cache-flushes during this period. * @throws IOException * - * @return true if compaction completed successfully + * @return mid key if a split is needed null otherwise */ - boolean compact() throws IOException { - synchronized (compactLock) { - if (LOG.isDebugEnabled()) { - LOG.debug("started compaction of " + storefiles.size() + - " files using " + compactionDir.toString() + " for " + - this.storeName); - } + Text compact() throws IOException { + try { + synchronized (compactLock) { + List filesToCompact = null; + long maxId = -1; + synchronized (storefiles) { + if (storefiles.size() < 1 || + (storefiles.size() == 1 && + !storefiles.get(storefiles.firstKey()).isReference())) { + if (LOG.isDebugEnabled()) { + LOG.debug("nothing to compact for " + this.storeName + " because" + + (storefiles.size() < 1 ? " no store files to compact" : + " only one store file and it is not a refeerence")); + } + return null; + } + if (LOG.isDebugEnabled()) { + LOG.debug("started compaction of " + storefiles.size() + + " files using " + compactionDir.toString() + " for " + + this.storeName); + } - // Storefiles are keyed by sequence id. The oldest file comes first. - // We need to return out of here a List that has the newest file first. - List filesToCompact = - new ArrayList(this.storefiles.values()); - Collections.reverse(filesToCompact); - if (filesToCompact.size() < 1 || - (filesToCompact.size() == 1 && !filesToCompact.get(0).isReference())) { - if (LOG.isDebugEnabled()) { - LOG.debug("nothing to compact for " + this.storeName); + // Storefiles are keyed by sequence id. The oldest file comes first. + // We need to return out of here a List that has the newest file first. + filesToCompact = new ArrayList(this.storefiles.values()); + Collections.reverse(filesToCompact); + + // The max-sequenceID in any of the to-be-compacted TreeMaps is the + // last key of storefiles. + + maxId = this.storefiles.lastKey(); } - return false; - } - if (!fs.exists(compactionDir) && !fs.mkdirs(compactionDir)) { - LOG.warn("Mkdir on " + compactionDir.toString() + " failed"); - return false; - } + if (!fs.exists(compactionDir) && !fs.mkdirs(compactionDir)) { + LOG.warn("Mkdir on " + compactionDir.toString() + " failed"); + return null; + } - // Step through them, writing to the brand-new MapFile - HStoreFile compactedOutputFile = new HStoreFile(conf, fs, - this.compactionDir, info.getEncodedName(), family.getFamilyName(), - -1L, null); - MapFile.Writer compactedOut = compactedOutputFile.getWriter(this.fs, - this.compression, this.bloomFilter); - try { - compactHStoreFiles(compactedOut, filesToCompact); - } finally { - compactedOut.close(); - } + // Step through them, writing to the brand-new MapFile + HStoreFile compactedOutputFile = new HStoreFile(conf, fs, + this.compactionDir, info.getEncodedName(), family.getFamilyName(), + -1L, null); + MapFile.Writer compactedOut = compactedOutputFile.getWriter(this.fs, + this.compression, this.bloomFilter); + try { + compactHStoreFiles(compactedOut, filesToCompact); + } finally { + compactedOut.close(); + } - // Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap. - // Compute max-sequenceID seen in any of the to-be-compacted TreeMaps. - long maxId = getMaxSequenceId(filesToCompact); - compactedOutputFile.writeInfo(fs, maxId); + // Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap. + compactedOutputFile.writeInfo(fs, maxId); - // Move the compaction into place. - completeCompaction(filesToCompact, compactedOutputFile); - return true; + // Move the compaction into place. + completeCompaction(filesToCompact, compactedOutputFile); + } + } finally { + compactionRequested = false; + if (storeSize > this.desiredMaxFileSize) { + return checkSplit(); + } } + return null; } /* @@ -1265,7 +1304,7 @@ */ private void compactHStoreFiles(final MapFile.Writer compactedOut, final List toCompactFiles) throws IOException { - + int size = toCompactFiles.size(); CompactionReader[] rdrs = new CompactionReader[size]; int index = 0; @@ -1278,8 +1317,8 @@ // exception message so output a message here where we know the // culprit. LOG.warn("Failed with " + e.toString() + ": " + hsf.toString() + - (hsf.isReference() ? " " + hsf.getReference().toString() : "") + - " for " + this.storeName); + (hsf.isReference() ? " " + hsf.getReference().toString() : "") + + " for " + this.storeName); closeCompactionReaders(rdrs); throw e; } @@ -1342,7 +1381,7 @@ } byte [] value = (vals[smallestKey] == null)? - null: vals[smallestKey].get(); + null: vals[smallestKey].get(); if (!isDeleted(sk, value, false, deletes) && timesSeen <= family.getMaxVersions()) { // Keep old versions until we have maxVersions worth. @@ -1387,13 +1426,13 @@ /** Interface for generic reader for compactions */ interface CompactionReader { - + /** * Closes the reader * @throws IOException */ public void close() throws IOException; - + /** * Get the next key/value pair * @@ -1404,7 +1443,7 @@ */ public boolean next(WritableComparable key, Writable val) throws IOException; - + /** * Resets the reader * @throws IOException @@ -1415,11 +1454,11 @@ /** A compaction reader for MapFile */ static class MapFileCompactionReader implements CompactionReader { final MapFile.Reader reader; - + MapFileCompactionReader(final MapFile.Reader r) { this.reader = r; } - + /** {@inheritDoc} */ public void close() throws IOException { this.reader.close(); @@ -1496,11 +1535,11 @@ *
    * 1) Wait for active scanners to exit
    * 2) Acquiring the write-lock
-   * 3) Figuring out what MapFiles are going to be replaced
-   * 4) Moving the new compacted MapFile into place
-   * 5) Unloading all the replaced MapFiles.
-   * 6) Deleting all the old MapFile files.
-   * 7) Loading the new TreeMap.
+   * 3) Moving the new MapFile into place.
+   * 4) Unload all the replaced MapFiles and close.
+   * 5) Delete replaced MapFiles
+   * 6) Load the new TreeMap.
+   * 7) Compute new store size
    * 8) Releasing the write-lock
    * 9) Allow new scanners to proceed.
    * 
@@ -1511,9 +1550,9 @@ */ private void completeCompaction(List compactedFiles, HStoreFile compactedFile) throws IOException { - + // 1. Wait for active scanners to exit - + newScannerLock.writeLock().lock(); // prevent new scanners try { synchronized (activeScanners) { @@ -1531,7 +1570,7 @@ try { // 3. Moving the new MapFile into place. - + HStoreFile finalCompactedFile = new HStoreFile(conf, fs, basedir, info.getEncodedName(), family.getFamilyName(), -1, null); if(LOG.isDebugEnabled()) { @@ -1547,47 +1586,54 @@ } // 4. and 5. Unload all the replaced MapFiles, close and delete. - - List toDelete = new ArrayList(); - for (Map.Entry e: this.storefiles.entrySet()) { - if (!compactedFiles.contains(e.getValue())) { - continue; + + synchronized (storefiles) { + List toDelete = new ArrayList(); + for (Map.Entry e: this.storefiles.entrySet()) { + if (!compactedFiles.contains(e.getValue())) { + continue; + } + Long key = e.getKey(); + MapFile.Reader reader = this.readers.remove(key); + if (reader != null) { + reader.close(); + } + toDelete.add(key); } - Long key = e.getKey(); - MapFile.Reader reader = this.readers.remove(key); - if (reader != null) { - reader.close(); - } - toDelete.add(key); - } - try { - for (Long key: toDelete) { - HStoreFile hsf = this.storefiles.remove(key); - hsf.delete(); + try { + for (Long key: toDelete) { + HStoreFile hsf = this.storefiles.remove(key); + hsf.delete(); + } + + // 6. Loading the new TreeMap. + Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs)); + this.readers.put(orderVal, + // Use a block cache (if configured) for this reader since + // it is the only one. + finalCompactedFile.getReader(this.fs, this.bloomFilter, + family.isBlockCacheEnabled())); + this.storefiles.put(orderVal, finalCompactedFile); + } catch (IOException e) { + e = RemoteExceptionHandler.checkIOException(e); + LOG.error("Failed replacing compacted files for " + this.storeName + + ". Compacted file is " + finalCompactedFile.toString() + + ". Files replaced are " + compactedFiles.toString() + + " some of which may have been already removed", e); } - - // 6. Loading the new TreeMap. - Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs)); - this.readers.put(orderVal, - // Use a block cache (if configured) for this reader since - // it is the only one. - finalCompactedFile.getReader(this.fs, this.bloomFilter, - family.isBlockCacheEnabled())); - this.storefiles.put(orderVal, finalCompactedFile); - } catch (IOException e) { - e = RemoteExceptionHandler.checkIOException(e); - LOG.error("Failed replacing compacted files for " + this.storeName + - ". Compacted file is " + finalCompactedFile.toString() + - ". Files replaced are " + compactedFiles.toString() + - " some of which may have been already removed", e); + // 7. Compute new store size + storeSize = 0; + for (HStoreFile hsf: storefiles.values()) { + storeSize += hsf.length(); + } } } finally { - // 7. Releasing the write-lock + // 8. Releasing the write-lock this.lock.writeLock().unlock(); } } finally { - // 8. Allow new scanners to proceed. + // 9. Allow new scanners to proceed. newScannerLock.writeLock().unlock(); } } @@ -1604,13 +1650,13 @@ * The returned object should map column names to byte arrays (byte[]). */ void getFull(HStoreKey key, TreeMap results) - throws IOException { + throws IOException { Map> deletes = new HashMap>(); - + if (key == null) { return; } - + this.lock.readLock().lock(); memcache.getFull(key, results); try { @@ -1636,11 +1682,11 @@ } else if(key.getRow().compareTo(readkey.getRow()) < 0) { break; } - + } while(map.next(readkey, readval)); } } - + } finally { this.lock.readLock().unlock(); } @@ -1665,7 +1711,7 @@ if (numVersions <= 0) { throw new IllegalArgumentException("Number of versions must be > 0"); } - + this.lock.readLock().lock(); try { // Check the memcache @@ -1712,10 +1758,10 @@ } } for (readval = new ImmutableBytesWritable(); - map.next(readkey, readval) && - readkey.matchesRowCol(key) && - !hasEnoughVersions(numVersions, results); - readval = new ImmutableBytesWritable()) { + map.next(readkey, readval) && + readkey.matchesRowCol(key) && + !hasEnoughVersions(numVersions, results); + readval = new ImmutableBytesWritable()) { if (!isDeleted(readkey, readval.get(), true, deletes)) { results.add(readval.get()); } @@ -1726,7 +1772,7 @@ } } return results.size() == 0 ? - null : ImmutableBytesWritable.toArray(results); + null : ImmutableBytesWritable.toArray(results); } finally { this.lock.readLock().unlock(); } @@ -1753,13 +1799,13 @@ * @throws IOException */ List getKeys(final HStoreKey origin, final int versions) - throws IOException { - + throws IOException { + List keys = this.memcache.getKeys(origin, versions); if (versions != ALL_VERSIONS && keys.size() >= versions) { return keys; } - + // This code below is very close to the body of the get method. this.lock.readLock().lock(); try { @@ -1768,7 +1814,7 @@ MapFile.Reader map = maparray[i]; synchronized(map) { map.reset(); - + // do the priming read ImmutableBytesWritable readval = new ImmutableBytesWritable(); HStoreKey readkey = (HStoreKey)map.getClosest(origin, readval); @@ -1779,7 +1825,7 @@ // BEFORE. continue; } - + do{ // if the row matches, we might want this one. if(rowMatches(origin, readkey)){ @@ -1808,7 +1854,7 @@ }while(map.next(readkey, readval)); // advance to the next key } } - + return keys; } finally { this.lock.readLock().unlock(); @@ -1818,9 +1864,14 @@ /** * Find the key that matches row exactly, or the one that immediately * preceeds it. + * + * @param row + * @param timestamp + * @return row key + * @throws IOException */ - public Text getRowKeyAtOrBefore(final Text row, final long timestamp) - throws IOException{ + Text getRowKeyAtOrBefore(final Text row, final long timestamp) + throws IOException { // if the exact key is found, return that key // if we find a key that is greater than our search key, then use the // last key we processed, and if that was null, return null. @@ -1829,38 +1880,37 @@ if (foundKey != null) { return foundKey; } - + // obtain read lock this.lock.readLock().lock(); try { MapFile.Reader[] maparray = getReaders(); - Text bestSoFar = null; - - HStoreKey rowKey = new HStoreKey(row, timestamp); - + +// HStoreKey rowKey = new HStoreKey(row, timestamp); + // process each store file for(int i = maparray.length - 1; i >= 0; i--) { Text row_from_mapfile = rowAtOrBeforeFromMapFile(maparray[i], row, timestamp); - + // if the result from the mapfile is null, then we know that // the mapfile was empty and can move on to the next one. if (row_from_mapfile == null) { continue; } - + // short circuit on an exact match if (row.equals(row_from_mapfile)) { return row; } - + // check to see if we've found a new closest row key as a result if (bestSoFar == null || bestSoFar.compareTo(row_from_mapfile) < 0) { bestSoFar = row_from_mapfile; } } - + return bestSoFar; } finally { this.lock.readLock().unlock(); @@ -1872,34 +1922,33 @@ * and timestamp */ private Text rowAtOrBeforeFromMapFile(MapFile.Reader map, Text row, - long timestamp) - throws IOException { + long timestamp) throws IOException { HStoreKey searchKey = new HStoreKey(row, timestamp); Text previousRow = null; ImmutableBytesWritable readval = new ImmutableBytesWritable(); HStoreKey readkey = new HStoreKey(); - + synchronized(map) { // don't bother with the rest of this if the file is empty map.reset(); if (!map.next(readkey, readval)) { return null; } - + HStoreKey finalKey = new HStoreKey(); map.finalKey(finalKey); if (finalKey.getRow().compareTo(row) < 0) { return finalKey.getRow(); } - + // seek to the exact row, or the one that would be immediately before it readkey = (HStoreKey)map.getClosest(searchKey, readval, true); - + if (readkey == null) { // didn't find anything that would match, so returns return null; } - + do { if (readkey.getRow().compareTo(row) == 0) { // exact match on row @@ -1971,105 +2020,83 @@ // otherwise, we want to match on row and column return target.matchesRowCol(origin); } - - /* - * Data structure to hold result of a look at store file sizes. - */ - static class HStoreSize { - final long aggregate; - final long largest; - boolean splitable; - - HStoreSize(final long a, final long l, final boolean s) { - this.aggregate = a; - this.largest = l; - this.splitable = s; - } - - long getAggregate() { - return this.aggregate; - } - - long getLargest() { - return this.largest; - } - - boolean isSplitable() { - return this.splitable; - } - - void setSplitable(final boolean s) { - this.splitable = s; - } - } - + /** - * Gets size for the store. + * Determines if HStore can be split * - * @param midKey Gets set to the middle key of the largest splitable store - * file or its set to empty if largest is not splitable. - * @return Sizes for the store and the passed midKey is - * set to midKey of largest splitable. Otherwise, its set to empty - * to indicate we couldn't find a midkey to split on + * @return midKey if store can be split, null otherwise */ - HStoreSize size(Text midKey) { - long maxSize = 0L; - long aggregateSize = 0L; - // Not splitable if we find a reference store file present in the store. - boolean splitable = true; + Text checkSplit() { if (this.storefiles.size() <= 0) { - return new HStoreSize(0, 0, splitable); + return null; } - this.lock.readLock().lock(); try { + // Not splitable if we find a reference store file present in the store. + boolean splitable = true; + long maxSize = 0L; Long mapIndex = Long.valueOf(0L); // Iterate through all the MapFiles - for(Map.Entry e: storefiles.entrySet()) { - HStoreFile curHSF = e.getValue(); - long size = curHSF.length(); - aggregateSize += size; - if (maxSize == 0L || size > maxSize) { - // This is the largest one so far - maxSize = size; - mapIndex = e.getKey(); + synchronized (storefiles) { + for(Map.Entry e: storefiles.entrySet()) { + HStoreFile curHSF = e.getValue(); + long size = curHSF.length(); + if (size > maxSize) { + // This is the largest one so far + maxSize = size; + mapIndex = e.getKey(); + } + if (splitable) { + splitable = !curHSF.isReference(); + } } - if (splitable) { - splitable = !curHSF.isReference(); - } } + if (!splitable) { + return null; + } MapFile.Reader r = this.readers.get(mapIndex); - + // seek back to the beginning of mapfile r.reset(); - + // get the first and last keys HStoreKey firstKey = new HStoreKey(); HStoreKey lastKey = new HStoreKey(); Writable value = new ImmutableBytesWritable(); - r.next((WritableComparable)firstKey, value); - r.finalKey((WritableComparable)lastKey); - + r.next(firstKey, value); + r.finalKey(lastKey); + // get the midkey HStoreKey midkey = (HStoreKey)r.midKey(); if (midkey != null) { - midKey.set(((HStoreKey)midkey).getRow()); // if the midkey is the same as the first and last keys, then we cannot // (ever) split this region. if (midkey.getRow().equals(firstKey.getRow()) && - midkey.getRow().equals(lastKey.getRow())) { - return new HStoreSize(aggregateSize, maxSize, false); - } + midkey.getRow().equals(lastKey.getRow())) { +System.err.println("firstkey=("+firstKey.getRow()+") lastKey=("+lastKey.getRow()+ + ") midkey=("+midkey.getRow()+")"); + return null; + } + return midkey.getRow(); } } catch(IOException e) { LOG.warn("Failed getting store size for " + this.storeName, e); } finally { this.lock.readLock().unlock(); } - return new HStoreSize(aggregateSize, maxSize, splitable); + return null; } + + /** @return aggregate size of HStore */ + public long getSize() { + return storeSize; + } + long getLastFlushTime() { + return lastFlushTime; + } + ////////////////////////////////////////////////////////////////////////////// // File administration ////////////////////////////////////////////////////////////////////////////// @@ -2095,10 +2122,15 @@ } } + /** @return the HColumnDescriptor for this store */ + public HColumnDescriptor getFamily() { + return family; + } + /** {@inheritDoc} */ @Override public String toString() { - return this.storeName; + return this.storeName.toString(); } /* @@ -2139,39 +2171,41 @@ private class StoreFileScanner extends HAbstractScanner { @SuppressWarnings("hiding") private MapFile.Reader[] readers; - + StoreFileScanner(long timestamp, Text[] targetCols, Text firstRow) throws IOException { super(timestamp, targetCols); try { - this.readers = new MapFile.Reader[storefiles.size()]; - - // Most recent map file should be first - int i = readers.length - 1; - for(HStoreFile curHSF: storefiles.values()) { - readers[i--] = curHSF.getReader(fs, bloomFilter); + synchronized (storefiles) { + this.readers = new MapFile.Reader[storefiles.size()]; + + // Most recent map file should be first + int i = readers.length - 1; + for(HStoreFile curHSF: storefiles.values()) { + readers[i--] = curHSF.getReader(fs, bloomFilter); + } } - + this.keys = new HStoreKey[readers.length]; this.vals = new byte[readers.length][]; - + // Advance the readers to the first pos. - for(i = 0; i < readers.length; i++) { + for(int i = 0; i < readers.length; i++) { keys[i] = new HStoreKey(); - + if(firstRow.getLength() != 0) { if(findFirstRow(i, firstRow)) { continue; } } - + while(getNext(i)) { if(columnMatch(i)) { break; } } } - + } catch (Exception ex) { close(); IOException e = new IOException("HStoreScanner failed construction"); @@ -2192,7 +2226,7 @@ boolean findFirstRow(int i, Text firstRow) throws IOException { ImmutableBytesWritable ibw = new ImmutableBytesWritable(); HStoreKey firstKey - = (HStoreKey)readers[i].getClosest(new HStoreKey(firstRow), ibw); + = (HStoreKey)readers[i].getClosest(new HStoreKey(firstRow), ibw); if (firstKey == null) { // Didn't find it. Close the scanner and return TRUE closeSubScanner(i); @@ -2204,7 +2238,7 @@ keys[i].setVersion(firstKey.getTimestamp()); return columnMatch(i); } - + /** * Get the next value from the specified reader. * @@ -2228,7 +2262,7 @@ } return result; } - + /** Close down the indicated reader. */ @Override void closeSubScanner(int i) { @@ -2240,7 +2274,7 @@ LOG.error(storeName + " closing sub-scanner", e); } } - + } finally { readers[i] = null; keys[i] = null; @@ -2261,7 +2295,7 @@ } } } - + } finally { scannerClosed = true; } @@ -2284,7 +2318,7 @@ @SuppressWarnings("unchecked") HStoreScanner(Text[] targetCols, Text firstRow, long timestamp, RowFilterInterface filter) throws IOException { - + this.dataFilter = filter; if (null != dataFilter) { dataFilter.reset(); @@ -2296,7 +2330,7 @@ try { scanners[0] = memcache.getScanner(timestamp, targetCols, firstRow); scanners[1] = new StoreFileScanner(timestamp, targetCols, firstRow); - + for (int i = 0; i < scanners.length; i++) { if (scanners[i].isWildcardScanner()) { this.wildcardMatch = true; @@ -2314,10 +2348,10 @@ } throw e; } - + // Advance to the first key in each scanner. // All results will match the required column-set and scanTime. - + for (int i = 0; i < scanners.length; i++) { keys[i] = new HStoreKey(); resultSets[i] = new TreeMap(); @@ -2342,7 +2376,7 @@ /** {@inheritDoc} */ public boolean next(HStoreKey key, SortedMap results) - throws IOException { + throws IOException { // Filtered flag is set by filters. If a cell has been 'filtered out' // -- i.e. it is not to be returned to the caller -- the flag is 'true'. @@ -2355,14 +2389,14 @@ for (int i = 0; i < this.keys.length; i++) { if (scanners[i] != null && (chosenRow == null || - (keys[i].getRow().compareTo(chosenRow) < 0) || - ((keys[i].getRow().compareTo(chosenRow) == 0) && - (keys[i].getTimestamp() > chosenTimestamp)))) { + (keys[i].getRow().compareTo(chosenRow) < 0) || + ((keys[i].getRow().compareTo(chosenRow) == 0) && + (keys[i].getTimestamp() > chosenTimestamp)))) { chosenRow = new Text(keys[i].getRow()); chosenTimestamp = keys[i].getTimestamp(); } } - + // Filter whole row by row key? filtered = dataFilter != null? dataFilter.filter(chosenRow) : false; @@ -2406,7 +2440,7 @@ // values with older ones. So now we only insert // a result if the map does not contain the key. HStoreKey hsk = new HStoreKey(key.getRow(), EMPTY_TEXT, - key.getTimestamp()); + key.getTimestamp()); for (Map.Entry e : resultSets[i].entrySet()) { hsk.setColumn(e.getKey()); if (HLogEdit.isDeleted(e.getValue())) { @@ -2422,7 +2456,7 @@ if (dataFilter != null) { // Filter whole row by column data? filtered = - dataFilter.filter(chosenRow, e.getKey(), e.getValue()); + dataFilter.filter(chosenRow, e.getKey(), e.getValue()); if (filtered) { results.clear(); break; @@ -2438,7 +2472,7 @@ } } } - + for (int i = 0; i < scanners.length; i++) { // If the current scanner is non-null AND has a lower-or-equal // row label, then its timestamp is bad. We need to advance it. @@ -2452,7 +2486,7 @@ } moreToFollow = chosenTimestamp >= 0; - + if (dataFilter != null) { if (moreToFollow) { dataFilter.rowProcessed(filtered, chosenRow); @@ -2461,19 +2495,19 @@ moreToFollow = false; } } - + if (results.size() <= 0 && !filtered) { // There were no results found for this row. Marked it as // 'filtered'-out otherwise we will not move on to the next row. filtered = true; } } - + // If we got no results, then there is no more to follow. if (results == null || results.size() <= 0) { moreToFollow = false; } - + // Make sure scanners closed if no more results if (!moreToFollow) { for (int i = 0; i < scanners.length; i++) { @@ -2482,11 +2516,11 @@ } } } - + return moreToFollow; } - + /** Shut down a single scanner */ void closeScanner(int i) { try { @@ -2505,11 +2539,11 @@ /** {@inheritDoc} */ public void close() { try { - for(int i = 0; i < scanners.length; i++) { - if(scanners[i] != null) { - closeScanner(i); + for(int i = 0; i < scanners.length; i++) { + if(scanners[i] != null) { + closeScanner(i); + } } - } } finally { synchronized (activeScanners) { int numberOfScanners = activeScanners.decrementAndGet(); @@ -2531,7 +2565,7 @@ /** {@inheritDoc} */ public Iterator>> iterator() { throw new UnsupportedOperationException("Unimplemented serverside. " + - "next(HStoreKey, StortedMap(...) is more efficient"); + "next(HStoreKey, StortedMap(...) is more efficient"); } } } Index: src/java/org/apache/hadoop/hbase/HMerge.java =================================================================== --- src/java/org/apache/hadoop/hbase/HMerge.java (revision 620703) +++ src/java/org/apache/hadoop/hbase/HMerge.java (working copy) @@ -138,17 +138,16 @@ long currentSize = 0; HRegion nextRegion = null; long nextSize = 0; - Text midKey = new Text(); for (int i = 0; i < info.length - 1; i++) { if (currentRegion == null) { currentRegion = new HRegion(tabledir, hlog, fs, conf, info[i], null, null); - currentSize = currentRegion.largestHStore(midKey).getAggregate(); + currentSize = currentRegion.getLargestHStoreSize(); } nextRegion = new HRegion(tabledir, hlog, fs, conf, info[i + 1], null, null); - nextSize = nextRegion.largestHStore(midKey).getAggregate(); + nextSize = nextRegion.getLargestHStoreSize(); if ((currentSize + nextSize) <= (maxFilesize / 2)) { // We merge two adjacent regions if their total size is less than @@ -348,7 +347,6 @@ oldRegion2 }; for(int r = 0; r < regionsToDelete.length; r++) { - long lockid = Math.abs(rand.nextLong()); BatchUpdate b = new BatchUpdate(regionsToDelete[r]); b.delete(COL_REGIONINFO); b.delete(COL_SERVER); Index: src/java/org/apache/hadoop/hbase/CacheFlushListener.java =================================================================== --- src/java/org/apache/hadoop/hbase/CacheFlushListener.java (revision 620703) +++ src/java/org/apache/hadoop/hbase/CacheFlushListener.java (working copy) @@ -1,36 +0,0 @@ -/** - * Copyright 2007 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase; - -/** - * Implementors of this interface want to be notified when an HRegion - * determines that a cache flush is needed. A CacheFlushListener (or null) - * must be passed to the HRegion constructor. - */ -public interface CacheFlushListener { - - /** - * Tell the listener the cache needs to be flushed. - * - * @param region the HRegion requesting the cache flush - */ - void flushRequested(HRegion region); -} Index: src/java/org/apache/hadoop/hbase/HStoreKey.java =================================================================== --- src/java/org/apache/hadoop/hbase/HStoreKey.java (revision 620703) +++ src/java/org/apache/hadoop/hbase/HStoreKey.java (working copy) @@ -37,6 +37,7 @@ private Text row; private Text column; private long timestamp; + private transient Text family; /** Default constructor used in conjunction with Writable interface */ @@ -89,6 +90,7 @@ this.row = new Text(row); this.column = new Text(column); this.timestamp = timestamp; + this.family = null; } /** @return Approximate size in bytes of this key. */ @@ -120,8 +122,9 @@ * * @param newcol new column key value */ - public void setColumn(Text newcol) { + public synchronized void setColumn(Text newcol) { this.column.set(newcol); + this.family = null; } /** @@ -154,6 +157,17 @@ return column; } + /** + * @return the column family name (minus the trailing colon) + * @throws InvalidColumnNameException + */ + public synchronized Text getFamily() throws InvalidColumnNameException { + if (family == null) { + family = extractFamily(column); + } + return family; + } + /** @return value of timestamp */ public long getTimestamp() { return timestamp; @@ -198,8 +212,7 @@ public boolean matchesRowFamily(HStoreKey that) throws InvalidColumnNameException { return this.row.compareTo(that.row) == 0 && - extractFamily(this.column). - compareTo(extractFamily(that.getColumn())) == 0; + this.getFamily().equals(that.getFamily()); } /** {@inheritDoc} */ @@ -225,6 +238,7 @@ // Comparable + /** {@inheritDoc} */ public int compareTo(Object o) { HStoreKey other = (HStoreKey)o; int result = this.row.compareTo(other.row); @@ -275,7 +289,7 @@ * the result by calling {@link TextSequence#toText()}. * @throws InvalidColumnNameException */ - public static TextSequence extractFamily(final Text col) + public static Text extractFamily(final Text col) throws InvalidColumnNameException { return extractFamily(col, false); } @@ -284,40 +298,41 @@ * Extracts the column family name from a column * For example, returns 'info' if the specified column was 'info:server' * @param col name of column - * @return column famile as a TextSequence based on the passed - * col. If col is reused, make a new Text of - * the result by calling {@link TextSequence#toText()}. + * @param withColon + * @return column family as a Text based on the passed col. * @throws InvalidColumnNameException */ - public static TextSequence extractFamily(final Text col, - final boolean withColon) + public static Text extractFamily(final Text col, final boolean withColon) throws InvalidColumnNameException { int offset = getColonOffset(col); // Include ':' in copy? - offset += (withColon)? 1: 0; + offset += (withColon)? 1 : 0; if (offset == col.getLength()) { - return new TextSequence(col); + return col; } - return new TextSequence(col, 0, offset); + Text family = new Text(); + family.set(col.getBytes(), 0, offset); + return family; } /** - * Extracts the column qualifier, the portion that follows the colon (':') - * family/qualifier separator. + * Extracts the column member, the portion that follows the colon (':') + * family:member separator. * For example, returns 'server' if the specified column was 'info:server' * @param col name of column - * @return column qualifier as a TextSequence based on the passed - * col. If col is reused, make a new Text of - * the result by calling {@link TextSequence#toText()}. + * @return column qualifier as a Text based on the passed col. * @throws InvalidColumnNameException */ - public static TextSequence extractQualifier(final Text col) + public static Text extractMember(final Text col) throws InvalidColumnNameException { - int offset = getColonOffset(col); - if (offset + 1 == col.getLength()) { + int offset = getColonOffset(col) + 1; + if (offset == col.getLength()) { return null; } - return new TextSequence(col, offset + 1); + + Text member = new Text(); + member.set(col.getBytes(), offset, col.getLength() - offset); + return member; } private static int getColonOffset(final Text col) @@ -333,7 +348,7 @@ } if(offset < 0) { throw new InvalidColumnNameException(col + " is missing the colon " + - "family/qualifier separator"); + "family:member separator"); } return offset; } Index: src/java/org/apache/hadoop/hbase/HRegionServer.java =================================================================== --- src/java/org/apache/hadoop/hbase/HRegionServer.java (revision 620703) +++ src/java/org/apache/hadoop/hbase/HRegionServer.java (working copy) @@ -72,7 +72,8 @@ * HRegionServer makes a set of HRegions available to clients. It checks in with * the HMaster. There are many HRegionServers in a single HBase deployment. */ -public class HRegionServer implements HConstants, HRegionInterface, Runnable { +public class HRegionServer implements HConstants, HRegionInterface, +HStoreListener, Runnable { static final Log LOG = LogFactory.getLog(HRegionServer.class); // Set when a report to the master comes back with a message asking us to @@ -165,11 +166,13 @@ } /** Queue entry passed to flusher, compactor and splitter threads */ - class QueueEntry implements Delayed { + static class QueueEntry implements Delayed { + private final HStore store; private final HRegion region; private long expirationTime; - QueueEntry(HRegion region, long expirationTime) { + QueueEntry(HStore store, HRegion region, long expirationTime) { + this.store = store; this.region = region; this.expirationTime = expirationTime; } @@ -184,7 +187,7 @@ /** {@inheritDoc} */ @Override public int hashCode() { - return this.region.getRegionInfo().hashCode(); + return this.store.getFamily().hashCode(); } /** {@inheritDoc} */ @@ -207,6 +210,11 @@ } return value; } + + /** @return the store */ + public HStore getStore() { + return store; + } /** @return the region */ public HRegion getRegion() { @@ -236,6 +244,8 @@ private final BlockingQueue compactionQueue = new LinkedBlockingQueue(); + + private volatile QueueEntry compactionInProgress; /** constructor */ public CompactSplitThread() { @@ -243,6 +253,7 @@ this.frequency = conf.getLong("hbase.regionserver.thread.splitcompactcheckfrequency", 20 * 1000); + this.compactionInProgress = null; } /** {@inheritDoc} */ @@ -255,8 +266,20 @@ if (e == null) { continue; } - e.getRegion().compactIfNeeded(); - split(e.getRegion()); + HRegion r = e.getRegion(); + HStore storeToCompact = e.getStore(); + Text midKey = r.compactStore(e.getStore()); + if (midKey != null) { + // Store wants region to split + // compact all the other stores so split can happen + for (HStore s: r.getStores()) { + if (s.storeName.equals(storeToCompact.storeName)) { + continue; + } + r.compactStore(s); + } + split(r, midKey); + } } catch (InterruptedException ex) { continue; } catch (IOException ex) { @@ -280,19 +303,28 @@ } /** - * @param e QueueEntry for region to be compacted + * @param s HStore to be compacted + * @param r HRegion the store belongs to + * @return true if a compaction was queued */ - public void compactionRequested(QueueEntry e) { - compactionQueue.add(e); + public boolean compactionRequested(final HStore s, final HRegion r) { + QueueEntry e = new QueueEntry(s, r, System.currentTimeMillis()); + synchronized (compactionQueue) { + if (compactionInProgress != null && compactionInProgress.equals(e)) { + return false; + } + if (compactionQueue.contains(e)) { + return false; + } + compactionQueue.add(e); + } + return true; } - void compactionRequested(final HRegion r) { - compactionRequested(new QueueEntry(r, System.currentTimeMillis())); - } - - private void split(final HRegion region) throws IOException { + private void split(final HRegion region, final Text midKey) + throws IOException { final HRegionInfo oldRegionInfo = region.getRegionInfo(); - final HRegion[] newRegions = region.splitRegion(this); + final HRegion[] newRegions = region.splitRegion(this, midKey); if (newRegions == null) { // Didn't need to be split return; @@ -323,6 +355,8 @@ oldRegionInfo.setSplit(true); BatchUpdate update = new BatchUpdate(oldRegionInfo.getRegionName()); update.put(COL_REGIONINFO, Writables.getBytes(oldRegionInfo)); + update.delete(COL_SERVER); + update.delete(COL_STARTCODE); update.put(COL_SPLITA, Writables.getBytes(newRegions[0].getRegionInfo())); update.put(COL_SPLITB, Writables.getBytes(newRegions[1].getRegionInfo())); t.commit(update); @@ -333,11 +367,6 @@ update.put(COL_REGIONINFO, Writables.getBytes( newRegions[i].getRegionInfo())); t.commit(update); - -/* long lockid = t.startUpdate(newRegions[i].getRegionName()); - t.put(lockid, COL_REGIONINFO, Writables.getBytes( - newRegions[i].getRegionInfo())); - t.commit(lockid);*/ } // Now tell the master about the new regions @@ -393,17 +422,19 @@ final Integer cacheFlusherLock = new Integer(0); /** Flush cache upon request */ - class Flusher extends Thread implements CacheFlushListener { + class Flusher extends Thread { private final DelayQueue flushQueue = new DelayQueue(); private final long optionalFlushPeriod; + private volatile QueueEntry flushInProgress; /** constructor */ public Flusher() { super(); this.optionalFlushPeriod = conf.getLong( "hbase.regionserver.optionalcacheflushinterval", 30 * 60 * 1000L); + this.flushInProgress = null; } @@ -417,24 +448,27 @@ if (e == null) { continue; } + synchronized (flushQueue) { + flushInProgress = e; + } synchronized(cacheFlusherLock) { // Don't interrupt while we're working - if (e.getRegion().flushcache()) { - compactSplitThread.compactionRequested(e); - } - + e.getRegion().flushcache(e.getStore()); e.setExpirationTime(System.currentTimeMillis() + optionalFlushPeriod); flushQueue.add(e); } - // Now insure that all the active regions are in the queue + // Now insure that all the active stores are in the queue Set regions = getRegionsToCheck(); for (HRegion r: regions) { - e = new QueueEntry(r, r.getLastFlushTime() + optionalFlushPeriod); - synchronized (flushQueue) { - if (!flushQueue.contains(e)) { - flushQueue.add(e); + for (HStore s: r.getStores()) { + e = new QueueEntry(s, r, + s.getLastFlushTime() + optionalFlushPeriod); + synchronized (flushQueue) { + if (!flushQueue.contains(e)) { + flushQueue.add(e); + } } } } @@ -487,15 +521,23 @@ LOG.info(getName() + " exiting"); } - /** {@inheritDoc} */ - public void flushRequested(HRegion region) { - QueueEntry e = new QueueEntry(region, System.currentTimeMillis()); + /** + * @param store + * @param region + * @return true if the entry was updated, false if already in progress + */ + public boolean flushRequested(HStore store, HRegion region) { + QueueEntry e = new QueueEntry(store, region, System.currentTimeMillis()); synchronized (flushQueue) { + if (flushInProgress != null && flushInProgress.equals(e)) { + return false; + } if (flushQueue.contains(e)) { flushQueue.remove(e); } flushQueue.add(e); } + return true; } } @@ -1030,6 +1072,16 @@ return result; } + /** {@inheritDoc} */ + public boolean flushRequested(HStore store, HRegion region) { + return cacheFlusher.flushRequested(store, region); + } + + /** {@inheritDoc} */ + public boolean compactionRequested(HStore store, HRegion region) { + return compactSplitThread.compactionRequested(store, region); + } + /** Add to the outbound message buffer */ private void reportOpen(HRegionInfo region) { outboundMsgs.add(new HMsg(HMsg.MSG_REPORT_OPEN, region)); @@ -1159,10 +1211,10 @@ HTableDescriptor.getTableDir(rootDir, regionInfo.getTableDesc().getName() ), - this.log, this.fs, conf, regionInfo, null, this.cacheFlusher + this.log, this.fs, conf, regionInfo, null, this ); // Startup a compaction early if one is needed. - this.compactSplitThread.compactionRequested(region); + region.compactStores(); } catch (IOException e) { LOG.error("error opening region " + regionInfo.getRegionName(), e); @@ -1570,11 +1622,6 @@ return this.requestCount; } - /** @return reference to CacheFlushListener */ - public CacheFlushListener getCacheFlushListener() { - return this.cacheFlusher; - } - /** * Protected utility method for safely obtaining an HRegion handle. * @param regionName Name of online {@link HRegion} to return Index: src/java/org/apache/hadoop/hbase/HConstants.java =================================================================== --- src/java/org/apache/hadoop/hbase/HConstants.java (revision 620703) +++ src/java/org/apache/hadoop/hbase/HConstants.java (working copy) @@ -32,7 +32,7 @@ static final String VERSION_FILE_NAME = "hbase.version"; /** version of file system */ - static final String FILE_SYSTEM_VERSION = "0.1"; + static final String FILE_SYSTEM_VERSION = "2"; // Configuration parameters Index: src/java/org/apache/hadoop/hbase/HLog.java =================================================================== --- src/java/org/apache/hadoop/hbase/HLog.java (revision 620703) +++ src/java/org/apache/hadoop/hbase/HLog.java (working copy) @@ -35,6 +35,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; @@ -50,24 +51,25 @@ * underlying file is being rolled. * *

- * A single HLog is used by several HRegions simultaneously. + * A single HLog is used by several HRegion/HStores simultaneously. * *

- * Each HRegion is identified by a unique long int. HRegions do - * not need to declare themselves before using the HLog; they simply include - * their HRegion-id in the append or - * completeCacheFlush calls. + * Each HStore is identified by a unique string consisting of the encoded + * HRegion name and by the HStore family name. + * HStores do not need to declare themselves before using the HLog; they simply + * include their HStore.storeName in the append + * or completeCacheFlush calls. * *

* An HLog consists of multiple on-disk files, which have a chronological order. * As data is flushed to other (better) on-disk structures, the log becomes - * obsolete. We can destroy all the log messages for a given HRegion-id up to - * the most-recent CACHEFLUSH message from that HRegion. + * obsolete. We can destroy all the log messages for a given HStore up to + * the most-recent CACHEFLUSH message from that HStore. * *

* It's only practical to delete entire files. Thus, we delete an entire on-disk * file F when all of the messages in F have a log-sequence-id that's older - * (smaller) than the most-recent CACHEFLUSH message for every HRegion that has + * (smaller) than the most-recent CACHEFLUSH message for every HStore that has * a message in F. * *

@@ -77,7 +79,7 @@ * separate reentrant lock is used. * *

- * TODO: Vuk Ercegovac also pointed out that keeping HBase HRegion edit logs in + * TODO: Vuk Ercegovac also pointed out that keeping HBase edit logs in * HDFS is currently flawed. HBase writes edits to logs and to a memcache. The * 'atomic' write to the log is meant to serve as insurance against abnormal * RegionServer exit: on startup, the log is rerun to reconstruct an HRegion's @@ -109,7 +111,7 @@ Collections.synchronizedSortedMap(new TreeMap()); /* - * Map of region to last sequence/edit id. + * Map of store to last sequence/edit id. */ final Map lastSeqWritten = new ConcurrentHashMap(); @@ -238,7 +240,7 @@ if (this.outputfiles.size() > 0) { if (this.lastSeqWritten.size() <= 0) { LOG.debug("Last sequence written is empty. Deleting all old hlogs"); - // If so, then no new writes have come in since all regions were + // If so, then no new writes have come in since all stores were // flushed (and removed from the lastSeqWritten map). Means can // remove all but currently open log file. for (Map.Entry e : this.outputfiles.entrySet()) { @@ -251,17 +253,17 @@ Long oldestOutstandingSeqNum = Collections.min(this.lastSeqWritten.values()); // Get the set of all log files whose final ID is older than or - // equal to the oldest pending region operation + // equal to the oldest pending HStore operation TreeSet sequenceNumbers = new TreeSet(this.outputfiles.headMap( (Long.valueOf(oldestOutstandingSeqNum.longValue() + 1L))).keySet()); // Now remove old log files (if any) if (LOG.isDebugEnabled()) { - // Find region associated with oldest key -- helps debugging. - Text oldestRegion = null; + // Find store associated with oldest key -- helps debugging. + Text oldestStore = null; for (Map.Entry e: this.lastSeqWritten.entrySet()) { if (e.getValue().longValue() == oldestOutstandingSeqNum.longValue()) { - oldestRegion = e.getKey(); + oldestStore = e.getKey(); break; } } @@ -269,7 +271,7 @@ LOG.debug("Found " + sequenceNumbers.size() + " logs to remove " + "using oldest outstanding seqnum of " + - oldestOutstandingSeqNum + " from region " + oldestRegion); + oldestOutstandingSeqNum + " from store " + oldestStore); } } if (sequenceNumbers.size() > 0) { @@ -331,7 +333,7 @@ } /** - * Append a set of edits to the log. Log edits are keyed by regionName, + * Append a set of edits to the log. Log edits are keyed by storeName, * rowname, and log-sequence-id. * * Later, if we sort by these keys, we obtain all the relevant edits for a @@ -347,38 +349,32 @@ * synchronized prevents appends during the completion of a cache flush or for * the duration of a log roll. * - * @param regionName + * @param storeName * @param tableName - * @param row - * @param columns - * @param timestamp + * @param key + * @param value * @throws IOException */ - void append(Text regionName, Text tableName, - TreeMap edits) throws IOException { + void append(Text storeName, Text tableName, HStoreKey key, byte[] value) + throws IOException { if (closed) { throw new IOException("Cannot append; log is closed"); } synchronized (updateLock) { - long seqNum[] = obtainSeqNum(edits.size()); + long seqNum = obtainSeqNum(); // The 'lastSeqWritten' map holds the sequence number of the oldest - // write for each region. When the cache is flushed, the entry for the - // region being flushed is removed if the sequence number of the flush + // write for each HStore. When the cache is flushed, the entry for the + // store being flushed is removed if the sequence number of the flush // is greater than or equal to the value in lastSeqWritten. - if (!this.lastSeqWritten.containsKey(regionName)) { - this.lastSeqWritten.put(regionName, Long.valueOf(seqNum[0])); + if (!this.lastSeqWritten.containsKey(storeName)) { + this.lastSeqWritten.put(storeName, Long.valueOf(seqNum)); } - int counter = 0; - for (Map.Entry es : edits.entrySet()) { - HStoreKey key = es.getKey(); - HLogKey logKey = - new HLogKey(regionName, tableName, key.getRow(), seqNum[counter++]); - HLogEdit logEdit = - new HLogEdit(key.getColumn(), es.getValue(), key.getTimestamp()); - this.writer.append(logKey, logEdit); - this.numEntries++; - } + HLogKey logKey = new HLogKey(storeName, tableName, key.getRow(), seqNum); + HLogEdit logEdit = + new HLogEdit(key.getColumn(), value, key.getTimestamp()); + this.writer.append(logKey, logEdit); + this.numEntries++; } if (this.numEntries > this.maxlogentries) { if (listener != null) { @@ -409,22 +405,6 @@ } /** - * Obtain a specified number of sequence numbers - * - * @param num number of sequence numbers to obtain - * @return array of sequence numbers - */ - private long[] obtainSeqNum(int num) { - long[] results = new long[num]; - synchronized (this.sequenceLock) { - for (int i = 0; i < num; i++) { - results[i] = this.logSeqNum++; - } - } - return results; - } - - /** * By acquiring a log sequence ID, we can allow log messages to continue while * we flush the cache. * @@ -446,12 +426,12 @@ * * Protected by cacheFlushLock * - * @param regionName + * @param storeName * @param tableName * @param logSeqId * @throws IOException */ - void completeCacheFlush(final Text regionName, final Text tableName, + void completeCacheFlush(final Text storeName, final Text tableName, final long logSeqId) throws IOException { try { @@ -459,13 +439,14 @@ return; } synchronized (updateLock) { - this.writer.append(new HLogKey(regionName, tableName, HLog.METAROW, logSeqId), + this.writer.append( + new HLogKey(storeName, tableName, HLog.METAROW, logSeqId), new HLogEdit(HLog.METACOLUMN, HLogEdit.completeCacheFlush.get(), System.currentTimeMillis())); this.numEntries++; - Long seq = this.lastSeqWritten.get(regionName); + Long seq = this.lastSeqWritten.get(storeName); if (seq != null && logSeqId >= seq.longValue()) { - this.lastSeqWritten.remove(regionName); + this.lastSeqWritten.remove(storeName); } } } finally { @@ -496,7 +477,7 @@ */ public static void splitLog(Path rootDir, Path srcDir, FileSystem fs, Configuration conf) throws IOException { - Path logfiles[] = fs.listPaths(new Path[] { srcDir }); + FileStatus logfiles[] = fs.listStatus(srcDir); LOG.info("splitting " + logfiles.length + " log(s) in " + srcDir.toString()); Map logWriters = @@ -508,25 +489,28 @@ logfiles[i]); } // Check for empty file. - if (fs.getFileStatus(logfiles[i]).getLen() <= 0) { + if (logfiles[i].getLen() <= 0) { LOG.info("Skipping " + logfiles[i].toString() + " because zero length"); continue; } HLogKey key = new HLogKey(); HLogEdit val = new HLogEdit(); - SequenceFile.Reader in = new SequenceFile.Reader(fs, logfiles[i], conf); + SequenceFile.Reader in = + new SequenceFile.Reader(fs, logfiles[i].getPath(), conf); try { int count = 0; for (; in.next(key, val); count++) { Text tableName = key.getTablename(); - Text regionName = key.getRegionName(); + Text storeName = key.getStoreName(); + Text regionName = new Text(); + regionName.set(storeName.getBytes(), 0, storeName.find("/")); SequenceFile.Writer w = logWriters.get(regionName); if (w == null) { Path logfile = new Path( HRegion.getRegionDir( HTableDescriptor.getTableDir(rootDir, tableName), - HRegionInfo.encodeRegionName(regionName) + regionName.toString() ), HREGION_OLDLOGFILE_NAME ); Index: src/java/org/apache/hadoop/hbase/HStoreListener.java =================================================================== --- src/java/org/apache/hadoop/hbase/HStoreListener.java (revision 0) +++ src/java/org/apache/hadoop/hbase/HStoreListener.java (revision 0) @@ -0,0 +1,47 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +/** + * Implementors of this interface want to be notified when an HRegion + * determines that a cache flush is needed. A CacheFlushListener (or null) + * must be passed to the HRegion constructor. + */ +public interface HStoreListener { + + /** + * Tell the listener the cache needs to be flushed. + * + * @param store the HStore requesting the cache flush + * @param region the HRegion the HStore belongs to + * @return true if a flush request was queued + */ + boolean flushRequested(HStore store, HRegion region); + + /** + * Tell the listener that the HStore needs compacting. + * + * @param store the HStore requesting the compaction. + * @param region the HRegion that the HStore belongs to + * @return true if a compaction was queued + */ + boolean compactionRequested(HStore store, HRegion region); +} Index: src/java/org/apache/hadoop/hbase/master/HMaster.java =================================================================== --- src/java/org/apache/hadoop/hbase/master/HMaster.java (revision 620703) +++ src/java/org/apache/hadoop/hbase/master/HMaster.java (working copy) @@ -160,6 +160,7 @@ /** Name of master server */ public static final String MASTER = "master"; + /** @return InfoServer object */ public InfoServer getInfoServer() { return infoServer; } @@ -910,9 +911,12 @@ if(! fs.exists(rootdir)) { fs.mkdirs(rootdir); FSUtils.setVersion(fs, rootdir); - } else if (!FSUtils.checkVersion(fs, rootdir)) { - throw new IOException("File system needs upgrade. Run " + - "the '${HBASE_HOME}/bin/hbase migrate' script"); + } else { + String fsversion = FSUtils.checkVersion(fs, rootdir); + if (fsversion == null || fsversion.compareTo(FILE_SYSTEM_VERSION) != 0) { + throw new IOException("File system needs upgrade. Run " + + "the '${HBASE_HOME}/bin/hbase migrate' script"); + } } if (!fs.exists(rootRegionDir)) { Index: src/java/org/apache/hadoop/hbase/HAbstractScanner.java =================================================================== --- src/java/org/apache/hadoop/hbase/HAbstractScanner.java (revision 620703) +++ src/java/org/apache/hadoop/hbase/HAbstractScanner.java (working copy) @@ -69,13 +69,13 @@ private Text col; ColumnMatcher(final Text col) throws IOException { - Text qualifier = HStoreKey.extractQualifier(col); + Text member = HStoreKey.extractMember(col); try { - if(qualifier == null || qualifier.getLength() == 0) { + if(member == null || member.getLength() == 0) { this.matchType = MATCH_TYPE.FAMILY_ONLY; - this.family = HStoreKey.extractFamily(col).toText(); + this.family = HStoreKey.extractFamily(col); this.wildCardmatch = true; - } else if(isRegexPattern.matcher(qualifier.toString()).matches()) { + } else if(isRegexPattern.matcher(member.toString()).matches()) { this.matchType = MATCH_TYPE.REGEX; this.columnMatcher = Pattern.compile(col.toString()); this.wildCardmatch = true; @@ -127,7 +127,7 @@ this.multipleMatchers = false; this.okCols = new TreeMap>(); for(int i = 0; i < targetCols.length; i++) { - Text family = HStoreKey.extractFamily(targetCols[i]).toText(); + Text family = HStoreKey.extractFamily(targetCols[i]); Vector matchers = okCols.get(family); if(matchers == null) { matchers = new Vector(); Index: src/java/org/apache/hadoop/hbase/HRegion.java =================================================================== --- src/java/org/apache/hadoop/hbase/HRegion.java (revision 620703) +++ src/java/org/apache/hadoop/hbase/HRegion.java (working copy) @@ -21,6 +21,8 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -84,7 +86,7 @@ * regionName is a unique identifier for this HRegion. (startKey, endKey] * defines the keyspace for this HRegion. */ -public class HRegion implements HConstants { +public class HRegion implements HConstants, HStoreListener { static final String SPLITDIR = "splits"; static final String MERGEDIR = "merges"; static final Random rand = new Random(); @@ -198,6 +200,8 @@ new ConcurrentHashMap>(); final AtomicLong memcacheSize = new AtomicLong(0); + final AtomicInteger cacheFlushesRequested = new AtomicInteger(0); + final int blockingFlushCount; final Path basedir; final HLog log; @@ -222,15 +226,11 @@ volatile WriteState writestate = new WriteState(); - final int memcacheFlushSize; - private volatile long lastFlushTime; - final CacheFlushListener flushListener; - final int blockingMemcacheSize; + private final HStoreListener listener; protected final long threadWakeFrequency; private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final Integer updateLock = new Integer(0); private final Integer splitLock = new Integer(0); - private final long desiredMaxFileSize; private final long minSequenceId; final AtomicInteger activeScannerCount = new AtomicInteger(0); @@ -255,12 +255,12 @@ * @param regionInfo - HRegionInfo that describes the region * @param initialFiles If there are initial files (implying that the HRegion * is new), then read them from the supplied path. - * @param listener an object that implements CacheFlushListener or null + * @param listener an object that implements HStoreListener or null * * @throws IOException */ public HRegion(Path basedir, HLog log, FileSystem fs, HBaseConfiguration conf, - HRegionInfo regionInfo, Path initialFiles, CacheFlushListener listener) + HRegionInfo regionInfo, Path initialFiles, HStoreListener listener) throws IOException { this.basedir = basedir; @@ -286,8 +286,8 @@ for(HColumnDescriptor c : this.regionInfo.getTableDesc().families().values()) { - HStore store = new HStore(this.basedir, this.regionInfo, c, this.fs, - oldLogFile, this.conf); + HStore store = + new HStore(this.basedir, this, c, this.fs, oldLogFile, this.conf); stores.put(c.getFamilyName(), store); @@ -302,6 +302,10 @@ " is " + this.minSequenceId); } + // Don't block if there is only one store. + int nStores = stores.size(); + this.blockingFlushCount = nStores == 1 ? 2 : nStores - 1; + // Get rid of any splits or merges that were lost in-progress Path splits = new Path(regiondir, SPLITDIR); if (fs.exists(splits)) { @@ -312,20 +316,10 @@ fs.delete(merges); } - // By default, we flush the cache when 64M. - this.memcacheFlushSize = conf.getInt("hbase.hregion.memcache.flush.size", - 1024*1024*64); - this.flushListener = listener; - this.blockingMemcacheSize = this.memcacheFlushSize * - conf.getInt("hbase.hregion.memcache.block.multiplier", 1); + this.listener = listener; - // By default we split region if a file > DEFAULT_MAX_FILE_SIZE. - this.desiredMaxFileSize = - conf.getLong("hbase.hregion.max.filesize", DEFAULT_MAX_FILE_SIZE); - // HRegion is ready to go! this.writestate.compacting = false; - this.lastFlushTime = System.currentTimeMillis(); LOG.info("region " + this.regionInfo.getRegionName() + " available"); } @@ -444,7 +438,10 @@ // Don't flush the cache if we are aborting if (!abort) { - internalFlushcache(snapshotMemcaches()); + for (HStore s: stores.values()) { + s.snapshotMemcache(); + internalFlushcache(s); + } } List result = new ArrayList(); @@ -516,11 +513,12 @@ return this.fs; } - /** @return the last time the region was flushed */ - public long getLastFlushTime() { - return this.lastFlushTime; + /** + * @return collection of stores that belong to this region + */ + public Collection getStores() { + return Collections.unmodifiableCollection(this.stores.values()); } - ////////////////////////////////////////////////////////////////////////////// // HRegion maintenance. // @@ -529,32 +527,17 @@ ////////////////////////////////////////////////////////////////////////////// /** - * @return returns size of largest HStore. Also returns whether store is - * splitable or not (Its not splitable if region has a store that has a - * reference store file). + * @return returns size of largest HStore. */ - HStore.HStoreSize largestHStore(Text midkey) { - HStore.HStoreSize biggest = null; - boolean splitable = true; - for(HStore h: stores.values()) { - HStore.HStoreSize size = h.size(midkey); - // If we came across a reference down in the store, then propagate - // fact that region is not splitable. - if (splitable) { - splitable = size.splitable; + long getLargestHStoreSize() { + long size = 0; + for (HStore store: stores.values()) { + long storeSize = store.getSize(); + if (storeSize > size) { + size = storeSize; } - if (biggest == null) { - biggest = size; - continue; - } - if(size.getAggregate() > biggest.getAggregate()) { // Largest so far - biggest = size; - } } - if (biggest != null) { - biggest.setSplitable(splitable); - } - return biggest; + return size; } /* @@ -563,16 +546,17 @@ * but instead create new 'reference' store files that read off the top and * bottom ranges of parent store files. * @param listener May be null. + * @param midKey key on which to split region * @return two brand-new (and open) HRegions or null if a split is not needed * @throws IOException */ - HRegion[] splitRegion(final RegionUnavailableListener listener) - throws IOException { + HRegion[] splitRegion(final RegionUnavailableListener listener, + final Text midKey) throws IOException { synchronized (splitLock) { - Text midKey = new Text(); - if (closed.get() || !needsSplit(midKey)) { + if (closed.get()) { return null; } + LOG.info("Starting split of region " + getRegionName()); Path splits = new Path(this.regiondir, SPLITDIR); if(!this.fs.exists(splits)) { this.fs.mkdirs(splits); @@ -643,71 +627,6 @@ } /* - * Iterates through all the HStores and finds the one with the largest - * MapFile size. If the size is greater than the (currently hard-coded) - * threshold, returns true indicating that the region should be split. The - * midKey for the largest MapFile is returned through the midKey parameter. - * It is possible for us to rule the region non-splitable even in excess of - * configured size. This happens if region contains a reference file. If - * a reference file, the region can not be split. - * - * Note that there is no need to do locking in this method because it calls - * largestHStore which does the necessary locking. - * - * @param midKey midKey of the largest MapFile - * @return true if the region should be split. midKey is set by this method. - * Check it for a midKey value on return. - */ - boolean needsSplit(Text midKey) { - HStore.HStoreSize biggest = largestHStore(midKey); - if (biggest == null || midKey.getLength() == 0 || - (midKey.equals(getStartKey()) && midKey.equals(getEndKey())) ) { - return false; - } - boolean split = (biggest.getAggregate() >= this.desiredMaxFileSize); - if (split) { - if (!biggest.isSplitable()) { - LOG.warn("Region " + getRegionName().toString() + - " is NOT splitable though its aggregate size is " + - StringUtils.humanReadableInt(biggest.getAggregate()) + - " and desired size is " + - StringUtils.humanReadableInt(this.desiredMaxFileSize)); - split = false; - } else { - LOG.info("Splitting " + getRegionName().toString() + - " because largest aggregate size is " + - StringUtils.humanReadableInt(biggest.getAggregate()) + - " and desired size is " + - StringUtils.humanReadableInt(this.desiredMaxFileSize)); - } - } - return split; - } - - /** - * Only do a compaction if it is necessary - * - * @return - * @throws IOException - */ - boolean compactIfNeeded() throws IOException { - boolean needsCompaction = false; - for (HStore store: stores.values()) { - if (store.needsCompaction()) { - needsCompaction = true; - if (LOG.isDebugEnabled()) { - LOG.debug(store.toString() + " needs compaction"); - } - break; - } - } - if (!needsCompaction) { - return false; - } - return compactStores(); - } - - /* * @param dir * @return compaction directory for the passed in dir */ @@ -734,25 +653,46 @@ } } + /** {@inheritDoc} */ + public boolean compactionRequested(HStore store, + @SuppressWarnings("unused") HRegion region) { + if (listener == null) { + return false; + } + return listener.compactionRequested(store, this); + } + /** - * Compact all the stores. This should be called periodically to make sure - * the stores are kept manageable. + * Called after region is opened to compact the HStores if necessary + */ + void compactStores() { + if (listener != null) { + synchronized (stores) { + for (HStore store: stores.values()) { + listener.compactionRequested(store, this); + } + } + } + } + + /** + * Compact the specified HStore * *

This operation could block for a long time, so don't call it from a * time-sensitive thread. - * - * @return Returns TRUE if the compaction has completed. FALSE, if the - * compaction was not carried out, because the HRegion is busy doing - * something else storage-intensive (like flushing the cache). The caller - * should check back later. * * Note that no locking is necessary at this level because compaction only * conflicts with a region split, and that cannot happen because the region * server does them sequentially and not in parallel. + * + * @param store the HStore to compact + * @return mid key if split is needed + * @throws IOException */ - boolean compactStores() throws IOException { + Text compactStore(HStore store) throws IOException { + Text midKey = null; if (this.closed.get()) { - return false; + return midKey; } try { synchronized (writestate) { @@ -760,27 +700,21 @@ writestate.compacting = true; } else { LOG.info("NOT compacting region " + - this.regionInfo.getRegionName().toString() + ": compacting=" + + getRegionName() + " store " + store.storeName + ": compacting=" + writestate.compacting + ", writesEnabled=" + writestate.writesEnabled); - return false; + return midKey; } } long startTime = System.currentTimeMillis(); - LOG.info("starting compaction on region " + - this.regionInfo.getRegionName().toString()); - boolean status = true; + LOG.info("starting compaction on region " + getRegionName() + " store " + + store.storeName); doRegionCompactionPrep(); - for (HStore store : stores.values()) { - if(!store.compact()) { - status = false; - } - } + midKey = store.compact(); doRegionCompactionCleanup(); - LOG.info("compaction completed on region " + - this.regionInfo.getRegionName().toString() + ". Took " + + LOG.info("compaction completed on region " + getRegionName() + " store " + + store.storeName + ". Took " + StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime)); - return status; } finally { synchronized (writestate) { @@ -788,8 +722,18 @@ writestate.notifyAll(); } } + return midKey; } + /** {@inheritDoc} */ + public boolean flushRequested(HStore store, + @SuppressWarnings("unused") HRegion region) { + if (listener == null) { + return false; + } + cacheFlushesRequested.incrementAndGet(); + return listener.flushRequested(store, this); + } /** * Flush the cache. * @@ -803,16 +747,16 @@ * *

This method may block for some time, so it should not be called from a * time-sensitive thread. + * + * @param s store to be flushed * - * @return true if cache was flushed - * * @throws IOException * @throws DroppedSnapshotException Thrown when replay of hlog is required * because a Snapshot was not properly persisted. */ - boolean flushcache() throws IOException { + void flushcache(HStore s) throws IOException { if (this.closed.get()) { - return false; + return; } synchronized (writestate) { if (!writestate.flushing && writestate.writesEnabled) { @@ -824,17 +768,17 @@ writestate.flushing + ", writesEnabled=" + writestate.writesEnabled); } - return false; + return; } } try { lock.readLock().lock(); // Prevent splits and closes try { - long startTime = -1; - synchronized (updateLock) {// Stop updates while we snapshot the memcaches - startTime = snapshotMemcaches(); + synchronized (updateLock) { + // Stop updates while we snapshot the memcache + s.snapshotMemcache(); } - return internalFlushcache(startTime); + internalFlushcache(s); } finally { lock.readLock().unlock(); } @@ -846,32 +790,6 @@ } } - /* - * It is assumed that updates are blocked for the duration of this method - */ - private long snapshotMemcaches() { - if (this.memcacheSize.get() == 0) { - return -1; - } - long startTime = System.currentTimeMillis(); - - if(LOG.isDebugEnabled()) { - LOG.debug("Started memcache flush for region " + - this.regionInfo.getRegionName() + ". Size " + - StringUtils.humanReadableInt(this.memcacheSize.get())); - } - - // We reset the aggregate memcache size here so that subsequent updates - // will add to the unflushed size - - this.memcacheSize.set(0L); - - for (HStore hstore: stores.values()) { - hstore.snapshotMemcache(); - } - return startTime; - } - /** * Flushing the cache is a little tricky. We have a lot of updates in the * HMemcache, all of which have also been written to the log. We need to @@ -898,18 +816,17 @@ * *

This method may block for some time. * - * @param startTime the time the cache was snapshotted or -1 if a flush is - * not needed + * @param store the HStore to be flushed * - * @return true if the cache was flushed - * * @throws IOException * @throws DroppedSnapshotException Thrown when replay of hlog is required * because a Snapshot was not properly persisted. */ - private boolean internalFlushcache(long startTime) throws IOException { - if (startTime == -1) { - return false; + private void internalFlushcache(HStore store) throws IOException { + long startTime = System.currentTimeMillis(); + if(LOG.isDebugEnabled()) { + LOG.debug("Started memcache flush for region " + + this.regionInfo.getRegionName() + " store " + store.toString()); } // We pass the log to the HMemcache, so we can lock down both @@ -934,9 +851,12 @@ // A. Flush memcache to all the HStores. // Keep running vector of all store files that includes both old and the // just-made new flush store file. - - for (HStore hstore: stores.values()) { - hstore.flushCache(sequenceId); + + long bytesFlushed = store.flushCache(sequenceId); + memcacheSize.set(memcacheSize.get() - bytesFlushed); + if (memcacheSize.get() < 0) { + LOG.warn("memcacheSize < 0. Resetting to zero"); + memcacheSize.set(0L); } } catch (IOException e) { // An exception here means that the snapshot was not persisted. @@ -944,30 +864,32 @@ // Currently, only a server restart will do this. this.log.abortCacheFlush(); throw new DroppedSnapshotException(e.getMessage()); + } finally { + cacheFlushesRequested.decrementAndGet(); } - // If we get to here, the HStores have been written. If we get an + // If we get to here, the HStores has been written. If we get an // error in completeCacheFlush it will release the lock it is holding // B. Write a FLUSHCACHE-COMPLETE message to the log. - // This tells future readers that the HStores were emitted correctly, - // and that all updates to the log for this regionName that have lower + // This tells future readers that the HStore was emitted correctly, + // and that all updates to the log for this store that have lower // log-sequence-ids can be safely ignored. - this.log.completeCacheFlush(this.regionInfo.getRegionName(), - regionInfo.getTableDesc().getName(), sequenceId); + this.log.completeCacheFlush(store.storeName, getTableDesc().getName(), + sequenceId); // D. Finally notify anyone waiting on memcache to clear: // e.g. checkResources(). - synchronized (this) { - notifyAll(); + synchronized (cacheFlushesRequested) { + if (cacheFlushesRequested.get() < this.blockingFlushCount) { + cacheFlushesRequested.notifyAll(); + } } if (LOG.isDebugEnabled()) { - LOG.debug("Finished memcache flush for region " + - this.regionInfo.getRegionName() + " in " + - (System.currentTimeMillis() - startTime) + "ms, sequenceid=" + - sequenceId); + LOG.debug("Finished memcache flush for store " + store.storeName + + " in " + (System.currentTimeMillis() - startTime) + + "ms, sequenceid=" + sequenceId); } - return true; } ////////////////////////////////////////////////////////////////////////////// @@ -1279,29 +1201,28 @@ * * For now, just checks memcache saturation. * - * Here we synchronize on HRegion, a broad scoped lock. Its appropriate + * Here we synchronize on cacheFlushesRequested. Its appropriate * given we're figuring in here whether this region is able to take on * writes. This is only method with a synchronize (at time of writing), - * this and the synchronize on 'this' inside in internalFlushCache to send - * the notify. + * and the synchronize inside in internalFlushCache to send the notify. */ - private synchronized void checkResources() { + private void checkResources() { boolean blocked = false; - - while (this.memcacheSize.get() >= this.blockingMemcacheSize) { - if (!blocked) { - LOG.info("Blocking updates for '" + Thread.currentThread().getName() + - "': Memcache size " + - StringUtils.humanReadableInt(this.memcacheSize.get()) + - " is >= than blocking " + - StringUtils.humanReadableInt(this.blockingMemcacheSize) + " size"); - } - blocked = true; - try { - wait(threadWakeFrequency); - } catch (InterruptedException e) { - // continue; + synchronized (cacheFlushesRequested) { + while (this.cacheFlushesRequested.get() >= this.blockingFlushCount) { + if (!blocked) { + LOG.info("Blocking updates for '" + Thread.currentThread().getName() + + "': cache flushes requested " + cacheFlushesRequested.get() + + " is >= max flush request count " + this.blockingFlushCount); + } + + blocked = true; + try { + cacheFlushesRequested.wait(threadWakeFrequency); + } catch (InterruptedException e) { + // continue; + } } } if (blocked) { @@ -1451,21 +1372,13 @@ return; } synchronized (updateLock) { // prevent a cache flush - this.log.append(regionInfo.getRegionName(), - regionInfo.getTableDesc().getName(), updatesByColumn); - - long size = 0; for (Map.Entry e: updatesByColumn.entrySet()) { HStoreKey key = e.getKey(); byte[] val = e.getValue(); - size = this.memcacheSize.addAndGet(key.getSize() + - (val == null ? 0 : val.length)); - stores.get(HStoreKey.extractFamily(key.getColumn())).add(key, val); + HStore store = stores.get(key.getFamily()); + this.log.append(store.storeName, getTableDesc().getName(), key, val); + memcacheSize.addAndGet(store.add(key, val)); } - if (this.flushListener != null && size > this.memcacheFlushSize) { - // Request a cache flush - this.flushListener.flushRequested(this); - } } } @@ -1809,7 +1722,7 @@ * @param r HRegion to add to meta * * @throws IOException - * @see {@link #removeRegionFromMETA(HRegion, HRegion)} + * @see #removeRegionFromMETA(HRegionInterface, Text, Text) */ public static void addRegionToMETA(HRegion meta, HRegion r) throws IOException { @@ -1835,14 +1748,13 @@ * * @param srvr META server to be updated * @param metaRegionName Meta region name - * @param regionNmae HRegion to remove from meta + * @param regionName HRegion to remove from meta * * @throws IOException - * @see {@link #addRegionToMETA(HRegion, HRegion)} + * @see #addRegionToMETA(HRegion, HRegion) */ public static void removeRegionFromMETA(final HRegionInterface srvr, - final Text metaRegionName, final Text regionName) - throws IOException { + final Text metaRegionName, final Text regionName) throws IOException { srvr.deleteAll(metaRegionName, regionName, HConstants.LATEST_TIMESTAMP); } @@ -1853,7 +1765,7 @@ * @param info HRegion to update in meta * * @throws IOException - * @see {@link #addRegionToMETA(HRegion, HRegion)} + * @see #addRegionToMETA(HRegion, HRegion) */ public static void offlineRegionInMETA(final HRegionInterface srvr, final Text metaRegionName, final HRegionInfo info) Index: src/java/org/apache/hadoop/hbase/util/Migrate.java =================================================================== --- src/java/org/apache/hadoop/hbase/util/Migrate.java (revision 620703) +++ src/java/org/apache/hadoop/hbase/util/Migrate.java (working copy) @@ -25,6 +25,8 @@ import java.io.InputStreamReader; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.HashMap; import java.util.HashSet; @@ -64,7 +66,7 @@ /** * Perform a file system upgrade to convert older file layouts to that - * supported by HADOOP-2478 + * supported by HADOOP-2478, and then to the form supported by HBASE-69 */ public class Migrate extends Configured implements Tool { static final Log LOG = LogFactory.getLog(Migrate.class); @@ -96,10 +98,11 @@ options.put("prompt", ACTION.PROMPT); } + private FileSystem fs = null; + private Path rootdir = null; private boolean readOnly = false; private boolean migrationNeeded = false; private boolean newRootRegion = false; - private ACTION logFiles = ACTION.IGNORE; private ACTION otherFiles = ACTION.IGNORE; private BufferedReader reader = null; @@ -127,7 +130,7 @@ } try { - FileSystem fs = FileSystem.get(conf); // get DFS handle + fs = FileSystem.get(conf); // get DFS handle LOG.info("Verifying that file system is available..."); if (!FSUtils.isFileSystemAvailable(fs)) { @@ -148,8 +151,7 @@ LOG.info("Starting upgrade" + (readOnly ? " check" : "")); - Path rootdir = - fs.makeQualified(new Path(this.conf.get(HConstants.HBASE_DIR))); + rootdir = fs.makeQualified(new Path(this.conf.get(HConstants.HBASE_DIR))); if (!fs.exists(rootdir)) { throw new FileNotFoundException("HBase root directory " + @@ -158,40 +160,28 @@ // See if there is a file system version file - if (FSUtils.checkVersion(fs, rootdir)) { + String version = FSUtils.checkVersion(fs, rootdir); + if (version != null && + version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0) { LOG.info("No upgrade necessary."); return 0; } - // check to see if new root region dir exists + // Get contents of root directory + + FileStatus[] rootFiles = getRootDirFiles(); - checkNewRootRegionDirExists(fs, rootdir); - - // check for "extra" files and for old upgradable regions - - extraFiles(fs, rootdir); - - if (!newRootRegion) { - // find root region - - Path rootRegion = new Path(rootdir, - OLD_PREFIX + HRegionInfo.rootRegionInfo.getEncodedName()); - - if (!fs.exists(rootRegion)) { - throw new IOException("Cannot find root region " + - rootRegion.toString()); - } else if (readOnly) { - migrationNeeded = true; - } else { - migrateRegionDir(fs, rootdir, HConstants.ROOT_TABLE_NAME, rootRegion); - scanRootRegion(fs, rootdir); - - // scan for left over regions - - extraRegions(fs, rootdir); - } + if (version == null) { + migrateFromNoVersion(rootFiles); + migrateToV2(rootFiles); + } else if (version.compareTo("0.1") == 0) { + migrateToV2(rootFiles); + } else if (version.compareTo("2") == 0) { + // Nothing to do (yet) + } else { + throw new IOException("Unrecognized version: " + version); } - + if (!readOnly) { // set file system version LOG.info("Setting file system version."); @@ -207,21 +197,81 @@ } } - private void checkNewRootRegionDirExists(FileSystem fs, Path rootdir) - throws IOException { + private void migrateFromNoVersion(FileStatus[] rootFiles) throws IOException { + // check to see if new root region dir exists + + checkNewRootRegionDirExists(); + + // check for unrecovered region server log files + + checkForUnrecoveredLogFiles(rootFiles); + + // check for "extra" files and for old upgradable regions + + extraFiles(rootFiles); + + if (!newRootRegion) { + // find root region + + Path rootRegion = new Path(rootdir, + OLD_PREFIX + HRegionInfo.rootRegionInfo.getEncodedName()); + + if (!fs.exists(rootRegion)) { + throw new IOException("Cannot find root region " + + rootRegion.toString()); + } else if (readOnly) { + migrationNeeded = true; + } else { + migrateRegionDir(HConstants.ROOT_TABLE_NAME, rootRegion); + scanRootRegion(); + + // scan for left over regions + + extraRegions(); + } + } + } + + private void migrateToV2(FileStatus[] rootFiles) throws IOException { + checkForUnrecoveredLogFiles(rootFiles); + } + + private FileStatus[] getRootDirFiles() throws IOException { + FileStatus[] stats = fs.listStatus(rootdir); + if (stats == null || stats.length == 0) { + throw new IOException("No files found under root directory " + + rootdir.toString()); + } + return stats; + } + + private void checkNewRootRegionDirExists() throws IOException { Path rootRegionDir = HRegion.getRegionDir(rootdir, HRegionInfo.rootRegionInfo); newRootRegion = fs.exists(rootRegionDir); migrationNeeded = !newRootRegion; } + + private void checkForUnrecoveredLogFiles(FileStatus[] rootFiles) + throws IOException { + List unrecoveredLogs = new ArrayList(); + for (int i = 0; i < rootFiles.length; i++) { + String name = rootFiles[i].getPath().getName(); + if (name.startsWith("log_")) { + unrecoveredLogs.add(name); + } + } + if (unrecoveredLogs.size() != 0) { + throw new IOException("There are " + unrecoveredLogs.size() + + " unrecovered region server logs. Please uninstall this version of " + + "HBase, re-install the previous version, start your cluster and " + + "shut it down cleanly, so that all region server logs are recovered" + + " and deleted."); + } + } // Check for files that should not be there or should be migrated - private void extraFiles(FileSystem fs, Path rootdir) throws IOException { - FileStatus[] stats = fs.listStatus(rootdir); - if (stats == null || stats.length == 0) { - throw new IOException("No files found under root directory " + - rootdir.toString()); - } + private void extraFiles(FileStatus[] stats) throws IOException { for (int i = 0; i < stats.length; i++) { String name = stats[i].getPath().getName(); if (name.startsWith(OLD_PREFIX)) { @@ -234,31 +284,27 @@ } catch (NumberFormatException e) { extraFile(otherFiles, "Old region format can not be upgraded: " + - name, fs, stats[i].getPath()); + name, stats[i].getPath()); } } else { // Since the new root region directory exists, we assume that this // directory is not necessary - extraFile(otherFiles, "Old region directory found: " + name, fs, + extraFile(otherFiles, "Old region directory found: " + name, stats[i].getPath()); } } else { // File name does not start with "hregion_" - if (name.startsWith("log_")) { - String message = "Unrecovered region server log file " + name + - " this file can be recovered by the master when it starts."; - extraFile(logFiles, message, fs, stats[i].getPath()); - } else if (!newRootRegion) { + if (!newRootRegion) { // new root region directory does not exist. This is an extra file String message = "Unrecognized file " + name; - extraFile(otherFiles, message, fs, stats[i].getPath()); + extraFile(otherFiles, message, stats[i].getPath()); } } } } - private void extraFile(ACTION action, String message, FileSystem fs, - Path p) throws IOException { + private void extraFile(ACTION action, String message, Path p) + throws IOException { if (action == ACTION.ABORT) { throw new IOException(message + " aborting"); @@ -277,8 +323,8 @@ } } - private void migrateRegionDir(FileSystem fs, Path rootdir, Text tableName, - Path oldPath) throws IOException { + private void migrateRegionDir(Text tableName, Path oldPath) + throws IOException { // Create directory where table will live @@ -323,7 +369,7 @@ } } - private void scanRootRegion(FileSystem fs, Path rootdir) throws IOException { + private void scanRootRegion() throws IOException { HLog log = new HLog(fs, new Path(rootdir, HConstants.HREGION_LOGDIR_NAME), conf, null); @@ -354,12 +400,12 @@ // First move the meta region to where it should be and rename // subdirectories as necessary - migrateRegionDir(fs, rootdir, HConstants.META_TABLE_NAME, + migrateRegionDir(HConstants.META_TABLE_NAME, new Path(rootdir, OLD_PREFIX + info.getEncodedName())); // Now scan and process the meta table - scanMetaRegion(fs, rootdir, log, info); + scanMetaRegion(log, info); } } finally { @@ -375,8 +421,7 @@ } } - private void scanMetaRegion(FileSystem fs, Path rootdir, HLog log, - HRegionInfo info) throws IOException { + private void scanMetaRegion(HLog log, HRegionInfo info) throws IOException { HRegion metaRegion = new HRegion( new Path(rootdir, info.getTableDesc().getName().toString()), log, fs, @@ -402,7 +447,7 @@ // Move the region to where it should be and rename // subdirectories as necessary - migrateRegionDir(fs, rootdir, region.getTableDesc().getName(), + migrateRegionDir(region.getTableDesc().getName(), new Path(rootdir, OLD_PREFIX + region.getEncodedName())); results.clear(); @@ -417,7 +462,7 @@ } } - private void extraRegions(FileSystem fs, Path rootdir) throws IOException { + private void extraRegions() throws IOException { FileStatus[] stats = fs.listStatus(rootdir); if (stats == null || stats.length == 0) { throw new IOException("No files found under root directory " + @@ -436,7 +481,7 @@ message = "Region not in meta table and no other regions reference it " + name; } - extraFile(otherFiles, message, fs, stats[i].getPath()); + extraFile(otherFiles, message, stats[i].getPath()); } } } @@ -444,18 +489,11 @@ @SuppressWarnings("static-access") private int parseArgs(String[] args) { Options opts = new Options(); - Option logFiles = OptionBuilder.withArgName(ACTIONS) - .hasArg() - .withDescription( - "disposition of unrecovered region server logs: {abort|ignore|delete|prompt}") - .create("logfiles"); - Option extraFiles = OptionBuilder.withArgName(ACTIONS) .hasArg() .withDescription("disposition of 'extra' files: {abort|ignore|delete|prompt}") .create("extrafiles"); - opts.addOption(logFiles); opts.addOption(extraFiles); GenericOptionsParser parser = @@ -474,21 +512,12 @@ } if (readOnly) { - this.logFiles = ACTION.IGNORE; this.otherFiles = ACTION.IGNORE; } else { CommandLine commandLine = parser.getCommandLine(); ACTION action = null; - if (commandLine.hasOption("logfiles")) { - action = options.get(commandLine.getOptionValue("logfiles")); - if (action == null) { - usage(); - return -1; - } - this.logFiles = action; - } if (commandLine.hasOption("extrafiles")) { action = options.get(commandLine.getOptionValue("extrafiles")); if (action == null) { @@ -506,9 +535,6 @@ System.err.println(" check perform upgrade checks only."); System.err.println(" upgrade perform upgrade checks and modify hbase.\n"); System.err.println(" Options are:"); - System.err.println(" -logfiles={abort|ignore|delete|prompt}"); - System.err.println(" action to take when unrecovered region"); - System.err.println(" server log files are found.\n"); System.err.println(" -extrafiles={abort|ignore|delete|prompt}"); System.err.println(" action to take if \"extra\" files are found.\n"); System.err.println(" -conf specify an application configuration file"); Index: src/java/org/apache/hadoop/hbase/util/FSUtils.java =================================================================== --- src/java/org/apache/hadoop/hbase/util/FSUtils.java (revision 620703) +++ src/java/org/apache/hadoop/hbase/util/FSUtils.java (working copy) @@ -81,20 +81,19 @@ * * @param fs * @param rootdir - * @return true if the current file system is the correct version + * @return null if no version file exists, version string otherwise. * @throws IOException */ - public static boolean checkVersion(FileSystem fs, Path rootdir) throws IOException { + public static String checkVersion(FileSystem fs, Path rootdir) throws IOException { Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME); - boolean versionOk = false; + String version = null; if (fs.exists(versionFile)) { FSDataInputStream s = fs.open(new Path(rootdir, HConstants.VERSION_FILE_NAME)); - String version = DataInputStream.readUTF(s); + version = DataInputStream.readUTF(s); s.close(); - versionOk = version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0; } - return versionOk; + return version; } /**