Index: src/test/org/apache/hadoop/hbase/MultiRegionTable.java =================================================================== --- src/test/org/apache/hadoop/hbase/MultiRegionTable.java (revision 632429) +++ src/test/org/apache/hadoop/hbase/MultiRegionTable.java (working copy) @@ -330,7 +330,7 @@ * @throws IOException */ protected static void compact(final MiniHBaseCluster cluster, - final HRegionInfo r) throws IOException { + final HRegionInfo r) { if (r == null) { LOG.debug("Passed region is null"); return; Index: src/test/org/apache/hadoop/hbase/HBaseTestCase.java =================================================================== --- src/test/org/apache/hadoop/hbase/HBaseTestCase.java (revision 632429) +++ src/test/org/apache/hadoop/hbase/HBaseTestCase.java (working copy) @@ -161,7 +161,7 @@ throws IOException { return new HRegion(closedRegion.getBaseDir(), closedRegion.getLog(), closedRegion.getFilesystem(), closedRegion.getConf(), - closedRegion.getRegionInfo(), null, null); + closedRegion.getRegionInfo(), null, null, null); } /** @@ -509,54 +509,88 @@ */ public static class HTableIncommon implements Incommon { final HTable table; + private BatchUpdate batch; + + private void checkBatch() { + if (batch == null) { + throw new IllegalStateException("No batch update in progress."); + } + } + /** * @param table */ public HTableIncommon(final HTable table) { super(); this.table = table; + this.batch = null; } /** {@inheritDoc} */ - public void abort(long lockid) { - this.table.abort(lockid); + public void abort(@SuppressWarnings("unused") long lockid) { + if (this.batch != null) { + this.batch = null; + } } /** {@inheritDoc} */ - public void commit(long lockid) throws IOException { - this.table.commit(lockid); + public void commit(@SuppressWarnings("unused") long lockid) + throws IOException { + checkBatch(); + this.table.commit(batch); + this.batch = null; } + /** {@inheritDoc} */ - public void commit(long lockid, final long ts) throws IOException { - this.table.commit(lockid, ts); + public void commit(@SuppressWarnings("unused") long lockid, final long ts) + throws IOException { + checkBatch(); + this.batch.setTimestamp(ts); + this.table.commit(batch); + this.batch = null; } + /** {@inheritDoc} */ - public void put(long lockid, Text column, byte[] val) { - this.table.put(lockid, column, val); + public void put(@SuppressWarnings("unused") long lockid, Text column, + byte[] val) { + checkBatch(); + this.batch.put(column, val); } + /** {@inheritDoc} */ - public void delete(long lockid, Text column) { - this.table.delete(lockid, column); + public void delete(@SuppressWarnings("unused") long lockid, Text column) { + checkBatch(); + this.batch.delete(column); } + /** {@inheritDoc} */ public void deleteAll(Text row, Text column, long ts) throws IOException { this.table.deleteAll(row, column, ts); } + /** {@inheritDoc} */ public long startUpdate(Text row) { - return this.table.startUpdate(row); + if (this.batch != null) { + throw new IllegalStateException("Batch update already in progress."); + } + this.batch = new BatchUpdate(row); + return 0L; } + /** {@inheritDoc} */ public HScannerInterface getScanner(Text [] columns, Text firstRow, long ts) throws IOException { return this.table.obtainScanner(columns, firstRow, ts, null); } + /** {@inheritDoc} */ public byte[] get(Text row, Text column) throws IOException { return this.table.get(row, column); } + /** {@inheritDoc} */ public byte[][] get(Text row, Text column, int versions) throws IOException { return this.table.get(row, column, versions); } + /** {@inheritDoc} */ public byte[][] get(Text row, Text column, long ts, int versions) throws IOException { Index: src/test/org/apache/hadoop/hbase/TestHBaseCluster.java =================================================================== --- src/test/org/apache/hadoop/hbase/TestHBaseCluster.java (revision 632429) +++ src/test/org/apache/hadoop/hbase/TestHBaseCluster.java (working copy) @@ -104,7 +104,7 @@ (ANCHORSTR + k).getBytes(HConstants.UTF8_ENCODING)); table.commit(b); } - System.out.println("Write " + NUM_VALS + " rows. Elapsed time: " + System.err.println("Write " + NUM_VALS + " rows. Elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); // Read them back in @@ -134,7 +134,7 @@ teststr.compareTo(bodystr) == 0); } - System.out.println("Read " + NUM_VALS + " rows. Elapsed time: " + System.err.println("Read " + NUM_VALS + " rows. Elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); } @@ -175,7 +175,7 @@ anchorFetched++; } else { - System.out.println(col); + System.err.println(col); } } curVals.clear(); @@ -184,7 +184,7 @@ assertEquals("Expected " + NUM_VALS + " " + CONTENTS_BASIC + " values, but fetched " + contentsFetched, NUM_VALS, contentsFetched); assertEquals("Expected " + NUM_VALS + " " + ANCHORNUM + " values, but fetched " + anchorFetched, NUM_VALS, anchorFetched); - System.out.println("Scanned " + NUM_VALS + System.err.println("Scanned " + NUM_VALS + " rows. Elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); Index: src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java =================================================================== --- src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java (revision 632429) +++ src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java (working copy) @@ -100,6 +100,10 @@ // Always compact if there is more than one store file. conf.setInt("hbase.hstore.compactionThreshold", 2); + // Make the lease timeout and check interval longer that for other tests + conf.setInt("hbase.master.lease.period", 10 * 1000); + conf.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000); + desc = new HTableDescriptor(TABLE_NAME); desc.addFamily(new HColumnDescriptor(INPUT_COLUMN)); desc.addFamily(new HColumnDescriptor(OUTPUT_COLUMN)); @@ -142,13 +146,18 @@ /** {@inheritDoc} */ @Override public void tearDown() throws Exception { - super.tearDown(); + try { + super.tearDown(); - if (hCluster != null) { - hCluster.shutdown(); + if (hCluster != null) { + hCluster.shutdown(); + } + + StaticTestEnvironment.shutdownDfs(dfsCluster); + } finally { + // Delete any files in local file system + deleteLocal(new File(conf.get("hadoop.tmp.dir"))); } - - StaticTestEnvironment.shutdownDfs(dfsCluster); } /** @@ -169,8 +178,9 @@ // set configuration parameter for index build conf.set("hbase.index.conf", createIndexConfContent()); + JobConf jobConf = null; try { - JobConf jobConf = new JobConf(conf, TestTableIndex.class); + jobConf = new JobConf(conf, TestTableIndex.class); jobConf.setJobName("index column contents"); jobConf.setNumMapTasks(2); // number of indexes to partition into @@ -188,6 +198,9 @@ JobClient.runJob(jobConf); } finally { + if (jobConf != null) { + conf.set("hadoop.tmp.dir", jobConf.get("hadoop.tmp.dir")); + } mrCluster.shutdown(); } @@ -320,6 +333,21 @@ scanner.close(); } } + + private void deleteLocal(File f) { + if (f.exists()) { + if (f.isDirectory()) { + File[] children = f.listFiles(); + if (children != null && children.length > 0) { + for (int i = 0; i < children.length; i++) { + deleteLocal(children[i]); + } + } + } + f.delete(); + } + } + /** * @param args unused */ Index: src/test/org/apache/hadoop/hbase/regionserver/TestMemcache.java =================================================================== --- src/test/org/apache/hadoop/hbase/regionserver/TestMemcache.java (revision 0) +++ src/test/org/apache/hadoop/hbase/regionserver/TestMemcache.java (revision 0) @@ -0,0 +1,86 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import java.io.UnsupportedEncodingException; +import java.util.List; + +import junit.framework.TestCase; + +import org.apache.hadoop.io.Text; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HStoreKey; + +/** + * + */ +public class TestMemcache extends TestCase { + private static final int FIRST_ROW = 1; + private static final int NUM_VALS = 1000; + private static final Text CONTENTS_BASIC = new Text("contents:basic"); + private static final String CONTENTSTR = "contentstr"; + private static final String ANCHORNUM = "anchor:anchornum-"; + private static final String ANCHORSTR = "anchorstr"; + + /** + * @throws UnsupportedEncodingException + */ + public void testMemcache() throws UnsupportedEncodingException { + HStore.Memcache cache = new HStore.Memcache(); + for (int k = FIRST_ROW; k <= NUM_VALS; k++) { + Text row = new Text("row_" + k); + HStoreKey key = + new HStoreKey(row, CONTENTS_BASIC, System.currentTimeMillis()); + cache.add(key, (CONTENTSTR + k).getBytes(HConstants.UTF8_ENCODING)); + + key = + new HStoreKey(row, new Text(ANCHORNUM + k), System.currentTimeMillis()); + cache.add(key, (ANCHORSTR + k).getBytes(HConstants.UTF8_ENCODING)); + } + + // Read them back + + for (int k = FIRST_ROW; k <= NUM_VALS; k++) { + List results; + Text row = new Text("row_" + k); + HStoreKey key = new HStoreKey(row, CONTENTS_BASIC, Long.MAX_VALUE); + results = cache.get(key, 1); + assertNotNull("no data for " + key.toString(), results); + assertEquals(1, results.size()); + String bodystr = new String(results.get(0), HConstants.UTF8_ENCODING); + String teststr = CONTENTSTR + k; + assertTrue("Incorrect value for key: (" + key.toString() + + "), expected: '" + teststr + "' got: '" + + bodystr + "'", teststr.compareTo(bodystr) == 0); + + key = new HStoreKey(row, new Text(ANCHORNUM + k), Long.MAX_VALUE); + results = cache.get(key, 1); + assertNotNull("no data for " + key.toString(), results); + assertEquals(1, results.size()); + bodystr = new String(results.get(0), HConstants.UTF8_ENCODING); + teststr = ANCHORSTR + k; + assertTrue("Incorrect value for key: (" + key.toString() + + "), expected: '" + teststr + "' got: '" + bodystr + "'", + teststr.compareTo(bodystr) == 0); + } + } +} Index: src/test/org/apache/hadoop/hbase/regionserver/TestLogRolling.java =================================================================== --- src/test/org/apache/hadoop/hbase/regionserver/TestLogRolling.java (revision 632429) +++ src/test/org/apache/hadoop/hbase/regionserver/TestLogRolling.java (working copy) @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.StaticTestEnvironment; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.io.BatchUpdate; /** * Test log deletion as logs are rolled. @@ -65,8 +66,8 @@ // Force a region split after every 768KB conf.setLong("hbase.hregion.max.filesize", 768L * 1024L); - // We roll the log after every 256 writes - conf.setInt("hbase.regionserver.maxlogentries", 256); + // We roll the log after every 32 writes + conf.setInt("hbase.regionserver.maxlogentries", 32); // For less frequently updated regions flush after every 2 flushes conf.setInt("hbase.hregion.memcache.optionalflushcount", 2); @@ -102,11 +103,11 @@ @Override public void setUp() throws Exception { try { - super.setUp(); dfs = new MiniDFSCluster(conf, 2, true, (String[]) null); // Set the hbase.rootdir to be the home directory in mini dfs. this.conf.set(HConstants.HBASE_DIR, this.dfs.getFileSystem().getHomeDirectory().toString()); + super.setUp(); } catch (Exception e) { StaticTestEnvironment.shutdownDfs(dfs); LOG.fatal("error during setUp: ", e); @@ -141,7 +142,7 @@ this.log = server.getLog(); // When the META table can be opened, the region servers are running - HTable meta = new HTable(conf, HConstants.META_TABLE_NAME); + new HTable(conf, HConstants.META_TABLE_NAME); // Create the test table and open it HTableDescriptor desc = new HTableDescriptor(tableName); @@ -150,14 +151,14 @@ admin.createTable(desc); HTable table = new HTable(conf, new Text(tableName)); - for (int i = 1; i <= 2048; i++) { // 2048 writes should cause 8 log rolls - long lockid = - table.startUpdate(new Text("row" + String.format("%1$04d", i))); - table.put(lockid, HConstants.COLUMN_FAMILY, value); - table.commit(lockid); + for (int i = 1; i <= 256; i++) { // 256 writes should cause 8 log rolls + BatchUpdate b = + new BatchUpdate(new Text("row" + String.format("%1$04d", i))); + b.put(HConstants.COLUMN_FAMILY, value); + table.commit(b); - if (i % 256 == 0) { - // After every 256 writes sleep to let the log roller run + if (i % 32 == 0) { + // After every 32 writes sleep to let the log roller run try { Thread.sleep(2000); @@ -193,7 +194,7 @@ int count = log.getNumLogFiles(); LOG.info("after flushing all regions and rolling logs there are " + log.getNumLogFiles() + " log files"); - assertTrue(count <= 2); + assertTrue(("actual count: " + count), count <= 2); } catch (Exception e) { LOG.fatal("unexpected exception", e); throw e; Index: src/test/org/apache/hadoop/hbase/regionserver/TestSplit.java =================================================================== --- src/test/org/apache/hadoop/hbase/regionserver/TestSplit.java (revision 632429) +++ src/test/org/apache/hadoop/hbase/regionserver/TestSplit.java (working copy) @@ -95,9 +95,15 @@ private void basicSplit(final HRegion region) throws Exception { addContent(region, COLFAMILY_NAME3); region.flushcache(); - Text midkey = new Text(); - assertTrue(region.needsSplit(midkey)); - HRegion [] regions = split(region); + Text midkey = null; + for (HStore s: region.stores.values()) { + Text k = region.compactStore(s); + if (midkey == null && k != null) { + midkey = k; + } + } + assertNotNull(midkey); + HRegion [] regions = split(region, midkey); try { // Need to open the regions. // TODO: Add an 'open' to HRegion... don't do open by constructing @@ -113,17 +119,9 @@ assertScan(regions[0], COLFAMILY_NAME3, new Text(START_KEY)); assertScan(regions[1], COLFAMILY_NAME3, midkey); // Now prove can't split regions that have references. - Text[] midkeys = new Text[regions.length]; for (int i = 0; i < regions.length; i++) { - midkeys[i] = new Text(); - // Even after above splits, still needs split but after splits its - // unsplitable because biggest store file is reference. References - // make the store unsplittable, until something bigger comes along. - assertFalse(regions[i].needsSplit(midkeys[i])); // Add so much data to this region, we create a store file that is > - // than - // one of our unsplitable references. - // it will. + // than one of our unsplitable references. it will. for (int j = 0; j < 2; j++) { addContent(regions[i], COLFAMILY_NAME3); } @@ -132,30 +130,29 @@ regions[i].flushcache(); } - // Assert that even if one store file is larger than a reference, the - // region is still deemed unsplitable (Can't split region if references - // presen). - for (int i = 0; i < regions.length; i++) { - midkeys[i] = new Text(); - // Even after above splits, still needs split but after splits its - // unsplitable because biggest store file is reference. References - // make the store unsplittable, until something bigger comes along. - assertFalse(regions[i].needsSplit(midkeys[i])); - } - + Text[] midkeys = new Text[regions.length]; // To make regions splitable force compaction. for (int i = 0; i < regions.length; i++) { - regions[i].compactStores(); + midkeys[i] = null; + for (HStore s: regions[i].stores.values()) { + Text k = regions[i].compactStore(s); + if (midkeys[i] == null && k != null) { + midkeys[i] = k; + } + } } TreeMap sortedMap = new TreeMap(); // Split these two daughter regions so then I'll have 4 regions. Will // split because added data above. for (int i = 0; i < regions.length; i++) { - HRegion[] rs = split(regions[i]); - for (int j = 0; j < rs.length; j++) { - sortedMap.put(rs[j].getRegionName().toString(), - openClosedRegion(rs[j])); + HRegion[] rs = null; + if (midkeys[i] != null) { + rs = split(regions[i], midkeys[i]); + for (int j = 0; j < rs.length; j++) { + sortedMap.put(rs[j].getRegionName().toString(), + openClosedRegion(rs[j])); + } } } LOG.info("Made 4 regions"); @@ -226,12 +223,11 @@ } } - private HRegion [] split(final HRegion r) throws IOException { - Text midKey = new Text(); - assertTrue(r.needsSplit(midKey)); + private HRegion [] split(final HRegion r, final Text midKey) + throws IOException { // Assert can get mid key from passed region. assertGet(r, COLFAMILY_NAME3, midKey); - HRegion [] regions = r.splitRegion(null); + HRegion [] regions = r.splitRegion(null, midKey); assertEquals(regions.length, 2); return regions; } Index: src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java =================================================================== --- src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java (revision 632429) +++ src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java (working copy) @@ -19,7 +19,6 @@ */ package org.apache.hadoop.hbase.regionserver; -import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; @@ -57,7 +56,7 @@ */ public void testHRegion() throws IOException { try { - setup(); + init(); locks(); badPuts(); basic(); @@ -65,14 +64,10 @@ batchWrite(); splitAndMerge(); read(); - cleanup(); + } catch (IOException e) { + e.printStackTrace(); + throw e; } finally { - if (r != null) { - r.close(); - } - if (log != null) { - log.closeAndDelete(); - } StaticTestEnvironment.shutdownDfs(cluster); } } @@ -96,22 +91,34 @@ HRegionIncommon region = null; private static int numInserted = 0; - - // Create directories, start mini cluster, etc. - private void setup() throws IOException { + /** {@inheritDoc} */ + @Override + public void setUp() throws Exception { + if (!StaticTestEnvironment.debugging) { + conf.setLong("hbase.hregion.max.filesize", 65536); + } cluster = new MiniDFSCluster(conf, 2, true, (String[])null); + fs = cluster.getFileSystem(); + // Set the hbase.rootdir to be the home directory in mini dfs. this.conf.set(HConstants.HBASE_DIR, this.cluster.getFileSystem().getHomeDirectory().toString()); + + super.setUp(); + } + // Create directories, start mini cluster, etc. + + private void init() throws IOException { desc = new HTableDescriptor("test"); desc.addFamily(new HColumnDescriptor("contents:")); desc.addFamily(new HColumnDescriptor("anchor:")); r = createNewHRegion(desc, null, null); log = r.getLog(); region = new HRegionIncommon(r); + System.err.println("setup completed."); } // Test basic functionality. Writes to contents:basic and anchor:anchornum-* @@ -129,7 +136,7 @@ (ANCHORSTR + k).getBytes(HConstants.UTF8_ENCODING)); region.commit(writeid, System.currentTimeMillis()); } - System.out.println("Write " + NUM_VALS + " rows. Elapsed time: " + System.err.println("Write " + NUM_VALS + " rows. Elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); // Flush cache @@ -138,7 +145,7 @@ region.flushcache(); - System.out.println("Cache flush elapsed time: " + System.err.println("Cache flush elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); // Read them back in @@ -165,8 +172,10 @@ bodystr, teststr); } - System.out.println("Read " + NUM_VALS + " rows. Elapsed time: " + System.err.println("Read " + NUM_VALS + " rows. Elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); + + System.err.println("basic completed."); } private void badPuts() { @@ -198,6 +207,7 @@ } } assertTrue("Bad family", exceptionThrown); + System.err.println("badPuts completed."); } /** @@ -253,6 +263,7 @@ } } } + System.err.println("locks completed."); } // Test scanners. Writes contents:firstcol and anchor:secondcol @@ -283,7 +294,7 @@ numInserted += 2; } - System.out.println("Write " + (vals1.length / 2) + " elapsed time: " + System.err.println("Write " + (vals1.length / 2) + " elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); // 2. Scan from cache @@ -321,7 +332,7 @@ } assertEquals("Inserted " + numInserted + " values, but fetched " + numFetched, numInserted, numFetched); - System.out.println("Scanned " + (vals1.length / 2) + System.err.println("Scanned " + (vals1.length / 2) + " rows from cache. Elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); @@ -331,7 +342,7 @@ region.flushcache(); - System.out.println("Cache flush elapsed time: " + System.err.println("Cache flush elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); // 4. Scan from disk @@ -368,7 +379,7 @@ } assertEquals("Inserted " + numInserted + " values, but fetched " + numFetched, numInserted, numFetched); - System.out.println("Scanned " + (vals1.length / 2) + System.err.println("Scanned " + (vals1.length / 2) + " rows from disk. Elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); @@ -386,7 +397,7 @@ numInserted += 2; } - System.out.println("Write " + (vals1.length / 2) + " rows. Elapsed time: " + System.err.println("Write " + (vals1.length / 2) + " rows. Elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); // 6. Scan from cache and disk @@ -423,7 +434,7 @@ } assertEquals("Inserted " + numInserted + " values, but fetched " + numFetched, numInserted, numFetched); - System.out.println("Scanned " + vals1.length + System.err.println("Scanned " + vals1.length + " rows from cache and disk. Elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); @@ -433,7 +444,7 @@ region.flushcache(); - System.out.println("Cache flush elapsed time: " + System.err.println("Cache flush elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); // 8. Scan from disk @@ -468,7 +479,7 @@ } assertEquals("Inserted " + numInserted + " values, but fetched " + numFetched, numInserted, numFetched); - System.out.println("Scanned " + vals1.length + System.err.println("Scanned " + vals1.length + " rows from disk. Elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); @@ -506,9 +517,11 @@ } assertEquals("Should have fetched " + (numInserted / 2) + " values, but fetched " + numFetched, (numInserted / 2), numFetched); - System.out.println("Scanned " + (numFetched / 2) + System.err.println("Scanned " + (numFetched / 2) + " rows from disk with specified start point. Elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); + + System.err.println("scan completed."); } // Do a large number of writes. Disabled if not debugging because it takes a @@ -517,6 +530,7 @@ private void batchWrite() throws IOException { if(! StaticTestEnvironment.debugging) { + System.err.println("batchWrite completed."); return; } @@ -542,7 +556,7 @@ buf1.toString().getBytes(HConstants.UTF8_ENCODING)); region.commit(writeid, System.currentTimeMillis()); if (k > 0 && k % (N_ROWS / 100) == 0) { - System.out.println("Flushing write #" + k); + System.err.println("Flushing write #" + k); long flushStart = System.currentTimeMillis(); region.flushcache(); @@ -550,51 +564,60 @@ totalFlush += (flushEnd - flushStart); if (k % (N_ROWS / 10) == 0) { - System.out.print("Rolling log..."); + System.err.print("Rolling log..."); long logStart = System.currentTimeMillis(); log.rollWriter(); long logEnd = System.currentTimeMillis(); totalLog += (logEnd - logStart); - System.out.println(" elapsed time: " + ((logEnd - logStart) / 1000.0)); + System.err.println(" elapsed time: " + ((logEnd - logStart) / 1000.0)); } } } long startCompact = System.currentTimeMillis(); - if(r.compactIfNeeded()) { - totalCompact = System.currentTimeMillis() - startCompact; - System.out.println("Region compacted - elapsedTime: " + (totalCompact / 1000.0)); - - } else { - System.out.println("No compaction required."); - } + r.compactStores(); + totalCompact = System.currentTimeMillis() - startCompact; + System.err.println("Region compacted - elapsedTime: " + (totalCompact / 1000.0)); long endTime = System.currentTimeMillis(); long totalElapsed = (endTime - startTime); - System.out.println(); - System.out.println("Batch-write complete."); - System.out.println("Wrote " + N_ROWS + " rows, each of ~" + valsize + " bytes"); - System.out.println("Total flush-time: " + (totalFlush / 1000.0)); - System.out.println("Total compact-time: " + (totalCompact / 1000.0)); - System.out.println("Total log-time: " + (totalLog / 1000.0)); - System.out.println("Total time elapsed: " + (totalElapsed / 1000.0)); - System.out.println("Total time, rows/second: " + (N_ROWS / (totalElapsed / 1000.0))); - System.out.println("Adjusted time (not including flush, compact, or log): " + ((totalElapsed - totalFlush - totalCompact - totalLog) / 1000.0)); - System.out.println("Adjusted time, rows/second: " + (N_ROWS / ((totalElapsed - totalFlush - totalCompact - totalLog) / 1000.0))); - System.out.println(); + System.err.println(); + System.err.println("Batch-write complete."); + System.err.println("Wrote " + N_ROWS + " rows, each of ~" + valsize + " bytes"); + System.err.println("Total flush-time: " + (totalFlush / 1000.0)); + System.err.println("Total compact-time: " + (totalCompact / 1000.0)); + System.err.println("Total log-time: " + (totalLog / 1000.0)); + System.err.println("Total time elapsed: " + (totalElapsed / 1000.0)); + System.err.println("Total time, rows/second: " + (N_ROWS / (totalElapsed / 1000.0))); + System.err.println("Adjusted time (not including flush, compact, or log): " + ((totalElapsed - totalFlush - totalCompact - totalLog) / 1000.0)); + System.err.println("Adjusted time, rows/second: " + (N_ROWS / ((totalElapsed - totalFlush - totalCompact - totalLog) / 1000.0))); + System.err.println(); + System.err.println("batchWrite completed."); } // NOTE: This test depends on testBatchWrite succeeding private void splitAndMerge() throws IOException { Path oldRegionPath = r.getRegionDir(); + Text midKey = null; + for (HStore s: r.stores.values()) { + midKey = r.compactStore(s); + if (midKey != null) { + break; + } + } + assertNotNull(midKey); long startTime = System.currentTimeMillis(); - HRegion subregions[] = r.splitRegion(this); + HRegion subregions[] = r.splitRegion(this, midKey); if (subregions != null) { - System.out.println("Split region elapsed time: " + System.err.println("Split region elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); assertEquals("Number of subregions", subregions.length, 2); + for (int i = 0; i < subregions.length; i++) { + subregions[i] = openClosedRegion(subregions[i]); + } + // Now merge it back together Path oldRegion1 = subregions[0].getRegionDir(); @@ -602,12 +625,13 @@ startTime = System.currentTimeMillis(); r = HRegion.closeAndMerge(subregions[0], subregions[1]); region = new HRegionIncommon(r); - System.out.println("Merge regions elapsed time: " + System.err.println("Merge regions elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); fs.delete(oldRegion1); fs.delete(oldRegion2); fs.delete(oldRegionPath); } + System.err.println("splitAndMerge completed."); } /** @@ -668,7 +692,7 @@ anchorFetched++; } else { - System.out.println("UNEXPECTED COLUMN " + col); + System.err.println("UNEXPECTED COLUMN " + col); } } curVals.clear(); @@ -677,7 +701,7 @@ assertEquals("Expected " + NUM_VALS + " " + CONTENTS_BASIC + " values, but fetched " + contentsFetched, NUM_VALS, contentsFetched); assertEquals("Expected " + NUM_VALS + " " + ANCHORNUM + " values, but fetched " + anchorFetched, NUM_VALS, anchorFetched); - System.out.println("Scanned " + NUM_VALS + System.err.println("Scanned " + NUM_VALS + " rows from disk. Elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); @@ -720,7 +744,7 @@ } assertEquals("Inserted " + numInserted + " values, but fetched " + numFetched, numInserted, numFetched); - System.out.println("Scanned " + (numFetched / 2) + System.err.println("Scanned " + (numFetched / 2) + " rows from disk. Elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); @@ -753,7 +777,7 @@ } assertEquals("Inserted " + N_ROWS + " values, but fetched " + numFetched, N_ROWS, numFetched); - System.out.println("Scanned " + N_ROWS + System.err.println("Scanned " + N_ROWS + " rows from disk. Elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); @@ -785,37 +809,14 @@ } assertEquals("Inserted " + (NUM_VALS + numInserted/2) + " values, but fetched " + fetched, (NUM_VALS + numInserted/2), fetched); - System.out.println("Scanned " + fetched + System.err.println("Scanned " + fetched + " rows from disk. Elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); } finally { s.close(); } + System.err.println("read completed."); } - private static void deleteFile(File f) { - if(f.isDirectory()) { - File[] children = f.listFiles(); - for(int i = 0; i < children.length; i++) { - deleteFile(children[i]); - } - } - f.delete(); - } - - private void cleanup() { - try { - r.close(); - r = null; - log.closeAndDelete(); - log = null; - } catch (IOException e) { - e.printStackTrace(); - } - - // Delete all the DFS files - - deleteFile(new File(System.getProperty("test.build.data"), "dfs")); - } } Index: src/test/org/apache/hadoop/hbase/regionserver/TestCompaction.java =================================================================== --- src/test/org/apache/hadoop/hbase/regionserver/TestCompaction.java (revision 632429) +++ src/test/org/apache/hadoop/hbase/regionserver/TestCompaction.java (working copy) @@ -92,7 +92,6 @@ */ public void testCompaction() throws Exception { createStoreFile(r); - assertFalse(r.compactIfNeeded()); for (int i = 0; i < COMPACTION_THRESHOLD; i++) { createStoreFile(r); } @@ -103,36 +102,21 @@ addContent(new HRegionIncommon(r), COLUMN_FAMILY); byte [][] bytes = this.r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/); // Assert that I can get > 5 versions (Should be at least 5 in there). - assertTrue(bytes.length >= 5); - // Try to run compaction concurrent with a thread flush just to see that - // we can. - final HRegion region = this.r; - Thread t1 = new Thread() { - @Override - public void run() { - try { - region.flushcache(); - } catch (IOException e) { - e.printStackTrace(); - } + assertTrue(("actual value: " + bytes.length), bytes.length >= 5); + try { + r.flushcache(); + } catch (IOException e) { + e.printStackTrace(); + throw e; + } + try { + for (HStore s: r.stores.values()) { + r.compactStore(s); } - }; - Thread t2 = new Thread() { - @Override - public void run() { - try { - assertTrue(region.compactIfNeeded()); - } catch (IOException e) { - e.printStackTrace(); - } - } - }; - t1.setDaemon(true); - t1.start(); - t2.setDaemon(true); - t2.start(); - t1.join(); - t2.join(); + } catch (IOException e) { + e.printStackTrace(); + throw e; + } // Now assert that there are 4 versions of a record only: thats the // 3 versions that should be in the compacted store and then the one more // we added when we flushed. But could be 3 only if the flush happened @@ -149,7 +133,8 @@ // machines and even on hudson. On said machine, its reporting in the // LOG line above that there are 3 items in row so it should pass the // below test. - assertTrue(bytes.length == 3 || bytes.length == 4); + assertTrue(("actual value: " + bytes.length), + bytes.length == 3 || bytes.length == 4); // Now add deletes to memcache and then flush it. That will put us over // the compaction threshold of 3 store files. Compacting these store files @@ -168,7 +153,15 @@ // compacted store and the flush above when we added deletes. Add more // content to be certain. createSmallerStoreFile(this.r); - assertTrue(this.r.compactIfNeeded()); + try { + r.flushcache(); + for (HStore s: r.stores.values()) { + r.compactStore(s); + } + } catch (IOException e) { + e.printStackTrace(); + throw e; + } // Assert that the first row is still deleted. bytes = this.r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/); assertNull(bytes); Index: src/java/org/apache/hadoop/hbase/HMerge.java =================================================================== --- src/java/org/apache/hadoop/hbase/HMerge.java (revision 632429) +++ src/java/org/apache/hadoop/hbase/HMerge.java (working copy) @@ -144,17 +144,16 @@ long currentSize = 0; HRegion nextRegion = null; long nextSize = 0; - Text midKey = new Text(); for (int i = 0; i < info.length - 1; i++) { if (currentRegion == null) { currentRegion = - new HRegion(tabledir, hlog, fs, conf, info[i], null, null); - currentSize = currentRegion.largestHStore(midKey).getAggregate(); + new HRegion(tabledir, hlog, fs, conf, info[i], null, null, null); + currentSize = currentRegion.getLargestHStoreSize(); } nextRegion = - new HRegion(tabledir, hlog, fs, conf, info[i + 1], null, null); + new HRegion(tabledir, hlog, fs, conf, info[i + 1], null, null, null); - nextSize = nextRegion.largestHStore(midKey).getAggregate(); + nextSize = nextRegion.getLargestHStoreSize(); if ((currentSize + nextSize) <= (maxFilesize / 2)) { // We merge two adjacent regions if their total size is less than @@ -308,7 +307,7 @@ // Scan root region to find all the meta regions root = new HRegion(rootTableDir, hlog, fs, conf, - HRegionInfo.rootRegionInfo, null, null); + HRegionInfo.rootRegionInfo, null, null, null); HScannerInterface rootScanner = root.getScanner(COL_REGIONINFO_ARRAY, new Text(), System.currentTimeMillis(), null); Index: src/java/org/apache/hadoop/hbase/regionserver/HStoreSize.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/HStoreSize.java (revision 632429) +++ src/java/org/apache/hadoop/hbase/regionserver/HStoreSize.java (working copy) @@ -1,33 +0,0 @@ - -package org.apache.hadoop.hbase.regionserver; - -/* - * Data structure to hold result of a look at store file sizes. - */ -public class HStoreSize { - final long aggregate; - final long largest; - boolean splitable; - - HStoreSize(final long a, final long l, final boolean s) { - this.aggregate = a; - this.largest = l; - this.splitable = s; - } - - public long getAggregate() { - return this.aggregate; - } - - public long getLargest() { - return this.largest; - } - - public boolean isSplitable() { - return this.splitable; - } - - public void setSplitable(final boolean s) { - this.splitable = s; - } -} \ No newline at end of file Index: src/java/org/apache/hadoop/hbase/regionserver/CompactionListener.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/CompactionListener.java (revision 0) +++ src/java/org/apache/hadoop/hbase/regionserver/CompactionListener.java (revision 0) @@ -0,0 +1,34 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +/** + * + */ +public interface CompactionListener { + /** + * HStore needs compacting + * + * @param store store to be compacted + * @param region HRegion the store belongs to. + */ + public void compactionRequested(HStore store, HRegion region); +} Index: src/java/org/apache/hadoop/hbase/regionserver/HStore.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/HStore.java (revision 632429) +++ src/java/org/apache/hadoop/hbase/regionserver/HStore.java (working copy) @@ -228,7 +228,7 @@ * @return the key that matches row exactly, or the one that * immediately preceeds it. */ - public Text getRowKeyAtOrBefore(final Text row, long timestamp) { + public Text getRowKeyAtOrBefore(final Text row, final long timestamp) { this.lock.readLock().lock(); Text key_memcache = null; @@ -284,8 +284,8 @@ // keep seeking so long as we're in the same row, and the timstamp // isn't as small as we'd like, and there are more cells to check - while (found_key.getRow().equals(key) - && found_key.getTimestamp() > timestamp && key_iterator.hasNext()) { + while (found_key.getRow().equals(key) && + found_key.getTimestamp() > timestamp && key_iterator.hasNext()) { found_key = key_iterator.next(); } @@ -295,7 +295,6 @@ if (found_key.getTimestamp() <= timestamp) { // we didn't find a key that matched by timestamp, so we have to // return null; -/* LOG.debug("Went searching for " + key + ", found " + found_key.getRow());*/ return found_key.getRow(); } } @@ -477,22 +476,22 @@ // Generate list of iterators HStoreKey firstKey = new HStoreKey(firstRow); - if (firstRow != null && firstRow.getLength() != 0) { - keyIterator = - backingMap.tailMap(firstKey).keySet().iterator(); + if (firstRow != null && firstRow.getLength() != 0) { + keyIterator = + backingMap.tailMap(firstKey).keySet().iterator(); - } else { - keyIterator = backingMap.keySet().iterator(); - } + } else { + keyIterator = backingMap.keySet().iterator(); + } - while (getNext(0)) { - if (!findFirstRow(0, firstRow)) { - continue; - } - if (columnMatch(0)) { - break; - } + while (getNext(0)) { + if (!findFirstRow(0, firstRow)) { + continue; } + if (columnMatch(0)) { + break; + } + } } catch (RuntimeException ex) { LOG.error("error initializing Memcache scanner: ", ex); close(); @@ -590,15 +589,16 @@ private final HBaseConfiguration conf; private final Path filterDir; final Filter bloomFilter; - private final Path compactionDir; - private final Integer compactLock = new Integer(0); + final long desiredMaxFileSize; + private volatile long storeSize; + private final Integer flushLock = new Integer(0); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); final AtomicInteger activeScanners = new AtomicInteger(0); - final String storeName; + final Text storeName; /* * Sorted Map of readers keyed by sequence id (Most recent should be last in @@ -614,8 +614,17 @@ private final SortedMap readers = new TreeMap(); + // The most-recent log-seq-ID that's present. The most-recent such ID means + // we can ignore all log messages up to and including that ID (because they're + // already reflected in the TreeMaps). private volatile long maxSeqId; + + private final Path compactionDir; + private final Integer compactLock = new Integer(0); private final int compactionThreshold; + private final CompactionListener listener; + private volatile boolean compactionRequested = false; + private final ReentrantReadWriteLock newScannerLock = new ReentrantReadWriteLock(); @@ -649,22 +658,34 @@ * @param fs file system object * @param reconstructionLog existing log file to apply if any * @param conf configuration object + * @param listener compaction request listener * @throws IOException */ HStore(Path basedir, HRegionInfo info, HColumnDescriptor family, - FileSystem fs, Path reconstructionLog, HBaseConfiguration conf) - throws IOException { + FileSystem fs, Path reconstructionLog, HBaseConfiguration conf, + CompactionListener listener) throws IOException { this.basedir = basedir; this.info = info; this.family = family; this.fs = fs; this.conf = conf; + this.listener = listener; this.compactionDir = HRegion.getCompactionDir(basedir); this.storeName = - this.info.getEncodedName() + "/" + this.family.getFamilyName(); + new Text(this.info.getEncodedName() + "/" + this.family.getFamilyName()); + // By default, we compact if an HStore has more than + // MIN_COMMITS_FOR_COMPACTION map files + this.compactionThreshold = + conf.getInt("hbase.hstore.compactionThreshold", 3); + + // By default we split region if a file > DEFAULT_MAX_FILE_SIZE. + this.desiredMaxFileSize = + conf.getLong("hbase.hregion.max.filesize", DEFAULT_MAX_FILE_SIZE); + this.storeSize = 0L; + if (family.getCompression() == HColumnDescriptor.CompressionType.BLOCK) { this.compression = SequenceFile.CompressionType.BLOCK; } else if (family.getCompression() == @@ -700,29 +721,18 @@ if(LOG.isDebugEnabled()) { LOG.debug("starting " + storeName + ((reconstructionLog == null || !fs.exists(reconstructionLog)) ? - " (no reconstruction log)" : - " with reconstruction log: " + reconstructionLog.toString())); + " (no reconstruction log)" : + " with reconstruction log: " + reconstructionLog.toString())); } // Go through the 'mapdir' and 'infodir' together, make sure that all // MapFiles are in a reliable state. Every entry in 'mapdir' must have a // corresponding one in 'loginfodir'. Without a corresponding log info // file, the entry in 'mapdir' must be deleted. - List hstoreFiles = loadHStoreFiles(infodir, mapdir); - for(HStoreFile hsf: hstoreFiles) { - this.storefiles.put(Long.valueOf(hsf.loadInfo(fs)), hsf); - } + // loadHStoreFiles also computes the max sequence id + this.maxSeqId = -1L; + this.storefiles.putAll(loadHStoreFiles(infodir, mapdir)); - // Now go through all the HSTORE_LOGINFOFILEs and figure out the - // most-recent log-seq-ID that's present. The most-recent such ID means we - // can ignore all log messages up to and including that ID (because they're - // already reflected in the TreeMaps). - // - // If the HSTORE_LOGINFOFILE doesn't contain a number, just ignore it. That - // means it was built prior to the previous run of HStore, and so it cannot - // contain any updates also contained in the log. - - this.maxSeqId = getMaxSequenceId(hstoreFiles); if (LOG.isDebugEnabled()) { LOG.debug("maximum sequence id for hstore " + storeName + " is " + this.maxSeqId); @@ -730,16 +740,6 @@ doReconstructionLog(reconstructionLog, maxSeqId); - // By default, we compact if an HStore has more than - // MIN_COMMITS_FOR_COMPACTION map files - this.compactionThreshold = - conf.getInt("hbase.hstore.compactionThreshold", 3); - - // We used to compact in here before bringing the store online. Instead - // get it online quick even if it needs compactions so we can start - // taking updates as soon as possible (Once online, can take updates even - // during a compaction). - // Move maxSeqId on by one. Why here? And not in HRegion? this.maxSeqId += 1; @@ -756,28 +756,13 @@ first = false; } else { this.readers.put(e.getKey(), - e.getValue().getReader(this.fs, this.bloomFilter)); + e.getValue().getReader(this.fs, this.bloomFilter)); } } } - - /* - * @param hstoreFiles - * @return Maximum sequence number found or -1. - * @throws IOException - */ - private long getMaxSequenceId(final List hstoreFiles) - throws IOException { - long maxSeqID = -1; - for (HStoreFile hsf : hstoreFiles) { - long seqid = hsf.loadInfo(fs); - if (seqid > 0) { - if (seqid > maxSeqID) { - maxSeqID = seqid; - } - } - } - return maxSeqID; + + HColumnDescriptor getFamily() { + return this.family; } long getMaxSequenceId() { @@ -818,7 +803,7 @@ } if (skippedEdits > 0 && LOG.isDebugEnabled()) { LOG.debug("Skipped " + skippedEdits + - " edits because sequence id <= " + maxSeqID); + " edits because sequence id <= " + maxSeqID); } // Check this edit is for me. Also, guard against writing // METACOLUMN info such as HBASE::CACHEFLUSH entries @@ -828,8 +813,7 @@ || !HStoreKey.extractFamily(column).equals(family.getFamilyName())) { if (LOG.isDebugEnabled()) { LOG.debug("Passing on edit " + key.getRegionName() + ", " + - column.toString() + ": " + - new String(val.getVal(), UTF8_ENCODING) + + column + ": " + new String(val.getVal(), UTF8_ENCODING) + ", my region: " + info.getRegionName() + ", my column: " + family.getFamilyName()); } @@ -864,7 +848,7 @@ * @param mapdir qualified path for map file directory * @throws IOException */ - private List loadHStoreFiles(Path infodir, Path mapdir) + private SortedMap loadHStoreFiles(Path infodir, Path mapdir) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("infodir: " + infodir.toString() + " mapdir: " + @@ -873,7 +857,7 @@ // Look first at info files. If a reference, these contain info we need // to create the HStoreFile. FileStatus infofiles[] = fs.listStatus(infodir); - ArrayList results = new ArrayList(infofiles.length); + SortedMap results = new TreeMap(); ArrayList mapfiles = new ArrayList(infofiles.length); for (int i = 0; i < infofiles.length; i++) { Path p = infofiles[i].getPath(); @@ -897,6 +881,20 @@ } curfile = new HStoreFile(conf, fs, basedir, info.getEncodedName(), family.getFamilyName(), fid, reference); + + storeSize += curfile.length(); + long storeSeqId = -1; + try { + storeSeqId = curfile.loadInfo(fs); + if (storeSeqId > this.maxSeqId) { + this.maxSeqId = storeSeqId; + } + } catch (IOException e) { + // If the HSTORE_LOGINFOFILE doesn't contain a number, just ignore it. + // That means it was built prior to the previous run of HStore, and so + // it cannot contain any updates also contained in the log. + } + Path mapfile = curfile.getMapFilePath(); if (!fs.exists(mapfile)) { fs.delete(curfile.getInfoFilePath()); @@ -908,7 +906,7 @@ // TODO: Confirm referent exists. // Found map and sympathetic info file. Add this hstorefile to result. - results.add(curfile); + results.put(storeSeqId, curfile); // Keep list of sympathetic data mapfiles for cleaning info dir in next // section. Make sure path is fully qualified for compare. mapfiles.add(mapfile); @@ -1103,10 +1101,10 @@ synchronized(flushLock) { // A. Write the Maps out to the disk HStoreFile flushedFile = new HStoreFile(conf, fs, basedir, - info.getEncodedName(), family.getFamilyName(), -1L, null); + info.getEncodedName(), family.getFamilyName(), -1L, null); String name = flushedFile.toString(); MapFile.Writer out = flushedFile.getWriter(this.fs, this.compression, - this.bloomFilter); + this.bloomFilter); // Here we tried picking up an existing HStoreFile from disk and // interlacing the memcache flush compacting as we go. The notion was @@ -1120,18 +1118,23 @@ // Related, looks like 'merging compactions' in BigTable paper interlaces // a memcache flush. We don't. int entries = 0; + long cacheSize = 0; try { for (Map.Entry es: cache.entrySet()) { HStoreKey curkey = es.getKey(); + byte[] bytes = es.getValue(); TextSequence f = HStoreKey.extractFamily(curkey.getColumn()); if (f.equals(this.family.getFamilyName())) { entries++; - out.append(curkey, new ImmutableBytesWritable(es.getValue())); + out.append(curkey, new ImmutableBytesWritable(bytes)); + cacheSize += curkey.getSize() + (bytes != null ? bytes.length : 0); } } } finally { out.close(); } + long newStoreSize = flushedFile.length(); + storeSize += newStoreSize; // B. Write out the log sequence number that corresponds to this output // MapFile. The MapFile is current up to and including the log seq num. @@ -1152,15 +1155,25 @@ this.storefiles.put(flushid, flushedFile); if(LOG.isDebugEnabled()) { LOG.debug("Added " + name + " with " + entries + - " entries, sequence id " + logCacheFlushId + ", and size " + - StringUtils.humanReadableInt(flushedFile.length()) + " for " + + " entries, sequence id " + logCacheFlushId + ", data size " + + StringUtils.humanReadableInt(cacheSize) + ", file size " + + StringUtils.humanReadableInt(newStoreSize) + " for " + this.storeName); } } finally { this.lock.writeLock().unlock(); } - return; } + if (!compactionRequested && + (storefiles.size() > compactionThreshold || + storeSize > this.desiredMaxFileSize)) { + + if (LOG.isDebugEnabled()) { + LOG.debug("Requesting compaction for store " + storeName); + } + compactionRequested = true; + listener.compactionRequested(this, null); + } } ////////////////////////////////////////////////////////////////////////////// @@ -1168,28 +1181,6 @@ ////////////////////////////////////////////////////////////////////////////// /** - * @return True if this store needs compaction. - */ - boolean needsCompaction() { - return this.storefiles != null && - (this.storefiles.size() >= this.compactionThreshold || hasReferences()); - } - - /* - * @return True if this store has references. - */ - private boolean hasReferences() { - if (this.storefiles != null) { - for (HStoreFile hsf: this.storefiles.values()) { - if (hsf.isReference()) { - return true; - } - } - } - return false; - } - - /** * Compact the back-HStores. This method may take some time, so the calling * thread must be able to block for long periods. * @@ -1206,55 +1197,76 @@ * can be lengthy and we want to allow cache-flushes during this period. * @throws IOException * - * @return true if compaction completed successfully + * @return mid key if a split is needed, null otherwise */ - boolean compact() throws IOException { - synchronized (compactLock) { - if (LOG.isDebugEnabled()) { - LOG.debug("started compaction of " + storefiles.size() + - " files using " + compactionDir.toString() + " for " + - this.storeName); - } + Text compact() throws IOException { + try { + synchronized (compactLock) { + List filesToCompact = null; + long maxId = -1; + synchronized (storefiles) { + if (storefiles.size() < 1 || + (storefiles.size() == 1 && + !storefiles.get(storefiles.firstKey()).isReference())) { + if (LOG.isDebugEnabled()) { + LOG.debug("nothing to compact for " + this.storeName + " because" + + (storefiles.size() < 1 ? " no store files to compact" : + " only one store file and it is not a refeerence")); + } + return null; + } + if (LOG.isDebugEnabled()) { + LOG.debug("started compaction of " + storefiles.size() + + " files using " + compactionDir.toString() + " for " + + this.storeName); + } - // Storefiles are keyed by sequence id. The oldest file comes first. - // We need to return out of here a List that has the newest file first. - List filesToCompact = - new ArrayList(this.storefiles.values()); - Collections.reverse(filesToCompact); - if (filesToCompact.size() < 1 || - (filesToCompact.size() == 1 && !filesToCompact.get(0).isReference())) { - if (LOG.isDebugEnabled()) { - LOG.debug("nothing to compact for " + this.storeName); + // Storefiles are keyed by sequence id. The oldest file comes first. + // We need to return out of here a List that has the newest file first. + filesToCompact = new ArrayList(this.storefiles.values()); + Collections.reverse(filesToCompact); + + // The max-sequenceID in any of the to-be-compacted TreeMaps is the + // last key of storefiles. + + maxId = this.storefiles.lastKey(); } - return false; - } - if (!fs.exists(compactionDir) && !fs.mkdirs(compactionDir)) { - LOG.warn("Mkdir on " + compactionDir.toString() + " failed"); - return false; - } + if (!fs.exists(compactionDir) && !fs.mkdirs(compactionDir)) { + LOG.warn("Mkdir on " + compactionDir.toString() + " failed"); + return null; + } - // Step through them, writing to the brand-new MapFile - HStoreFile compactedOutputFile = new HStoreFile(conf, fs, - this.compactionDir, info.getEncodedName(), family.getFamilyName(), - -1L, null); - MapFile.Writer compactedOut = compactedOutputFile.getWriter(this.fs, - this.compression, this.bloomFilter); - try { - compactHStoreFiles(compactedOut, filesToCompact); - } finally { - compactedOut.close(); - } + // Step through them, writing to the brand-new MapFile + HStoreFile compactedOutputFile = new HStoreFile(conf, fs, + this.compactionDir, info.getEncodedName(), family.getFamilyName(), + -1L, null); + MapFile.Writer compactedOut = compactedOutputFile.getWriter(this.fs, + this.compression, this.bloomFilter); + try { + compactHStoreFiles(compactedOut, filesToCompact); + } finally { + compactedOut.close(); + } - // Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap. - // Compute max-sequenceID seen in any of the to-be-compacted TreeMaps. - long maxId = getMaxSequenceId(filesToCompact); - compactedOutputFile.writeInfo(fs, maxId); + // Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap. + compactedOutputFile.writeInfo(fs, maxId); - // Move the compaction into place. - completeCompaction(filesToCompact, compactedOutputFile); - return true; + // Move the compaction into place. + completeCompaction(filesToCompact, compactedOutputFile); + + if (LOG.isDebugEnabled()) { + LOG.debug("Completed compaction of " + this.storeName + + " store size is " + StringUtils.humanReadableInt(storeSize)); + } + } + } finally { + compactionRequested = false; + if (storeSize > this.desiredMaxFileSize) { + return checkSplit(); + } } + return null; } /* @@ -1284,8 +1296,8 @@ // exception message so output a message here where we know the // culprit. LOG.warn("Failed with " + e.toString() + ": " + hsf.toString() + - (hsf.isReference() ? " " + hsf.getReference().toString() : "") + - " for " + this.storeName); + (hsf.isReference() ? " " + hsf.getReference().toString() : "") + + " for " + this.storeName); closeCompactionReaders(rdrs); throw e; } @@ -1502,11 +1514,11 @@ *
    * 1) Wait for active scanners to exit
    * 2) Acquiring the write-lock
-   * 3) Figuring out what MapFiles are going to be replaced
-   * 4) Moving the new compacted MapFile into place
-   * 5) Unloading all the replaced MapFiles.
-   * 6) Deleting all the old MapFile files.
-   * 7) Loading the new TreeMap.
+   * 3) Moving the new compacted MapFile into place
+   * 4) Unloading all the replaced MapFiles and close.
+   * 5) Deleting all the replaced MapFile files.
+   * 6) Loading the new TreeMap.
+   * 7) Compute new store size
    * 8) Releasing the write-lock
    * 9) Allow new scanners to proceed.
    * 
