Index: src/contrib/hbase/conf/hbase-default.xml =================================================================== --- src/contrib/hbase/conf/hbase-default.xml (revision 617638) +++ src/contrib/hbase/conf/hbase-default.xml (working copy) @@ -144,16 +144,6 @@ - hbase.regionserver.optionalcacheflushinterval - 1800000 - - Amount of time to wait since the last time a region was flushed before - invoking an optional cache flush (An optional cache flush is a - flush even though memcache is not at the memcache.flush.size). - Default: 30 minutes (in miliseconds) - - - hbase.hregion.memcache.flush.size 67108864 Index: src/contrib/hbase/src/test/hbase-site.xml =================================================================== --- src/contrib/hbase/src/test/hbase-site.xml (revision 617638) +++ src/contrib/hbase/src/test/hbase-site.xml (working copy) @@ -111,14 +111,6 @@ - hbase.regionserver.optionalcacheflushinterval - 10000 - - Amount of time to wait since the last time a region was flushed before - invoking an optional cache flush. Default 60,000. - - - hbase.rootdir /hbase location of HBase instance in dfs Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestGet2.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestGet2.java (revision 617638) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestGet2.java (working copy) @@ -77,7 +77,7 @@ assertColumnsPresent(region, actualStartRow); assertColumnsPresent(region, actualStopRow); // Force a flush so store files come into play. - region.flushcache(); + flushcache(region); // Assert I got all out. assertColumnsPresent(region, actualStartRow); assertColumnsPresent(region, actualStopRow); @@ -140,7 +140,10 @@ } } - /** For HADOOP-2443 */ + /** + * For HADOOP-2443 + * @throws IOException + */ public void testGetClosestRowBefore() throws IOException{ HRegion region = null; @@ -148,7 +151,6 @@ try { HTableDescriptor htd = createTableDescriptor(getName()); - HRegionInfo hri = new HRegionInfo(htd, null, null); region = createNewHRegion(htd, null, null); region_incommon = new HRegionIncommon(region); @@ -190,7 +192,7 @@ assertEquals(new String(results.get(COLUMNS[0])), "t40 bytes"); // force a flush - region.flushcache(); + flushcache(region); // try finding "015" results = region.getClosestRowBefore(t15, HConstants.LATEST_TIMESTAMP); Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestMergeTable.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestMergeTable.java (revision 617638) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestMergeTable.java (working copy) @@ -32,7 +32,7 @@ */ public void testMergeTable() throws IOException { assertNotNull(dfsCluster); - MiniHBaseCluster hCluster = new MiniHBaseCluster(conf, 1, dfsCluster, true); + MiniHBaseCluster hCluster = new MiniHBaseCluster(conf, 1, dfsCluster); try { HMerge.merge(conf, dfsCluster.getFileSystem(), desc.getName()); } finally { Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/AbstractMergeTestBase.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/AbstractMergeTestBase.java (revision 617638) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/AbstractMergeTestBase.java (working copy) @@ -143,7 +143,6 @@ r.flushcache(); } } - region.compactIfNeeded(); region.close(); region.getLog().closeAndDelete(); region.getRegionInfo().setOffline(true); Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java (revision 617638) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java (working copy) @@ -99,6 +99,10 @@ super.setUp(); localfs = (conf.get("fs.default.name", "file:///").compareTo("file::///") == 0); + + if (localfs) { + LOG.warn("default file system is local fs"); + } try { this.fs = FileSystem.get(conf); @@ -310,6 +314,44 @@ } } + protected Text needsSplit(final HRegion r) { + long maxSize = 0; + HStore largestHStore = null; + for (HStore store: r.getStores()) { + long size = store.getSize(); + if (size > maxSize) { + maxSize = size; + largestHStore = store; + } + } + if (largestHStore != null) { + return largestHStore.checkSplit(); + } + return null; + } + + protected static void flushcache(HRegion r) throws IOException { + for (HStore store: r.getStores()) { + try { + r.flushcache(store); + } catch (IOException e) { + LOG.error("Error flushing cache for HStore: " + store.storeName, e); + throw e; + } + } + } + + protected static void compactStores(HRegion r) throws IOException { + for (HStore store: r.getStores()) { + try { + r.compactStore(store); + } catch (IOException e) { + LOG.error("Error compacting HStore: " + store.storeName, e); + throw e; + } + } + } + /** * Implementors can flushcache. */ @@ -502,7 +544,7 @@ } /** {@inheritDoc} */ public void flushcache() throws IOException { - this.region.flushcache(); + HBaseTestCase.flushcache(this.region); } } Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHLog.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHLog.java (revision 617638) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHLog.java (working copy) @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase; import java.io.IOException; -import java.util.TreeMap; import org.apache.hadoop.dfs.MiniDFSCluster; import org.apache.hadoop.fs.Path; @@ -68,12 +67,12 @@ for (int ii = 0; ii < 3; ii++) { for (int i = 0; i < 3; i++) { for (int j = 0; j < 3; j++) { - TreeMap edit = new TreeMap(); + Text storeName = + new Text(Integer.toString(i) + "/" + Integer.toString(j)); Text column = new Text(Integer.toString(j)); - edit.put( - new HStoreKey(rowName, column, System.currentTimeMillis()), - column.getBytes()); - log.append(new Text(Integer.toString(i)), tableName, edit); + HStoreKey key = + new HStoreKey(rowName, column, System.currentTimeMillis()); + log.append(storeName, tableName, key, column.getBytes()); } } log.rollWriter(); @@ -92,7 +91,9 @@ */ public void testAppend() throws IOException { final int COL_COUNT = 10; - final Text regionName = new Text("regionname"); + Text[] storeNames = new Text[COL_COUNT]; + final String regionName = + HRegionInfo.encodeRegionName(new Text("regionname")); final Text tableName = new Text("tablename"); final Text row = new Text("row"); Reader reader = null; @@ -101,14 +102,18 @@ // Write columns named 1, 2, 3, etc. and then values of single byte // 1, 2, 3... long timestamp = System.currentTimeMillis(); - TreeMap cols = new TreeMap(); for (int i = 0; i < COL_COUNT; i++) { - cols.put(new HStoreKey(row, new Text(Integer.toString(i)), timestamp), - new byte[] { (byte)(i + '0') }); + String column = Integer.toString(i); + storeNames[i] = new Text(regionName + "/" + column); + HStoreKey key = + new HStoreKey(row, new Text(column), timestamp); + byte[] val = new byte[] { (byte)(i + '0')}; + log.append(storeNames[i], tableName, key, val); } - log.append(regionName, tableName, cols); - long logSeqId = log.startCacheFlush(); - log.completeCacheFlush(regionName, tableName, logSeqId); + for (int i = 0; i < COL_COUNT; i++) { + long logSeqId = log.startCacheFlush(); + log.completeCacheFlush(storeNames[i], tableName, logSeqId); + } log.close(); Path filename = log.computeFilename(log.filenum - 1); log = null; @@ -118,15 +123,16 @@ HLogEdit val = new HLogEdit(); for (int i = 0; i < COL_COUNT; i++) { reader.next(key, val); - assertEquals(regionName, key.getRegionName()); + assertTrue(storeNames[i].compareTo(key.getStoreName()) == 0); assertEquals(tableName, key.getTablename()); assertEquals(row, key.getRow()); assertEquals((byte)(i + '0'), val.getVal()[0]); System.out.println(key + " " + val); } - while (reader.next(key, val)) { + for (int i = 0; i < COL_COUNT; i++) { + reader.next(key, val); // Assert only one more row... the meta flushed row. - assertEquals(regionName, key.getRegionName()); + assertTrue(storeNames[i].compareTo(key.getStoreName()) == 0); assertEquals(tableName, key.getTablename()); assertEquals(HLog.METAROW, key.getRow()); assertEquals(HLog.METACOLUMN, val.getColumn()); Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (revision 617638) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (working copy) @@ -44,7 +44,7 @@ private boolean shutdownDFS; private Path parentdir; private LocalHBaseCluster hbaseCluster; - private boolean deleteOnExit = true; + private boolean deleteOnExit = false; /** * Starts a MiniHBaseCluster on top of a new MiniDFSCluster @@ -88,17 +88,15 @@ * @param conf * @param nRegionNodes * @param dfsCluster - * @param deleteOnExit * @throws IOException */ public MiniHBaseCluster(HBaseConfiguration conf, int nRegionNodes, - MiniDFSCluster dfsCluster, boolean deleteOnExit) throws IOException { + MiniDFSCluster dfsCluster) throws IOException { this.conf = conf; this.fs = dfsCluster.getFileSystem(); this.cluster = dfsCluster; this.shutdownDFS = false; - this.deleteOnExit = deleteOnExit; init(nRegionNodes); } @@ -118,7 +116,6 @@ throws IOException { this.conf = conf; - this.deleteOnExit = deleteOnExit; this.shutdownDFS = false; if (miniHdfsFilesystem) { try { @@ -130,6 +127,7 @@ throw e; } } else { + this.deleteOnExit = deleteOnExit; this.cluster = null; this.fs = FileSystem.get(conf); } @@ -262,7 +260,9 @@ for (LocalHBaseCluster.RegionServerThread t: this.hbaseCluster.getRegionServers()) { for(HRegion r: t.getRegionServer().onlineRegions.values() ) { - r.flushcache(); + for (HStore store: r.getStores()) { + r.flushcache(store); + } } } } Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTimestamp.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTimestamp.java (revision 617638) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTimestamp.java (working copy) @@ -106,7 +106,7 @@ */ public void testTimestamps() throws IOException { final MiniHBaseCluster cluster = - new MiniHBaseCluster(this.conf, 1, this.cluster, true); + new MiniHBaseCluster(this.conf, 1, this.cluster); try { HTable t = createTable(); Incommon incommon = new HTableIncommon(t); Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/MultiRegionTable.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/MultiRegionTable.java (revision 617638) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/MultiRegionTable.java (working copy) @@ -83,13 +83,13 @@ // one regionserver. Of not, the split may already have happened by the // time we got here. If so, then the region found when we go searching // with EMPTY_START_ROW will be one of the unsplittable daughters. + HRegionServer server = cluster.getRegionThreads().get(0).getRegionServer(); HRegionInfo hri = null; HRegion r = null; for (int i = 0; i < 30; i++) { hri = t.getRegionLocation(HConstants.EMPTY_START_ROW).getRegionInfo(); LOG.info("Region location: " + hri); - r = cluster.getRegionThreads().get(0).getRegionServer(). - onlineRegions.get(hri.getRegionName()); + r = server.onlineRegions.get(hri.getRegionName()); if (r != null) { break; } @@ -99,11 +99,23 @@ LOG.warn("Waiting on region to come online", e); } } + // Flush caches + flushcache(r); + + // Do compactions if necessary + compactStores(r); + + // Do splits + for (HStore store: r.getStores()) { + Text midkey = store.checkSplit(); + if (midkey != null && midkey.getLength() != 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("requesting split on region " + r.getRegionName()); + } + server.splitRequested(r, midkey); + } + } - // Flush the cache - cluster.getRegionThreads().get(0).getRegionServer().getCacheFlushListener(). - flushRequested(r); - // Now, wait until split makes it into the meta table. int oldCount = count; for (int i = 0; i < retries; i++) { @@ -312,17 +324,18 @@ * @throws IOException */ protected static void compact(final MiniHBaseCluster cluster, - final HRegionInfo r) - throws IOException { + final HRegionInfo r) { if (r == null) { LOG.debug("Passed region is null"); return; } LOG.info("Starting compaction"); for (LocalHBaseCluster.RegionServerThread thread: - cluster.getRegionThreads()) { - SortedMap regions = thread.getRegionServer().onlineRegions; + cluster.getRegionThreads()) { + HRegionServer server = thread.getRegionServer(); + SortedMap regions = server.onlineRegions; + // Retry if ConcurrentModification... alternative of sync'ing is not // worth it for sake of unit test. for (int i = 0; i < 10; i++) { @@ -330,7 +343,9 @@ for (HRegion online: regions.values()) { if (online.getRegionName().toString(). equals(r.getRegionName().toString())) { - online.compactStores(); + for (HStore store: online.getStores()) { + server.compactionRequested(store, online); + } } } break; Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java (revision 617638) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java (working copy) @@ -118,7 +118,7 @@ } private void startAndWriteData() throws Exception { - cluster = new MiniHBaseCluster(conf, 1, dfs, true); + cluster = new MiniHBaseCluster(conf, 1, dfs); try { Thread.sleep(10 * 1000); // Wait for region server to start } catch (InterruptedException e) { @@ -181,7 +181,7 @@ List regions = new ArrayList(server.getOnlineRegions().values()); for (HRegion r: regions) { - r.flushcache(); + flushcache(r); } // Now roll the log Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java (revision 617638) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java (working copy) @@ -110,7 +110,7 @@ fs.mkdirs(dir); // Start up HBase cluster - hCluster = new MiniHBaseCluster(conf, 1, dfsCluster, true); + hCluster = new MiniHBaseCluster(conf, 1, dfsCluster); // Create a table. HBaseAdmin admin = new HBaseAdmin(conf); @@ -249,6 +249,7 @@ } } + @SuppressWarnings("deprecation") private void verify() throws IOException { // Force a cache flush for every online region to ensure that when the // scanner takes its snapshot, all the updates have made it into the cache. Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java (revision 617638) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java (working copy) @@ -1,441 +1,441 @@ -/** - * Copyright 2007 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapred; - -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.util.Map; -import java.util.TreeMap; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.dfs.MiniDFSCluster; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseAdmin; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HScannerInterface; -import org.apache.hadoop.hbase.HStoreKey; -import org.apache.hadoop.hbase.HTable; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MiniHBaseCluster; -import org.apache.hadoop.hbase.MultiRegionTable; -import org.apache.hadoop.hbase.StaticTestEnvironment; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.io.MapWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.MiniMRCluster; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.OutputCollector; - -/** - * Test Map/Reduce job over HBase tables - */ -public class TestTableMapReduce extends MultiRegionTable { - @SuppressWarnings("hiding") - private static final Log LOG = - LogFactory.getLog(TestTableMapReduce.class.getName()); - - static final String SINGLE_REGION_TABLE_NAME = "srtest"; - static final String MULTI_REGION_TABLE_NAME = "mrtest"; - static final String INPUT_COLUMN = "contents:"; - static final Text TEXT_INPUT_COLUMN = new Text(INPUT_COLUMN); - static final String OUTPUT_COLUMN = "text:"; - static final Text TEXT_OUTPUT_COLUMN = new Text(OUTPUT_COLUMN); - - private static final Text[] columns = { - TEXT_INPUT_COLUMN, - TEXT_OUTPUT_COLUMN - }; - - private MiniDFSCluster dfsCluster = null; - private Path dir; - private MiniHBaseCluster hCluster = null; - - private static byte[][] values = null; - - static { - try { - values = new byte[][] { - "0123".getBytes(HConstants.UTF8_ENCODING), - "abcd".getBytes(HConstants.UTF8_ENCODING), - "wxyz".getBytes(HConstants.UTF8_ENCODING), - "6789".getBytes(HConstants.UTF8_ENCODING) - }; - } catch (UnsupportedEncodingException e) { - fail(); - } - } - - /** constructor */ - public TestTableMapReduce() { - super(); - - // Make sure the cache gets flushed so we trigger a compaction(s) and - // hence splits. - conf.setInt("hbase.hregion.memcache.flush.size", 1024 * 1024); - - // Always compact if there is more than one store file. - conf.setInt("hbase.hstore.compactionThreshold", 2); - - // This size should make it so we always split using the addContent - // below. After adding all data, the first region is 1.3M - conf.setLong("hbase.hregion.max.filesize", 256 * 1024); - - // Make lease timeout longer, lease checks less frequent - conf.setInt("hbase.master.lease.period", 10 * 1000); - conf.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000); - - // Set client pause to the original default - conf.setInt("hbase.client.pause", 10 * 1000); - } - - /** - * {@inheritDoc} - */ - @Override - public void setUp() throws Exception { - dfsCluster = new MiniDFSCluster(conf, 1, true, (String[])null); - - // Must call super.setup() after starting mini dfs cluster. Otherwise - // we get a local file system instead of hdfs - - super.setUp(); - try { - dir = new Path("/hbase"); - fs.mkdirs(dir); - // Start up HBase cluster - // Only one region server. MultiRegionServer manufacturing code below - // depends on there being one region server only. - hCluster = new MiniHBaseCluster(conf, 1, dfsCluster, true); - LOG.info("Master is at " + this.conf.get(HConstants.MASTER_ADDRESS)); - } catch (Exception e) { - StaticTestEnvironment.shutdownDfs(dfsCluster); - throw e; - } - } - - /** - * {@inheritDoc} - */ - @Override - public void tearDown() throws Exception { - super.tearDown(); - if(hCluster != null) { - hCluster.shutdown(); - } - StaticTestEnvironment.shutdownDfs(dfsCluster); - } - - /** - * Pass the given key and processed record reduce - */ - public static class ProcessContentsMapper extends TableMap { - /** - * Pass the key, and reversed value to reduce - * - * @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.io.MapWritable, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter) - */ - @SuppressWarnings("unchecked") - @Override - public void map(HStoreKey key, MapWritable value, - OutputCollector output, - @SuppressWarnings("unused") Reporter reporter) throws IOException { - - Text tKey = key.getRow(); - - if(value.size() != 1) { - throw new IOException("There should only be one input column"); - } - - Text[] keys = value.keySet().toArray(new Text[value.size()]); - if(!keys[0].equals(TEXT_INPUT_COLUMN)) { - throw new IOException("Wrong input column. Expected: " + INPUT_COLUMN - + " but got: " + keys[0]); - } - - // Get the original value and reverse it - - String originalValue = - new String(((ImmutableBytesWritable)value.get(keys[0])).get(), - HConstants.UTF8_ENCODING); - StringBuilder newValue = new StringBuilder(); - for(int i = originalValue.length() - 1; i >= 0; i--) { - newValue.append(originalValue.charAt(i)); - } - - // Now set the value to be collected - - MapWritable outval = new MapWritable(); - outval.put(TEXT_OUTPUT_COLUMN, new ImmutableBytesWritable( - newValue.toString().getBytes(HConstants.UTF8_ENCODING))); - - output.collect(tKey, outval); - } - } - - /** - * Test hbase mapreduce jobs against single region and multi-region tables. - * @throws IOException - */ - public void testTableMapReduce() throws IOException { - localTestSingleRegionTable(); - localTestMultiRegionTable(); - } - - /* - * Test against a single region. - * @throws IOException - */ - private void localTestSingleRegionTable() throws IOException { - HTableDescriptor desc = new HTableDescriptor(SINGLE_REGION_TABLE_NAME); - desc.addFamily(new HColumnDescriptor(INPUT_COLUMN)); - desc.addFamily(new HColumnDescriptor(OUTPUT_COLUMN)); - - // Create a table. - HBaseAdmin admin = new HBaseAdmin(this.conf); - admin.createTable(desc); - - // insert some data into the test table - HTable table = new HTable(conf, new Text(SINGLE_REGION_TABLE_NAME)); - - try { - for(int i = 0; i < values.length; i++) { - long lockid = table.startUpdate(new Text("row_" - + String.format("%1$05d", i))); - - try { - table.put(lockid, TEXT_INPUT_COLUMN, values[i]); - table.commit(lockid, System.currentTimeMillis()); - lockid = -1; - } finally { - if (lockid != -1) - table.abort(lockid); - } - } - - LOG.info("Print table contents before map/reduce for " + - SINGLE_REGION_TABLE_NAME); - scanTable(SINGLE_REGION_TABLE_NAME, true); - - @SuppressWarnings("deprecation") - MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1); - - try { - JobConf jobConf = new JobConf(conf, TestTableMapReduce.class); - jobConf.setJobName("process column contents"); - jobConf.setNumMapTasks(1); - jobConf.setNumReduceTasks(1); - - TableMap.initJob(SINGLE_REGION_TABLE_NAME, INPUT_COLUMN, - ProcessContentsMapper.class, jobConf); - - TableReduce.initJob(SINGLE_REGION_TABLE_NAME, - IdentityTableReduce.class, jobConf); - LOG.info("Started " + SINGLE_REGION_TABLE_NAME); - JobClient.runJob(jobConf); - - LOG.info("Print table contents after map/reduce for " + - SINGLE_REGION_TABLE_NAME); - scanTable(SINGLE_REGION_TABLE_NAME, true); - - // verify map-reduce results - verify(SINGLE_REGION_TABLE_NAME); - } finally { - mrCluster.shutdown(); - } - } finally { - table.close(); - } - } - - /* - * Test against multiple regions. - * @throws IOException - */ - private void localTestMultiRegionTable() throws IOException { - HTableDescriptor desc = new HTableDescriptor(MULTI_REGION_TABLE_NAME); - desc.addFamily(new HColumnDescriptor(INPUT_COLUMN)); - desc.addFamily(new HColumnDescriptor(OUTPUT_COLUMN)); - - // Create a table. - HBaseAdmin admin = new HBaseAdmin(this.conf); - admin.createTable(desc); - - // Populate a table into multiple regions - makeMultiRegionTable(conf, hCluster, fs, MULTI_REGION_TABLE_NAME, - INPUT_COLUMN); - - // Verify table indeed has multiple regions - HTable table = new HTable(conf, new Text(MULTI_REGION_TABLE_NAME)); - try { - Text[] startKeys = table.getStartKeys(); - assertTrue(startKeys.length > 1); - - @SuppressWarnings("deprecation") - MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1); - - try { - JobConf jobConf = new JobConf(conf, TestTableMapReduce.class); - jobConf.setJobName("process column contents"); - jobConf.setNumMapTasks(2); - jobConf.setNumReduceTasks(1); - - TableMap.initJob(MULTI_REGION_TABLE_NAME, INPUT_COLUMN, - ProcessContentsMapper.class, jobConf); - - TableReduce.initJob(MULTI_REGION_TABLE_NAME, - IdentityTableReduce.class, jobConf); - LOG.info("Started " + MULTI_REGION_TABLE_NAME); - JobClient.runJob(jobConf); - - // verify map-reduce results - verify(MULTI_REGION_TABLE_NAME); - } finally { - mrCluster.shutdown(); - } - } finally { - table.close(); - } - } - - private void scanTable(String tableName, boolean printValues) - throws IOException { - HTable table = new HTable(conf, new Text(tableName)); - - HScannerInterface scanner = - table.obtainScanner(columns, HConstants.EMPTY_START_ROW); - - try { - HStoreKey key = new HStoreKey(); - TreeMap results = new TreeMap(); - - while(scanner.next(key, results)) { - if (printValues) { - LOG.info("row: " + key.getRow()); - - for(Map.Entry e: results.entrySet()) { - LOG.info(" column: " + e.getKey() + " value: " - + new String(e.getValue(), HConstants.UTF8_ENCODING)); - } - } - } - - } finally { - scanner.close(); - } - } - - @SuppressWarnings("null") - private void verify(String tableName) throws IOException { - HTable table = new HTable(conf, new Text(tableName)); - boolean verified = false; - long pause = conf.getLong("hbase.client.pause", 5 * 1000); - int numRetries = conf.getInt("hbase.client.retries.number", 5); - for (int i = 0; i < numRetries; i++) { - try { - verifyAttempt(table); - verified = true; - break; - } catch (NullPointerException e) { - // If here, a cell was empty. Presume its because updates came in - // after the scanner had been opened. Wait a while and retry. - LOG.debug("Verification attempt failed: " + e.getMessage()); - } - try { - Thread.sleep(pause); - } catch (InterruptedException e) { - // continue - } - } - assertTrue(verified); - } - - /** - * Looks at every value of the mapreduce output and verifies that indeed - * the values have been reversed. - * @param table Table to scan. - * @throws IOException - * @throws NullPointerException if we failed to find a cell value - */ - private void verifyAttempt(final HTable table) throws IOException, NullPointerException { - HScannerInterface scanner = - table.obtainScanner(columns, HConstants.EMPTY_START_ROW); - try { - HStoreKey key = new HStoreKey(); - TreeMap results = new TreeMap(); - - while(scanner.next(key, results)) { - if (LOG.isDebugEnabled()) { - if (results.size() > 2 ) { - throw new IOException("Too many results, expected 2 got " + - results.size()); - } - } - byte[] firstValue = null; - byte[] secondValue = null; - int count = 0; - for(Map.Entry e: results.entrySet()) { - if (count == 0) { - firstValue = e.getValue(); - } - if (count == 1) { - secondValue = e.getValue(); - } - count++; - if (count == 2) { - break; - } - } - - String first = ""; - if (firstValue == null) { - throw new NullPointerException(key.getRow().toString() + - ": first value is null"); - } - first = new String(firstValue, HConstants.UTF8_ENCODING); - - String second = ""; - if (secondValue == null) { - throw new NullPointerException(key.getRow().toString() + - ": second value is null"); - } - byte[] secondReversed = new byte[secondValue.length]; - for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) { - secondReversed[i] = secondValue[j]; - } - second = new String(secondReversed, HConstants.UTF8_ENCODING); - - if (first.compareTo(second) != 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("second key is not the reverse of first. row=" + - key.getRow() + ", first value=" + first + ", second value=" + - second); - } - fail(); - } - } - } finally { - scanner.close(); - } - } -} \ No newline at end of file +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.dfs.MiniDFSCluster; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseAdmin; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HScannerInterface; +import org.apache.hadoop.hbase.HStoreKey; +import org.apache.hadoop.hbase.HTable; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.MultiRegionTable; +import org.apache.hadoop.hbase.StaticTestEnvironment; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MiniMRCluster; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.OutputCollector; + +/** + * Test Map/Reduce job over HBase tables + */ +public class TestTableMapReduce extends MultiRegionTable { + @SuppressWarnings("hiding") + private static final Log LOG = + LogFactory.getLog(TestTableMapReduce.class.getName()); + + static final String SINGLE_REGION_TABLE_NAME = "srtest"; + static final String MULTI_REGION_TABLE_NAME = "mrtest"; + static final String INPUT_COLUMN = "contents:"; + static final Text TEXT_INPUT_COLUMN = new Text(INPUT_COLUMN); + static final String OUTPUT_COLUMN = "text:"; + static final Text TEXT_OUTPUT_COLUMN = new Text(OUTPUT_COLUMN); + + private static final Text[] columns = { + TEXT_INPUT_COLUMN, + TEXT_OUTPUT_COLUMN + }; + + private MiniDFSCluster dfsCluster = null; + private Path dir; + private MiniHBaseCluster hCluster = null; + + private static byte[][] values = null; + + static { + try { + values = new byte[][] { + "0123".getBytes(HConstants.UTF8_ENCODING), + "abcd".getBytes(HConstants.UTF8_ENCODING), + "wxyz".getBytes(HConstants.UTF8_ENCODING), + "6789".getBytes(HConstants.UTF8_ENCODING) + }; + } catch (UnsupportedEncodingException e) { + fail(); + } + } + + /** constructor */ + public TestTableMapReduce() { + super(); + + // Make sure the cache gets flushed so we trigger a compaction(s) and + // hence splits. + conf.setInt("hbase.hregion.memcache.flush.size", 1024 * 1024); + + // Always compact if there is more than one store file. + conf.setInt("hbase.hstore.compactionThreshold", 2); + + // This size should make it so we always split using the addContent + // below. After adding all data, the first region is 1.3M + conf.setLong("hbase.hregion.max.filesize", 256 * 1024); + + // Make lease timeout longer, lease checks less frequent + conf.setInt("hbase.master.lease.period", 10 * 1000); + conf.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000); + + // Set client pause to the original default + conf.setInt("hbase.client.pause", 10 * 1000); + } + + /** + * {@inheritDoc} + */ + @Override + public void setUp() throws Exception { + dfsCluster = new MiniDFSCluster(conf, 1, true, (String[])null); + + // Must call super.setup() after starting mini dfs cluster. Otherwise + // we get a local file system instead of hdfs + + super.setUp(); + try { + dir = new Path("/hbase"); + fs.mkdirs(dir); + // Start up HBase cluster + // Only one region server. MultiRegionServer manufacturing code below + // depends on there being one region server only. + hCluster = new MiniHBaseCluster(conf, 1, dfsCluster); + LOG.info("Master is at " + this.conf.get(HConstants.MASTER_ADDRESS)); + } catch (Exception e) { + StaticTestEnvironment.shutdownDfs(dfsCluster); + throw e; + } + } + + /** + * {@inheritDoc} + */ + @Override + public void tearDown() throws Exception { + super.tearDown(); + if(hCluster != null) { + hCluster.shutdown(); + } + StaticTestEnvironment.shutdownDfs(dfsCluster); + } + + /** + * Pass the given key and processed record reduce + */ + public static class ProcessContentsMapper extends TableMap { + /** + * Pass the key, and reversed value to reduce + * + * @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey,org.apache.hadoop.io.MapWritable,org.apache.hadoop.mapred.OutputCollector,org.apache.hadoop.mapred.Reporter) + */ + @SuppressWarnings("unchecked") + @Override + public void map(HStoreKey key, MapWritable value, + OutputCollector output, + @SuppressWarnings("unused") Reporter reporter) throws IOException { + + Text tKey = key.getRow(); + + if(value.size() != 1) { + throw new IOException("There should only be one input column"); + } + + Text[] keys = value.keySet().toArray(new Text[value.size()]); + if(!keys[0].equals(TEXT_INPUT_COLUMN)) { + throw new IOException("Wrong input column. Expected: " + INPUT_COLUMN + + " but got: " + keys[0]); + } + + // Get the original value and reverse it + + String originalValue = + new String(((ImmutableBytesWritable)value.get(keys[0])).get(), + HConstants.UTF8_ENCODING); + StringBuilder newValue = new StringBuilder(); + for(int i = originalValue.length() - 1; i >= 0; i--) { + newValue.append(originalValue.charAt(i)); + } + + // Now set the value to be collected + + MapWritable outval = new MapWritable(); + outval.put(TEXT_OUTPUT_COLUMN, new ImmutableBytesWritable( + newValue.toString().getBytes(HConstants.UTF8_ENCODING))); + + output.collect(tKey, outval); + } + } + + /** + * Test hbase mapreduce jobs against single region and multi-region tables. + * @throws IOException + */ + public void testTableMapReduce() throws IOException { + localTestSingleRegionTable(); + localTestMultiRegionTable(); + } + + /* + * Test against a single region. + * @throws IOException + */ + private void localTestSingleRegionTable() throws IOException { + HTableDescriptor desc = new HTableDescriptor(SINGLE_REGION_TABLE_NAME); + desc.addFamily(new HColumnDescriptor(INPUT_COLUMN)); + desc.addFamily(new HColumnDescriptor(OUTPUT_COLUMN)); + + // Create a table. + HBaseAdmin admin = new HBaseAdmin(this.conf); + admin.createTable(desc); + + // insert some data into the test table + HTable table = new HTable(conf, new Text(SINGLE_REGION_TABLE_NAME)); + + try { + for(int i = 0; i < values.length; i++) { + long lockid = table.startUpdate(new Text("row_" + + String.format("%1$05d", i))); + + try { + table.put(lockid, TEXT_INPUT_COLUMN, values[i]); + table.commit(lockid, System.currentTimeMillis()); + lockid = -1; + } finally { + if (lockid != -1) + table.abort(lockid); + } + } + + LOG.info("Print table contents before map/reduce for " + + SINGLE_REGION_TABLE_NAME); + scanTable(SINGLE_REGION_TABLE_NAME, true); + + @SuppressWarnings("deprecation") + MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1); + + try { + JobConf jobConf = new JobConf(conf, TestTableMapReduce.class); + jobConf.setJobName("process column contents"); + jobConf.setNumMapTasks(1); + jobConf.setNumReduceTasks(1); + + TableMap.initJob(SINGLE_REGION_TABLE_NAME, INPUT_COLUMN, + ProcessContentsMapper.class, jobConf); + + TableReduce.initJob(SINGLE_REGION_TABLE_NAME, + IdentityTableReduce.class, jobConf); + LOG.info("Started " + SINGLE_REGION_TABLE_NAME); + JobClient.runJob(jobConf); + + LOG.info("Print table contents after map/reduce for " + + SINGLE_REGION_TABLE_NAME); + scanTable(SINGLE_REGION_TABLE_NAME, true); + + // verify map-reduce results + verify(SINGLE_REGION_TABLE_NAME); + } finally { + mrCluster.shutdown(); + } + } finally { + table.close(); + } + } + + /* + * Test against multiple regions. + * @throws IOException + */ + private void localTestMultiRegionTable() throws IOException { + HTableDescriptor desc = new HTableDescriptor(MULTI_REGION_TABLE_NAME); + desc.addFamily(new HColumnDescriptor(INPUT_COLUMN)); + desc.addFamily(new HColumnDescriptor(OUTPUT_COLUMN)); + + // Create a table. + HBaseAdmin admin = new HBaseAdmin(this.conf); + admin.createTable(desc); + + // Populate a table into multiple regions + makeMultiRegionTable(conf, hCluster, fs, MULTI_REGION_TABLE_NAME, + INPUT_COLUMN); + + // Verify table indeed has multiple regions + HTable table = new HTable(conf, new Text(MULTI_REGION_TABLE_NAME)); + try { + Text[] startKeys = table.getStartKeys(); + assertTrue(startKeys.length > 1); + + @SuppressWarnings("deprecation") + MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1); + + try { + JobConf jobConf = new JobConf(conf, TestTableMapReduce.class); + jobConf.setJobName("process column contents"); + jobConf.setNumMapTasks(2); + jobConf.setNumReduceTasks(1); + + TableMap.initJob(MULTI_REGION_TABLE_NAME, INPUT_COLUMN, + ProcessContentsMapper.class, jobConf); + + TableReduce.initJob(MULTI_REGION_TABLE_NAME, + IdentityTableReduce.class, jobConf); + LOG.info("Started " + MULTI_REGION_TABLE_NAME); + JobClient.runJob(jobConf); + + // verify map-reduce results + verify(MULTI_REGION_TABLE_NAME); + } finally { + mrCluster.shutdown(); + } + } finally { + table.close(); + } + } + + private void scanTable(String tableName, boolean printValues) + throws IOException { + HTable table = new HTable(conf, new Text(tableName)); + + HScannerInterface scanner = + table.obtainScanner(columns, HConstants.EMPTY_START_ROW); + + try { + HStoreKey key = new HStoreKey(); + TreeMap results = new TreeMap(); + + while(scanner.next(key, results)) { + if (printValues) { + LOG.info("row: " + key.getRow()); + + for(Map.Entry e: results.entrySet()) { + LOG.info(" column: " + e.getKey() + " value: " + + new String(e.getValue(), HConstants.UTF8_ENCODING)); + } + } + } + + } finally { + scanner.close(); + } + } + + @SuppressWarnings("null") + private void verify(String tableName) throws IOException { + HTable table = new HTable(conf, new Text(tableName)); + boolean verified = false; + long pause = conf.getLong("hbase.client.pause", 5 * 1000); + int numRetries = conf.getInt("hbase.client.retries.number", 5); + for (int i = 0; i < numRetries; i++) { + try { + verifyAttempt(table); + verified = true; + break; + } catch (NullPointerException e) { + // If here, a cell was empty. Presume its because updates came in + // after the scanner had been opened. Wait a while and retry. + LOG.debug("Verification attempt failed: " + e.getMessage()); + } + try { + Thread.sleep(pause); + } catch (InterruptedException e) { + // continue + } + } + assertTrue(verified); + } + + /** + * Looks at every value of the mapreduce output and verifies that indeed + * the values have been reversed. + * @param table Table to scan. + * @throws IOException + * @throws NullPointerException if we failed to find a cell value + */ + private void verifyAttempt(final HTable table) throws IOException, NullPointerException { + HScannerInterface scanner = + table.obtainScanner(columns, HConstants.EMPTY_START_ROW); + try { + HStoreKey key = new HStoreKey(); + TreeMap results = new TreeMap(); + + while(scanner.next(key, results)) { + if (LOG.isDebugEnabled()) { + if (results.size() > 2 ) { + throw new IOException("Too many results, expected 2 got " + + results.size()); + } + } + byte[] firstValue = null; + byte[] secondValue = null; + int count = 0; + for(Map.Entry e: results.entrySet()) { + if (count == 0) { + firstValue = e.getValue(); + } + if (count == 1) { + secondValue = e.getValue(); + } + count++; + if (count == 2) { + break; + } + } + + String first = ""; + if (firstValue == null) { + throw new NullPointerException(key.getRow().toString() + + ": first value is null"); + } + first = new String(firstValue, HConstants.UTF8_ENCODING); + + String second = ""; + if (secondValue == null) { + throw new NullPointerException(key.getRow().toString() + + ": second value is null"); + } + byte[] secondReversed = new byte[secondValue.length]; + for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) { + secondReversed[i] = secondValue[j]; + } + second = new String(secondReversed, HConstants.UTF8_ENCODING); + + if (first.compareTo(second) != 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("second key is not the reverse of first. row=" + + key.getRow() + ", first value=" + first + ", second value=" + + second); + } + fail(); + } + } + } finally { + scanner.close(); + } + } +} Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java (revision 617638) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java (working copy) @@ -19,7 +19,6 @@ */ package org.apache.hadoop.hbase; -import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; @@ -27,7 +26,6 @@ import java.util.TreeMap; import org.apache.hadoop.dfs.MiniDFSCluster; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.log4j.Logger; @@ -51,7 +49,6 @@ */ public void testHRegion() throws IOException { try { - setup(); locks(); badPuts(); basic(); @@ -59,13 +56,12 @@ batchWrite(); splitAndMerge(); read(); - cleanup(); } finally { if (r != null) { r.close(); } if (log != null) { - log.closeAndDelete(); + log.close(); } StaticTestEnvironment.shutdownDfs(cluster); } @@ -92,19 +88,25 @@ private static int numInserted = 0; // Create directories, start mini cluster, etc. - - private void setup() throws IOException { - cluster = new MiniDFSCluster(conf, 2, true, (String[])null); - + /** constructor */ + public TestHRegion() { desc = new HTableDescriptor("test"); desc.addFamily(new HColumnDescriptor("contents:")); desc.addFamily(new HColumnDescriptor("anchor:")); + } + + /** {@inheritDoc} */ + @Override + public void setUp() throws Exception { + cluster = new MiniDFSCluster(conf, 2, true, (String[])null); + super.setUp(); + r = createNewHRegion(desc, null, null); log = r.getLog(); region = new HRegionIncommon(r); } - + // Test basic functionality. Writes to contents:basic and anchor:anchornum-* private void basic() throws IOException { @@ -551,13 +553,9 @@ } } long startCompact = System.currentTimeMillis(); - if(r.compactIfNeeded()) { - totalCompact = System.currentTimeMillis() - startCompact; - System.out.println("Region compacted - elapsedTime: " + (totalCompact / 1000.0)); - - } else { - System.out.println("No compaction required."); - } + compactStores(r); + totalCompact = System.currentTimeMillis() - startCompact; + System.out.println("Region compacted - elapsedTime: " + (totalCompact / 1000.0)); long endTime = System.currentTimeMillis(); long totalElapsed = (endTime - startTime); @@ -577,27 +575,29 @@ // NOTE: This test depends on testBatchWrite succeeding private void splitAndMerge() throws IOException { - Path oldRegionPath = r.getRegionDir(); long startTime = System.currentTimeMillis(); - HRegion subregions[] = r.splitRegion(this); + Text midkey = needsSplit(r); + HRegion subregions[] = null; + if (midkey != null) { + subregions = r.splitRegion(this, midkey); + } if (subregions != null) { System.out.println("Split region elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); assertEquals("Number of subregions", subregions.length, 2); + + for (int i = 0; i < subregions.length; i++) { + subregions[i] = openClosedRegion(subregions[i]); + } // Now merge it back together - Path oldRegion1 = subregions[0].getRegionDir(); - Path oldRegion2 = subregions[1].getRegionDir(); startTime = System.currentTimeMillis(); r = HRegion.closeAndMerge(subregions[0], subregions[1]); region = new HRegionIncommon(r); System.out.println("Merge regions elapsed time: " + ((System.currentTimeMillis() - startTime) / 1000.0)); - fs.delete(oldRegion1); - fs.delete(oldRegion2); - fs.delete(oldRegionPath); } } @@ -784,29 +784,4 @@ s.close(); } } - - private static void deleteFile(File f) { - if(f.isDirectory()) { - File[] children = f.listFiles(); - for(int i = 0; i < children.length; i++) { - deleteFile(children[i]); - } - } - f.delete(); - } - - private void cleanup() { - try { - r.close(); - r = null; - log.closeAndDelete(); - log = null; - } catch (IOException e) { - e.printStackTrace(); - } - - // Delete all the DFS files - - deleteFile(new File(System.getProperty("test.build.data"), "dfs")); - } } Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java (revision 617638) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java (working copy) @@ -85,9 +85,9 @@ private void basicSplit(final HRegion region) throws Exception { addContent(region, COLFAMILY_NAME3); - region.flushcache(); - Text midkey = new Text(); - assertTrue(region.needsSplit(midkey)); + flushcache(region); + Text midkey = needsSplit(region); + assertNotNull(midkey); HRegion [] regions = split(region); try { // Need to open the regions. @@ -110,7 +110,8 @@ // Even after above splits, still needs split but after splits its // unsplitable because biggest store file is reference. References // make the store unsplittable, until something bigger comes along. - assertFalse(regions[i].needsSplit(midkeys[i])); + midkeys[i] = needsSplit(regions[i]); + assertNull(midkeys[i]); // Add so much data to this region, we create a store file that is > // than // one of our unsplitable references. @@ -120,23 +121,23 @@ } addContent(regions[i], COLFAMILY_NAME2); addContent(regions[i], COLFAMILY_NAME1); - regions[i].flushcache(); + flushcache(regions[i]); } // Assert that even if one store file is larger than a reference, the // region is still deemed unsplitable (Can't split region if references // presen). for (int i = 0; i < regions.length; i++) { - midkeys[i] = new Text(); + midkeys[i] = needsSplit(regions[i]); // Even after above splits, still needs split but after splits its // unsplitable because biggest store file is reference. References // make the store unsplittable, until something bigger comes along. - assertFalse(regions[i].needsSplit(midkeys[i])); + assertNull(midkeys[i]); } // To make regions splitable force compaction. for (int i = 0; i < regions.length; i++) { - regions[i].compactStores(); + compactStores(regions[i]); } TreeMap sortedMap = new TreeMap(); @@ -218,11 +219,11 @@ } private HRegion [] split(final HRegion r) throws IOException { - Text midKey = new Text(); - assertTrue(r.needsSplit(midKey)); + Text midKey = needsSplit(r); + assertNotNull(midKey); // Assert can get mid key from passed region. assertGet(r, COLFAMILY_NAME3, midKey); - HRegion [] regions = r.splitRegion(null); + HRegion [] regions = r.splitRegion(null, midKey); assertEquals(regions.length, 2); return regions; } Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompaction.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompaction.java (revision 617638) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompaction.java (working copy) @@ -82,7 +82,6 @@ */ public void testCompaction() throws Exception { createStoreFile(r); - assertFalse(r.compactIfNeeded()); for (int i = 0; i < COMPACTION_THRESHOLD; i++) { createStoreFile(r); } @@ -94,35 +93,8 @@ byte [][] bytes = this.r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/); // Assert that I can get > 5 versions (Should be at least 5 in there). assertTrue(bytes.length >= 5); - // Try to run compaction concurrent with a thread flush just to see that - // we can. - final HRegion region = this.r; - Thread t1 = new Thread() { - @Override - public void run() { - try { - region.flushcache(); - } catch (IOException e) { - e.printStackTrace(); - } - } - }; - Thread t2 = new Thread() { - @Override - public void run() { - try { - assertTrue(region.compactIfNeeded()); - } catch (IOException e) { - e.printStackTrace(); - } - } - }; - t1.setDaemon(true); - t1.start(); - t2.setDaemon(true); - t2.start(); - t1.join(); - t2.join(); + flushcache(r); + compactStores(r); // Now assert that there are 4 versions of a record only: thats the // 3 versions that should be in the compacted store and then the one more // we added when we flushed. But could be 3 only if the flush happened @@ -150,7 +122,8 @@ // verify that it is removed as we compact. // Assert all delted. assertNull(this.r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/)); - this.r.flushcache(); + flushcache(r); + compactStores(r); assertNull(this.r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/)); // Add a bit of data and flush it so we for sure have the compaction limit // for store files. Usually by this time we will have but if compaction @@ -158,7 +131,7 @@ // compacted store and the flush above when we added deletes. Add more // content to be certain. createSmallerStoreFile(this.r); - assertTrue(this.r.compactIfNeeded()); + compactStores(r); // Assert that the first row is still deleted. bytes = this.r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/); assertNull(bytes); Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLogKey.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLogKey.java (revision 617638) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLogKey.java (working copy) @@ -31,7 +31,7 @@ * also sorted. */ public class HLogKey implements WritableComparable { - Text regionName = new Text(); + Text storeName = new Text(); Text tablename = new Text(); Text row = new Text(); long logSeqNum = 0L; @@ -46,14 +46,14 @@ * We maintain the tablename mainly for debugging purposes. * A regionName is always a sub-table object. * - * @param regionName - name of region + * @param storeName - name of region * @param tablename - name of table * @param row - row key * @param logSeqNum - log sequence number */ - public HLogKey(Text regionName, Text tablename, Text row, long logSeqNum) { + public HLogKey(Text storeName, Text tablename, Text row, long logSeqNum) { // TODO: Is this copy of the instances necessary? They are expensive. - this.regionName.set(regionName); + this.storeName.set(storeName); this.tablename.set(tablename); this.row.set(row); this.logSeqNum = logSeqNum; @@ -63,8 +63,8 @@ // A bunch of accessors ////////////////////////////////////////////////////////////////////////////// - Text getRegionName() { - return regionName; + Text getStoreName() { + return storeName; } Text getTablename() { @@ -84,7 +84,7 @@ */ @Override public String toString() { - return tablename + "/" + regionName + "/" + row + "/" + logSeqNum; + return tablename + "/" + storeName + "/" + row + "/" + logSeqNum; } /** @@ -100,7 +100,7 @@ */ @Override public int hashCode() { - int result = this.regionName.hashCode(); + int result = this.storeName.hashCode(); result ^= this.row.hashCode(); result ^= this.logSeqNum; return result; @@ -115,7 +115,7 @@ */ public int compareTo(Object o) { HLogKey other = (HLogKey) o; - int result = this.regionName.compareTo(other.regionName); + int result = this.storeName.compareTo(other.storeName); if(result == 0) { result = this.row.compareTo(other.row); @@ -141,7 +141,7 @@ * {@inheritDoc} */ public void write(DataOutput out) throws IOException { - this.regionName.write(out); + this.storeName.write(out); this.tablename.write(out); this.row.write(out); out.writeLong(logSeqNum); @@ -151,7 +151,7 @@ * {@inheritDoc} */ public void readFields(DataInput in) throws IOException { - this.regionName.readFields(in); + this.storeName.readFields(in); this.tablename.readFields(in); this.row.readFields(in); this.logSeqNum = in.readLong(); Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (revision 617638) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (working copy) @@ -43,7 +43,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.filter.RowFilterInterface; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.io.TextSequence; import org.apache.hadoop.io.MapFile; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; @@ -215,10 +214,13 @@ /** * Find the key that matches row exactly, or the one that immediately * preceeds it. + * + * @param row + * @param timestamp + * @return row key */ - public Text getRowKeyAtOrBefore(final Text row, long timestamp) - throws IOException{ - this.lock.readLock().lock(); + public Text getRowKeyAtOrBefore(final Text row, long timestamp) { + this.lock.readLock().lock(); Text key_memcache = null; Text key_snapshot = null; @@ -238,18 +240,17 @@ return key_snapshot; } else if (key_memcache != null && key_snapshot == null) { return key_memcache; - } else { + } else if ( (key_memcache != null && key_memcache.equals(row)) + || (key_snapshot != null && key_snapshot.equals(row)) ) { // if either is a precise match, return the original row. - if ( (key_memcache != null && key_memcache.equals(row)) - || (key_snapshot != null && key_snapshot.equals(row)) ) { - return row; - } else { - // no precise matches, so return the one that is closer to the search - // key (greatest) - return key_memcache.compareTo(key_snapshot) > 0 ? + return row; + } else if (key_memcache != null) { + // no precise matches, so return the one that is closer to the search + // key (greatest) + return key_memcache.compareTo(key_snapshot) > 0 ? key_memcache : key_snapshot; - } } + return null; } finally { this.lock.readLock().unlock(); } @@ -296,10 +297,10 @@ if (headMap.isEmpty()) { /* LOG.debug("Went searching for " + key + ", found nothing!");*/ return null; - } else { + } /* LOG.debug("Went searching for " + key + ", found " + headMap.lastKey().getRow());*/ - return headMap.lastKey().getRow(); - } + + return headMap.lastKey().getRow(); } /** @@ -578,7 +579,13 @@ private static final String BLOOMFILTER_FILE_NAME = "filter"; final Memcache memcache = new Memcache(); + private volatile long memcacheSize = 0; + final long memcacheFlushSize; + final long blockingMemcacheSize; + final long desiredMaxFileSize; + volatile long storeSize = 0; private final Path basedir; + private final HRegion region; private final HRegionInfo info; private final HColumnDescriptor family; private final SequenceFile.CompressionType compression; @@ -594,7 +601,7 @@ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); final AtomicInteger activeScanners = new AtomicInteger(0); - final String storeName; + final Text storeName; /* * Sorted Map of readers keyed by sequence id (Most recent should be last in @@ -610,10 +617,18 @@ private final SortedMap readers = new TreeMap(); + // The most-recent log-seq-ID that's present. The most-recent such ID means + // we can ignore all log messages up to and including that ID (because they're + // already reflected in the TreeMaps). private volatile long maxSeqId; + private final int compactionThreshold; private final ReentrantReadWriteLock newScannerLock = new ReentrantReadWriteLock(); + + private volatile boolean splitRequested = false; + private volatile boolean compactionRequested = false; + private volatile boolean flushRequested = false; /** * An HStore is a set of zero or more MapFiles, which stretch backwards over @@ -640,26 +655,37 @@ * file will be deleted (by whoever has instantiated the HStore). * * @param basedir qualified path under which the region directory lives - * @param info HRegionInfo for this region + * @param region HRegion this store belongs to * @param family HColumnDescriptor for this column * @param fs file system object * @param reconstructionLog existing log file to apply if any * @param conf configuration object * @throws IOException */ - HStore(Path basedir, HRegionInfo info, HColumnDescriptor family, + HStore(Path basedir, HRegion region, HColumnDescriptor family, FileSystem fs, Path reconstructionLog, HBaseConfiguration conf) throws IOException { this.basedir = basedir; - this.info = info; + this.region = region; + this.info = this.region.getRegionInfo(); this.family = family; this.fs = fs; this.conf = conf; + // By default, we flush the cache when 64M. + this.memcacheFlushSize = conf.getInt("hbase.hregion.memcache.flush.size", + 1024*1024*64); + + this.blockingMemcacheSize = this.memcacheFlushSize * + conf.getInt("hbase.hregion.memcache.block.multiplier", 2); + + // By default we split region if a file > DEFAULT_MAX_FILE_SIZE. + this.desiredMaxFileSize = + conf.getLong("hbase.hregion.max.filesize", DEFAULT_MAX_FILE_SIZE); this.compactionDir = HRegion.getCompactionDir(basedir); this.storeName = - this.info.getEncodedName() + "/" + this.family.getFamilyName(); + new Text(this.info.getEncodedName() + "/" + this.family.getFamilyName()); if (family.getCompression() == HColumnDescriptor.CompressionType.BLOCK) { this.compression = SequenceFile.CompressionType.BLOCK; @@ -704,21 +730,9 @@ // MapFiles are in a reliable state. Every entry in 'mapdir' must have a // corresponding one in 'loginfodir'. Without a corresponding log info // file, the entry in 'mapdir' must be deleted. - List hstoreFiles = loadHStoreFiles(infodir, mapdir); - for(HStoreFile hsf: hstoreFiles) { - this.storefiles.put(Long.valueOf(hsf.loadInfo(fs)), hsf); - } - - // Now go through all the HSTORE_LOGINFOFILEs and figure out the - // most-recent log-seq-ID that's present. The most-recent such ID means we - // can ignore all log messages up to and including that ID (because they're - // already reflected in the TreeMaps). - // - // If the HSTORE_LOGINFOFILE doesn't contain a number, just ignore it. That - // means it was built prior to the previous run of HStore, and so it cannot - // contain any updates also contained in the log. - - this.maxSeqId = getMaxSequenceId(hstoreFiles); + // loadHStoreFiles also computes the max sequence id + this.maxSeqId = -1; + this.storefiles.putAll(loadHStoreFiles(infodir, mapdir)); if (LOG.isDebugEnabled()) { LOG.debug("maximum sequence id for hstore " + storeName + " is " + this.maxSeqId); @@ -747,25 +761,6 @@ } } - /* - * @param hstoreFiles - * @return Maximum sequence number found or -1. - * @throws IOException - */ - private long getMaxSequenceId(final List hstoreFiles) - throws IOException { - long maxSeqID = -1; - for (HStoreFile hsf : hstoreFiles) { - long seqid = hsf.loadInfo(fs); - if (seqid > 0) { - if (seqid > maxSeqID) { - maxSeqID = seqid; - } - } - } - return maxSeqID; - } - long getMaxSequenceId() { return this.maxSeqId; } @@ -810,14 +805,12 @@ // METACOLUMN info such as HBASE::CACHEFLUSH entries Text column = val.getColumn(); if (column.equals(HLog.METACOLUMN) - || !key.getRegionName().equals(info.getRegionName()) + || (key.getStoreName().compareTo(storeName) != 0) || !HStoreKey.extractFamily(column).equals(family.getFamilyName())) { if (LOG.isDebugEnabled()) { - LOG.debug("Passing on edit " + key.getRegionName() + ", " + - column.toString() + ": " + - new String(val.getVal(), UTF8_ENCODING) + - ", my region: " + info.getRegionName() + ", my column: " + - family.getFamilyName()); + LOG.debug("Passing on edit " + key.getStoreName() + ": " + + new String(val.getVal(), UTF8_ENCODING) + ", my name: " + + storeName); } continue; } @@ -850,8 +843,10 @@ * @param mapdir qualified path for map file directory * @throws IOException */ - private List loadHStoreFiles(Path infodir, Path mapdir) - throws IOException { + @SuppressWarnings("deprecation") + private SortedMap loadHStoreFiles(Path infodir, + Path mapdir) throws IOException { + if (LOG.isDebugEnabled()) { LOG.debug("infodir: " + infodir.toString() + " mapdir: " + mapdir.toString()); @@ -859,8 +854,9 @@ // Look first at info files. If a reference, these contain info we need // to create the HStoreFile. Path infofiles[] = fs.listPaths(new Path[] {infodir}); - ArrayList results = new ArrayList(infofiles.length); + SortedMap results = new TreeMap(); ArrayList mapfiles = new ArrayList(infofiles.length); + boolean hasReferences = false; for (Path p: infofiles) { Matcher m = REF_NAME_PARSER.matcher(p.getName()); /* @@ -879,26 +875,40 @@ HStoreFile.Reference reference = null; if (isReference) { reference = readSplitInfo(p, fs); + hasReferences = true; } curfile = new HStoreFile(conf, fs, basedir, info.getEncodedName(), family.getFamilyName(), fid, reference); + storeSize += curfile.length(); + long storeSeqId = -1; + try { + storeSeqId = curfile.loadInfo(fs); + if (storeSeqId > this.maxSeqId) { + this.maxSeqId = storeSeqId; + } + } catch (IOException e) { + // If the HSTORE_LOGINFOFILE doesn't contain a number, just ignore it. + // That means it was built prior to the previous run of HStore, and so + // it cannot contain any updates also contained in the log. + } + Path mapfile = curfile.getMapFilePath(); if (!fs.exists(mapfile)) { fs.delete(curfile.getInfoFilePath()); LOG.warn("Mapfile " + mapfile.toString() + " does not exist. " + - "Cleaned up info file. Continuing..."); + "Cleaned up info file. Continuing..."); continue; } - + // TODO: Confirm referent exists. - + // Found map and sympathetic info file. Add this hstorefile to result. - results.add(curfile); + results.put(storeSeqId, curfile); // Keep list of sympathetic data mapfiles for cleaning info dir in next // section. Make sure path is fully qualified for compare. mapfiles.add(mapfile); } - + // List paths by experience returns fully qualified names -- at least when // running on a mini hdfs cluster. Path datfiles[] = fs.listPaths(new Path[] {mapdir}); @@ -908,6 +918,14 @@ fs.delete(datfiles[i]); } } + + // Request a compaction if there are references or too many store files + + if (hasReferences || storefiles.size() > compactionThreshold) { + compactionRequested = + region.listener.compactionRequested(this, region); + } + return results; } @@ -1020,12 +1038,38 @@ lock.readLock().lock(); try { this.memcache.add(key, value); + memcacheSize += key.getSize() + (value == null ? 0 : value.length); } finally { lock.readLock().unlock(); } + checkForFlushCompactionSplit(); } + private void checkForFlushCompactionSplit() { + if (region.listener != null) { + if (!splitRequested) { + if (storeSize > this.desiredMaxFileSize) { + Text midkey = checkSplit(); + if (midkey != null) { + splitRequested = region.listener.splitRequested(region, midkey); + } + } else { + if (!flushRequested && memcacheSize > memcacheFlushSize) { + flushRequested = region.listener.flushRequested(this, region); + if (flushRequested) { + memcacheSize = 0; + } + } + if (!compactionRequested && storefiles.size() > compactionThreshold) { + compactionRequested = + region.listener.compactionRequested(this, region); + } + } + } + } + } + /** * Close all the MapFile readers * @@ -1078,72 +1122,85 @@ * @throws IOException */ void flushCache(final long logCacheFlushId) throws IOException { - internalFlushCache(memcache.getSnapshot(), logCacheFlushId); + internalFlushCache(memcache.getSnapshot(), logCacheFlushId); } private void internalFlushCache(SortedMap cache, long logCacheFlushId) throws IOException { - - synchronized(flushLock) { - // A. Write the Maps out to the disk - HStoreFile flushedFile = new HStoreFile(conf, fs, basedir, - info.getEncodedName(), family.getFamilyName(), -1L, null); - String name = flushedFile.toString(); - MapFile.Writer out = flushedFile.getWriter(this.fs, this.compression, - this.bloomFilter); - - // Here we tried picking up an existing HStoreFile from disk and - // interlacing the memcache flush compacting as we go. The notion was - // that interlacing would take as long as a pure flush with the added - // benefit of having one less file in the store. Experiments showed that - // it takes two to three times the amount of time flushing -- more column - // families makes it so the two timings come closer together -- but it - // also complicates the flush. The code was removed. Needed work picking - // which file to interlace (favor references first, etc.) - // - // Related, looks like 'merging compactions' in BigTable paper interlaces - // a memcache flush. We don't. - int entries = 0; - try { - for (Map.Entry es: cache.entrySet()) { - HStoreKey curkey = es.getKey(); - TextSequence f = HStoreKey.extractFamily(curkey.getColumn()); - if (f.equals(this.family.getFamilyName())) { - entries++; - out.append(curkey, new ImmutableBytesWritable(es.getValue())); - } + + try { + if (cache.size() == 0) { + if (LOG.isDebugEnabled()) { + LOG.info("Not flushing cache for " + storeName + + " because it has 0 entries"); } - } finally { - out.close(); + return; } + synchronized(flushLock) { + // A. Write the Maps out to the disk + HStoreFile flushedFile = new HStoreFile(conf, fs, basedir, + info.getEncodedName(), family.getFamilyName(), -1L, null); + String name = flushedFile.toString(); + MapFile.Writer out = flushedFile.getWriter(this.fs, this.compression, + this.bloomFilter); - // B. Write out the log sequence number that corresponds to this output - // MapFile. The MapFile is current up to and including the log seq num. - flushedFile.writeInfo(fs, logCacheFlushId); - - // C. Flush the bloom filter if any - if (bloomFilter != null) { - flushBloomFilter(); - } + // Here we tried picking up an existing HStoreFile from disk and + // interlacing the memcache flush compacting as we go. The notion was + // that interlacing would take as long as a pure flush with the added + // benefit of having one less file in the store. Experiments showed that + // it takes two to three times the amount of time flushing -- more column + // families makes it so the two timings come closer together -- but it + // also complicates the flush. The code was removed. Needed work picking + // which file to interlace (favor references first, etc.) + // + // Related, looks like 'merging compactions' in BigTable paper interlaces + // a memcache flush. We don't. + int entries = 0; + long cacheSize = 0; + try { + for (Map.Entry es: cache.entrySet()) { + HStoreKey curkey = es.getKey(); + byte[] bytes = es.getValue(); + if (curkey.getFamily().equals(this.family.getFamilyName())) { + entries++; + out.append(curkey, new ImmutableBytesWritable(bytes)); + cacheSize += curkey.getSize() + (bytes != null ? bytes.length : 0); + } + } + } finally { + out.close(); + } - // D. Finally, make the new MapFile available. - this.lock.writeLock().lock(); - try { - Long flushid = Long.valueOf(logCacheFlushId); - // Open the map file reader. - this.readers.put(flushid, - flushedFile.getReader(this.fs, this.bloomFilter)); - this.storefiles.put(flushid, flushedFile); - if(LOG.isDebugEnabled()) { - LOG.debug("Added " + name + " with " + entries + - " entries, sequence id " + logCacheFlushId + ", and size " + - StringUtils.humanReadableInt(flushedFile.length()) + " for " + - this.storeName); + // B. Write out the log sequence number that corresponds to this output + // MapFile. The MapFile is current up to and including the log seq num. + flushedFile.writeInfo(fs, logCacheFlushId); + + // C. Flush the bloom filter if any + if (bloomFilter != null) { + flushBloomFilter(); } - } finally { - this.lock.writeLock().unlock(); + + // D. Finally, make the new MapFile available. + this.lock.writeLock().lock(); + try { + Long flushid = Long.valueOf(logCacheFlushId); + // Open the map file reader. + this.readers.put(flushid, + flushedFile.getReader(this.fs, this.bloomFilter)); + this.storefiles.put(flushid, flushedFile); + if(LOG.isDebugEnabled()) { + LOG.debug("Added " + name + " with " + entries + + " entries, sequence id " + logCacheFlushId + ", and size " + + StringUtils.humanReadableInt(cacheSize) + " for " + + this.storeName); + } + } finally { + this.lock.writeLock().unlock(); + } + storeSize += cacheSize; } - return; + } finally { + flushRequested = false; } } @@ -1152,21 +1209,6 @@ ////////////////////////////////////////////////////////////////////////////// /** - * @return True if this store needs compaction. - */ - boolean needsCompaction() { - boolean compactionNeeded = false; - if (this.storefiles != null) { - compactionNeeded = this.storefiles.size() >= this.compactionThreshold; - if (LOG.isDebugEnabled()) { - LOG.debug("compaction for HStore " + storeName + - (compactionNeeded ? " " : " not ") + "needed."); - } - } - return compactionNeeded; - } - - /** * Compact the back-HStores. This method may take some time, so the calling * thread must be able to block for long periods. * @@ -1186,51 +1228,65 @@ * @return true if compaction completed successfully */ boolean compact() throws IOException { - synchronized (compactLock) { - if (LOG.isDebugEnabled()) { - LOG.debug("started compaction of " + storefiles.size() + - " files using " + compactionDir.toString() + " for " + - this.storeName); - } + try { + synchronized (compactLock) { + List filesToCompact = null; + long maxId = -1; + synchronized (storefiles) { + if (storefiles.size() < 1 || + (storefiles.size() == 1 && + !storefiles.get(storefiles.firstKey()).isReference())) { + if (LOG.isDebugEnabled()) { + LOG.debug("nothing to compact for " + this.storeName + " because" + + (storefiles.size() < 1 ? " no store files to compact" : + " only one store file and it is not a refeerence")); + } + return false; + } - // Storefiles are keyed by sequence id. The oldest file comes first. - // We need to return out of here a List that has the newest file first. - List filesToCompact = - new ArrayList(this.storefiles.values()); - Collections.reverse(filesToCompact); - if (filesToCompact.size() < 1 || - (filesToCompact.size() == 1 && !filesToCompact.get(0).isReference())) { - if (LOG.isDebugEnabled()) { - LOG.debug("nothing to compact for " + this.storeName); + if (LOG.isDebugEnabled()) { + LOG.debug("started compaction of " + storefiles.size() + + " files using " + compactionDir.toString() + " for " + + this.storeName); + } + + // Storefiles are keyed by sequence id. The oldest file comes first. + // We need to return out of here a List that has the newest file first. + filesToCompact = new ArrayList(this.storefiles.values()); + Collections.reverse(filesToCompact); + + // The max-sequenceID in any of the to-be-compacted TreeMaps is the + // last key of storefiles. + + maxId = this.storefiles.lastKey(); } - return false; - } - if (!fs.exists(compactionDir) && !fs.mkdirs(compactionDir)) { - LOG.warn("Mkdir on " + compactionDir.toString() + " failed"); - return false; - } + if (!fs.exists(compactionDir) && !fs.mkdirs(compactionDir)) { + LOG.warn("Mkdir on " + compactionDir.toString() + " failed"); + return false; + } - // Step through them, writing to the brand-new MapFile - HStoreFile compactedOutputFile = new HStoreFile(conf, fs, - this.compactionDir, info.getEncodedName(), family.getFamilyName(), - -1L, null); - MapFile.Writer compactedOut = compactedOutputFile.getWriter(this.fs, - this.compression, this.bloomFilter); - try { - compactHStoreFiles(compactedOut, filesToCompact); - } finally { - compactedOut.close(); - } + // Step through them, writing to the brand-new MapFile + HStoreFile compactedOutputFile = new HStoreFile(conf, fs, + this.compactionDir, info.getEncodedName(), family.getFamilyName(), + -1L, null); + MapFile.Writer compactedOut = compactedOutputFile.getWriter(this.fs, + this.compression, this.bloomFilter); + try { + compactHStoreFiles(compactedOut, filesToCompact); + } finally { + compactedOut.close(); + } - // Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap. - // Compute max-sequenceID seen in any of the to-be-compacted TreeMaps. - long maxId = getMaxSequenceId(filesToCompact); - compactedOutputFile.writeInfo(fs, maxId); + // Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap. + compactedOutputFile.writeInfo(fs, maxId); - // Move the compaction into place. - completeCompaction(filesToCompact, compactedOutputFile); + // Move the compaction into place. + completeCompaction(filesToCompact, compactedOutputFile); + } return true; + } finally { + compactionRequested = false; } } @@ -1530,37 +1586,39 @@ } // 4. and 5. Unload all the replaced MapFiles, close and delete. - - List toDelete = new ArrayList(); - for (Map.Entry e: this.storefiles.entrySet()) { - if (!compactedFiles.contains(e.getValue())) { - continue; + + synchronized (storefiles) { + List toDelete = new ArrayList(); + for (Map.Entry e: this.storefiles.entrySet()) { + if (!compactedFiles.contains(e.getValue())) { + continue; + } + Long key = e.getKey(); + MapFile.Reader reader = this.readers.remove(key); + if (reader != null) { + reader.close(); + } + toDelete.add(key); } - Long key = e.getKey(); - MapFile.Reader reader = this.readers.remove(key); - if (reader != null) { - reader.close(); - } - toDelete.add(key); - } - try { - for (Long key: toDelete) { - HStoreFile hsf = this.storefiles.remove(key); - hsf.delete(); + try { + for (Long key: toDelete) { + HStoreFile hsf = this.storefiles.remove(key); + hsf.delete(); + } + + // 6. Loading the new TreeMap. + Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs)); + this.readers.put(orderVal, + finalCompactedFile.getReader(this.fs, this.bloomFilter)); + this.storefiles.put(orderVal, finalCompactedFile); + } catch (IOException e) { + e = RemoteExceptionHandler.checkIOException(e); + LOG.error("Failed replacing compacted files for " + this.storeName + + ". Compacted file is " + finalCompactedFile.toString() + + ". Files replaced are " + compactedFiles.toString() + + " some of which may have been already removed", e); } - - // 6. Loading the new TreeMap. - Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs)); - this.readers.put(orderVal, - finalCompactedFile.getReader(this.fs, this.bloomFilter)); - this.storefiles.put(orderVal, finalCompactedFile); - } catch (IOException e) { - e = RemoteExceptionHandler.checkIOException(e); - LOG.error("Failed replacing compacted files for " + this.storeName + - ". Compacted file is " + finalCompactedFile.toString() + - ". Files replaced are " + compactedFiles.toString() + - " some of which may have been already removed", e); } } finally { // 7. Releasing the write-lock @@ -1798,8 +1856,13 @@ /** * Find the key that matches row exactly, or the one that immediately * preceeds it. + * + * @param row + * @param timestamp + * @return row key + * @throws IOException */ - public Text getRowKeyAtOrBefore(final Text row, final long timestamp) + Text getRowKeyAtOrBefore(final Text row, final long timestamp) throws IOException{ // if the exact key is found, return that key // if we find a key that is greater than our search key, then use the @@ -1814,10 +1877,9 @@ this.lock.readLock().lock(); try { MapFile.Reader[] maparray = getReaders(); - Text bestSoFar = null; - HStoreKey rowKey = new HStoreKey(row, timestamp); +/* HStoreKey rowKey = new HStoreKey(row, timestamp); */ // process each store file for(int i = maparray.length - 1; i >= 0; i--) { @@ -1960,72 +2022,42 @@ return target.matchesRowCol(origin); } - /* - * Data structure to hold result of a look at store file sizes. - */ - static class HStoreSize { - final long aggregate; - final long largest; - boolean splitable; - - HStoreSize(final long a, final long l, final boolean s) { - this.aggregate = a; - this.largest = l; - this.splitable = s; - } - - long getAggregate() { - return this.aggregate; - } - - long getLargest() { - return this.largest; - } - - boolean isSplitable() { - return this.splitable; - } - - void setSplitable(final boolean s) { - this.splitable = s; - } - } - /** - * Gets size for the store. + * Determines if HStore can be split * - * @param midKey Gets set to the middle key of the largest splitable store - * file or its set to empty if largest is not splitable. - * @return Sizes for the store and the passed midKey is - * set to midKey of largest splitable. Otherwise, its set to empty - * to indicate we couldn't find a midkey to split on + * @return midKey if store can be split, null otherwise */ - HStoreSize size(Text midKey) { - long maxSize = 0L; - long aggregateSize = 0L; + Text checkSplit() { + // Not splitable if we find a reference store file present in the store. + boolean splitable = true; if (this.storefiles.size() <= 0) { - return new HStoreSize(0, 0, splitable); + return null; } this.lock.readLock().lock(); try { + long maxSize = 0; Long mapIndex = Long.valueOf(0L); // Iterate through all the MapFiles - for(Map.Entry e: storefiles.entrySet()) { - HStoreFile curHSF = e.getValue(); - long size = curHSF.length(); - aggregateSize += size; - if (maxSize == 0L || size > maxSize) { - // This is the largest one so far - maxSize = size; - mapIndex = e.getKey(); + synchronized (storefiles) { + for(Map.Entry e: storefiles.entrySet()) { + HStoreFile curHSF = e.getValue(); + long size = curHSF.length(); + if (maxSize == 0L || size > maxSize) { + // This is the largest one so far + maxSize = size; + mapIndex = e.getKey(); + } + if (splitable) { + splitable = !curHSF.isReference(); + } } - if (splitable) { - splitable = !curHSF.isReference(); - } } + if (!splitable) { + return null; + } MapFile.Reader r = this.readers.get(mapIndex); // seek back to the beginning of mapfile @@ -2035,28 +2067,33 @@ HStoreKey firstKey = new HStoreKey(); HStoreKey lastKey = new HStoreKey(); Writable value = new ImmutableBytesWritable(); - r.next((WritableComparable)firstKey, value); - r.finalKey((WritableComparable)lastKey); + r.next(firstKey, value); + r.finalKey(lastKey); // get the midkey HStoreKey midkey = (HStoreKey)r.midKey(); if (midkey != null) { - midKey.set(((HStoreKey)midkey).getRow()); // if the midkey is the same as the first and last keys, then we cannot // (ever) split this region. if (midkey.getRow().equals(firstKey.getRow()) && - midkey.getRow().equals(lastKey.getRow())) { - return new HStoreSize(aggregateSize, maxSize, false); - } + midkey.getRow().equals(lastKey.getRow())) { + return null; + } + return midkey.getRow(); } } catch(IOException e) { LOG.warn("Failed getting store size for " + this.storeName, e); } finally { this.lock.readLock().unlock(); } - return new HStoreSize(aggregateSize, maxSize, splitable); + return null; } + + /** @return aggregate size of HStore */ + public long getSize() { + return storeSize; + } ////////////////////////////////////////////////////////////////////////////// // File administration @@ -2083,10 +2120,17 @@ } } + /** + * @return the HColumnDescriptor for this store + */ + public HColumnDescriptor getFamily() { + return family; + } + /** {@inheritDoc} */ @Override public String toString() { - return this.storeName; + return this.storeName.toString(); } /* @@ -2132,19 +2176,21 @@ throws IOException { super(timestamp, targetCols); try { - this.readers = new MapFile.Reader[storefiles.size()]; - - // Most recent map file should be first - int i = readers.length - 1; - for(HStoreFile curHSF: storefiles.values()) { - readers[i--] = curHSF.getReader(fs, bloomFilter); + synchronized (storefiles) { + this.readers = new MapFile.Reader[storefiles.size()]; + + // Most recent map file should be first + int i = readers.length - 1; + for(HStoreFile curHSF: storefiles.values()) { + readers[i--] = curHSF.getReader(fs, bloomFilter); + } } this.keys = new HStoreKey[readers.length]; this.vals = new byte[readers.length][]; // Advance the readers to the first pos. - for(i = 0; i < readers.length; i++) { + for(int i = 0; i < readers.length; i++) { keys[i] = new HStoreKey(); if(firstRow.getLength() != 0) { Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java (revision 617638) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java (working copy) @@ -138,17 +138,16 @@ long currentSize = 0; HRegion nextRegion = null; long nextSize = 0; - Text midKey = new Text(); for (int i = 0; i < info.length - 1; i++) { if (currentRegion == null) { currentRegion = new HRegion(tabledir, hlog, fs, conf, info[i], null, null); - currentSize = currentRegion.largestHStore(midKey).getAggregate(); + currentSize = currentRegion.getLargestHStoreSize(); } nextRegion = new HRegion(tabledir, hlog, fs, conf, info[i + 1], null, null); - nextSize = nextRegion.largestHStore(midKey).getAggregate(); + nextSize = nextRegion.getLargestHStoreSize(); if ((currentSize + nextSize) <= (maxFilesize / 2)) { // We merge two adjacent regions if their total size is less than Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/CacheFlushListener.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/CacheFlushListener.java (revision 617638) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/CacheFlushListener.java (working copy) @@ -1,36 +0,0 @@ -/** - * Copyright 2007 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase; - -/** - * Implementors of this interface want to be notified when an HRegion - * determines that a cache flush is needed. A CacheFlushListener (or null) - * must be passed to the HRegion constructor. - */ -public interface CacheFlushListener { - - /** - * Tell the listener the cache needs to be flushed. - * - * @param region the HRegion requesting the cache flush - */ - void flushRequested(HRegion region); -} Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java (revision 617638) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java (working copy) @@ -37,6 +37,7 @@ private Text row; private Text column; private long timestamp; + private transient Text family; /** Default constructor used in conjunction with Writable interface */ @@ -89,6 +90,7 @@ this.row = new Text(row); this.column = new Text(column); this.timestamp = timestamp; + this.family = null; } /** @return Approximate size in bytes of this key. */ @@ -120,8 +122,9 @@ * * @param newcol new column key value */ - public void setColumn(Text newcol) { + public synchronized void setColumn(Text newcol) { this.column.set(newcol); + this.family = null; } /** @@ -154,6 +157,17 @@ return column; } + /** + * @return the column family name (minus the trailing colon) + * @throws InvalidColumnNameException + */ + public synchronized Text getFamily() throws InvalidColumnNameException { + if (family == null) { + family = extractFamily(column); + } + return family; + } + /** @return value of timestamp */ public long getTimestamp() { return timestamp; @@ -198,8 +212,7 @@ public boolean matchesRowFamily(HStoreKey that) throws InvalidColumnNameException { return this.row.compareTo(that.row) == 0 && - extractFamily(this.column). - compareTo(extractFamily(that.getColumn())) == 0; + this.getFamily().equals(that.getFamily()); } /** {@inheritDoc} */ @@ -225,6 +238,7 @@ // Comparable + /** {@inheritDoc} */ public int compareTo(Object o) { HStoreKey other = (HStoreKey)o; int result = this.row.compareTo(other.row); @@ -275,7 +289,7 @@ * the result by calling {@link TextSequence#toText()}. * @throws InvalidColumnNameException */ - public static TextSequence extractFamily(final Text col) + public static Text extractFamily(final Text col) throws InvalidColumnNameException { return extractFamily(col, false); } @@ -284,40 +298,41 @@ * Extracts the column family name from a column * For example, returns 'info' if the specified column was 'info:server' * @param col name of column - * @return column famile as a TextSequence based on the passed - * col. If col is reused, make a new Text of - * the result by calling {@link TextSequence#toText()}. + * @param withColon + * @return column family as a Text based on the passed col. * @throws InvalidColumnNameException */ - public static TextSequence extractFamily(final Text col, - final boolean withColon) + public static Text extractFamily(final Text col, final boolean withColon) throws InvalidColumnNameException { int offset = getColonOffset(col); // Include ':' in copy? - offset += (withColon)? 1: 0; + offset += (withColon)? 1 : 0; if (offset == col.getLength()) { - return new TextSequence(col); + return col; } - return new TextSequence(col, 0, offset); + Text family = new Text(); + family.set(col.getBytes(), 0, offset); + return family; } /** - * Extracts the column qualifier, the portion that follows the colon (':') - * family/qualifier separator. + * Extracts the column member, the portion that follows the colon (':') + * family:member separator. * For example, returns 'server' if the specified column was 'info:server' * @param col name of column - * @return column qualifier as a TextSequence based on the passed - * col. If col is reused, make a new Text of - * the result by calling {@link TextSequence#toText()}. + * @return column qualifier as a Text based on the passed col. * @throws InvalidColumnNameException */ - public static TextSequence extractQualifier(final Text col) + public static Text extractMember(final Text col) throws InvalidColumnNameException { - int offset = getColonOffset(col); - if (offset + 1 == col.getLength()) { + int offset = getColonOffset(col) + 1; + if (offset == col.getLength()) { return null; } - return new TextSequence(col, offset + 1); + + Text member = new Text(); + member.set(col.getBytes(), offset, col.getLength() - offset); + return member; } private static int getColonOffset(final Text col) @@ -333,7 +348,7 @@ } if(offset < 0) { throw new InvalidColumnNameException(col + " is missing the colon " + - "family/qualifier separator"); + "family:member separator"); } return offset; } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java (revision 617638) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java (working copy) @@ -50,24 +50,25 @@ * underlying file is being rolled. * *

