Index: src/contrib/hbase/conf/hbase-default.xml =================================================================== --- src/contrib/hbase/conf/hbase-default.xml (revision 597481) +++ src/contrib/hbase/conf/hbase-default.xml (working copy) @@ -144,6 +144,14 @@ + hbase.regionserver.optionalcacheflushinterval + 60000 + + Amount of time to wait since the last time a region was flushed before + invoking an optional cache flush. Default 60,000. + + + hbase.hregion.memcache.flush.size 16777216 Index: src/contrib/hbase/src/test/hbase-site.xml =================================================================== --- src/contrib/hbase/src/test/hbase-site.xml (revision 597481) +++ src/contrib/hbase/src/test/hbase-site.xml (working copy) @@ -104,7 +104,16 @@ + hbase.regionserver.optionalcacheflushinterval + 10000 + + Amount of time to wait since the last time a region was flushed before + invoking an optional cache flush. Default 60,000. + + + hbase.rootdir /hbase - location of HBase instance in dfs + location of HBase instance in dfs + Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java (revision 597481) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java (working copy) @@ -123,8 +123,8 @@ FileSystem fs = dir.getFileSystem(c); fs.mkdirs(regionDir); return new HRegion(dir, - new HLog(fs, new Path(regionDir, HConstants.HREGION_LOGDIR_NAME), conf), - fs, conf, info, null); + new HLog(fs, new Path(regionDir, HConstants.HREGION_LOGDIR_NAME), conf, + null), fs, conf, info, null, null); } protected HTableDescriptor createTableDescriptor(final String name) { @@ -365,7 +365,7 @@ return region.getFull(row); } public void flushcache() throws IOException { - this.region.internalFlushcache(this.region.snapshotMemcaches()); + this.region.flushcache(); } } Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestGet.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestGet.java (revision 597481) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestGet.java (working copy) @@ -93,9 +93,9 @@ HRegionInfo.encodeRegionName(info.getRegionName())); fs.mkdirs(regionDir); - HLog log = new HLog(fs, new Path(regionDir, "log"), conf); + HLog log = new HLog(fs, new Path(regionDir, "log"), conf, null); - HRegion region = new HRegion(dir, log, fs, conf, info, null); + HRegion region = new HRegion(dir, log, fs, conf, info, null, null); HRegionIncommon r = new HRegionIncommon(region); // Write information to the table @@ -135,7 +135,7 @@ region.close(); log.rollWriter(); - region = new HRegion(dir, log, fs, conf, info, null); + region = new HRegion(dir, log, fs, conf, info, null, null); r = new HRegionIncommon(region); // Read it back @@ -164,7 +164,7 @@ region.close(); log.rollWriter(); - region = new HRegion(dir, log, fs, conf, info, null); + region = new HRegion(dir, log, fs, conf, info, null, null); r = new HRegionIncommon(region); // Read it back Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHLog.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHLog.java (revision 597481) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHLog.java (working copy) @@ -45,7 +45,7 @@ final Text tableName = new Text("tablename"); final Text row = new Text("row"); Reader reader = null; - HLog log = new HLog(fs, dir, this.conf); + HLog log = new HLog(fs, dir, this.conf, null); try { // Write columns named 1, 2, 3, etc. and then values of single byte // 1, 2, 3... Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner.java (revision 597481) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner.java (working copy) @@ -144,9 +144,9 @@ HRegionInfo.encodeRegionName(REGION_INFO.getRegionName())); fs.mkdirs(regionDir); - HLog log = new HLog(fs, new Path(regionDir, "log"), conf); + HLog log = new HLog(fs, new Path(regionDir, "log"), conf, null); - r = new HRegion(dir, log, fs, conf, REGION_INFO, null); + r = new HRegion(dir, log, fs, conf, REGION_INFO, null, null); region = new HRegionIncommon(r); // Write information to the meta table @@ -169,7 +169,7 @@ r.close(); log.rollWriter(); - r = new HRegion(dir, log, fs, conf, REGION_INFO, null); + r = new HRegion(dir, log, fs, conf, REGION_INFO, null, null); region = new HRegionIncommon(r); // Verify we can get the data back now that it is on disk. @@ -210,7 +210,7 @@ r.close(); log.rollWriter(); - r = new HRegion(dir, log, fs, conf, REGION_INFO, null); + r = new HRegion(dir, log, fs, conf, REGION_INFO, null, null); region = new HRegionIncommon(r); // Validate again @@ -247,7 +247,7 @@ r.close(); log.rollWriter(); - r = new HRegion(dir, log, fs, conf, REGION_INFO, null); + r = new HRegion(dir, log, fs, conf, REGION_INFO, null, null); region = new HRegionIncommon(r); // Validate again Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (revision 597481) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (working copy) @@ -257,7 +257,7 @@ for (LocalHBaseCluster.RegionServerThread t: this.hbaseCluster.getRegionServers()) { for(HRegion r: t.getRegionServer().onlineRegions.values() ) { - r.internalFlushcache(r.snapshotMemcaches()); + r.flushcache(); } } } Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTimestamp.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTimestamp.java (revision 597481) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTimestamp.java (working copy) @@ -310,11 +310,11 @@ } private HRegion createRegion() throws IOException { - HLog hlog = new HLog(this.localFs, this.testDir, this.conf); + HLog hlog = new HLog(this.localFs, this.testDir, this.conf, null); HTableDescriptor htd = createTableDescriptor(getName()); htd.addFamily(new HColumnDescriptor(COLUMN, VERSIONS, CompressionType.NONE, false, Integer.MAX_VALUE, null)); HRegionInfo hri = new HRegionInfo(htd, null, null); - return new HRegion(testDir, hlog, this.localFs, this.conf, hri, null); + return new HRegion(testDir, hlog, this.localFs, this.conf, hri, null, null); } } \ No newline at end of file Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/MultiRegionTable.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/MultiRegionTable.java (revision 597481) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/MultiRegionTable.java (working copy) @@ -103,9 +103,11 @@ } } - // Flush will provoke a split next time the split-checker thread runs. - r.internalFlushcache(r.snapshotMemcaches()); + // Flush the cache + cluster.getRegionThreads().get(0).getRegionServer().getCacheFlushListener(). + flushRequested(r); + // Now, wait until split makes it into the meta table. int oldCount = count; for (int i = 0; i < retries; i++) { Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java (revision 597481) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java (working copy) @@ -19,10 +19,12 @@ */ package org.apache.hadoop.hbase; +import java.util.ArrayList; +import java.util.List; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.dfs.MiniDFSCluster; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; /** @@ -32,7 +34,8 @@ private static final Log LOG = LogFactory.getLog(TestLogRolling.class); private MiniDFSCluster dfs; private MiniHBaseCluster cluster; - private Path logdir; + private HRegionServer server; + private HLog log; private String tableName; private byte[] value; @@ -45,10 +48,14 @@ try { this.dfs = null; this.cluster = null; - this.logdir = null; + this.server = null; + this.log = null; this.tableName = null; this.value = null; + // Force a region split after every 768KB + conf.setLong("hbase.hregion.max.filesize", 768L * 1024L); + // We roll the log after every 256 writes conf.setInt("hbase.regionserver.maxlogentries", 256); @@ -118,8 +125,8 @@ // continue } - this.logdir = - cluster.getRegionThreads().get(0).getRegionServer().getLog().dir; + this.server = cluster.getRegionThreads().get(0).getRegionServer(); + this.log = server.getLog(); // When the META table can be opened, the region servers are running @SuppressWarnings("unused") @@ -150,21 +157,6 @@ } } - private int countLogFiles(final boolean print) throws Exception { - Path[] logfiles = dfs.getFileSystem().listPaths(new Path[] {this.logdir}); - if (print) { - for (int i = 0; i < logfiles.length; i++) { - if (LOG.isDebugEnabled()) { - LOG.debug("logfile: " + logfiles[i].toString()); - } - } - } - if (LOG.isDebugEnabled()) { - LOG.debug("number of log files: " + logfiles.length); - } - return logfiles.length; - } - /** * Tests that logs are deleted * @@ -172,21 +164,24 @@ */ public void testLogRolling() throws Exception { tableName = getName(); - // Force a region split after every 768KB - conf.setLong("hbase.hregion.max.filesize", 768L * 1024L); try { startAndWriteData(); - int count = countLogFiles(true); - LOG.info("Finished writing. There are " + count + " log files. " + - "Sleeping to let cache flusher and log roller run"); - while (count > 2) { - try { - Thread.sleep(1000L); - } catch (InterruptedException e) { - LOG.info("Sleep interrupted", e); - } - count = countLogFiles(true); + LOG.info("after writing there are " + log.getNumLogFiles() + " log files"); + + // flush all regions + + List regions = + new ArrayList(server.getOnlineRegions().values()); + for (HRegion r: regions) { + r.flushcache(); } + + // Now roll the log + log.rollWriter(); + + int count = log.getNumLogFiles(); + LOG.info("after flushing all regions and rolling logs there are " + + log.getNumLogFiles() + " log files"); assertTrue(count <= 2); } catch (Exception e) { LOG.fatal("unexpected exception", e); Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java (revision 597481) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java (working copy) @@ -105,6 +105,9 @@ // Make lease timeout longer, lease checks less frequent conf.setInt("hbase.master.lease.period", 10 * 1000); conf.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000); + + // Set client pause to the original default + conf.setInt("hbase.client.pause", 10 * 1000); } /** Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java (revision 597481) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java (working copy) @@ -98,12 +98,12 @@ fs.mkdirs(parentdir); newlogdir = new Path(parentdir, "log"); - log = new HLog(fs, newlogdir, conf); + log = new HLog(fs, newlogdir, conf, null); desc = new HTableDescriptor("test"); desc.addFamily(new HColumnDescriptor("contents:")); desc.addFamily(new HColumnDescriptor("anchor:")); r = new HRegion(parentdir, log, fs, conf, - new HRegionInfo(desc, null, null), null); + new HRegionInfo(desc, null, null), null, null); region = new HRegionIncommon(r); } Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java (revision 597481) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java (working copy) @@ -65,11 +65,11 @@ */ public void testBasicSplit() throws Exception { HRegion region = null; - HLog hlog = new HLog(this.localFs, this.testDir, this.conf); + HLog hlog = new HLog(this.localFs, this.testDir, this.conf, null); try { HTableDescriptor htd = createTableDescriptor(getName()); HRegionInfo hri = new HRegionInfo(htd, null, null); - region = new HRegion(testDir, hlog, this.localFs, this.conf, hri, null); + region = new HRegion(testDir, hlog, this.localFs, this.conf, hri, null, null); basicSplit(region); } finally { if (region != null) { @@ -81,7 +81,7 @@ private void basicSplit(final HRegion region) throws Exception { addContent(region, COLFAMILY_NAME3); - region.internalFlushcache(region.snapshotMemcaches()); + region.flushcache(); Text midkey = new Text(); assertTrue(region.needsSplit(midkey)); HRegion [] regions = split(region); @@ -108,12 +108,7 @@ } addContent(regions[i], COLFAMILY_NAME2); addContent(regions[i], COLFAMILY_NAME1); - long startTime = region.snapshotMemcaches(); - if (startTime == -1) { - LOG.info("cache flush not needed"); - } else { - regions[i].internalFlushcache(startTime); - } + regions[i].flushcache(); } // Assert that even if one store file is larger than a reference, the Index: src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompaction.java =================================================================== --- src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompaction.java (revision 597481) +++ src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompaction.java (working copy) @@ -54,10 +54,11 @@ @Override public void setUp() throws Exception { super.setUp(); - this.hlog = new HLog(this.localFs, this.testDir, this.conf); + this.hlog = new HLog(this.localFs, this.testDir, this.conf, null); HTableDescriptor htd = createTableDescriptor(getName()); HRegionInfo hri = new HRegionInfo(htd, null, null); - this.r = new HRegion(testDir, hlog, this.localFs, this.conf, hri, null); + this.r = + new HRegion(testDir, hlog, this.localFs, this.conf, hri, null, null); } /** {@inheritDoc} */ Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java (revision 597481) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java (working copy) @@ -108,7 +108,8 @@ this.dir = new Path(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR)); this.basedir = new Path(dir, "merge_" + System.currentTimeMillis()); fs.mkdirs(basedir); - this.hlog = new HLog(fs, new Path(basedir, HREGION_LOGDIR_NAME), conf); + this.hlog = + new HLog(fs, new Path(basedir, HREGION_LOGDIR_NAME), conf, null); } void process() throws IOException { @@ -150,11 +151,11 @@ for(int i = 0; i < regions.length - 1; i++) { if(currentRegion == null) { currentRegion = - new HRegion(dir, hlog, fs, conf, regions[i], null); + new HRegion(dir, hlog, fs, conf, regions[i], null, null); currentSize = currentRegion.largestHStore(midKey).getAggregate(); } nextRegion = - new HRegion(dir, hlog, fs, conf, regions[i + 1], null); + new HRegion(dir, hlog, fs, conf, regions[i + 1], null, null); nextSize = nextRegion.largestHStore(midKey).getAggregate(); @@ -327,7 +328,7 @@ // Scan root region to find all the meta regions HRegion root = - new HRegion(dir, hlog,fs, conf, HRegionInfo.rootRegionInfo, null); + new HRegion(dir, hlog,fs, conf, HRegionInfo.rootRegionInfo, null, null); HInternalScannerInterface rootScanner = root.getScanner(META_COLS, new Text(), System.currentTimeMillis(), null); @@ -363,7 +364,7 @@ HRegion newRegion) throws IOException { HRegion root = - new HRegion(dir, hlog, fs, conf, HRegionInfo.rootRegionInfo, null); + new HRegion(dir, hlog, fs, conf, HRegionInfo.rootRegionInfo, null, null); Text[] regionsToDelete = { oldRegion1, Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/CacheFlushListener.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/CacheFlushListener.java (revision 0) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/CacheFlushListener.java (revision 0) @@ -0,0 +1,36 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +/** + * Implementors of this interface want to be notified when an HRegion + * determines that a cache flush is needed. A CacheFlushListener (or null) + * must be passed to the HRegion constructor. + */ +public interface CacheFlushListener { + + /** + * Tell the listener the cache needs to be flushed. + * + * @param region the HRegion requesting the cache flush + */ + void flushRequested(HRegion region); +} Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (revision 597481) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (working copy) @@ -27,14 +27,17 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.List; -import java.util.ListIterator; +import java.util.HashSet; +import java.util.Iterator; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import java.util.Vector; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.DelayQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -125,27 +128,79 @@ /** region server process name */ public static final String REGIONSERVER = "regionserver"; + /** Queue entry passed to flusher, compactor and splitter threads */ + class QueueEntry implements Delayed { + private final HRegion region; + private long expirationTime; + + QueueEntry(HRegion region, long expirationTime) { + this.region = region; + this.expirationTime = expirationTime; + } + + /** {@inheritDoc} */ + @Override + public boolean equals(Object o) { + QueueEntry other = (QueueEntry) o; + return this.hashCode() == other.hashCode(); + } + + /** {@inheritDoc} */ + @Override + public int hashCode() { + return this.region.getRegionInfo().hashCode(); + } + + /** {@inheritDoc} */ + public long getDelay(TimeUnit unit) { + return unit.convert(this.expirationTime - System.currentTimeMillis(), + TimeUnit.MILLISECONDS); + } + + /** {@inheritDoc} */ + public int compareTo(Delayed o) { + long delta = this.getDelay(TimeUnit.MILLISECONDS) - + o.getDelay(TimeUnit.MILLISECONDS); + + int value = 0; + if (delta > 0) { + value = 1; + + } else if (delta < 0) { + value = -1; + } + return value; + } + + /** @return the region */ + public HRegion getRegion() { + return region; + } + + /** @param expirationTime the expirationTime to set */ + public void setExpirationTime(long expirationTime) { + this.expirationTime = expirationTime; + } + } + // Check to see if regions should be split - private final Thread splitOrCompactCheckerThread; + final Splitter splitter; // Needed at shutdown. On way out, if can get this lock then we are not in // middle of a split or compaction: i.e. splits/compactions cannot be // interrupted. - protected final Integer splitOrCompactLock = new Integer(0); + final Integer splitterLock = new Integer(0); - /* - * Runs periodically to determine if regions need to be compacted or split - */ - class SplitOrCompactChecker extends Chore - implements RegionUnavailableListener { + /** Split regions on request */ + class Splitter extends Thread implements RegionUnavailableListener { + private final BlockingQueue splitQueue = + new LinkedBlockingQueue(); + private HTable root = null; private HTable meta = null; - /** - * @param stop - */ - public SplitOrCompactChecker(final AtomicBoolean stop) { - super(conf.getInt("hbase.regionserver.thread.splitcompactcheckfrequency", - 30 * 1000), stop); + /** constructor */ + public Splitter() { + super(); } /** {@inheritDoc} */ @@ -178,37 +233,52 @@ } /** - * Scan for splits or compactions to run. Run any we find. + * Perform region splits if necessary */ @Override - protected void chore() { - // Don't interrupt us while we're working - synchronized (splitOrCompactLock) { - checkForSplitsOrCompactions(); - } - } - - private void checkForSplitsOrCompactions() { - // Grab a list of regions to check - List nonClosedRegionsToCheck = getRegionsToCheck(); - for(HRegion cur: nonClosedRegionsToCheck) { + public void run() { + while (!stopRequested.get()) { + QueueEntry e = null; try { - if (cur.compactIfNeeded()) { - // After compaction, it probably needs splitting. May also need - // splitting just because one of the memcache flushes was big. - split(cur); - } + e = splitQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); + + } catch (InterruptedException ex) { + continue; + } + if (e == null) { + continue; + } + synchronized (splitterLock) { // Don't interrupt us while we're working + try { + split(e.getRegion()); + + } catch (IOException ex) { + LOG.error("Split failed for region " + + e.getRegion().getRegionName(), + RemoteExceptionHandler.checkIOException(ex)); + if (!checkFileSystem()) { + break; + } - } catch(IOException e) { - //TODO: What happens if this fails? Are we toast? - LOG.error("Split or compaction failed", e); - if (!checkFileSystem()) { - break; + } catch (Exception ex) { + LOG.error("Split failed on region " + + e.getRegion().getRegionName(), ex); + if (!checkFileSystem()) { + break; + } } } } + LOG.info(getName() + " exiting"); } + /** + * @param e entry indicating which region needs to be split + */ + public void splitRequested(QueueEntry e) { + splitQueue.add(e); + } + private void split(final HRegion region) throws IOException { final HRegionInfo oldRegionInfo = region.getRegionInfo(); final HRegion[] newRegions = region.splitRegion(this); @@ -271,100 +341,232 @@ } } - // Cache flushing - private final Thread cacheFlusherThread; + // Compactions + final Compactor compactor; // Needed during shutdown so we send an interrupt after completion of a - // flush, not in the midst. - protected final Integer cacheFlusherLock = new Integer(0); - - /* Runs periodically to flush memcache. - */ - class Flusher extends Chore { - /** - * @param period - * @param stop - */ - public Flusher(final int period, final AtomicBoolean stop) { - super(period, stop); + // compaction, not in the midst. + final Integer compactionLock = new Integer(0); + + /** Compact region on request */ + class Compactor extends Thread { + private final BlockingQueue compactionQueue = + new LinkedBlockingQueue(); + + /** constructor */ + public Compactor() { + super(); } + /** {@inheritDoc} */ @Override - protected void chore() { - synchronized(cacheFlusherLock) { - checkForFlushesToRun(); - } - } - - private void checkForFlushesToRun() { - // Grab a list of items to flush - List nonClosedRegionsToFlush = getRegionsToCheck(); - // Flush them, if necessary - for(HRegion cur: nonClosedRegionsToFlush) { + public void run() { + while (!stopRequested.get()) { + QueueEntry e = null; try { - cur.flushcache(); - } catch (DroppedSnapshotException e) { - // Cache flush can fail in a few places. If it fails in a critical - // section, we get a DroppedSnapshotException and a replay of hlog - // is required. Currently the only way to do this is a restart of - // the server. - LOG.fatal("Replay of hlog required. Forcing server restart", e); + e = compactionQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); + + } catch (InterruptedException ex) { + continue; + } + if (e == null) { + continue; + } + try { + if (e.getRegion().compactIfNeeded()) { + splitter.splitRequested(e); + } + + } catch (IOException ex) { + LOG.error("Compaction failed for region " + + e.getRegion().getRegionName(), + RemoteExceptionHandler.checkIOException(ex)); if (!checkFileSystem()) { break; } - HRegionServer.this.stop(); - } catch (IOException iex) { - LOG.error("Cache flush failed", - RemoteExceptionHandler.checkIOException(iex)); + + } catch (Exception ex) { + LOG.error("Compaction failed for region " + + e.getRegion().getRegionName(), ex); if (!checkFileSystem()) { break; } } } + LOG.info(getName() + " exiting"); } + + /** + * @param e QueueEntry for region to be compacted + */ + public void compactionRequested(QueueEntry e) { + compactionQueue.add(e); + } } + + // Cache flushing + final Flusher cacheFlusher; + // Needed during shutdown so we send an interrupt after completion of a + // flush, not in the midst. + final Integer cacheFlusherLock = new Integer(0); + + /** Flush cache upon request */ + class Flusher extends Thread implements CacheFlushListener { + private final DelayQueue flushQueue = + new DelayQueue(); + private final long optionalFlushPeriod; + + /** constructor */ + public Flusher() { + super(); + this.optionalFlushPeriod = conf.getLong( + "hbase.regionserver.optionalcacheflushinterval", 60L * 1000L); + + } + + /** {@inheritDoc} */ + @Override + public void run() { + while (!stopRequested.get()) { + QueueEntry e = null; + try { + e = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); + + } catch (InterruptedException ex) { + if (stopRequested.get()) { + flushQueue.clear(); + break; + } + } + synchronized(cacheFlusherLock) { // Don't interrupt while we're working + if (e != null) { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("flushing region " + e.getRegion().getRegionName()); + } + if (e.getRegion().flushcache()) { + compactor.compactionRequested(e); + } + + } catch (DroppedSnapshotException ex) { + // Cache flush can fail in a few places. If it fails in a critical + // section, we get a DroppedSnapshotException and a replay of hlog + // is required. Currently the only way to do this is a restart of + // the server. + LOG.fatal("Replay of hlog required. Forcing server restart", ex); + if (!checkFileSystem()) { + break; + } + HRegionServer.this.stop(); + + } catch (IOException ex) { + LOG.error("Cache flush failed for region " + + e.getRegion().getRegionName(), + RemoteExceptionHandler.checkIOException(ex)); + if (!checkFileSystem()) { + break; + } + + } catch (Exception ex) { + LOG.error("Cache flush failed for region " + + e.getRegion().getRegionName(), ex); + if (!checkFileSystem()) { + break; + } + } + e.setExpirationTime(System.currentTimeMillis() + + optionalFlushPeriod); + flushQueue.add(e); + } + + // Now insure that all the active regions are in the queue + + Set regions = getRegionsToCheck(); + for (HRegion r: regions) { + e = new QueueEntry(r, r.getLastFlushTime() + optionalFlushPeriod); + if (!flushQueue.contains(e)) { + flushQueue.add(e); + } + } + + // Now make sure that the queue only contains active regions + + for (Iterator i = flushQueue.iterator(); i.hasNext(); ) { + e = i.next(); + if (!regions.contains(e.getRegion())) { + i.remove(); + } + } + } + } + LOG.info(getName() + " exiting"); + } + + /** {@inheritDoc} */ + public void flushRequested(HRegion region) { + QueueEntry e = new QueueEntry(region, System.currentTimeMillis()); + if (flushQueue.contains(e)) { + flushQueue.remove(e); + } + flushQueue.add(e); + } + } + // HLog and HLog roller. log is protected rather than private to avoid // eclipse warning when accessed by inner classes protected HLog log; - private final Thread logRollerThread; - protected final Integer logRollerLock = new Integer(0); + final LogRoller logRoller; + final Integer logRollerLock = new Integer(0); /** Runs periodically to determine if the HLog should be rolled */ - class LogRoller extends Chore { - private int MAXLOGENTRIES = - conf.getInt("hbase.regionserver.maxlogentries", 30 * 1000); + class LogRoller extends Thread implements LogRollListener { + private volatile boolean rollLog; - /** - * @param period - * @param stop - */ - public LogRoller(final int period, final AtomicBoolean stop) { - super(period, stop); + /** constructor */ + public LogRoller() { + super(); + this.rollLog = false; } /** {@inheritDoc} */ @Override - protected void chore() { - synchronized(logRollerLock) { - checkForLogRoll(); - } - } - - private void checkForLogRoll() { - // If the number of log entries is high enough, roll the log. This - // is a very fast operation, but should not be done too frequently. - int nEntries = log.getNumEntries(); - if(nEntries > this.MAXLOGENTRIES) { + public synchronized void run() { + while (!stopRequested.get()) { try { - LOG.info("Rolling hlog. Number of entries: " + nEntries); - log.rollWriter(); - } catch (IOException iex) { - LOG.error("Log rolling failed", - RemoteExceptionHandler.checkIOException(iex)); - checkFileSystem(); + this.wait(threadWakeFrequency); + + } catch (InterruptedException e) { + continue; } + if (!rollLog) { + continue; + } + synchronized (logRollerLock) { + try { + LOG.info("Rolling hlog. Number of entries: " + log.getNumEntries()); + log.rollWriter(); + + } catch (IOException ex) { + LOG.error("Log rolling failed", + RemoteExceptionHandler.checkIOException(ex)); + checkFileSystem(); + + } catch (Exception ex) { + LOG.error("Log rolling failed", ex); + checkFileSystem(); + + } finally { + rollLog = false; + } + } } } + + /** {@inheritDoc} */ + public synchronized void logRollRequested() { + rollLog = true; + this.notifyAll(); + } } /** @@ -396,20 +598,22 @@ this.serverLeaseTimeout = conf.getInt("hbase.master.lease.period", 30 * 1000); - // Cache flushing chore thread. - this.cacheFlusherThread = - new Flusher(this.threadWakeFrequency, stopRequested); + // Cache flushing thread. + this.cacheFlusher = new Flusher(); - // Check regions to see if they need to be split or compacted chore thread - this.splitOrCompactCheckerThread = - new SplitOrCompactChecker(this.stopRequested); + // Compaction thread + this.compactor = new Compactor(); + // Region split thread + this.splitter = new Splitter(); + + // Log rolling thread + this.logRoller = new LogRoller(); + // Task thread to process requests from Master this.worker = new Worker(); this.workerThread = new Thread(worker); this.sleeper = new Sleeper(this.msgInterval, this.stopRequested); - this.logRollerThread = - new LogRoller(this.threadWakeFrequency, stopRequested); // Server to handle client requests this.server = RPC.getServer(this, address.getBindAddress(), address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10), @@ -557,15 +761,18 @@ // Send interrupts to wake up threads if sleeping so they notice shutdown. // TODO: Should we check they are alive? If OOME could have exited already - synchronized(logRollerLock) { - this.logRollerThread.interrupt(); - } synchronized(cacheFlusherLock) { - this.cacheFlusherThread.interrupt(); + this.cacheFlusher.interrupt(); } - synchronized(splitOrCompactLock) { - this.splitOrCompactCheckerThread.interrupt(); + synchronized (compactionLock) { + this.compactor.interrupt(); } + synchronized (splitterLock) { + this.splitter.interrupt(); + } + synchronized (logRollerLock) { + this.logRoller.interrupt(); + } if (abortRequested) { if (this.fsOk) { @@ -657,7 +864,7 @@ "running at " + this.serverInfo.getServerAddress().toString() + " because logdir " + logdir.toString() + " exists"); } - return new HLog(fs, logdir, conf); + return new HLog(fs, logdir, conf, logRoller); } /* @@ -680,16 +887,13 @@ LOG.fatal("Set stop flag in " + t.getName(), e); } }; - Threads.setDaemonThreadRunning(this.cacheFlusherThread, n + ".cacheFlusher", + Threads.setDaemonThreadRunning(this.logRoller, n + ".logRoller", + handler); + Threads.setDaemonThreadRunning(this.cacheFlusher, n + ".cacheFlusher", handler); - Threads.setDaemonThreadRunning(this.splitOrCompactCheckerThread, - n + ".splitOrCompactChecker", handler); - Threads.setDaemonThreadRunning(this.logRollerThread, n + ".logRoller", - handler); - // Worker is not the same as the above threads in that it does not - // inherit from Chore. Set an UncaughtExceptionHandler on it in case its - // the one to see an OOME, etc., first. The handler will set the stop - // flag. + Threads.setDaemonThreadRunning(this.compactor, n + ".compactor", + handler); + Threads.setDaemonThreadRunning(this.splitter, n + ".splitter", handler); Threads.setDaemonThreadRunning(this.workerThread, n + ".worker", handler); // Leases is not a Thread. Internally it runs a daemon thread. If it gets // an unhandled exception, it will just exit. @@ -752,9 +956,10 @@ */ void join() { join(this.workerThread); - join(this.logRollerThread); - join(this.cacheFlusherThread); - join(this.splitOrCompactCheckerThread); + join(this.logRoller); + join(this.cacheFlusher); + join(this.compactor); + join(this.splitter); } private void join(final Thread t) { @@ -925,7 +1130,8 @@ HRegion region = onlineRegions.get(regionInfo.getRegionName()); if(region == null) { region = new HRegion(new Path(this.conf.get(HConstants.HBASE_DIR)), - this.log, FileSystem.get(conf), conf, regionInfo, null); + this.log, FileSystem.get(conf), conf, regionInfo, null, + this.cacheFlusher); this.lock.writeLock().lock(); try { this.log.setSequenceNumber(region.getMinSequenceId()); @@ -1226,6 +1432,11 @@ public AtomicInteger getRequestCount() { return this.requestCount; } + + /** @return reference to CacheFlushListener */ + public CacheFlushListener getCacheFlushListener() { + return this.cacheFlusher; + } /** * Protected utility method for safely obtaining an HRegion handle. @@ -1318,8 +1529,8 @@ * @return Returns list of non-closed regions hosted on this server. If no * regions to check, returns an empty list. */ - protected List getRegionsToCheck() { - ArrayList regionsToCheck = new ArrayList(); + protected Set getRegionsToCheck() { + HashSet regionsToCheck = new HashSet(); //TODO: is this locking necessary? lock.readLock().lock(); try { @@ -1328,8 +1539,7 @@ lock.readLock().unlock(); } // Purge closed regions. - for (final ListIterator i = regionsToCheck.listIterator(); - i.hasNext();) { + for (final Iterator i = regionsToCheck.iterator(); i.hasNext();) { HRegion r = i.next(); if (r.isClosed()) { i.remove(); Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java (revision 597481) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java (working copy) @@ -22,10 +22,13 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -89,7 +92,9 @@ final FileSystem fs; final Path dir; final Configuration conf; + final LogRollListener listener; final long threadWakeFrequency; + private final int maxlogentries; /* * Current log file. @@ -99,12 +104,13 @@ /* * Map of all log files but the current one. */ - final TreeMap outputfiles = new TreeMap(); + final SortedMap outputfiles = + Collections.synchronizedSortedMap(new TreeMap()); /* * Map of region to last sequence/edit id. */ - final Map lastSeqWritten = new HashMap(); + final Map lastSeqWritten = new ConcurrentHashMap(); volatile boolean closed = false; @@ -119,6 +125,10 @@ // synchronized is insufficient because a cache flush spans two method calls. private final Lock cacheFlushLock = new ReentrantLock(); + // We synchronize on updateLock to prevent updates and to prevent a log roll + // during an update + private final Integer updateLock = new Integer(0); + /** * Split up a bunch of log files, that are no longer being written to, into * new files, one per region. Delete the old log files when finished. @@ -207,12 +217,15 @@ * @param conf * @throws IOException */ - HLog(final FileSystem fs, final Path dir, final Configuration conf) - throws IOException { + HLog(final FileSystem fs, final Path dir, final Configuration conf, + final LogRollListener listener) throws IOException { this.fs = fs; this.dir = dir; this.conf = conf; + this.listener = listener; this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000); + this.maxlogentries = + conf.getInt("hbase.regionserver.maxlogentries", 30 * 1000); if (fs.exists(dir)) { throw new IOException("Target HLog directory already exists: " + dir); } @@ -256,98 +269,82 @@ * * @throws IOException */ - synchronized void rollWriter() throws IOException { - boolean locked = false; - while (!locked && !closed) { - if (this.cacheFlushLock.tryLock()) { - locked = true; - break; + void rollWriter() throws IOException { + this.cacheFlushLock.lock(); + try { + if (closed) { + return; } - try { - this.wait(threadWakeFrequency); - } catch (InterruptedException e) { - // continue - } - } - if (closed) { - if (locked) { - this.cacheFlushLock.unlock(); - } - throw new IOException("Cannot roll log; log is closed"); - } - - // If we get here we have locked out both cache flushes and appends - try { - if (this.writer != null) { - // Close the current writer, get a new one. - this.writer.close(); - Path p = computeFilename(filenum - 1); - if (LOG.isDebugEnabled()) { - LOG.debug("Closing current log writer " + p.toString() + + synchronized (updateLock) { + if (this.writer != null) { + // Close the current writer, get a new one. + this.writer.close(); + Path p = computeFilename(filenum - 1); + if (LOG.isDebugEnabled()) { + LOG.debug("Closing current log writer " + p.toString() + " to get a new one"); - } - if (filenum > 0) { - synchronized (this.sequenceLock) { - this.outputfiles.put(Long.valueOf(this.logSeqNum - 1), p); } + if (filenum > 0) { + synchronized (this.sequenceLock) { + this.outputfiles.put(Long.valueOf(this.logSeqNum - 1), p); + } + } } - } - Path newPath = computeFilename(filenum++); - this.writer = SequenceFile.createWriter(this.fs, this.conf, newPath, - HLogKey.class, HLogEdit.class); - LOG.info("new log writer created at " + newPath); + Path newPath = computeFilename(filenum++); + this.writer = SequenceFile.createWriter(this.fs, this.conf, newPath, + HLogKey.class, HLogEdit.class); + LOG.info("new log writer created at " + newPath); - // Can we delete any of the old log files? - if (this.outputfiles.size() > 0) { - if (this.lastSeqWritten.size() <= 0) { - LOG.debug("Last sequence written is empty. Deleting all old hlogs"); - // If so, then no new writes have come in since all regions were - // flushed (and removed from the lastSeqWritten map). Means can - // remove all but currently open log file. - for (Map.Entry e : this.outputfiles.entrySet()) { - deleteLogFile(e.getValue(), e.getKey()); - } - this.outputfiles.clear(); - } else { - // Get oldest edit/sequence id. If logs are older than this id, - // then safe to remove. - TreeSet sequenceNumbers = - new TreeSet(this.lastSeqWritten.values()); - long oldestOutstandingSeqNum = sequenceNumbers.first().longValue(); - // Get the set of all log files whose final ID is older than the - // oldest pending region operation - sequenceNumbers.clear(); - sequenceNumbers.addAll(this.outputfiles.headMap( - Long.valueOf(oldestOutstandingSeqNum)).keySet()); - // Now remove old log files (if any) - if (LOG.isDebugEnabled()) { - // Find region associated with oldest key -- helps debugging. - Text oldestRegion = null; - for (Map.Entry e: this.lastSeqWritten.entrySet()) { - if (e.getValue().longValue() == oldestOutstandingSeqNum) { - oldestRegion = e.getKey(); - break; + // Can we delete any of the old log files? + if (this.outputfiles.size() > 0) { + if (this.lastSeqWritten.size() <= 0) { + LOG.debug("Last sequence written is empty. Deleting all old hlogs"); + // If so, then no new writes have come in since all regions were + // flushed (and removed from the lastSeqWritten map). Means can + // remove all but currently open log file. + for (Map.Entry e : this.outputfiles.entrySet()) { + deleteLogFile(e.getValue(), e.getKey()); + } + this.outputfiles.clear(); + } else { + // Get oldest edit/sequence id. If logs are older than this id, + // then safe to remove. + Long oldestOutstandingSeqNum = + Collections.min(this.lastSeqWritten.values()); + // Get the set of all log files whose final ID is older than or + // equal to the oldest pending region operation + TreeSet sequenceNumbers = + new TreeSet(this.outputfiles.headMap( + (oldestOutstandingSeqNum + Long.valueOf(1L))).keySet()); + // Now remove old log files (if any) + if (LOG.isDebugEnabled()) { + // Find region associated with oldest key -- helps debugging. + Text oldestRegion = null; + for (Map.Entry e: this.lastSeqWritten.entrySet()) { + if (e.getValue().longValue() == oldestOutstandingSeqNum) { + oldestRegion = e.getKey(); + break; + } } + LOG.debug("Found " + sequenceNumbers.size() + " logs to remove " + + "using oldest outstanding seqnum of " + + oldestOutstandingSeqNum + " from region " + oldestRegion); } - LOG.debug("Found " + sequenceNumbers.size() + " logs to remove " + - "using oldest outstanding seqnum of " + oldestOutstandingSeqNum + - " from region " + oldestRegion); - } - if (sequenceNumbers.size() > 0) { - for (Long seq : sequenceNumbers) { - deleteLogFile(this.outputfiles.remove(seq), seq); + if (sequenceNumbers.size() > 0) { + for (Long seq : sequenceNumbers) { + deleteLogFile(this.outputfiles.remove(seq), seq); + } } } } + this.numEntries = 0; } - this.numEntries = 0; } finally { this.cacheFlushLock.unlock(); } } - private void deleteLogFile(final Path p, final Long seqno) - throws IOException { + private void deleteLogFile(final Path p, final Long seqno) throws IOException { LOG.info("removing old log file " + p.toString() + " whose highest sequence/edit id is " + seqno); this.fs.delete(p); @@ -367,7 +364,7 @@ * * @throws IOException */ - synchronized void closeAndDelete() throws IOException { + void closeAndDelete() throws IOException { close(); fs.delete(dir); } @@ -377,12 +374,19 @@ * * @throws IOException */ - synchronized void close() throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("closing log writer in " + this.dir.toString()); + void close() throws IOException { + cacheFlushLock.lock(); + try { + synchronized (updateLock) { + if (LOG.isDebugEnabled()) { + LOG.debug("closing log writer in " + this.dir.toString()); + } + this.writer.close(); + this.closed = true; + } + } finally { + cacheFlushLock.unlock(); } - this.writer.close(); - this.closed = true; } /** @@ -409,29 +413,36 @@ * @param timestamp * @throws IOException */ - synchronized void append(Text regionName, Text tableName, + void append(Text regionName, Text tableName, TreeMap edits) throws IOException { if (closed) { throw new IOException("Cannot append; log is closed"); } - long seqNum[] = obtainSeqNum(edits.size()); - // The 'lastSeqWritten' map holds the sequence number of the oldest - // write for each region. When the cache is flushed, the entry for the - // region being flushed is removed if the sequence number of the flush - // is greater than or equal to the value in lastSeqWritten. - if (!this.lastSeqWritten.containsKey(regionName)) { - this.lastSeqWritten.put(regionName, Long.valueOf(seqNum[0])); + synchronized (updateLock) { + long seqNum[] = obtainSeqNum(edits.size()); + // The 'lastSeqWritten' map holds the sequence number of the oldest + // write for each region. When the cache is flushed, the entry for the + // region being flushed is removed if the sequence number of the flush + // is greater than or equal to the value in lastSeqWritten. + if (!this.lastSeqWritten.containsKey(regionName)) { + this.lastSeqWritten.put(regionName, Long.valueOf(seqNum[0])); + } + int counter = 0; + for (Map.Entry es : edits.entrySet()) { + HStoreKey key = es.getKey(); + HLogKey logKey = + new HLogKey(regionName, tableName, key.getRow(), seqNum[counter++]); + HLogEdit logEdit = + new HLogEdit(key.getColumn(), es.getValue(), key.getTimestamp()); + this.writer.append(logKey, logEdit); + this.numEntries++; + } } - int counter = 0; - for (Map.Entry es : edits.entrySet()) { - HStoreKey key = es.getKey(); - HLogKey logKey = - new HLogKey(regionName, tableName, key.getRow(), seqNum[counter++]); - HLogEdit logEdit = - new HLogEdit(key.getColumn(), es.getValue(), key.getTimestamp()); - this.writer.append(logKey, logEdit); - this.numEntries++; + if (this.numEntries > this.maxlogentries) { + if (listener != null) { + listener.logRollRequested(); + } } } @@ -451,6 +462,11 @@ return value; } + /** @return the number of log files in use */ + int getNumLogFiles() { + return outputfiles.size(); + } + /** * Obtain a specified number of sequence numbers * @@ -487,43 +503,43 @@ /** * Complete the cache flush * - * Protected by this and cacheFlushLock + * Protected by cacheFlushLock * * @param regionName * @param tableName * @param logSeqId * @throws IOException */ - synchronized void completeCacheFlush(final Text regionName, - final Text tableName, final long logSeqId) - throws IOException { + void completeCacheFlush(final Text regionName, final Text tableName, + final long logSeqId) throws IOException { + try { if (this.closed) { return; } - this.writer.append(new HLogKey(regionName, tableName, HLog.METAROW, logSeqId), - new HLogEdit(HLog.METACOLUMN, HLogEdit.completeCacheFlush.get(), - System.currentTimeMillis())); - this.numEntries++; - Long seq = this.lastSeqWritten.get(regionName); - if (seq != null && logSeqId >= seq.longValue()) { - this.lastSeqWritten.remove(regionName); + synchronized (updateLock) { + this.writer.append(new HLogKey(regionName, tableName, HLog.METAROW, logSeqId), + new HLogEdit(HLog.METACOLUMN, HLogEdit.completeCacheFlush.get(), + System.currentTimeMillis())); + this.numEntries++; + Long seq = this.lastSeqWritten.get(regionName); + if (seq != null && logSeqId >= seq.longValue()) { + this.lastSeqWritten.remove(regionName); + } } } finally { this.cacheFlushLock.unlock(); - notifyAll(); // wake up the log roller if it is waiting } } /** - * Abort a cache flush. This method will clear waits on - * {@link #insideCacheFlush}. Call if the flush fails. Note that the only - * recovery for an aborted flush currently is a restart of the regionserver so - * the snapshot content dropped by the failure gets restored to the memcache. + * Abort a cache flush. + * Call if the flush fails. Note that the only recovery for an aborted flush + * currently is a restart of the regionserver so the snapshot content dropped + * by the failure gets restored to the memcache. */ - synchronized void abortCacheFlush() { + void abortCacheFlush() { this.cacheFlushLock.unlock(); - notifyAll(); } private static void usage() { Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java (revision 597481) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java (working copy) @@ -78,7 +78,18 @@ private boolean split; private Text startKey; private HTableDescriptor tableDesc; - + private int hashCode; + + private void setHashCode() { + int result = this.regionName.hashCode(); + result ^= Long.valueOf(this.regionId).hashCode(); + result ^= this.startKey.hashCode(); + result ^= this.endKey.hashCode(); + result ^= Boolean.valueOf(this.offLine).hashCode(); + result ^= this.tableDesc.hashCode(); + this.hashCode = result; + } + /** Used to construct the HRegionInfo for the root and first meta regions */ private HRegionInfo(long regionId, HTableDescriptor tableDesc) { this.regionId = regionId; @@ -89,6 +100,7 @@ DELIMITER + regionId); this.split = false; this.startKey = new Text(); + setHashCode(); } /** Default constructor - creates empty object */ @@ -100,6 +112,7 @@ this.split = false; this.startKey = new Text(); this.tableDesc = new HTableDescriptor(); + this.hashCode = 0; } /** @@ -152,6 +165,7 @@ } this.tableDesc = tableDesc; + setHashCode(); } /** @return the endKey */ @@ -232,13 +246,7 @@ */ @Override public int hashCode() { - int result = this.regionName.hashCode(); - result ^= Long.valueOf(this.regionId).hashCode(); - result ^= this.startKey.hashCode(); - result ^= this.endKey.hashCode(); - result ^= Boolean.valueOf(this.offLine).hashCode(); - result ^= this.tableDesc.hashCode(); - return result; + return this.hashCode; } // @@ -256,6 +264,7 @@ out.writeBoolean(split); startKey.write(out); tableDesc.write(out); + out.writeInt(hashCode); } /** @@ -269,6 +278,7 @@ this.split = in.readBoolean(); this.startKey.readFields(in); this.tableDesc.readFields(in); + this.hashCode = in.readInt(); } // Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/LogRollListener.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/LogRollListener.java (revision 0) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/LogRollListener.java (revision 0) @@ -0,0 +1,29 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +/** + * Mechanism by which the HLog requests a log roll + */ +public interface LogRollListener { + /** Request that the log be rolled */ + public void logRollRequested(); +} Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (revision 597481) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (working copy) @@ -90,7 +90,6 @@ static final Random rand = new Random(); static final Log LOG = LogFactory.getLog(HRegion.class); final AtomicBoolean closed = new AtomicBoolean(false); - private volatile long noFlushCount = 0; /** * Merge two HRegions. They must be available on the current @@ -159,7 +158,7 @@ // Done // Construction moves the merge files into place under region. HRegion dstRegion = new HRegion(rootDir, log, fs, conf, newRegionInfo, - newRegionDir); + newRegionDir, null); // Get rid of merges directory @@ -221,9 +220,10 @@ volatile WriteState writestate = new WriteState(); final int memcacheFlushSize; + private volatile long lastFlushTime; + final CacheFlushListener flushListener; final int blockingMemcacheSize; protected final long threadWakeFrequency; - protected final int optionalFlushCount; private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final Integer updateLock = new Integer(0); private final long desiredMaxFileSize; @@ -251,11 +251,13 @@ * @param regionInfo - HRegionInfo that describes the region * @param initialFiles If there are initial files (implying that the HRegion * is new), then read them from the supplied path. + * @param listener an object that implements CacheFlushListener or null * * @throws IOException */ public HRegion(Path rootDir, HLog log, FileSystem fs, HBaseConfiguration conf, - HRegionInfo regionInfo, Path initialFiles) throws IOException { + HRegionInfo regionInfo, Path initialFiles, CacheFlushListener listener) + throws IOException { this.rootDir = rootDir; this.log = log; @@ -265,8 +267,6 @@ this.encodedRegionName = HRegionInfo.encodeRegionName(this.regionInfo.getRegionName()); this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000); - this.optionalFlushCount = - conf.getInt("hbase.hregion.memcache.optionalflushcount", 10); // Declare the regionName. This is a unique string for the region, used to // build a unique filename. @@ -314,6 +314,7 @@ // By default, we flush the cache when 16M. this.memcacheFlushSize = conf.getInt("hbase.hregion.memcache.flush.size", 1024*1024*16); + this.flushListener = listener; this.blockingMemcacheSize = this.memcacheFlushSize * conf.getInt("hbase.hregion.memcache.block.multiplier", 2); @@ -323,6 +324,7 @@ // HRegion is ready to go! this.writestate.compacting = false; + this.lastFlushTime = System.currentTimeMillis(); LOG.info("region " + this.regionInfo.getRegionName() + " available"); } @@ -485,6 +487,11 @@ return this.fs; } + /** @return the last time the region was flushed */ + public long getLastFlushTime() { + return this.lastFlushTime; + } + ////////////////////////////////////////////////////////////////////////////// // HRegion maintenance. // @@ -598,8 +605,10 @@ // Done! // Opening the region copies the splits files from the splits directory // under each region. - HRegion regionA = new HRegion(rootDir, log, fs, conf, regionAInfo, dirA); - HRegion regionB = new HRegion(rootDir, log, fs, conf, regionBInfo, dirB); + HRegion regionA = + new HRegion(rootDir, log, fs, conf, regionAInfo, dirA, null); + HRegion regionB = + new HRegion(rootDir, log, fs, conf, regionBInfo, dirB, null); // Cleanup boolean deleted = fs.delete(splits); // Get rid of splits directory @@ -751,55 +760,31 @@ } /** - * Flush the cache if necessary. This is called periodically to minimize the - * amount of log processing needed upon startup. + * Flush the cache. * - *