@@ -1554,46 +1566,53 @@ // 4. and 5. Unload all the replaced MapFiles, close and delete. - List toDelete = new ArrayList(); - for (Map.Entry e: this.storefiles.entrySet()) { - if (!compactedFiles.contains(e.getValue())) { - continue; + synchronized (storefiles) { + List toDelete = new ArrayList(); + for (Map.Entry e: this.storefiles.entrySet()) { + if (!compactedFiles.contains(e.getValue())) { + continue; + } + Long key = e.getKey(); + MapFile.Reader reader = this.readers.remove(key); + if (reader != null) { + reader.close(); + } + toDelete.add(key); } - Long key = e.getKey(); - MapFile.Reader reader = this.readers.remove(key); - if (reader != null) { - reader.close(); - } - toDelete.add(key); - } - try { - for (Long key: toDelete) { - HStoreFile hsf = this.storefiles.remove(key); - hsf.delete(); + try { + for (Long key: toDelete) { + HStoreFile hsf = this.storefiles.remove(key); + hsf.delete(); + } + + // 6. Loading the new TreeMap. + Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs)); + this.readers.put(orderVal, + // Use a block cache (if configured) for this reader since + // it is the only one. + finalCompactedFile.getReader(this.fs, this.bloomFilter, + family.isBlockCacheEnabled())); + this.storefiles.put(orderVal, finalCompactedFile); + } catch (IOException e) { + e = RemoteExceptionHandler.checkIOException(e); + LOG.error("Failed replacing compacted files for " + this.storeName + + ". Compacted file is " + finalCompactedFile.toString() + + ". Files replaced are " + compactedFiles.toString() + + " some of which may have been already removed", e); } - - // 6. Loading the new TreeMap. - Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs)); - this.readers.put(orderVal, - // Use a block cache (if configured) for this reader since - // it is the only one. - finalCompactedFile.getReader(this.fs, this.bloomFilter, - family.isBlockCacheEnabled())); - this.storefiles.put(orderVal, finalCompactedFile); - } catch (IOException e) { - e = RemoteExceptionHandler.checkIOException(e); - LOG.error("Failed replacing compacted files for " + this.storeName + - ". Compacted file is " + finalCompactedFile.toString() + - ". Files replaced are " + compactedFiles.toString() + - " some of which may have been already removed", e); + // 7. Compute new store size + storeSize = 0L; + for (HStoreFile hsf: storefiles.values()) { + storeSize += hsf.length(); + } } } finally { - // 7. Releasing the write-lock + // 8. Releasing the write-lock this.lock.writeLock().unlock(); } } finally { - // 8. Allow new scanners to proceed. + // 9. Allow new scanners to proceed. newScannerLock.writeLock().unlock(); } } @@ -1732,7 +1751,7 @@ } } return results.size() == 0 ? - null : ImmutableBytesWritable.toArray(results); + null : ImmutableBytesWritable.toArray(results); } finally { this.lock.readLock().unlock(); } @@ -1788,7 +1807,7 @@ do{ // if the row matches, we might want this one. - if(rowMatches(origin, readkey)){ + if (rowMatches(origin, readkey)) { // if the cell matches, then we definitely want this key. if (cellMatches(origin, readkey)) { // store the key if it isn't deleted or superceeded by what's @@ -1807,11 +1826,11 @@ // timestamps, so move to the next key continue; } - } else{ + } else { // the row doesn't match, so we've gone too far. break; } - }while(map.next(readkey, readval)); // advance to the next key + } while (map.next(readkey, readval)); // advance to the next key } } @@ -1879,8 +1898,7 @@ * and timestamp */ private Text rowAtOrBeforeFromMapFile(MapFile.Reader map, Text row, - long timestamp) - throws IOException { + long timestamp) throws IOException { HStoreKey searchKey = new HStoreKey(row, timestamp); Text previousRow = null; ImmutableBytesWritable readval = new ImmutableBytesWritable(); @@ -1980,71 +1998,74 @@ } /** - * Gets size for the store. + * Determines if HStore can be split * - * @param midKey Gets set to the middle key of the largest splitable store - * file or its set to empty if largest is not splitable. - * @return Sizes for the store and the passed midKey is - * set to midKey of largest splitable. Otherwise, its set to empty - * to indicate we couldn't find a midkey to split on + * @return midKey if store can be split, null otherwise */ - HStoreSize size(Text midKey) { - long maxSize = 0L; - long aggregateSize = 0L; - // Not splitable if we find a reference store file present in the store. - boolean splitable = true; + Text checkSplit() { if (this.storefiles.size() <= 0) { - return new HStoreSize(0, 0, splitable); + return null; } - this.lock.readLock().lock(); try { + // Not splitable if we find a reference store file present in the store. + boolean splitable = true; + long maxSize = 0L; Long mapIndex = Long.valueOf(0L); // Iterate through all the MapFiles - for (Map.Entry e: storefiles.entrySet()) { - HStoreFile curHSF = e.getValue(); - long size = curHSF.length(); - aggregateSize += size; - if (maxSize == 0L || size > maxSize) { - // This is the largest one so far - maxSize = size; - mapIndex = e.getKey(); + synchronized (storefiles) { + for (Map.Entry e: storefiles.entrySet()) { + HStoreFile curHSF = e.getValue(); + long size = curHSF.length(); + if (size > maxSize) { + // This is the largest one so far + maxSize = size; + mapIndex = e.getKey(); + } + if (splitable) { + splitable = !curHSF.isReference(); + } } - if (splitable) { - splitable = !curHSF.isReference(); - } } - if (splitable) { - MapFile.Reader r = this.readers.get(mapIndex); - // seek back to the beginning of mapfile - r.reset(); - // get the first and last keys - HStoreKey firstKey = new HStoreKey(); - HStoreKey lastKey = new HStoreKey(); - Writable value = new ImmutableBytesWritable(); - r.next(firstKey, value); - r.finalKey(lastKey); - // get the midkey - HStoreKey mk = (HStoreKey)r.midKey(); - if (mk != null) { - // if the midkey is the same as the first and last keys, then we cannot - // (ever) split this region. - if (mk.getRow().equals(firstKey.getRow()) && - mk.getRow().equals(lastKey.getRow())) { - return new HStoreSize(aggregateSize, maxSize, false); - } - // Otherwise, set midKey - midKey.set(mk.getRow()); + if (!splitable) { + return null; + } + MapFile.Reader r = this.readers.get(mapIndex); + + // seek back to the beginning of mapfile + r.reset(); + + // get the first and last keys + HStoreKey firstKey = new HStoreKey(); + HStoreKey lastKey = new HStoreKey(); + Writable value = new ImmutableBytesWritable(); + r.next(firstKey, value); + r.finalKey(lastKey); + + // get the midkey + HStoreKey mk = (HStoreKey)r.midKey(); + if (mk != null) { + // if the midkey is the same as the first and last keys, then we cannot + // (ever) split this region. + if (mk.getRow().equals(firstKey.getRow()) && + mk.getRow().equals(lastKey.getRow())) { + return null; } + return mk.getRow(); } } catch(IOException e) { LOG.warn("Failed getting store size for " + this.storeName, e); } finally { this.lock.readLock().unlock(); } - return new HStoreSize(aggregateSize, maxSize, splitable); + return null; } + /** @return aggregate size of HStore */ + public long getSize() { + return storeSize; + } + ////////////////////////////////////////////////////////////////////////////// // File administration ////////////////////////////////////////////////////////////////////////////// @@ -2073,7 +2094,7 @@ /** {@inheritDoc} */ @Override public String toString() { - return this.storeName; + return this.storeName.toString(); } /* @@ -2119,19 +2140,21 @@ throws IOException { super(timestamp, targetCols); try { - this.readers = new MapFile.Reader[storefiles.size()]; - - // Most recent map file should be first - int i = readers.length - 1; - for(HStoreFile curHSF: storefiles.values()) { - readers[i--] = curHSF.getReader(fs, bloomFilter); + synchronized (storefiles) { + this.readers = new MapFile.Reader[storefiles.size()]; + + // Most recent map file should be first + int i = readers.length - 1; + for(HStoreFile curHSF: storefiles.values()) { + readers[i--] = curHSF.getReader(fs, bloomFilter); + } } this.keys = new HStoreKey[readers.length]; this.vals = new byte[readers.length][]; // Advance the readers to the first pos. - for(i = 0; i < readers.length; i++) { + for(int i = 0; i < readers.length; i++) { keys[i] = new HStoreKey(); if(firstRow.getLength() != 0) { @@ -2166,8 +2189,8 @@ @Override boolean findFirstRow(int i, Text firstRow) throws IOException { ImmutableBytesWritable ibw = new ImmutableBytesWritable(); - HStoreKey firstKey - = (HStoreKey)readers[i].getClosest(new HStoreKey(firstRow), ibw); + HStoreKey firstKey = + (HStoreKey)readers[i].getClosest(new HStoreKey(firstRow), ibw); if (firstKey == null) { // Didn't find it. Close the scanner and return TRUE closeSubScanner(i); @@ -2330,9 +2353,9 @@ for (int i = 0; i < this.keys.length; i++) { if (scanners[i] != null && (chosenRow == null || - (keys[i].getRow().compareTo(chosenRow) < 0) || - ((keys[i].getRow().compareTo(chosenRow) == 0) && - (keys[i].getTimestamp() > chosenTimestamp)))) { + (keys[i].getRow().compareTo(chosenRow) < 0) || + ((keys[i].getRow().compareTo(chosenRow) == 0) && + (keys[i].getTimestamp() > chosenTimestamp)))) { chosenRow = new Text(keys[i].getRow()); chosenTimestamp = keys[i].getTimestamp(); } @@ -2480,11 +2503,11 @@ /** {@inheritDoc} */ public void close() { try { - for(int i = 0; i < scanners.length; i++) { - if(scanners[i] != null) { - closeScanner(i); + for(int i = 0; i < scanners.length; i++) { + if(scanners[i] != null) { + closeScanner(i); + } } - } } finally { synchronized (activeScanners) { int numberOfScanners = activeScanners.decrementAndGet(); Index: src/java/org/apache/hadoop/hbase/regionserver/QueueEntry.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/QueueEntry.java (revision 632429) +++ src/java/org/apache/hadoop/hbase/regionserver/QueueEntry.java (working copy) @@ -19,17 +19,12 @@ */ package org.apache.hadoop.hbase.regionserver; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.Delayed; - /** Queue entry passed to flusher, compactor and splitter threads */ -class QueueEntry implements Delayed { +class QueueEntry { private final HRegion region; - private long expirationTime; - QueueEntry(HRegion region, long expirationTime) { + QueueEntry(HRegion region) { this.region = region; - this.expirationTime = expirationTime; } /** {@inheritDoc} */ @@ -45,34 +40,8 @@ return this.region.getRegionInfo().hashCode(); } - /** {@inheritDoc} */ - public long getDelay(TimeUnit unit) { - return unit.convert(this.expirationTime - System.currentTimeMillis(), - TimeUnit.MILLISECONDS); - } - - /** {@inheritDoc} */ - public int compareTo(Delayed o) { - long delta = this.getDelay(TimeUnit.MILLISECONDS) - - o.getDelay(TimeUnit.MILLISECONDS); - - int value = 0; - if (delta > 0) { - value = 1; - - } else if (delta < 0) { - value = -1; - } - return value; - } - /** @return the region */ public HRegion getRegion() { return region; } - - /** @param expirationTime the expirationTime to set */ - public void setExpirationTime(long expirationTime) { - this.expirationTime = expirationTime; - } } \ No newline at end of file Index: src/java/org/apache/hadoop/hbase/regionserver/Flusher.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/Flusher.java (revision 632429) +++ src/java/org/apache/hadoop/hbase/regionserver/Flusher.java (working copy) @@ -20,10 +20,10 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; -import java.util.concurrent.DelayQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.Set; -import java.util.Iterator; import java.util.ConcurrentModificationException; import org.apache.commons.logging.Log; @@ -35,15 +35,15 @@ /** Flush cache upon request */ class Flusher extends Thread implements CacheFlushListener { static final Log LOG = LogFactory.getLog(Flusher.class); - private final DelayQueue flushQueue = - new DelayQueue(); + private final BlockingQueue flushQueue = + new LinkedBlockingQueue(); private final long optionalFlushPeriod; private final HRegionServer server; private final HBaseConfiguration conf; private final Integer lock = new Integer(0); - /** constructor */ + /** @param server */ public Flusher(final HRegionServer server) { super(); this.server = server; @@ -59,39 +59,23 @@ QueueEntry e = null; try { e = flushQueue.poll(server.threadWakeFrequency, TimeUnit.MILLISECONDS); - if (e == null) { - continue; - } - synchronized(lock) { // Don't interrupt while we're working - if (e.getRegion().flushcache()) { - server.compactionRequested(e); + if (e != null) { + synchronized(lock) { // Don't interrupt while we're working + e.getRegion().flushcache(); } - - e.setExpirationTime(System.currentTimeMillis() + - optionalFlushPeriod); - flushQueue.add(e); } - // Now ensure that all the active regions are in the queue + // Queue up regions for optional flush if they need it Set regions = server.getRegionsToCheck(); - for (HRegion r: regions) { - e = new QueueEntry(r, r.getLastFlushTime() + optionalFlushPeriod); - synchronized (flushQueue) { - if (!flushQueue.contains(e)) { + long now = System.currentTimeMillis(); + synchronized (flushQueue) { + for (HRegion r: regions) { + if (now - optionalFlushPeriod > r.getLastFlushTime()) { + e = new QueueEntry(r); flushQueue.add(e); } } } - - // Now make sure that the queue only contains active regions - synchronized (flushQueue) { - for (Iterator i = flushQueue.iterator(); i.hasNext(); ) { - e = i.next(); - if (!regions.contains(e.getRegion())) { - i.remove(); - } - } - } } catch (InterruptedException ex) { continue; } catch (ConcurrentModificationException ex) { @@ -128,13 +112,7 @@ /** {@inheritDoc} */ public void flushRequested(HRegion region) { - QueueEntry e = new QueueEntry(region, System.currentTimeMillis()); - synchronized (flushQueue) { - if (flushQueue.contains(e)) { - flushQueue.remove(e); - } - flushQueue.add(e); - } + flushQueue.add(new QueueEntry(region)); } /** Index: src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 632429) +++ src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -26,7 +26,6 @@ import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; -import java.util.ConcurrentModificationException; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -38,8 +37,6 @@ import java.util.TreeMap; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Delayed; -import java.util.concurrent.DelayQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -59,7 +56,6 @@ import org.apache.hadoop.hbase.util.InfoServer; import org.apache.hadoop.hbase.util.Sleeper; import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.io.HbaseMapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -79,13 +75,11 @@ import org.apache.hadoop.hbase.HScannerInterface; import org.apache.hadoop.hbase.LeaseListener; import org.apache.hadoop.hbase.RemoteExceptionHandler; -import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.HServerLoad; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.LocalHBaseCluster; import org.apache.hadoop.hbase.ipc.HRegionInterface; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.ipc.HMasterRegionInterface; /** @@ -790,10 +784,11 @@ HTableDescriptor.getTableDir(rootDir, regionInfo.getTableDesc().getName() ), - this.log, this.fs, conf, regionInfo, null, this.cacheFlusher + this.log, this.fs, conf, regionInfo, null, this.cacheFlusher, + this.compactSplitThread ); // Startup a compaction early if one is needed. - this.compactSplitThread.compactionRequested(region); + region.compactStores(); } catch (IOException e) { LOG.error("error opening region " + regionInfo.getRegionName(), e); @@ -1190,15 +1185,12 @@ } /** @return the info server */ - /** - * Get the InfoServer this HRegionServer has put up. - */ public InfoServer getInfoServer() { return infoServer; } /** - * Check if a stop has been requested. + * @return true if a stop has been requested. */ public boolean isStopRequested() { return stopRequested.get(); @@ -1209,10 +1201,6 @@ return lock.writeLock(); } - void compactionRequested(QueueEntry e) { - compactSplitThread.compactionRequested(e); - } - /** * @return Immutable list of this servers regions. */ Index: src/java/org/apache/hadoop/hbase/regionserver/HLog.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/HLog.java (revision 632429) +++ src/java/org/apache/hadoop/hbase/regionserver/HLog.java (working copy) @@ -304,7 +304,7 @@ * file-number. */ Path computeFilename(final long fn) { - return new Path(dir, HLOG_DATFILE + new Long(fn).toString()); + return new Path(dir, HLOG_DATFILE + fn); } /** Index: src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (revision 632429) +++ src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (working copy) @@ -40,21 +40,42 @@ * Compact region on request and then run split if appropriate */ class CompactSplitThread extends Thread -implements RegionUnavailableListener, HConstants { +implements RegionUnavailableListener, CompactionListener, HConstants { static final Log LOG = LogFactory.getLog(CompactSplitThread.class); + + private class CompactQueueEntry extends QueueEntry { + private final HStore store; + CompactQueueEntry(HStore store, HRegion region) { + super(region); + this.store = store; + } + + /** {@inheritDoc} */ + @Override + public int hashCode() { + int result = super.hashCode(); + result ^= store.getFamily().hashCode(); + return result; + } + + HStore getStore() { + return this.store; + } + } + private HTable root = null; private HTable meta = null; - private long startTime; + private volatile long startTime; private final long frequency; - private HRegionServer server; - private HBaseConfiguration conf; + private final HRegionServer server; + private final HBaseConfiguration conf; - private final BlockingQueue compactionQueue = - new LinkedBlockingQueue(); + private final BlockingQueue compactionQueue = + new LinkedBlockingQueue(); - /** constructor */ + /** @param server */ public CompactSplitThread(HRegionServer server) { super(); this.server = server; @@ -68,19 +89,26 @@ @Override public void run() { while (!server.isStopRequested()) { - QueueEntry e = null; + CompactQueueEntry e = null; try { e = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS); - if (e == null) { - continue; + if (e != null) { + HStore store = e.getStore(); + HRegion region = e.getRegion(); + Text midKey = region.compactStore(store); + if (midKey != null) { + split(region, midKey); + } } - e.getRegion().compactIfNeeded(); - split(e.getRegion()); } catch (InterruptedException ex) { continue; } catch (IOException ex) { LOG.error("Compaction failed" + - (e != null ? (" for region " + e.getRegion().getRegionName()) : ""), + (e != null ? + (" for region " + e.getRegion().getRegionName() + " store " + + e.getStore().storeName) + : "" + ), RemoteExceptionHandler.checkIOException(ex)); if (!server.checkFileSystem()) { break; @@ -88,7 +116,11 @@ } catch (Exception ex) { LOG.error("Compaction failed" + - (e != null ? (" for region " + e.getRegion().getRegionName()) : ""), + (e != null ? + (" for region " + e.getRegion().getRegionName() + " store " + + e.getStore().storeName) + : "" + ), ex); if (!server.checkFileSystem()) { break; @@ -99,19 +131,17 @@ } /** - * @param e QueueEntry for region to be compacted + * @param s HStore to compact + * @param r HRegion store belongs to */ - public void compactionRequested(QueueEntry e) { - compactionQueue.add(e); + public void compactionRequested(HStore s, HRegion r) { + compactionQueue.add(new CompactQueueEntry(s, r)); } - void compactionRequested(final HRegion r) { - compactionRequested(new QueueEntry(r, System.currentTimeMillis())); - } - - private void split(final HRegion region) throws IOException { + private void split(final HRegion region, final Text midKey) + throws IOException { final HRegionInfo oldRegionInfo = region.getRegionInfo(); - final HRegion[] newRegions = region.splitRegion(this); + final HRegion[] newRegions = region.splitRegion(this, midKey); if (newRegions == null) { // Didn't need to be split return; Index: src/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 632429) +++ src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -47,6 +47,7 @@ import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.hbase.regionserver.CompactionListener; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HRegionInfo; @@ -95,7 +96,7 @@ * regionName is a unique identifier for this HRegion. (startKey, endKey] * defines the keyspace for this HRegion. */ -public class HRegion implements HConstants { +public class HRegion implements HConstants, CompactionListener { static final String SPLITDIR = "splits"; static final String MERGEDIR = "merges"; static final Random rand = new Random(); @@ -173,7 +174,7 @@ // Done // Construction moves the merge files into place under region. HRegion dstRegion = new HRegion(basedir, log, fs, conf, newRegionInfo, - newRegionDir, null); + newRegionDir, null, null); // Get rid of merges directory @@ -246,7 +247,7 @@ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final Integer updateLock = new Integer(0); private final Integer splitLock = new Integer(0); - private final long desiredMaxFileSize; + private final CompactionListener compactionListener; private final long minSequenceId; final AtomicInteger activeScannerCount = new AtomicInteger(0); @@ -257,6 +258,8 @@ /** * HRegion constructor. * + * @param basedir qualified path of directory where region should be located, + * usually the table directory. * @param log The HLog is the outbound log for any updates to the HRegion * (There's a single HLog for all the HRegions on a single HRegionServer.) * The log file is a logfile from the previous execution that's @@ -264,19 +267,20 @@ * appropriate log info for this HRegion. If there is a previous log file * (implying that the HRegion has been written-to before), then read it from * the supplied path. - * @param basedir qualified path of directory where region should be located, - * usually the table directory. * @param fs is the filesystem. * @param conf is global configuration settings. * @param regionInfo - HRegionInfo that describes the region * @param initialFiles If there are initial files (implying that the HRegion * is new), then read them from the supplied path. - * @param listener an object that implements CacheFlushListener or null + * @param flushListener an object that implements CacheFlushListener or null + * @param compactionListener an object that implements CompactionListener + * or null * * @throws IOException */ public HRegion(Path basedir, HLog log, FileSystem fs, HBaseConfiguration conf, - HRegionInfo regionInfo, Path initialFiles, CacheFlushListener listener) + HRegionInfo regionInfo, Path initialFiles, + CacheFlushListener flushListener, CompactionListener compactionListener) throws IOException { this.basedir = basedir; @@ -284,6 +288,8 @@ this.fs = fs; this.conf = conf; this.regionInfo = regionInfo; + this.flushListener = flushListener; + this.compactionListener = compactionListener; this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000); this.regiondir = new Path(basedir, this.regionInfo.getEncodedName()); Path oldLogFile = new Path(regiondir, HREGION_OLDLOGFILE_NAME); @@ -303,7 +309,7 @@ this.regionInfo.getTableDesc().families().values()) { HStore store = new HStore(this.basedir, this.regionInfo, c, this.fs, - oldLogFile, this.conf); + oldLogFile, this.conf, this); stores.put(c.getFamilyName(), store); @@ -331,19 +337,23 @@ // By default, we flush the cache when 64M. this.memcacheFlushSize = conf.getInt("hbase.hregion.memcache.flush.size", 1024*1024*64); - this.flushListener = listener; + this.blockingMemcacheSize = this.memcacheFlushSize * conf.getInt("hbase.hregion.memcache.block.multiplier", 1); - // By default we split region if a file > DEFAULT_MAX_FILE_SIZE. - this.desiredMaxFileSize = - conf.getLong("hbase.hregion.max.filesize", DEFAULT_MAX_FILE_SIZE); - // HRegion is ready to go! this.writestate.compacting = false; this.lastFlushTime = System.currentTimeMillis(); LOG.info("region " + this.regionInfo.getRegionName() + " available"); } + + /** {@inheritDoc} */ + public void compactionRequested(HStore store, + @SuppressWarnings("unused") HRegion region) { + if (this.compactionListener != null) { + this.compactionListener.compactionRequested(store, this); + } + } /** * @return Updates to this region need to have a sequence id that is >= to @@ -408,7 +418,7 @@ // region. writestate.writesEnabled = false; LOG.debug("compactions and cache flushes disabled for region " + - regionName); + regionName); while (writestate.compacting || writestate.flushing) { LOG.debug("waiting for" + (writestate.compacting ? " compaction" : "") + @@ -544,34 +554,16 @@ // upkeep. ////////////////////////////////////////////////////////////////////////////// - /** - * @param midkey - * @return returns size of largest HStore. Also returns whether store is - * splitable or not (Its not splitable if region has a store that has a - * reference store file). - */ - public HStoreSize largestHStore(Text midkey) { - HStoreSize biggest = null; - boolean splitable = true; + /** @return returns size of largest HStore. */ + public long getLargestHStoreSize() { + long size = 0; for (HStore h: stores.values()) { - HStoreSize size = h.size(midkey); - // If we came across a reference down in the store, then propagate - // fact that region is not splitable. - if (splitable) { - splitable = size.splitable; + long storeSize = h.getSize(); + if (storeSize > size) { + size = storeSize; } - if (biggest == null) { - biggest = size; - continue; - } - if(size.getAggregate() > biggest.getAggregate()) { // Largest so far - biggest = size; - } } - if (biggest != null) { - biggest.setSplitable(splitable); - } - return biggest; + return size; } /* @@ -580,21 +572,17 @@ * but instead create new 'reference' store files that read off the top and * bottom ranges of parent store files. * @param listener May be null. + * @param midKey key on which to split region * @return two brand-new (and open) HRegions or null if a split is not needed * @throws IOException */ - HRegion[] splitRegion(final RegionUnavailableListener listener) - throws IOException { + HRegion[] splitRegion(final RegionUnavailableListener listener, + final Text midKey) throws IOException { synchronized (splitLock) { - Text midKey = new Text(); - if (closed.get() || !needsSplit(midKey)) { + if (closed.get()) { return null; } - Path splits = new Path(this.regiondir, SPLITDIR); - if(!this.fs.exists(splits)) { - this.fs.mkdirs(splits); - } - // Make copies just in case and add start/end key checking: hbase-428. + // Add start/end key checking: hbase-428. Text startKey = new Text(this.regionInfo.getStartKey()); Text endKey = new Text(this.regionInfo.getEndKey()); if (startKey.equals(midKey)) { @@ -605,6 +593,11 @@ LOG.debug("Endkey and midkey are same, not splitting"); return null; } + LOG.info("Starting split of region " + getRegionName()); + Path splits = new Path(this.regiondir, SPLITDIR); + if(!this.fs.exists(splits)) { + this.fs.mkdirs(splits); + } HRegionInfo regionAInfo = new HRegionInfo(this.regionInfo.getTableDesc(), startKey, midKey); Path dirA = new Path(splits, regionAInfo.getEncodedName()); @@ -654,10 +647,10 @@ // Opening the region copies the splits files from the splits directory // under each region. HRegion regionA = - new HRegion(basedir, log, fs, conf, regionAInfo, dirA, null); + new HRegion(basedir, log, fs, conf, regionAInfo, dirA, null, null); regionA.close(); HRegion regionB = - new HRegion(basedir, log, fs, conf, regionBInfo, dirB, null); + new HRegion(basedir, log, fs, conf, regionBInfo, dirB, null, null); regionB.close(); // Cleanup @@ -671,71 +664,6 @@ } /* - * Iterates through all the HStores and finds the one with the largest - * MapFile size. If the size is greater than the (currently hard-coded) - * threshold, returns true indicating that the region should be split. The - * midKey for the largest MapFile is returned through the midKey parameter. - * It is possible for us to rule the region non-splitable even in excess of - * configured size. This happens if region contains a reference file. If - * a reference file, the region can not be split. - * - * Note that there is no need to do locking in this method because it calls - * largestHStore which does the necessary locking. - * - * @param midKey midKey of the largest MapFile - * @return true if the region should be split. midKey is set by this method. - * Check it for a midKey value on return. - */ - boolean needsSplit(Text midKey) { - HStoreSize biggest = largestHStore(midKey); - if (biggest == null || midKey.getLength() == 0 || - (midKey.equals(getStartKey()) && midKey.equals(getEndKey())) ) { - return false; - } - boolean split = (biggest.getAggregate() >= this.desiredMaxFileSize); - if (split) { - if (!biggest.isSplitable()) { - LOG.warn("Region " + getRegionName().toString() + - " is NOT splitable though its aggregate size is " + - StringUtils.humanReadableInt(biggest.getAggregate()) + - " and desired size is " + - StringUtils.humanReadableInt(this.desiredMaxFileSize)); - split = false; - } else { - LOG.info("Splitting " + getRegionName().toString() + - " because largest aggregate size is " + - StringUtils.humanReadableInt(biggest.getAggregate()) + - " and desired size is " + - StringUtils.humanReadableInt(this.desiredMaxFileSize)); - } - } - return split; - } - - /** - * Only do a compaction if it is necessary - * - * @return whether or not there was a compaction - * @throws IOException - */ - public boolean compactIfNeeded() throws IOException { - boolean needsCompaction = false; - for (HStore store: stores.values()) { - if (store.needsCompaction()) { - needsCompaction = true; - if (LOG.isDebugEnabled()) { - LOG.debug(store.toString() + " needs compaction"); - } - break; - } - } - if (!needsCompaction) { - return false; - } - return compactStores(); - } - - /* * @param dir * @return compaction directory for the passed in dir */ @@ -761,28 +689,36 @@ this.fs.delete(this.regionCompactionDir); } } + + /** + * Called after region is opened to compact the HStores if necessary. + */ + public void compactStores() { + if (compactionListener != null) { + for (HStore store: stores.values()) { + this.compactionListener.compactionRequested(store, this); + } + } + } /** - * Compact all the stores. This should be called periodically to make sure - * the stores are kept manageable. + * Compact the specified HStore. * *