- * A single HLog is used by several HRegions simultaneously. + * A single HLog is used by several HRegion/HStores simultaneously. * *

- * Each HRegion is identified by a unique long int. HRegions do - * not need to declare themselves before using the HLog; they simply include - * their HRegion-id in the append or - * completeCacheFlush calls. + * Each HStore is identified by a unique string consisting of the encoded + * HRegion name and by the HStore family name. + * HStores do not need to declare themselves before using the HLog; they simply + * include their HStore.storeName in the append + * or completeCacheFlush calls. * *

* An HLog consists of multiple on-disk files, which have a chronological order. * As data is flushed to other (better) on-disk structures, the log becomes - * obsolete. We can destroy all the log messages for a given HRegion-id up to - * the most-recent CACHEFLUSH message from that HRegion. + * obsolete. We can destroy all the log messages for a given HStore up to + * the most-recent CACHEFLUSH message from that HStore. * *

* It's only practical to delete entire files. Thus, we delete an entire on-disk * file F when all of the messages in F have a log-sequence-id that's older - * (smaller) than the most-recent CACHEFLUSH message for every HRegion that has + * (smaller) than the most-recent CACHEFLUSH message for every HStore that has * a message in F. * *

@@ -77,7 +78,7 @@ * separate reentrant lock is used. * *