The returned Vector is a list of all the files used by the component - * HStores. It is a list of HStoreFile objects. If the returned value is - * NULL, then the flush could not be executed, because the HRegion is busy - * doing something else storage-intensive. The caller should check back - * later. + * When this method is called the cache will be flushed unless: + *

    + *
  1. the cache is empty
  2. + *
  3. the region is closed.
  4. + *
  5. a flush is already in progress
  6. + *
  7. writes are disabled
  8. + *
* *

This method may block for some time, so it should not be called from a * time-sensitive thread. * - * @param disableFutureWrites indicates that the caller intends to - * close() the HRegion shortly, so the HRegion should not take on any new and - * potentially long-lasting disk operations. This flush() should be the final - * pre-close() disk operation. + * @return true if cache was flushed + * * @throws IOException * @throws DroppedSnapshotException Thrown when replay of hlog is required * because a Snapshot was not properly persisted. */ - void flushcache() throws IOException { + boolean flushcache() throws IOException { lock.readLock().lock(); // Prevent splits and closes try { if (this.closed.get()) { - return; + return false; } - boolean needFlush = false; - long memcacheSize = this.memcacheSize.get(); - if(memcacheSize > this.memcacheFlushSize) { - needFlush = true; - } else if (memcacheSize > 0) { - if (this.noFlushCount >= this.optionalFlushCount) { - LOG.info("Optional flush called " + this.noFlushCount + - " times when data present without flushing. Forcing one."); - needFlush = true; - } else { - // Only increment if something in the cache. - // Gets zero'd when a flushcache is called. - this.noFlushCount++; - } - } - if (!needFlush) { - if (LOG.isDebugEnabled()) { - LOG.debug("Cache flush not needed for region " + - regionInfo.getRegionName() + ". Cache size=" + memcacheSize + - ", cache flush threshold=" + this.memcacheFlushSize); - } - return; - } synchronized (writestate) { if ((!writestate.flushing) && writestate.writesEnabled) { writestate.flushing = true; @@ -811,16 +796,15 @@ writestate.flushing + ", writesEnabled=" + writestate.writesEnabled); } - return; + return false; } } - this.noFlushCount = 0; long startTime = -1; synchronized (updateLock) {// Stop updates while we snapshot the memcaches startTime = snapshotMemcaches(); } try { - internalFlushcache(startTime); + return internalFlushcache(startTime); } finally { synchronized (writestate) { writestate.flushing = false; @@ -835,7 +819,7 @@ /* * It is assumed that updates are blocked for the duration of this method */ - long snapshotMemcaches() { + private long snapshotMemcaches() { if (this.memcacheSize.get() == 0) { return -1; } @@ -883,17 +867,24 @@ * routes. * *

This method may block for some time. + * + * @param startTime the time the cache was snapshotted or -1 if a flush is + * not needed + * + * @return true if the cache was flushed + * * @throws IOException * @throws DroppedSnapshotException Thrown when replay of hlog is required * because a Snapshot was not properly persisted. */ - void internalFlushcache(long startTime) throws IOException { + private boolean internalFlushcache(long startTime) throws IOException { if (startTime == -1) { if (LOG.isDebugEnabled()) { - LOG.debug("Not flushing cache: snapshotMemcaches() determined that " + - "there was nothing to do"); + LOG.debug("Not flushing cache for region " + + regionInfo.getRegionName() + + ": snapshotMemcaches() determined that there was nothing to do"); } - return; + return false; } // We pass the log to the HMemcache, so we can lock down both @@ -914,7 +905,6 @@ // Otherwise, the snapshot content while backed up in the hlog, it will not // be part of the current running servers state. - long logCacheFlushId = sequenceId; try { // A. Flush memcache to all the HStores. // Keep running vector of all store files that includes both old and the @@ -938,7 +928,7 @@ // and that all updates to the log for this regionName that have lower // log-sequence-ids can be safely ignored. this.log.completeCacheFlush(this.regionInfo.getRegionName(), - regionInfo.getTableDesc().getName(), logCacheFlushId); + regionInfo.getTableDesc().getName(), sequenceId); // D. Finally notify anyone waiting on memcache to clear: // e.g. checkResources(). @@ -948,8 +938,10 @@ if (LOG.isDebugEnabled()) { LOG.debug("Finished memcache flush for region " + this.regionInfo.getRegionName() + " in " + - (System.currentTimeMillis() - startTime) + "ms"); + (System.currentTimeMillis() - startTime) + "ms, sequenceid=" + + sequenceId); } + return true; } ////////////////////////////////////////////////////////////////////////////// @@ -1309,13 +1301,18 @@ this.log.append(regionInfo.getRegionName(), regionInfo.getTableDesc().getName(), updatesByColumn); + long memcacheSize = 0; for (Map.Entry e: updatesByColumn.entrySet()) { HStoreKey key = e.getKey(); byte[] val = e.getValue(); - this.memcacheSize.addAndGet(key.getSize() + + memcacheSize = this.memcacheSize.addAndGet(key.getSize() + (val == null ? 0 : val.length)); stores.get(HStoreKey.extractFamily(key.getColumn())).add(key, val); } + if (this.flushListener != null && memcacheSize > this.memcacheFlushSize) { + // Request a cache flush + this.flushListener.flushRequested(this); + } } } @@ -1582,8 +1579,8 @@ FileSystem fs = FileSystem.get(conf); fs.mkdirs(regionDir); return new HRegion(rootDir, - new HLog(fs, new Path(regionDir, HREGION_LOGDIR_NAME), conf), - fs, conf, info, initialFiles); + new HLog(fs, new Path(regionDir, HREGION_LOGDIR_NAME), conf, null), + fs, conf, info, initialFiles, null); } /** Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/Sleeper.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/Sleeper.java (revision 597481) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/Sleeper.java (working copy) @@ -31,6 +31,10 @@ private final int period; private AtomicBoolean stop; + /** + * @param sleep + * @param stop + */ public Sleeper(final int sleep, final AtomicBoolean stop) { this.period = sleep; this.stop = stop; @@ -40,7 +44,7 @@ * Sleep for period. */ public void sleep() { - sleep(System.currentTimeMillis()); + sleep(period); } /**