This operation could block for a long time, so don't call it from a * time-sensitive thread. * - * @return Returns TRUE if the compaction has completed. FALSE, if the - * compaction was not carried out, because the HRegion is busy doing - * something else storage-intensive (like flushing the cache). The caller - * should check back later. - * * Note that no locking is necessary at this level because compaction only * conflicts with a region split, and that cannot happen because the region * server does them sequentially and not in parallel. * + * @param store the HStore to compact + * @return mid key if split is needed * @throws IOException */ - public boolean compactStores() throws IOException { + Text compactStore(HStore store) throws IOException { + Text midKey = null; if (this.closed.get()) { - return false; + return midKey; } try { synchronized (writestate) { @@ -790,27 +726,21 @@ writestate.compacting = true; } else { LOG.info("NOT compacting region " + - this.regionInfo.getRegionName().toString() + ": compacting=" + + getRegionName() + " store " + store.storeName + ": compacting=" + writestate.compacting + ", writesEnabled=" + writestate.writesEnabled); - return false; + return midKey; } } long startTime = System.currentTimeMillis(); - LOG.info("starting compaction on region " + - this.regionInfo.getRegionName().toString()); - boolean status = true; + LOG.info("starting compaction on region " + getRegionName() + " store " + + store.storeName); doRegionCompactionPrep(); - for (HStore store : stores.values()) { - if(!store.compact()) { - status = false; - } - } + midKey = store.compact(); doRegionCompactionCleanup(); - LOG.info("compaction completed on region " + - this.regionInfo.getRegionName().toString() + ". Took " + + LOG.info("compaction completed on region " + getRegionName() + " store " + + store.storeName + ". Took " + StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime)); - return status; } finally { synchronized (writestate) { @@ -818,6 +748,7 @@ writestate.notifyAll(); } } + return midKey; } /** @@ -986,11 +917,15 @@ this.log.completeCacheFlush(this.regionInfo.getRegionName(), regionInfo.getTableDesc().getName(), sequenceId); - // D. Finally notify anyone waiting on memcache to clear: + // D. record latest flush time + this.lastFlushTime = System.currentTimeMillis(); + + // E. Finally notify anyone waiting on memcache to clear: // e.g. checkResources(). synchronized (this) { notifyAll(); } + if (LOG.isDebugEnabled()) { LOG.debug("Finished memcache flush for region " + this.regionInfo.getRegionName() + " in " + @@ -1254,8 +1189,8 @@ Text row = b.getRow(); long lockid = obtainRowLock(row); - long commitTime = - (b.getTimestamp() == LATEST_TIMESTAMP) ? System.currentTimeMillis() : b.getTimestamp(); + long commitTime = (b.getTimestamp() == LATEST_TIMESTAMP) ? + System.currentTimeMillis() : b.getTimestamp(); try { List deletes = null; @@ -1828,7 +1763,7 @@ fs.mkdirs(regionDir); return new HRegion(tableDir, new HLog(fs, new Path(regionDir, HREGION_LOGDIR_NAME), conf, null), - fs, conf, info, null, null); + fs, conf, info, null, null, null); } /** @@ -1870,8 +1805,7 @@ * @throws IOException */ public static void removeRegionFromMETA(final HRegionInterface srvr, - final Text metaRegionName, final Text regionName) - throws IOException { + final Text metaRegionName, final Text regionName) throws IOException { srvr.deleteAll(metaRegionName, regionName, HConstants.LATEST_TIMESTAMP); } @@ -1884,8 +1818,7 @@ * @throws IOException */ public static void offlineRegionInMETA(final HRegionInterface srvr, - final Text metaRegionName, final HRegionInfo info) - throws IOException { + final Text metaRegionName, final HRegionInfo info) throws IOException { BatchUpdate b = new BatchUpdate(info.getRegionName()); info.setOffline(true); b.put(COL_REGIONINFO, Writables.getBytes(info)); @@ -1906,8 +1839,7 @@ * @return True if deleted. */ public static boolean deleteRegion(FileSystem fs, Path rootdir, - HRegionInfo info) - throws IOException { + HRegionInfo info) throws IOException { Path p = HRegion.getRegionDir(rootdir, info); if (LOG.isDebugEnabled()) { LOG.debug("DELETING region " + p.toString()); Index: src/java/org/apache/hadoop/hbase/util/Migrate.java =================================================================== --- src/java/org/apache/hadoop/hbase/util/Migrate.java (revision 632429) +++ src/java/org/apache/hadoop/hbase/util/Migrate.java (working copy) @@ -383,7 +383,7 @@ HRegion rootRegion = new HRegion( new Path(rootdir, HConstants.ROOT_TABLE_NAME.toString()), log, fs, conf, - HRegionInfo.rootRegionInfo, null, null); + HRegionInfo.rootRegionInfo, null, null, null); try { HScannerInterface rootScanner = rootRegion.getScanner( @@ -430,7 +430,7 @@ HRegion metaRegion = new HRegion( new Path(rootdir, info.getTableDesc().getName().toString()), log, fs, - conf, info, null, null); + conf, info, null, null, null); try { HScannerInterface metaScanner = metaRegion.getScanner(