- * TODO: Vuk Ercegovac also pointed out that keeping HBase HRegion edit logs in + * TODO: Vuk Ercegovac also pointed out that keeping HBase edit logs in * HDFS is currently flawed. HBase writes edits to logs and to a memcache. The * 'atomic' write to the log is meant to serve as insurance against abnormal * RegionServer exit: on startup, the log is rerun to reconstruct an HRegion's @@ -109,7 +110,7 @@ Collections.synchronizedSortedMap(new TreeMap()); /* - * Map of region to last sequence/edit id. + * Map of store to last sequence/edit id. */ final Map lastSeqWritten = new ConcurrentHashMap(); @@ -235,7 +236,7 @@ if (this.outputfiles.size() > 0) { if (this.lastSeqWritten.size() <= 0) { LOG.debug("Last sequence written is empty. Deleting all old hlogs"); - // If so, then no new writes have come in since all regions were + // If so, then no new writes have come in since all hstores were // flushed (and removed from the lastSeqWritten map). Means can // remove all but currently open log file. for (Map.Entry e : this.outputfiles.entrySet()) { @@ -248,23 +249,23 @@ Long oldestOutstandingSeqNum = Collections.min(this.lastSeqWritten.values()); // Get the set of all log files whose final ID is older than or - // equal to the oldest pending region operation + // equal to the oldest pending HStore operation TreeSet sequenceNumbers = new TreeSet(this.outputfiles.headMap( (Long.valueOf(oldestOutstandingSeqNum.longValue() + 1L))).keySet()); // Now remove old log files (if any) if (LOG.isDebugEnabled()) { - // Find region associated with oldest key -- helps debugging. - Text oldestRegion = null; + // Find HStore associated with oldest key -- helps debugging. + Text oldestStore = null; for (Map.Entry e: this.lastSeqWritten.entrySet()) { if (e.getValue().longValue() == oldestOutstandingSeqNum.longValue()) { - oldestRegion = e.getKey(); + oldestStore = e.getKey(); break; } } LOG.debug("Found " + sequenceNumbers.size() + " logs to remove " + "using oldest outstanding seqnum of " + - oldestOutstandingSeqNum + " from region " + oldestRegion); + oldestOutstandingSeqNum + " from HStore " + oldestStore); } if (sequenceNumbers.size() > 0) { for (Long seq : sequenceNumbers) { @@ -326,7 +327,7 @@ } /** - * Append a set of edits to the log. Log edits are keyed by regionName, + * Append a set of edits to the log. Log edits are keyed by storeName, * rowname, and log-sequence-id. * * Later, if we sort by these keys, we obtain all the relevant edits for a @@ -342,38 +343,32 @@ * synchronized prevents appends during the completion of a cache flush or for * the duration of a log roll. * - * @param regionName + * @param storeName * @param tableName - * @param row - * @param columns - * @param timestamp + * @param key + * @param value * @throws IOException */ - void append(Text regionName, Text tableName, - TreeMap edits) throws IOException { + void append(Text storeName, Text tableName, HStoreKey key, byte[] value) + throws IOException { if (closed) { throw new IOException("Cannot append; log is closed"); } synchronized (updateLock) { - long seqNum[] = obtainSeqNum(edits.size()); + long seqNum = obtainSeqNum(); // The 'lastSeqWritten' map holds the sequence number of the oldest - // write for each region. When the cache is flushed, the entry for the - // region being flushed is removed if the sequence number of the flush + // write for each HStore. When the cache is flushed, the entry for the + // store being flushed is removed if the sequence number of the flush // is greater than or equal to the value in lastSeqWritten. - if (!this.lastSeqWritten.containsKey(regionName)) { - this.lastSeqWritten.put(regionName, Long.valueOf(seqNum[0])); + if (!this.lastSeqWritten.containsKey(storeName)) { + this.lastSeqWritten.put(storeName, Long.valueOf(seqNum)); } - int counter = 0; - for (Map.Entry es : edits.entrySet()) { - HStoreKey key = es.getKey(); - HLogKey logKey = - new HLogKey(regionName, tableName, key.getRow(), seqNum[counter++]); - HLogEdit logEdit = - new HLogEdit(key.getColumn(), es.getValue(), key.getTimestamp()); - this.writer.append(logKey, logEdit); - this.numEntries++; - } + HLogKey logKey = new HLogKey(storeName, tableName, key.getRow(), seqNum); + HLogEdit logEdit = + new HLogEdit(key.getColumn(), value, key.getTimestamp()); + this.writer.append(logKey, logEdit); + this.numEntries++; } if (this.numEntries > this.maxlogentries) { if (listener != null) { @@ -404,22 +399,6 @@ } /** - * Obtain a specified number of sequence numbers - * - * @param num number of sequence numbers to obtain - * @return array of sequence numbers - */ - private long[] obtainSeqNum(int num) { - long[] results = new long[num]; - synchronized (this.sequenceLock) { - for (int i = 0; i < num; i++) { - results[i] = this.logSeqNum++; - } - } - return results; - } - - /** * By acquiring a log sequence ID, we can allow log messages to continue while * we flush the cache. * @@ -441,12 +420,12 @@ * * Protected by cacheFlushLock * - * @param regionName + * @param storeName * @param tableName * @param logSeqId * @throws IOException */ - void completeCacheFlush(final Text regionName, final Text tableName, + void completeCacheFlush(final Text storeName, final Text tableName, final long logSeqId) throws IOException { try { @@ -454,13 +433,13 @@ return; } synchronized (updateLock) { - this.writer.append(new HLogKey(regionName, tableName, HLog.METAROW, logSeqId), + this.writer.append(new HLogKey(storeName, tableName, HLog.METAROW, logSeqId), new HLogEdit(HLog.METACOLUMN, HLogEdit.completeCacheFlush.get(), System.currentTimeMillis())); this.numEntries++; - Long seq = this.lastSeqWritten.get(regionName); + Long seq = this.lastSeqWritten.get(storeName); if (seq != null && logSeqId >= seq.longValue()) { - this.lastSeqWritten.remove(regionName); + this.lastSeqWritten.remove(storeName); } } } finally { @@ -489,6 +468,7 @@ * @param conf HBaseConfiguration * @throws IOException */ + @SuppressWarnings("deprecation") static void splitLog(Path rootDir, Path srcDir, FileSystem fs, Configuration conf) throws IOException { Path logfiles[] = fs.listPaths(new Path[] { srcDir }); @@ -515,13 +495,15 @@ int count = 0; for (; in.next(key, val); count++) { Text tableName = key.getTablename(); - Text regionName = key.getRegionName(); + Text storeName = key.getStoreName(); + Text regionName = new Text(); + regionName.set(storeName.getBytes(), 0, storeName.find("/")); SequenceFile.Writer w = logWriters.get(regionName); if (w == null) { Path logfile = new Path( HRegion.getRegionDir( HTableDescriptor.getTableDir(rootDir, tableName), - HRegionInfo.encodeRegionName(regionName) + regionName.toString() ), HREGION_OLDLOGFILE_NAME ); Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (revision 617638) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (working copy) @@ -356,6 +356,7 @@ * @return True if still has references to parent. * @throws IOException */ + @SuppressWarnings("deprecation") protected boolean hasReferences(final Text metaRegionName, final HRegionInterface srvr, final Text parent, SortedMap rowContent, final Text splitColumn) @@ -1306,8 +1307,7 @@ loadToServers.put(load, servers); if (!closed.get()) { - long serverLabel = getServerLabel(s); - serverLeases.createLease(serverLabel, serverLabel, new ServerExpirer(s)); + serverLeases.createLease(s, new ServerExpirer(s)); } return createConfigurationSubset(); @@ -1327,15 +1327,10 @@ return mw; } - private long getServerLabel(final String s) { - return s.hashCode(); - } - /** {@inheritDoc} */ public HMsg[] regionServerReport(HServerInfo serverInfo, HMsg msgs[]) throws IOException { String serverName = serverInfo.getServerAddress().toString().trim(); - long serverLabel = getServerLabel(serverName); if (msgs.length > 0) { if (msgs[0].getMsg() == HMsg.MSG_REPORT_EXITING) { synchronized (serversToServerInfo) { @@ -1348,7 +1343,7 @@ ": MSG_REPORT_EXITING -- cancelling lease"); } - if (cancelLease(serverName, serverLabel)) { + if (cancelLease(serverName)) { // Only process the exit message if the server still has a lease. // Otherwise we could end up processing the server exit twice. LOG.info("Region server " + serverName + @@ -1428,7 +1423,7 @@ } synchronized (serversToServerInfo) { - cancelLease(serverName, serverLabel); + cancelLease(serverName); serversToServerInfo.notifyAll(); } return new HMsg[]{new HMsg(HMsg.MSG_REGIONSERVER_STOP)}; @@ -1439,7 +1434,7 @@ // This will always succeed; otherwise, the fetch of serversToServerInfo // would have failed above. - serverLeases.renewLease(serverLabel, serverLabel); + serverLeases.renewLease(serverName); // Refresh the info object and the load information @@ -1476,7 +1471,7 @@ } /** Cancel a server's lease and update its load information */ - private boolean cancelLease(final String serverName, final long serverLabel) { + private boolean cancelLease(final String serverName) { boolean leaseCancelled = false; HServerInfo info = serversToServerInfo.remove(serverName); if (info != null) { @@ -1487,7 +1482,7 @@ unassignRootRegion(); } LOG.info("Cancelling lease for " + serverName); - serverLeases.cancelLease(serverLabel, serverLabel); + serverLeases.cancelLease(serverName); leaseCancelled = true; // update load information @@ -3120,20 +3115,20 @@ /* * Data structure used to return results out of the toRowMap method. */ - private class RowMap { + class RowMap { final Text row; final SortedMap map; - private RowMap(final Text r, final SortedMap m) { + RowMap(final Text r, final SortedMap m) { this.row = r; this.map = m; } - private Text getRow() { + Text getRow() { return this.row; } - private SortedMap getMap() { + SortedMap getMap() { return this.map; } } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreListener.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreListener.java (revision 0) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreListener.java (revision 0) @@ -0,0 +1,58 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import org.apache.hadoop.io.Text; + +/** + * Implementors of this interface want to be notified when an HRegion + * determines that a cache flush is needed. A CacheFlushListener (or null) + * must be passed to the HRegion constructor. + */ +public interface HStoreListener { + + /** + * Tell the listener the cache needs to be flushed. + * + * @param store the HStore requesting the cache flush + * @param region the HRegion the HStore belongs to + * @return true if a flush request was queued + */ + boolean flushRequested(HStore store, HRegion region); + + /** + * Tell the listener that the HStore needs compacting. + * + * @param store the HStore requesting the compaction. + * @param region the HRegion that the HStore belongs to + * @return true if a compaction was queued + */ + boolean compactionRequested(HStore store, HRegion region); + + /** + * Tell the listener that the region needs to be split + * + * @param region the HRegion to be split + * @param midkey the middle key for the split + * @return true if a split was queued + */ + boolean splitRequested(HRegion region, Text midkey); +} Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HAbstractScanner.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HAbstractScanner.java (revision 617638) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HAbstractScanner.java (working copy) @@ -69,13 +69,13 @@ private Text col; ColumnMatcher(final Text col) throws IOException { - Text qualifier = HStoreKey.extractQualifier(col); + Text member = HStoreKey.extractMember(col); try { - if(qualifier == null || qualifier.getLength() == 0) { + if(member == null || member.getLength() == 0) { this.matchType = MATCH_TYPE.FAMILY_ONLY; - this.family = HStoreKey.extractFamily(col).toText(); + this.family = HStoreKey.extractFamily(col); this.wildCardmatch = true; - } else if(isRegexPattern.matcher(qualifier.toString()).matches()) { + } else if(isRegexPattern.matcher(member.toString()).matches()) { this.matchType = MATCH_TYPE.REGEX; this.columnMatcher = Pattern.compile(col.toString()); this.wildCardmatch = true; @@ -127,7 +127,7 @@ this.multipleMatchers = false; this.okCols = new TreeMap>(); for(int i = 0; i < targetCols.length; i++) { - Text family = HStoreKey.extractFamily(targetCols[i]).toText(); + Text family = HStoreKey.extractFamily(targetCols[i]); Vector matchers = okCols.get(family); if(matchers == null) { matchers = new Vector(); @@ -158,7 +158,7 @@ boolean columnMatch(int i) throws IOException { Text column = keys[i].getColumn(); Vector matchers = - okCols.get(HStoreKey.extractFamily(column)); + okCols.get(keys[i].getFamily()); if(matchers == null) { return false; } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (revision 617638) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (working copy) @@ -21,6 +21,8 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -32,7 +34,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; @@ -197,8 +198,6 @@ volatile Map> targetColumns = new ConcurrentHashMap>(); - final AtomicLong memcacheSize = new AtomicLong(0); - final Path basedir; final HLog log; final FileSystem fs; @@ -224,15 +223,11 @@ volatile WriteState writestate = new WriteState(); - final int memcacheFlushSize; - private volatile long lastFlushTime; - final CacheFlushListener flushListener; - final int blockingMemcacheSize; + final HStoreListener listener; protected final long threadWakeFrequency; private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final Integer updateLock = new Integer(0); private final Integer splitLock = new Integer(0); - private final long desiredMaxFileSize; private final long minSequenceId; final AtomicInteger activeScannerCount = new AtomicInteger(0); @@ -262,7 +257,7 @@ * @throws IOException */ public HRegion(Path basedir, HLog log, FileSystem fs, HBaseConfiguration conf, - HRegionInfo regionInfo, Path initialFiles, CacheFlushListener listener) + HRegionInfo regionInfo, Path initialFiles, HStoreListener listener) throws IOException { this.basedir = basedir; @@ -288,8 +283,8 @@ for(HColumnDescriptor c : this.regionInfo.getTableDesc().families().values()) { - HStore store = new HStore(this.basedir, this.regionInfo, c, this.fs, - oldLogFile, this.conf); + HStore store = + new HStore(this.basedir, this, c, this.fs, oldLogFile, this.conf); stores.put(c.getFamilyName(), store); @@ -314,20 +309,10 @@ fs.delete(merges); } - // By default, we flush the cache when 64M. - this.memcacheFlushSize = conf.getInt("hbase.hregion.memcache.flush.size", - 1024*1024*64); - this.flushListener = listener; - this.blockingMemcacheSize = this.memcacheFlushSize * - conf.getInt("hbase.hregion.memcache.block.multiplier", 2); + this.listener = listener; - // By default we split region if a file > DEFAULT_MAX_FILE_SIZE. - this.desiredMaxFileSize = - conf.getLong("hbase.hregion.max.filesize", DEFAULT_MAX_FILE_SIZE); - // HRegion is ready to go! this.writestate.compacting = false; - this.lastFlushTime = System.currentTimeMillis(); LOG.info("region " + this.regionInfo.getRegionName() + " available"); } @@ -344,6 +329,18 @@ return this.regionInfo; } + /** @return size of largest HStore */ + long getLargestHStoreSize() { + long size = 0; + for (HStore store: stores.values()) { + long storeSize = store.getSize(); + if (storeSize > size) { + size = storeSize; + } + } + return size; + } + /** returns true if region is closed */ boolean isClosed() { return this.closed.get(); @@ -449,13 +446,12 @@ listener.closing(getRegionName()); } - // Don't flush the cache if we are aborting - if (!abort) { - internalFlushcache(snapshotMemcaches()); - } - List result = new ArrayList(); for (HStore store: stores.values()) { + if (!abort) { // Don't flush the cache if we are aborting + store.snapshotMemcache(); + internalFlushcache(store); + } result.addAll(store.close()); } this.closed.set(true); @@ -523,11 +519,6 @@ return this.fs; } - /** @return the last time the region was flushed */ - public long getLastFlushTime() { - return this.lastFlushTime; - } - ////////////////////////////////////////////////////////////////////////////// // HRegion maintenance. // @@ -535,35 +526,6 @@ // upkeep. ////////////////////////////////////////////////////////////////////////////// - /** - * @return returns size of largest HStore. Also returns whether store is - * splitable or not (Its not splitable if region has a store that has a - * reference store file). - */ - HStore.HStoreSize largestHStore(Text midkey) { - HStore.HStoreSize biggest = null; - boolean splitable = true; - for(HStore h: stores.values()) { - HStore.HStoreSize size = h.size(midkey); - // If we came across a reference down in the store, then propagate - // fact that region is not splitable. - if (splitable) { - splitable = size.splitable; - } - if (biggest == null) { - biggest = size; - continue; - } - if(size.getAggregate() > biggest.getAggregate()) { // Largest so far - biggest = size; - } - } - if (biggest != null) { - biggest.setSplitable(splitable); - } - return biggest; - } - /* * Split the HRegion to create two brand-new ones. This also closes * current HRegion. Split should be fast since we don't rewrite store files @@ -573,13 +535,13 @@ * @return two brand-new (and open) HRegions or null if a split is not needed * @throws IOException */ - HRegion[] splitRegion(final RegionUnavailableListener listener) + HRegion[] splitRegion(final RegionUnavailableListener listener, Text midKey) throws IOException { synchronized (splitLock) { - Text midKey = new Text(); - if (closed.get() || !needsSplit(midKey)) { + if (closed.get()) { return null; } + LOG.info("Starting split of region " + regionInfo.getRegionName()); Path splits = new Path(this.regiondir, SPLITDIR); if(!this.fs.exists(splits)) { this.fs.mkdirs(splits); @@ -650,76 +612,6 @@ } /* - * Iterates through all the HStores and finds the one with the largest - * MapFile size. If the size is greater than the (currently hard-coded) - * threshold, returns true indicating that the region should be split. The - * midKey for the largest MapFile is returned through the midKey parameter. - * It is possible for us to rule the region non-splitable even in excess of - * configured size. This happens if region contains a reference file. If - * a reference file, the region can not be split. - * - * Note that there is no need to do locking in this method because it calls - * largestHStore which does the necessary locking. - * - * @param midKey midKey of the largest MapFile - * @return true if the region should be split. midKey is set by this method. - * Check it for a midKey value on return. - */ - boolean needsSplit(Text midKey) { - HStore.HStoreSize biggest = largestHStore(midKey); - if (biggest == null || midKey.getLength() == 0 || - (midKey.equals(getStartKey()) && midKey.equals(getEndKey())) ) { - return false; - } - long triggerSize = this.desiredMaxFileSize + (this.desiredMaxFileSize / 2); - boolean split = (biggest.getAggregate() >= triggerSize); - if (split) { - if (!biggest.isSplitable()) { - LOG.warn("Region " + getRegionName().toString() + - " is NOT splitable though its aggregate size is " + - StringUtils.humanReadableInt(biggest.getAggregate()) + - " and desired size is " + - StringUtils.humanReadableInt(this.desiredMaxFileSize)); - split = false; - } else { - LOG.info("Splitting " + getRegionName().toString() + - " because largest aggregate size is " + - StringUtils.humanReadableInt(biggest.getAggregate()) + - " and desired size is " + - StringUtils.humanReadableInt(this.desiredMaxFileSize)); - } - } - return split; - } - - /** - * Only do a compaction if it is necessary - * - * @return - * @throws IOException - */ - boolean compactIfNeeded() throws IOException { - boolean needsCompaction = false; - for (HStore store: stores.values()) { - if (store.needsCompaction()) { - needsCompaction = true; - if (LOG.isDebugEnabled()) { - LOG.debug(store.toString() + " needs compaction"); - } - break; - } - } - if (!needsCompaction) { - if (LOG.isDebugEnabled()) { - LOG.debug("region " + regionInfo.getRegionName() + - " does not need compaction"); - } - return false; - } - return compactStores(); - } - - /* * @param dir * @return compaction directory for the passed in dir */ @@ -747,24 +639,20 @@ } /** - * Compact all the stores. This should be called periodically to make sure - * the stores are kept manageable. + * Compact all the specified HStore * *

This operation could block for a long time, so don't call it from a * time-sensitive thread. * - * @return Returns TRUE if the compaction has completed. FALSE, if the - * compaction was not carried out, because the HRegion is busy doing - * something else storage-intensive (like flushing the cache). The caller - * should check back later. + * @param store the HStore to compact * * Note that no locking is necessary at this level because compaction only * conflicts with a region split, and that cannot happen because the region * server does them sequentially and not in parallel. */ - boolean compactStores() throws IOException { + void compactStore(HStore store) throws IOException { if (this.closed.get()) { - return false; + return; } try { synchronized (writestate) { @@ -773,28 +661,25 @@ writestate.compacting = true; } else { LOG.info("NOT compacting region " + - this.regionInfo.getRegionName().toString() + ": compacting=" + + this.regionInfo.getRegionName().toString() + " store " + + store.toString() + ": compacting=" + writestate.compacting + ", writesEnabled=" + writestate.writesEnabled + ", writestate.disableCompactions=" + this.writestate.disableCompactions); - return false; + return; } } long startTime = System.currentTimeMillis(); LOG.info("starting compaction on region " + - this.regionInfo.getRegionName().toString()); - boolean status = true; + this.regionInfo.getRegionName().toString() + " store " + + store.toString()); doRegionCompactionPrep(); - for (HStore store : stores.values()) { - if(!store.compact()) { - status = false; - } - } + store.compact(); doRegionCompactionCleanup(); LOG.info("compaction completed on region " + - this.regionInfo.getRegionName().toString() + ". Took " + - StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime)); - return status; + this.regionInfo.getRegionName().toString() + " store " + + store.toString() + ". Took " + + StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime)); } finally { synchronized (writestate) { @@ -818,15 +703,13 @@ *

This method may block for some time, so it should not be called from a * time-sensitive thread. * - * @return true if cache was flushed - * * @throws IOException * @throws DroppedSnapshotException Thrown when replay of hlog is required * because a Snapshot was not properly persisted. */ - boolean flushcache() throws IOException { + void flushcache(HStore store) throws IOException { if (this.closed.get()) { - return false; + return; } synchronized (writestate) { if (!writestate.flushing && writestate.writesEnabled) { @@ -838,17 +721,16 @@ writestate.flushing + ", writesEnabled=" + writestate.writesEnabled); } - return false; + return; } } try { lock.readLock().lock(); // Prevent splits and closes try { - long startTime = -1; - synchronized (updateLock) {// Stop updates while we snapshot the memcaches - startTime = snapshotMemcaches(); + synchronized (updateLock) {// Stop updates while we snapshot the memcache + store.snapshotMemcache(); } - return internalFlushcache(startTime); + internalFlushcache(store); } finally { lock.readLock().unlock(); } @@ -860,32 +742,6 @@ } } - /* - * It is assumed that updates are blocked for the duration of this method - */ - private long snapshotMemcaches() { - if (this.memcacheSize.get() == 0) { - return -1; - } - long startTime = System.currentTimeMillis(); - - if(LOG.isDebugEnabled()) { - LOG.debug("Started memcache flush for region " + - this.regionInfo.getRegionName() + ". Size " + - StringUtils.humanReadableInt(this.memcacheSize.get())); - } - - // We reset the aggregate memcache size here so that subsequent updates - // will add to the unflushed size - - this.memcacheSize.set(0L); - - for (HStore hstore: stores.values()) { - hstore.snapshotMemcache(); - } - return startTime; - } - /** * Flushing the cache is a little tricky. We have a lot of updates in the * HMemcache, all of which have also been written to the log. We need to @@ -912,18 +768,18 @@ * *

This method may block for some time. * - * @param startTime the time the cache was snapshotted or -1 if a flush is - * not needed + * @param store the HStore whose cache should be flushed * - * @return true if the cache was flushed * * @throws IOException * @throws DroppedSnapshotException Thrown when replay of hlog is required * because a Snapshot was not properly persisted. */ - private boolean internalFlushcache(long startTime) throws IOException { - if (startTime == -1) { - return false; + private void internalFlushcache(HStore store) throws IOException { + long startTime = System.currentTimeMillis(); + if(LOG.isDebugEnabled()) { + LOG.debug("Started memcache flush for region " + + this.regionInfo.getRegionName() + " store " + store.toString()); } // We pass the log to the HMemcache, so we can lock down both @@ -945,13 +801,9 @@ // be part of the current running servers state. try { - // A. Flush memcache to all the HStores. - // Keep running vector of all store files that includes both old and the - // just-made new flush store file. + // A. Flush memcache for specified HStore - for (HStore hstore: stores.values()) { - hstore.flushCache(sequenceId); - } + store.flushCache(sequenceId); } catch (IOException e) { // An exception here means that the snapshot was not persisted. // The hlog needs to be replayed so its content is restored to memcache. @@ -967,8 +819,8 @@ // This tells future readers that the HStores were emitted correctly, // and that all updates to the log for this regionName that have lower // log-sequence-ids can be safely ignored. - this.log.completeCacheFlush(this.regionInfo.getRegionName(), - regionInfo.getTableDesc().getName(), sequenceId); + this.log.completeCacheFlush(store.storeName, getTableDesc().getName(), + sequenceId); // D. Finally notify anyone waiting on memcache to clear: // e.g. checkResources(). @@ -976,12 +828,10 @@ notifyAll(); } if (LOG.isDebugEnabled()) { - LOG.debug("Finished memcache flush for region " + - this.regionInfo.getRegionName() + " in " + - (System.currentTimeMillis() - startTime) + "ms, sequenceid=" + - sequenceId); + LOG.debug("Finished memcache flush for store " + store.storeName + + " in " + (System.currentTimeMillis() - startTime) + + "ms, sequenceid=" + sequenceId); } - return true; } ////////////////////////////////////////////////////////////////////////////// @@ -1160,7 +1010,7 @@ throws IOException { List keys = null; - Text colFamily = HStoreKey.extractFamily(origin.getColumn()); + Text colFamily = origin.getFamily(); HStore targetStore = stores.get(colFamily); if (targetStore != null) { // Pass versions without modification since in the store getKeys, it @@ -1223,14 +1073,7 @@ * @param b * @throws IOException */ - public void batchUpdate(long timestamp, BatchUpdate b) - throws IOException { - // Do a rough check that we have resources to accept a write. The check is - // 'rough' in that between the resource check and the call to obtain a - // read lock, resources may run out. For now, the thought is that this - // will be extremely rare; we'll deal with it when it happens. - checkResources(); - + public void batchUpdate(long timestamp, BatchUpdate b) throws IOException { // We obtain a per-row lock, so other clients will block while one client // performs an update. The read lock is released by the client calling // #commit or #abort or if the HRegionServer lease on the lock expires. @@ -1289,42 +1132,6 @@ } } - /* - * Check if resources to support an update. - * - * For now, just checks memcache saturation. - * - * Here we synchronize on HRegion, a broad scoped lock. Its appropriate - * given we're figuring in here whether this region is able to take on - * writes. This is only method with a synchronize (at time of writing), - * this and the synchronize on 'this' inside in internalFlushCache to send - * the notify. - */ - private synchronized void checkResources() { - boolean blocked = false; - - while (this.memcacheSize.get() >= this.blockingMemcacheSize) { - if (!blocked) { - LOG.info("Blocking updates for '" + Thread.currentThread().getName() + - "': Memcache size " + - StringUtils.humanReadableInt(this.memcacheSize.get()) + - " is >= than blocking " + - StringUtils.humanReadableInt(this.blockingMemcacheSize) + " size"); - } - - blocked = true; - try { - wait(threadWakeFrequency); - } catch (InterruptedException e) { - // continue; - } - } - if (blocked) { - LOG.info("Unblocking updates for region " + getRegionName() + " '" + - Thread.currentThread().getName() + "'"); - } - } - /** * Delete all cells of the same age as the passed timestamp or older. * @param row @@ -1454,8 +1261,6 @@ /* * Add updates first to the hlog and then add values to memcache. * Warning: Assumption is caller has lock on passed in row. - * @param row Row to update. - * @param timestamp Timestamp to record the updates against * @param updatesByColumn Cell updates by column * @throws IOException */ @@ -1466,21 +1271,13 @@ return; } synchronized (updateLock) { // prevent a cache flush - this.log.append(regionInfo.getRegionName(), - regionInfo.getTableDesc().getName(), updatesByColumn); - - long size = 0; for (Map.Entry e: updatesByColumn.entrySet()) { HStoreKey key = e.getKey(); byte[] val = e.getValue(); - size = this.memcacheSize.addAndGet(key.getSize() + - (val == null ? 0 : val.length)); - stores.get(HStoreKey.extractFamily(key.getColumn())).add(key, val); + HStore store = stores.get(key.getFamily()); + this.log.append(store.storeName, getTableDesc().getName(), key, val); + store.add(key, val); } - if (this.flushListener != null && size > this.memcacheFlushSize) { - // Request a cache flush - this.flushListener.flushRequested(this); - } } } @@ -1607,6 +1404,13 @@ return regionInfo.getRegionName().toString(); } + /** + * @return Immutable list of this region's HStores. + */ + public Collection getStores() { + return Collections.unmodifiableCollection(this.stores.values()); + } + private Path getBaseDir() { return this.basedir; } @@ -1767,14 +1571,14 @@ } } finally { synchronized (activeScannerCount) { - int count = activeScannerCount.decrementAndGet(); - if (count < 0) { - LOG.error("active scanner count less than zero: " + count + + int scanners = activeScannerCount.decrementAndGet(); + if (scanners < 0) { + LOG.error("active scanner count less than zero: " + scanners + " resetting to zero"); activeScannerCount.set(0); - count = 0; + scanners = 0; } - if (count == 0) { + if (scanners == 0) { activeScannerCount.notifyAll(); } } @@ -1827,7 +1631,6 @@ * @see {@link #removeRegionFromMETA(HRegion, HRegion)} */ static void addRegionToMETA(HRegion meta, HRegion r) throws IOException { - meta.checkResources(); // The row key is the region name Text row = r.getRegionName(); meta.obtainRowLock(row); Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java (revision 617638) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java (working copy) @@ -21,10 +21,14 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import java.io.*; -import java.util.*; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.ConcurrentModificationException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Delayed; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.TimeUnit; + /** * Leases * @@ -39,19 +43,19 @@ * An instance of the Leases class will create a thread to do its dirty work. * You should close() the instance if you want to clean up the thread properly. */ -public class Leases { - protected static final Log LOG = LogFactory.getLog(Leases.class.getName()); +public class Leases extends Thread { + private static final Log LOG = LogFactory.getLog(Leases.class.getName()); + private final int leasePeriod; + private final int leaseCheckFrequency; + private volatile DelayQueue leaseQueue = new DelayQueue(); - protected final int leasePeriod; - protected final int leaseCheckFrequency; - private final Thread leaseMonitorThread; - protected final Map leases = - new HashMap(); - protected final TreeSet sortedLeases = new TreeSet(); - protected AtomicBoolean stop = new AtomicBoolean(false); + protected final Map leases = new HashMap(); + protected final Map listeners = + new HashMap(); + private volatile boolean stopRequested = false; /** - * Creates a lease + * Creates a lease monitor * * @param leasePeriod - length of time (milliseconds) that the lease is valid * @param leaseCheckFrequency - how often the lease should be checked @@ -60,22 +64,40 @@ public Leases(final int leasePeriod, final int leaseCheckFrequency) { this.leasePeriod = leasePeriod; this.leaseCheckFrequency = leaseCheckFrequency; - this.leaseMonitorThread = - new LeaseMonitor(this.leaseCheckFrequency, this.stop); - this.leaseMonitorThread.setDaemon(true); } - /** Starts the lease monitor */ - public void start() { - leaseMonitorThread.start(); + /** {@inheritDoc} */ + @Override + public void run() { + while (!stopRequested || (stopRequested && leaseQueue.size() > 0) ) { + Lease lease = null; + try { + lease = leaseQueue.poll(leaseCheckFrequency, TimeUnit.MILLISECONDS); + + } catch (InterruptedException e) { + continue; + + } catch (ConcurrentModificationException e) { + continue; + } + if (lease == null) { + continue; + } + // A lease expired + LeaseListener listener = null; + synchronized (leaseQueue) { + String leaseName = lease.getLeaseName(); + leases.remove(leaseName); + listener = listeners.remove(leaseName); + if (listener == null) { + LOG.error("lease listener is null for lease " + leaseName); + continue; + } + } + listener.leaseExpired(); + } + close(); } - - /** - * @param name Set name on the lease checking daemon thread. - */ - public void setName(final String name) { - this.leaseMonitorThread.setName(name); - } /** * Shuts down this lease instance when all outstanding leases expire. @@ -85,20 +107,7 @@ * allocation of new leases. */ public void closeAfterLeasesExpire() { - synchronized(this.leases) { - while (this.leases.size() > 0) { - LOG.info(Thread.currentThread().getName() + " " + - Integer.toString(leases.size()) + " lease(s) " + - "outstanding. Waiting for them to expire."); - try { - this.leases.wait(this.leaseCheckFrequency); - } catch (InterruptedException e) { - // continue - } - } - } - // Now call close since no leases outstanding. - close(); + this.stopRequested = true; } /** @@ -107,271 +116,124 @@ */ public void close() { LOG.info(Thread.currentThread().getName() + " closing leases"); - this.stop.set(true); - while (this.leaseMonitorThread.isAlive()) { - try { - this.leaseMonitorThread.interrupt(); - this.leaseMonitorThread.join(); - } catch (InterruptedException iex) { - // Ignore - } + this.stopRequested = true; + synchronized (leaseQueue) { + leaseQueue.clear(); + leases.clear(); + listeners.clear(); + leaseQueue.notifyAll(); } - synchronized(leases) { - synchronized(sortedLeases) { - leases.clear(); - sortedLeases.clear(); - } - } LOG.info(Thread.currentThread().getName() + " closed leases"); } - /* A client obtains a lease... */ - /** * Obtain a lease * - * @param holderId id of lease holder - * @param resourceId id of resource being leased + * @param leaseName name of the lease * @param listener listener that will process lease expirations */ - public void createLease(final long holderId, final long resourceId, - final LeaseListener listener) { - LeaseName name = null; - synchronized(leases) { - synchronized(sortedLeases) { - Lease lease = new Lease(holderId, resourceId, listener); - name = lease.getLeaseName(); - if(leases.get(name) != null) { - throw new AssertionError("Impossible state for createLease(): " + - "Lease " + name + " is still held."); - } - leases.put(name, lease); - sortedLeases.add(lease); + public void createLease(String leaseName, final LeaseListener listener) { + if (stopRequested) { + return; + } + Lease lease = new Lease(leaseName, System.currentTimeMillis() + leasePeriod); + synchronized (leaseQueue) { + if (leases.containsKey(leaseName)) { + throw new IllegalStateException("lease '" + leaseName + + "' already exists"); } + leases.put(leaseName, lease); + listeners.put(leaseName, listener); + leaseQueue.add(lease); } -// if (LOG.isDebugEnabled()) { -// LOG.debug("Created lease " + name); -// } } - /* A client renews a lease... */ /** * Renew a lease * - * @param holderId id of lease holder - * @param resourceId id of resource being leased - * @throws IOException + * @param leaseName name of lease */ - public void renewLease(final long holderId, final long resourceId) - throws IOException { - LeaseName name = null; - synchronized(leases) { - synchronized(sortedLeases) { - name = createLeaseName(holderId, resourceId); - Lease lease = leases.get(name); - if (lease == null) { - // It's possible that someone tries to renew the lease, but - // it just expired a moment ago. So fail. - throw new IOException("Cannot renew lease that is not held: " + - name); - } - sortedLeases.remove(lease); - lease.renew(); - sortedLeases.add(lease); + public void renewLease(final String leaseName) { + synchronized (leaseQueue) { + Lease lease = leases.get(leaseName); + if (lease == null) { + throw new IllegalArgumentException("lease '" + leaseName + + "' does not exist"); } + leaseQueue.remove(lease); + lease.setExpirationTime(System.currentTimeMillis() + leasePeriod); + leaseQueue.add(lease); } -// if (LOG.isDebugEnabled()) { -// LOG.debug("Renewed lease " + name); -// } } /** * Client explicitly cancels a lease. * - * @param holderId id of lease holder - * @param resourceId id of resource being leased + * @param leaseName name of lease */ - public void cancelLease(final long holderId, final long resourceId) { - LeaseName name = null; - synchronized(leases) { - synchronized(sortedLeases) { - name = createLeaseName(holderId, resourceId); - Lease lease = leases.get(name); - if (lease == null) { - // It's possible that someone tries to renew the lease, but - // it just expired a moment ago. So just skip it. - return; - } - sortedLeases.remove(lease); - leases.remove(name); + public void cancelLease(final String leaseName) { + synchronized (leaseQueue) { + Lease lease = leases.remove(leaseName); + if (lease == null) { + throw new IllegalArgumentException("lease '" + leaseName + + "' does not exist"); } + leaseQueue.remove(lease); + listeners.remove(leaseName); } } - /** - * LeaseMonitor is a thread that expires Leases that go on too long. - * Its a daemon thread. - */ - class LeaseMonitor extends Chore { - /** - * @param p - * @param s - */ - public LeaseMonitor(int p, AtomicBoolean s) { - super(p, s); + /** This class tracks a single Lease. */ + private static class Lease implements Delayed { + private final String leaseName; + private long expirationTime; + + Lease(final String leaseName, long expirationTime) { + this.leaseName = leaseName; + this.expirationTime = expirationTime; } - /** {@inheritDoc} */ - @Override - protected void chore() { - synchronized(leases) { - synchronized(sortedLeases) { - Lease top; - while((sortedLeases.size() > 0) - && ((top = sortedLeases.first()) != null)) { - if(top.shouldExpire()) { - leases.remove(top.getLeaseName()); - sortedLeases.remove(top); - top.expired(); - } else { - break; - } - } - } - } + /** @return the lease name */ + public String getLeaseName() { + return leaseName; } - } - - /* - * A Lease name. - * More lightweight than String or Text. - */ - @SuppressWarnings("unchecked") - class LeaseName implements Comparable { - private final long holderId; - private final long resourceId; - - LeaseName(final long hid, final long rid) { - this.holderId = hid; - this.resourceId = rid; - } - + /** {@inheritDoc} */ @Override public boolean equals(Object obj) { - LeaseName other = (LeaseName)obj; - return this.holderId == other.holderId && - this.resourceId == other.resourceId; + return this.hashCode() == ((Lease) obj).hashCode(); } /** {@inheritDoc} */ @Override public int hashCode() { - // Copy OR'ing from javadoc for Long#hashCode. - int result = (int)(this.holderId ^ (this.holderId >>> 32)); - result ^= (int)(this.resourceId ^ (this.resourceId >>> 32)); - return result; + return this.leaseName.hashCode(); } - + /** {@inheritDoc} */ - @Override - public String toString() { - return Long.toString(this.holderId) + "/" + - Long.toString(this.resourceId); + public long getDelay(TimeUnit unit) { + return unit.convert(this.expirationTime - System.currentTimeMillis(), + TimeUnit.MILLISECONDS); } /** {@inheritDoc} */ - public int compareTo(Object obj) { - LeaseName other = (LeaseName)obj; - if (this.holderId < other.holderId) { - return -1; - } - if (this.holderId > other.holderId) { - return 1; - } - // holderIds are equal - if (this.resourceId < other.resourceId) { - return -1; - } - if (this.resourceId > other.resourceId) { - return 1; - } - // Objects are equal - return 0; - } - } - - /** Create a lease id out of the holder and resource ids. */ - protected LeaseName createLeaseName(final long hid, final long rid) { - return new LeaseName(hid, rid); - } + public int compareTo(Delayed o) { + long delta = this.getDelay(TimeUnit.MILLISECONDS) - + o.getDelay(TimeUnit.MILLISECONDS); - /** This class tracks a single Lease. */ - @SuppressWarnings("unchecked") - private class Lease implements Comparable { - final long holderId; - final long resourceId; - final LeaseListener listener; - long lastUpdate; - private LeaseName leaseId; + int value = 0; + if (delta > 0) { + value = 1; - Lease(final long holderId, final long resourceId, - final LeaseListener listener) { - this.holderId = holderId; - this.resourceId = resourceId; - this.listener = listener; - renew(); - } - - synchronized LeaseName getLeaseName() { - if (this.leaseId == null) { - this.leaseId = createLeaseName(holderId, resourceId); + } else if (delta < 0) { + value = -1; } - return this.leaseId; + return value; } - - boolean shouldExpire() { - return (System.currentTimeMillis() - lastUpdate > leasePeriod); - } - - void renew() { - this.lastUpdate = System.currentTimeMillis(); - } - - void expired() { - LOG.info(Thread.currentThread().getName() + " lease expired " + - getLeaseName()); - listener.leaseExpired(); - } - - /** {@inheritDoc} */ - @Override - public boolean equals(Object obj) { - return compareTo(obj) == 0; - } - - /** {@inheritDoc} */ - @Override - public int hashCode() { - int result = this.getLeaseName().hashCode(); - result ^= this.lastUpdate; - return result; - } - - ////////////////////////////////////////////////////////////////////////////// - // Comparable - ////////////////////////////////////////////////////////////////////////////// - /** {@inheritDoc} */ - public int compareTo(Object o) { - Lease other = (Lease) o; - if(this.lastUpdate < other.lastUpdate) { - return -1; - } else if(this.lastUpdate > other.lastUpdate) { - return 1; - } else { - return this.getLeaseName().compareTo(other.getLeaseName()); - } + /** @param expirationTime the expirationTime to set */ + public void setExpirationTime(long expirationTime) { + this.expirationTime = expirationTime; } } } \ No newline at end of file Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (revision 617638) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (working copy) @@ -38,8 +38,6 @@ import java.util.TreeMap; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Delayed; -import java.util.concurrent.DelayQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -71,7 +69,8 @@ * HRegionServer makes a set of HRegions available to clients. It checks in with * the HMaster. There are many HRegionServers in a single HBase deployment. */ -public class HRegionServer implements HConstants, HRegionInterface, Runnable { +public class HRegionServer implements HConstants, HRegionInterface, Runnable, +HStoreListener { static final Log LOG = LogFactory.getLog(HRegionServer.class); // Set when a report to the master comes back with a message asking us to @@ -140,7 +139,7 @@ * is registered as a shutdown hook in the HRegionServer constructor and is * only called when the HRegionServer receives a kill signal. */ - class ShutdownThread extends Thread { + static class ShutdownThread extends Thread { private final HRegionServer instance; /** @@ -163,61 +162,6 @@ } - /** Queue entry passed to flusher, compactor and splitter threads */ - class QueueEntry implements Delayed { - private final HRegion region; - private long expirationTime; - - QueueEntry(HRegion region, long expirationTime) { - this.region = region; - this.expirationTime = expirationTime; - } - - /** {@inheritDoc} */ - @Override - public boolean equals(Object o) { - QueueEntry other = (QueueEntry) o; - return this.hashCode() == other.hashCode(); - } - - /** {@inheritDoc} */ - @Override - public int hashCode() { - return this.region.getRegionInfo().hashCode(); - } - - /** {@inheritDoc} */ - public long getDelay(TimeUnit unit) { - return unit.convert(this.expirationTime - System.currentTimeMillis(), - TimeUnit.MILLISECONDS); - } - - /** {@inheritDoc} */ - public int compareTo(Delayed o) { - long delta = this.getDelay(TimeUnit.MILLISECONDS) - - o.getDelay(TimeUnit.MILLISECONDS); - - int value = 0; - if (delta > 0) { - value = 1; - - } else if (delta < 0) { - value = -1; - } - return value; - } - - /** @return the region */ - public HRegion getRegion() { - return region; - } - - /** @param expirationTime the expirationTime to set */ - public void setExpirationTime(long expirationTime) { - this.expirationTime = expirationTime; - } - } - // Check to see if regions should be split final Splitter splitter; // Needed at shutdown. On way out, if can get this lock then we are not in @@ -227,9 +171,44 @@ /** Split regions on request */ class Splitter extends Thread implements RegionUnavailableListener { - private final BlockingQueue splitQueue = - new LinkedBlockingQueue(); + /** Queue entry for splitter thread */ + private class SplitQueueEntry { + private final HRegion region; + private final Text midkey; + SplitQueueEntry(HRegion region, Text midkey) { + this.region = region; + this.midkey = midkey; + } + + /** {@inheritDoc} */ + @Override + public boolean equals(Object o) { + SplitQueueEntry other = (SplitQueueEntry) o; + return this.hashCode() == other.hashCode(); + } + + /** {@inheritDoc} */ + @Override + public int hashCode() { + return this.region.getRegionInfo().hashCode(); + } + + /** @return the region */ + public HRegion getRegion() { + return region; + } + + /** @return the midkey */ + public Text getMidKey() { + return midkey; + } + } + + private final BlockingQueue splitQueue = + new LinkedBlockingQueue(); + + private volatile SplitQueueEntry splitInProgress = null; private HTable root = null; private HTable meta = null; private long startTime; @@ -241,7 +220,6 @@ /** {@inheritDoc} */ public void closing(final Text regionName) { - startTime = System.currentTimeMillis(); lock.writeLock().lock(); try { // Remove region from regions Map and add it to the Map of retiring @@ -258,6 +236,7 @@ /** {@inheritDoc} */ public void closed(final Text regionName) { + startTime = System.currentTimeMillis(); lock.writeLock().lock(); try { retiringRegions.remove(regionName); @@ -275,14 +254,17 @@ @Override public void run() { while (!stopRequested.get()) { - QueueEntry e = null; + SplitQueueEntry e = null; try { e = splitQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); if (e == null) { continue; } + synchronized (splitQueue) { + splitInProgress = e; + } synchronized (splitterLock) { // Don't interrupt us while we're working - split(e.getRegion()); + split(e); } } catch (InterruptedException ex) { continue; @@ -301,21 +283,37 @@ if (!checkFileSystem()) { break; } + } finally { + synchronized (splitQueue) { + splitInProgress = null; + } } } LOG.info(getName() + " exiting"); } /** - * @param e entry indicating which region needs to be split + * @param r region to be split + * @param midkey the middle key of the region + * @return true if a split was queued */ - public void splitRequested(QueueEntry e) { - splitQueue.add(e); + public boolean splitRequested(HRegion r, Text midkey) { + SplitQueueEntry e = new SplitQueueEntry(r, midkey); + synchronized (splitQueue) { + if (splitInProgress != null && splitInProgress.equals(e)) { + return false; + } + if (splitQueue.contains(e)) { + return false; + } + splitQueue.add(e); + } + return true; } - private void split(final HRegion region) throws IOException { - final HRegionInfo oldRegionInfo = region.getRegionInfo(); - final HRegion[] newRegions = region.splitRegion(this); + private void split(final SplitQueueEntry e) throws IOException { + final HRegionInfo oldRegionInfo = e.getRegion().getRegionInfo(); + final HRegion[] newRegions = e.getRegion().splitRegion(this, e.getMidKey()); if (newRegions == null) { return; // Didn't need to be split @@ -325,7 +323,7 @@ // splitting a 'normal' region, and the ROOT table needs to be // updated if we are splitting a META region. HTable t = null; - if (region.getRegionInfo().isMetaTable()) { + if (e.getRegion().getRegionInfo().isMetaTable()) { // We need to update the root region if (this.root == null) { this.root = new HTable(conf, ROOT_TABLE_NAME); @@ -376,6 +374,40 @@ } } + /** Queue entry passed to flusher and compactor threads */ + static class QueueEntry { + private final HStore store; + private final HRegion region; + + QueueEntry(HStore store, HRegion region) { + this.store = store; + this.region = region; + } + + /** {@inheritDoc} */ + @Override + public boolean equals(Object o) { + QueueEntry other = (QueueEntry) o; + return this.hashCode() == other.hashCode(); + } + + /** {@inheritDoc} */ + @Override + public int hashCode() { + return this.store.getFamily().hashCode(); + } + + /** @return the store */ + public HStore getStore() { + return store; + } + + /** @return the region */ + public HRegion getRegion() { + return region; + } + } + // Compactions final Compactor compactor; // Needed during shutdown so we send an interrupt after completion of a @@ -386,6 +418,8 @@ class Compactor extends Thread { private final BlockingQueue compactionQueue = new LinkedBlockingQueue(); + + private volatile QueueEntry compactionInProgress = null; /** constructor */ public Compactor() { @@ -402,37 +436,65 @@ if (e == null) { continue; } - if (e.getRegion().compactIfNeeded()) { - splitter.splitRequested(e); + synchronized (compactionQueue) { + compactionInProgress = e; } + synchronized (compactionLock) { + // Don't interrupt us while we're working + e.getRegion().compactStore(e.getStore()); + } } catch (InterruptedException ex) { continue; } catch (IOException ex) { - LOG.error("Compaction failed" + - (e != null ? (" for region " + e.getRegion().getRegionName()) : ""), - RemoteExceptionHandler.checkIOException(ex)); + ex = RemoteExceptionHandler.checkIOException(ex); + if (e != null) { + LOG.error("Compaction failed for region " + + e.getRegion().getRegionName() + " hstore " + + e.getStore().toString(), ex); + } else { + LOG.error("Compaction failed", ex); + } if (!checkFileSystem()) { break; } - } catch (Exception ex) { - LOG.error("Compaction failed" + - (e != null ? (" for region " + e.getRegion().getRegionName()) : ""), - ex); + if (e != null) { + LOG.error("Compaction failed for region for region " + + e.getRegion().getRegionName() + " hstore " + + e.getStore().toString(), ex); + } else { + LOG.error("Compaction failed", ex); + } if (!checkFileSystem()) { break; } + } finally { + synchronized (compactionQueue) { + compactionInProgress = null; + } } } LOG.info(getName() + " exiting"); } /** - * @param e QueueEntry for region to be compacted + * @param store HStore to compact + * @param region HRegion that HStore belongs to. + * @return true if a compaction was queued */ - public void compactionRequested(QueueEntry e) { - compactionQueue.add(e); + public boolean compactionRequested(HStore store, HRegion region) { + QueueEntry e = new QueueEntry(store, region); + synchronized (compactionQueue) { + if (compactionInProgress != null && compactionInProgress.equals(e)) { + return false; + } + if (compactionQueue.contains(e)) { + return false; + } + compactionQueue.add(e); + } + return true; } } @@ -443,18 +505,15 @@ final Integer cacheFlusherLock = new Integer(0); /** Flush cache upon request */ - class Flusher extends Thread implements CacheFlushListener { - private final DelayQueue flushQueue = - new DelayQueue(); + class Flusher extends Thread { + private final BlockingQueue flushQueue = + new LinkedBlockingQueue(); + + private volatile QueueEntry flushInProgress = null; - private final long optionalFlushPeriod; - /** constructor */ public Flusher() { super(); - this.optionalFlushPeriod = conf.getLong( - "hbase.regionserver.optionalcacheflushinterval", 30 * 60 * 1000L); - } /** {@inheritDoc} */ @@ -467,38 +526,13 @@ if (e == null) { continue; } - synchronized(cacheFlusherLock) { // Don't interrupt while we're working - if (e.getRegion().flushcache()) { - compactor.compactionRequested(e); - } - - e.setExpirationTime(System.currentTimeMillis() + - optionalFlushPeriod); - flushQueue.add(e); + synchronized (flushQueue) { + flushInProgress = e; } - - // Now insure that all the active regions are in the queue - - Set regions = getRegionsToCheck(); - for (HRegion r: regions) { - e = new QueueEntry(r, r.getLastFlushTime() + optionalFlushPeriod); - synchronized (flushQueue) { - if (!flushQueue.contains(e)) { - flushQueue.add(e); - } - } + synchronized (cacheFlusherLock) { // Don't interrupt while we're working + e.getRegion().flushcache(e.getStore()); } - // Now make sure that the queue only contains active regions - - synchronized (flushQueue) { - for (Iterator i = flushQueue.iterator(); i.hasNext(); ) { - e = i.next(); - if (!regions.contains(e.getRegion())) { - i.remove(); - } - } - } } catch (InterruptedException ex) { continue; @@ -517,35 +551,59 @@ HRegionServer.this.stop(); } catch (IOException ex) { - LOG.error("Cache flush failed" + - (e != null ? (" for region " + e.getRegion().getRegionName()) : ""), - RemoteExceptionHandler.checkIOException(ex)); + ex = RemoteExceptionHandler.checkIOException(ex); + if (e != null) { + LOG.error("Cache flush failed for region " + + e.getRegion().getRegionName() + " hstore " + + e.getStore().toString(), ex); + } else { + LOG.error("Cache flush failed", ex); + } if (!checkFileSystem()) { break; } } catch (Exception ex) { - LOG.error("Cache flush failed" + - (e != null ? (" for region " + e.getRegion().getRegionName()) : ""), - ex); + if (e != null) { + LOG.error("Cache flush failed for region " + + e.getRegion().getRegionName() + " hstore " + + e.getStore().toString(), ex); + } else { + LOG.error("Cache flush failed", ex); + } if (!checkFileSystem()) { break; } + } finally { + synchronized (flushQueue) { + flushInProgress = null; + } } } flushQueue.clear(); LOG.info(getName() + " exiting"); } - - /** {@inheritDoc} */ - public void flushRequested(HRegion region) { - QueueEntry e = new QueueEntry(region, System.currentTimeMillis()); + + /** + * Request a cache flush for the specified HStore + * + * @param store HStore to flush + * @param region HRegion the store belongs to + * @return true if a flush was queued + */ + public boolean flushRequested(HStore store, HRegion region) { + QueueEntry e = new QueueEntry(store, region); + synchronized (flushQueue) { + if (flushInProgress != null && flushInProgress.equals(e)) { + return false; + } if (flushQueue.contains(e)) { - flushQueue.remove(e); + return false; } flushQueue.add(e); } + return true; } } @@ -697,7 +755,7 @@ // It has been way too long since we last reported to the master. // Commit suicide. LOG.fatal("unable to report to master for " + (now - lastMsg) + - " milliseconds - aborting server"); + " milliseconds - aborting server"); abort(); break; } @@ -716,22 +774,22 @@ HMsg msgs[] = this.hbaseMaster.regionServerReport(serverInfo, outboundArray); lastMsg = System.currentTimeMillis(); - + if (this.quiesced.get() && onlineRegions.size() == 0) { // We've just told the master we're exiting because we aren't // serving any regions. So set the stop bit and exit. LOG.info("Server quiesced and not serving any regions. " + - "Starting shutdown"); + "Starting shutdown"); stopRequested.set(true); continue; } - + // Queue up the HMaster's instruction stream for processing boolean restart = false; for(int i = 0; i < msgs.length && !stopRequested.get() && - !restart; i++) { + !restart; i++) { switch(msgs[i].getMsg()) { - + case HMsg.MSG_CALL_SERVER_STARTUP: LOG.info("Got call server startup message"); // We the MSG_CALL_SERVER_STARTUP on startup but we can also @@ -762,7 +820,7 @@ restart = true; } else { LOG.fatal("file system available check failed. " + - "Shutting down server."); + "Shutting down server."); } break; @@ -770,7 +828,7 @@ LOG.info("Got regionserver stop message"); stopRequested.set(true); break; - + case HMsg.MSG_REGIONSERVER_QUIESCE: if (!quiesceRequested) { LOG.info("Got quiesce server message"); @@ -778,7 +836,7 @@ toDo.put(new ToDoEntry(msgs[i])); } catch (InterruptedException e) { throw new RuntimeException("Putting into msgQueue was " + - "interrupted.", e); + "interrupted.", e); } quiesceRequested = true; } @@ -790,7 +848,7 @@ toDo.put(new ToDoEntry(msgs[i])); } catch (InterruptedException e) { throw new RuntimeException("Putting into msgQueue was " + - "interrupted.", e); + "interrupted.", e); } if (msgs[i].getMsg() == HMsg.MSG_REGION_OPEN) { outboundMsgs.add(new HMsg(HMsg.MSG_REPORT_PROCESS_OPEN, @@ -869,7 +927,7 @@ closeAllRegions(); // Don't leave any open file handles } LOG.info("aborting server at: " + - serverInfo.getServerAddress().toString()); + serverInfo.getServerAddress().toString()); } else { ArrayList closedRegions = closeAllRegions(); try { @@ -896,13 +954,13 @@ RemoteExceptionHandler.checkIOException(e)); } LOG.info("stopping server at: " + - serverInfo.getServerAddress().toString()); + serverInfo.getServerAddress().toString()); } join(); LOG.info(Thread.currentThread().getName() + " exiting"); } - + /* * Run init. Sets up hlog and starts up all server threads. * @param c Extra configuration. @@ -976,10 +1034,7 @@ handler); Threads.setDaemonThreadRunning(this.splitter, n + ".splitter", handler); Threads.setDaemonThreadRunning(this.workerThread, n + ".worker", handler); - // Leases is not a Thread. Internally it runs a daemon thread. If it gets - // an unhandled exception, it will just exit. - this.leases.setName(n + ".leaseChecker"); - this.leases.start(); + Threads.setDaemonThreadRunning(this.leases, n + ".leaseChecker", handler); // Put up info server. int port = this.conf.getInt("hbase.regionserver.info.port", 60030); if (port >= 0) { @@ -1053,6 +1108,21 @@ } } + /** {@inheritDoc} */ + public boolean flushRequested(HStore store, HRegion region) { + return cacheFlusher.flushRequested(store, region); + } + + /** {@inheritDoc} */ + public boolean compactionRequested(HStore store, HRegion region) { + return compactor.compactionRequested(store, region); + } + + /** {@inheritDoc} */ + public boolean splitRequested(HRegion region, Text midkey) { + return splitter.splitRequested(region, midkey); + } + /* * Let the master know we're here * Run initialization using parameters passed us by the master. @@ -1217,9 +1287,9 @@ HTableDescriptor.getTableDir(rootDir, regionInfo.getTableDesc().getName() ), - this.log, this.fs, conf, regionInfo, null, this.cacheFlusher + this.log, this.fs, conf, regionInfo, null, this ); - + } catch (IOException e) { LOG.error("error opening region " + regionInfo.getRegionName(), e); @@ -1456,7 +1526,7 @@ if (s == null) { throw new UnknownScannerException("Name: " + scannerName); } - this.leases.renewLease(scannerId, scannerId); + this.leases.renewLease(scannerName); // Collect values to be returned here HbaseMapWritable values = new HbaseMapWritable(); @@ -1518,8 +1588,7 @@ synchronized(scanners) { scanners.put(scannerName, s); } - this.leases. - createLease(scannerId, scannerId, new ScannerListener(scannerName)); + this.leases.createLease(scannerName, new ScannerListener(scannerName)); return scannerId; } catch (IOException e) { LOG.error("Error opening scanner (fsOk: " + this.fsOk + ")", @@ -1543,7 +1612,7 @@ throw new UnknownScannerException(scannerName); } s.close(); - this.leases.cancelLease(scannerId, scannerId); + this.leases.cancelLease(scannerName); } catch (IOException e) { checkFileSystem(); throw e; @@ -1627,11 +1696,6 @@ return this.requestCount; } - /** @return reference to CacheFlushListener */ - public CacheFlushListener getCacheFlushListener() { - return this.cacheFlusher; - } - /** * Protected utility method for safely obtaining an HRegion handle. * @param regionName Name of online {@link HRegion} to return