Index: src/test/org/apache/hadoop/hbase/TestGlobalMemcacheLimit.java =================================================================== --- src/test/org/apache/hadoop/hbase/TestGlobalMemcacheLimit.java (revision 0) +++ src/test/org/apache/hadoop/hbase/TestGlobalMemcacheLimit.java (revision 0) @@ -0,0 +1,142 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; + +import org.apache.hadoop.io.Text; + +/** + * Test setting the global memcache size for a region server. When it reaches + * this size, any puts should be blocked while one or more forced flushes occurs + * to bring the memcache size back down. + */ +public class TestGlobalMemcacheLimit extends HBaseClusterTestCase { + final byte[] ONE_KB = new byte[1024]; + + HTable table1; + HTable table2; + HRegionServer server; + + long keySize = (new Text(COLFAMILY_NAME1)).getLength() + 9 + 8; + long rowSize = keySize + ONE_KB.length; + + /** {@inheritDoc} */ + @Override + public void setUp() throws Exception { + preHBaseClusterSetup(); + super.setUp(); + postHBaseClusterSetup(); + } + + /** + * Get our hands into the cluster configuration before the hbase cluster + * starts up. + */ + private void preHBaseClusterSetup() { + // we'll use a 2MB global memcache for testing's sake. + conf.setInt("hbase.regionserver.globalMemcacheLimit", 2 * 1024 * 1024); + // low memcache mark will be 1MB + conf.setInt("hbase.regionserver.globalMemcacheLimitLowMark", + 1 * 1024 * 1024); + // make sure we don't do any optional flushes and confuse my tests. + conf.setInt("hbase.regionserver.optionalcacheflushinterval", 120000); + } + + /** + * Create a table that we'll use to test. + */ + private void postHBaseClusterSetup() throws IOException { + HTableDescriptor desc1 = createTableDescriptor("testTable1"); + HTableDescriptor desc2 = createTableDescriptor("testTable2"); + HBaseAdmin admin = new HBaseAdmin(conf); + admin.createTable(desc1); + admin.createTable(desc2); + table1 = new HTable(conf, new Text("testTable1")); + table2 = new HTable(conf, new Text("testTable2")); + server = cluster.getRegionServer(0); + + // there is a META region in play, and those are probably still in + // the memcache for ROOT. flush it out. + for (HRegion region : server.getOnlineRegions().values()) { + region.flushcache(); + } + // make sure we're starting at 0 so that it's easy to predict what the + // results of our tests should be. + assertEquals("Starting memcache size", 0, server.getGlobalMemcacheSize()); + } + + /** + * Make sure that region server thinks all the memcaches are as big as we were + * hoping they would be. + * @throws IOException + */ + public void testMemcacheSizeAccounting() throws IOException { + // put some data in each of the two tables + long dataSize = populate(table1, 500, 0) + populate(table2, 500, 0); + + // make sure the region server says it is using as much memory as we think + // it is. + assertEquals("Global memcache size", dataSize, + server.getGlobalMemcacheSize()); + } + + /** + * Test that a put gets blocked and a flush is forced as expected when we + * reach the memcache size limit. + * @throws IOException + */ + public void testBlocksAndForcesFlush() throws IOException { + // put some data in each of the two tables + long startingDataSize = populate(table1, 500, 0) + populate(table2, 500, 0); + + // at this point we have 1052000 bytes in memcache. now, we'll keep adding + // data to one of the tables until just before the global memcache limit, + // noting that the globalMemcacheSize keeps growing as expected. then, we'll + // do another put, causing it to go over the limit. when we look at the + // globablMemcacheSize now, it should be <= the low limit. + long dataNeeded = (2 * 1024 * 1024) - startingDataSize; + double numRows = (double)dataNeeded / (double)rowSize; + int preFlushRows = (int)Math.floor(numRows); + + long dataAdded = populate(table1, preFlushRows, 500); + assertEquals("Expected memcache size", dataAdded + startingDataSize, + server.getGlobalMemcacheSize()); + + populate(table1, 2, preFlushRows + 500); + assertTrue("Post-flush memcache size", server.getGlobalMemcacheSize() <= 1024 * 1024); + } + + private long populate(HTable table, int numRows, int startKey) throws IOException { + long total = 0; + Text column = new Text(COLFAMILY_NAME1); + for (int i = startKey; i < startKey + numRows; i++) { + Text key = new Text("row_" + String.format("%1$5d", i)); + total += key.getLength(); + total += column.getLength(); + total += 8; + total += ONE_KB.length; + long id = table.startUpdate(key); + table.put(id, column, ONE_KB); + table.commit(id); + } + return total; + } +} Index: src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java =================================================================== --- src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (revision 651055) +++ src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (working copy) @@ -276,4 +276,13 @@ public List getRegionThreads() { return this.hbaseCluster.getRegionServers(); } + + /** + * Grab a numbered region server of your choice. + * @param serverNumber + * @return region server + */ + public HRegionServer getRegionServer(int serverNumber) { + return hbaseCluster.getRegionServer(serverNumber); + } } Index: src/test/org/apache/hadoop/hbase/MultiRegionTable.java =================================================================== --- src/test/org/apache/hadoop/hbase/MultiRegionTable.java (revision 651055) +++ src/test/org/apache/hadoop/hbase/MultiRegionTable.java (working copy) @@ -104,8 +104,8 @@ } // Flush the cache - cluster.getRegionThreads().get(0).getRegionServer().getCacheFlushListener(). - flushRequested(r); + cluster.getRegionThreads().get(0).getRegionServer().getFlushRequester(). + request(r); // Now, wait until split makes it into the meta table. int oldCount = count; Index: src/java/org/apache/hadoop/hbase/CacheFlushListener.java =================================================================== --- src/java/org/apache/hadoop/hbase/CacheFlushListener.java (revision 651055) +++ src/java/org/apache/hadoop/hbase/CacheFlushListener.java (working copy) @@ -1,36 +0,0 @@ -/** - * Copyright 2007 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase; - -/** - * Implementors of this interface want to be notified when an HRegion - * determines that a cache flush is needed. A CacheFlushListener (or null) - * must be passed to the HRegion constructor. - */ -public interface CacheFlushListener { - - /** - * Tell the listener the cache needs to be flushed. - * - * @param region the HRegion requesting the cache flush - */ - void flushRequested(HRegion region); -} Index: src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java =================================================================== --- src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java (revision 651055) +++ src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java (working copy) @@ -114,6 +114,16 @@ } } + /** + * @param serverNumber + * @return region server + */ + public HRegionServer getRegionServer(int serverNumber) { + synchronized (regionThreads) { + return regionThreads.get(serverNumber).getRegionServer(); + } + } + /** runs region servers */ public static class RegionServerThread extends Thread { private final HRegionServer regionServer; Index: src/java/org/apache/hadoop/hbase/HRegion.java =================================================================== --- src/java/org/apache/hadoop/hbase/HRegion.java (revision 651055) +++ src/java/org/apache/hadoop/hbase/HRegion.java (working copy) @@ -321,15 +321,19 @@ volatile boolean writesEnabled = true; } - volatile WriteState writestate = new WriteState(); + private volatile WriteState writestate = new WriteState(); final int memcacheFlushSize; private volatile long lastFlushTime; - final CacheFlushListener flushListener; - final int blockingMemcacheSize; - protected final long threadWakeFrequency; - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - private final Integer updateLock = new Integer(0); + final FlushRequester flushListener; + private final int blockingMemcacheSize; + final long threadWakeFrequency; + // Used to guard splits and closes + private final ReentrantReadWriteLock splitsAndClosesLock = + new ReentrantReadWriteLock(); + // Stop updates lock + private final ReentrantReadWriteLock updateLock = + new ReentrantReadWriteLock(); private final Integer splitLock = new Integer(0); private final long desiredMaxFileSize; private final long minSequenceId; @@ -360,7 +364,7 @@ * @throws IOException */ public HRegion(Path basedir, HLog log, FileSystem fs, HBaseConfiguration conf, - HRegionInfo regionInfo, Path initialFiles, CacheFlushListener listener) + HRegionInfo regionInfo, Path initialFiles, FlushRequester listener) throws IOException { this(basedir, log, fs, conf, regionInfo, initialFiles, listener, null); } @@ -389,7 +393,7 @@ * @throws IOException */ public HRegion(Path basedir, HLog log, FileSystem fs, HBaseConfiguration conf, - HRegionInfo regionInfo, Path initialFiles, CacheFlushListener listener, + HRegionInfo regionInfo, Path initialFiles, FlushRequester listener, final Progressable reporter) throws IOException { @@ -548,13 +552,12 @@ } } } - lock.writeLock().lock(); - LOG.debug("new updates and scanners for region " + regionName + - " disabled"); + splitsAndClosesLock.writeLock().lock(); + LOG.debug("new updates and scanners for region " + regionName + " disabled"); try { - // Wait for active scanners to finish. The write lock we hold will prevent - // new scanners from being created. + // Wait for active scanners to finish. The write lock we hold will + // prevent new scanners from being created. synchronized (activeScannerCount) { while (activeScannerCount.get() != 0) { LOG.debug("waiting for " + activeScannerCount.get() + @@ -602,7 +605,7 @@ LOG.info("closed " + this.regionInfo.getRegionName()); return result; } finally { - lock.writeLock().unlock(); + splitsAndClosesLock.writeLock().unlock(); } } } @@ -661,6 +664,11 @@ return this.lastFlushTime; } + /** @param t the lastFlushTime */ + void setLastFlushTime(long t) { + this.lastFlushTime = t; + } + ////////////////////////////////////////////////////////////////////////////// // HRegion maintenance. // @@ -920,8 +928,7 @@ } } long startTime = System.currentTimeMillis(); - LOG.info("starting compaction on region " + - this.regionInfo.getRegionName().toString()); + LOG.info("starting compaction on region " + getRegionName()); boolean status = true; doRegionCompactionPrep(); for (HStore store : stores.values()) { @@ -930,8 +937,7 @@ } } doRegionCompactionCleanup(); - LOG.info("compaction completed on region " + - this.regionInfo.getRegionName().toString() + ". Took " + + LOG.info("compaction completed on region " + getRegionName() + ". " + StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime)); return status; @@ -981,11 +987,12 @@ } } try { - lock.readLock().lock(); // Prevent splits and closes + // Prevent splits and closes + splitsAndClosesLock.readLock().lock(); try { return internalFlushcache(); } finally { - lock.readLock().unlock(); + splitsAndClosesLock.readLock().unlock(); } } finally { synchronized (writestate) { @@ -1043,10 +1050,13 @@ // to do this for a moment. Its quick. The subsequent sequence id that // goes into the HLog after we've flushed all these snapshots also goes // into the info file that sits beside the flushed files. - synchronized (updateLock) { + updateLock.writeLock().lock(); + try { for (HStore s: stores.values()) { s.memcache.snapshot(); } + } finally { + updateLock.writeLock().unlock(); } long sequenceId = log.startCacheFlush(); @@ -1223,7 +1233,7 @@ HStoreKey key = null; checkRow(row); - lock.readLock().lock(); + splitsAndClosesLock.readLock().lock(); try { // examine each column family for the preceeding or matching key for(Text colFamily : stores.keySet()){ @@ -1258,7 +1268,7 @@ return result; } finally { - lock.readLock().unlock(); + splitsAndClosesLock.readLock().unlock(); } } @@ -1304,7 +1314,7 @@ */ public HScannerInterface getScanner(Text[] cols, Text firstRow, long timestamp, RowFilterInterface filter) throws IOException { - lock.readLock().lock(); + splitsAndClosesLock.readLock().lock(); try { if (this.closed.get()) { throw new IOException("Region " + this.getRegionName().toString() + @@ -1326,7 +1336,7 @@ return new HScanner(cols, firstRow, timestamp, storelist.toArray(new HStore [storelist.size()]), filter); } finally { - lock.readLock().unlock(); + splitsAndClosesLock.readLock().unlock(); } } @@ -1581,7 +1591,9 @@ if (updatesByColumn == null || updatesByColumn.size() <= 0) { return; } - synchronized (updateLock) { // prevent a cache flush + boolean flush = false; + updateLock.readLock().lock(); // prevent a cache flush + try { this.log.append(regionInfo.getRegionName(), regionInfo.getTableDesc().getName(), updatesByColumn); @@ -1592,11 +1604,14 @@ size = this.memcacheSize.addAndGet(getEntrySize(key, val)); stores.get(HStoreKey.extractFamily(key.getColumn())).add(key, val); } - if (this.flushListener != null && size > this.memcacheFlushSize) { - // Request a cache flush - this.flushListener.flushRequested(this); - } + flush = this.flushListener != null && size > this.memcacheFlushSize; + } finally { + updateLock.readLock().unlock(); } + if (flush) { + // Request a cache flush + this.flushListener.request(this); + } } /* @@ -1665,10 +1680,10 @@ */ long obtainRowLock(Text row) throws IOException { checkRow(row); - lock.readLock().lock(); + splitsAndClosesLock.readLock().lock(); try { if (this.closed.get()) { - throw new IOException("Region " + this.getRegionName().toString() + + throw new NotServingRegionException("Region " + getRegionName() + " closed"); } synchronized (rowsToLocks) { @@ -1686,7 +1701,7 @@ return lid.longValue(); } } finally { - lock.readLock().unlock(); + splitsAndClosesLock.readLock().unlock(); } } @@ -1725,6 +1740,18 @@ /** {@inheritDoc} */ @Override + public boolean equals(Object o) { + return this.hashCode() == ((HRegion)o).hashCode(); + } + + /** {@inheritDoc} */ + @Override + public int hashCode() { + return this.regionInfo.getRegionName().hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { return regionInfo.getRegionName().toString(); } @@ -1922,6 +1949,7 @@ } } + /** {@inheritDoc} */ public Iterator>> iterator() { throw new UnsupportedOperationException("Unimplemented serverside. " + "next(HStoreKey, StortedMap(...) is more efficient"); Index: src/java/org/apache/hadoop/hbase/HRegionServer.java =================================================================== --- src/java/org/apache/hadoop/hbase/HRegionServer.java (revision 651055) +++ src/java/org/apache/hadoop/hbase/HRegionServer.java (working copy) @@ -26,6 +26,7 @@ import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.ConcurrentModificationException; import java.util.HashMap; import java.util.HashSet; @@ -38,12 +39,11 @@ import java.util.TreeMap; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Delayed; -import java.util.concurrent.DelayQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; @@ -161,79 +161,23 @@ } - /** Queue entry passed to flusher, compactor and splitter threads */ - class QueueEntry implements Delayed { - private final HRegion region; - private long expirationTime; - - QueueEntry(HRegion region, long expirationTime) { - this.region = region; - this.expirationTime = expirationTime; - } - - /** {@inheritDoc} */ - @Override - public boolean equals(Object o) { - QueueEntry other = (QueueEntry) o; - return this.hashCode() == other.hashCode(); - } - - /** {@inheritDoc} */ - @Override - public int hashCode() { - return this.region.getRegionInfo().hashCode(); - } - - /** {@inheritDoc} */ - public long getDelay(TimeUnit unit) { - return unit.convert(this.expirationTime - System.currentTimeMillis(), - TimeUnit.MILLISECONDS); - } - - /** {@inheritDoc} */ - public int compareTo(Delayed o) { - long delta = this.getDelay(TimeUnit.MILLISECONDS) - - o.getDelay(TimeUnit.MILLISECONDS); - - int value = 0; - if (delta > 0) { - value = 1; - - } else if (delta < 0) { - value = -1; - } - return value; - } - - /** @return the region */ - public HRegion getRegion() { - return region; - } - - /** @param expirationTime the expirationTime to set */ - public void setExpirationTime(long expirationTime) { - this.expirationTime = expirationTime; - } - } - // Compactions final CompactSplitThread compactSplitThread; - // Needed during shutdown so we send an interrupt after completion of a - // compaction, not in the midst. - final Integer compactSplitLock = new Integer(0); - /** Compact region on request and then run split if appropriate - */ + /** Compact region on request and then run split if appropriate */ private class CompactSplitThread extends Thread implements RegionUnavailableListener { private HTable root = null; private HTable meta = null; private long startTime; private final long frequency; + private final ReentrantLock workingLock = new ReentrantLock(); - private final BlockingQueue compactionQueue = - new LinkedBlockingQueue(); + private final BlockingQueue compactionQueue = + new LinkedBlockingQueue(); + private final HashSet regionsInQueue = new HashSet(); + /** constructor */ public CompactSplitThread() { super(); @@ -246,21 +190,28 @@ @Override public void run() { while (!stopRequested.get()) { - QueueEntry e = null; + HRegion r = null; try { - e = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS); - if (e == null) { - continue; + r = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS); + if (r != null) { + synchronized (regionsInQueue) { + regionsInQueue.remove(r); + } + workingLock.lock(); + try { + // Don't interrupt us while we are working + if (r.compactStores()) { + split(r); + } + } finally { + workingLock.unlock(); + } } - synchronized (compactSplitLock) { // Don't interrupt us while working - e.getRegion().compactIfNeeded(); - split(e.getRegion()); - } } catch (InterruptedException ex) { continue; } catch (IOException ex) { LOG.error("Compaction failed" + - (e != null ? (" for region " + e.getRegion().getRegionName()) : ""), + (r != null ? (" for region " + r.getRegionName()) : ""), RemoteExceptionHandler.checkIOException(ex)); if (!checkFileSystem()) { break; @@ -268,27 +219,30 @@ } catch (Exception ex) { LOG.error("Compaction failed" + - (e != null ? (" for region " + e.getRegion().getRegionName()) : ""), - ex); + (r != null ? (" for region " + r.getRegionName()) : ""), ex); if (!checkFileSystem()) { break; } } } + regionsInQueue.clear(); + compactionQueue.clear(); LOG.info(getName() + " exiting"); } /** - * @param e QueueEntry for region to be compacted + * @param r HRegion store belongs to */ - public void compactionRequested(QueueEntry e) { - compactionQueue.add(e); + public void compactionRequested(HRegion r) { + LOG.debug("Compaction requested for region: " + r.getRegionName()); + synchronized (regionsInQueue) { + if (!regionsInQueue.contains(r)) { + compactionQueue.add(r); + regionsInQueue.add(r); + } + } } - void compactionRequested(final HRegion r) { - compactionRequested(new QueueEntry(r, System.currentTimeMillis())); - } - private void split(final HRegion region) throws IOException { final HRegionInfo oldRegionInfo = region.getRegionInfo(); final HRegion[] newRegions = region.splitRegion(this); @@ -380,20 +334,32 @@ lock.writeLock().unlock(); } } + + /** + * Only interrupt once it's done with a run through the work loop. + */ + public void interruptIfNecessary() { + if (workingLock.tryLock()) { + this.interrupt(); + } + } } // Cache flushing final Flusher cacheFlusher; - // Needed during shutdown so we send an interrupt after completion of a - // flush, not in the midst. - final Integer cacheFlusherLock = new Integer(0); - - /** Flush cache upon request */ - class Flusher extends Thread implements CacheFlushListener { - private final DelayQueue flushQueue = - new DelayQueue(); + /** + * Thread that flushes cache on request + * @see FlushRequester + */ + private class Flusher extends Thread implements FlushRequester { + private final BlockingQueue flushQueue = + new LinkedBlockingQueue(); + private final HashSet regionsInQueue = new HashSet(); + private final ReentrantLock workingLock = new ReentrantLock(); private final long optionalFlushPeriod; + private final long globalMemcacheLimit; + private final long globalMemcacheLimitLowMark; /** constructor */ public Flusher() { @@ -401,56 +367,86 @@ this.optionalFlushPeriod = conf.getLong( "hbase.regionserver.optionalcacheflushinterval", 30 * 60 * 1000L); + // default memcache limit of 512MB + globalMemcacheLimit = + conf.getLong("hbase.regionserver.globalMemcacheLimit", 512 * 1024 * 1024); + // default memcache low mark limit of 256MB, which is half the upper limit + globalMemcacheLimitLowMark = + conf.getLong("hbase.regionserver.globalMemcacheLimitLowMark", + globalMemcacheLimit / 2); } /** {@inheritDoc} */ @Override public void run() { while (!stopRequested.get()) { - QueueEntry e = null; + HRegion r = null; try { - e = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); - if (e == null) { + enqueueOptionalFlushRegions(); + r = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); + if (r == null) { continue; } - synchronized(cacheFlusherLock) { // Don't interrupt while we're working - if (e.getRegion().flushcache()) { - compactSplitThread.compactionRequested(e); - } - - e.setExpirationTime(System.currentTimeMillis() + - optionalFlushPeriod); - flushQueue.add(e); + if (!flushRegion(r, false)) { + break; } - - // Now insure that all the active regions are in the queue - - Set regions = getRegionsToCheck(); - for (HRegion r: regions) { - e = new QueueEntry(r, r.getLastFlushTime() + optionalFlushPeriod); - synchronized (flushQueue) { - if (!flushQueue.contains(e)) { - flushQueue.add(e); - } - } - } - - // Now make sure that the queue only contains active regions - - synchronized (flushQueue) { - for (Iterator i = flushQueue.iterator(); i.hasNext(); ) { - e = i.next(); - if (!regions.contains(e.getRegion())) { - i.remove(); - } - } - } } catch (InterruptedException ex) { continue; - } catch (ConcurrentModificationException ex) { continue; - + } catch (Exception ex) { + LOG.error("Cache flush failed" + + (r != null ? (" for region " + r.getRegionName()) : ""), ex); + if (!checkFileSystem()) { + break; + } + } + } + regionsInQueue.clear(); + flushQueue.clear(); + LOG.info(getName() + " exiting"); + } + + /** + * Flush a region right away, while respecting concurrency with the async + * flushing that is always going on. + * + * @param region the region to be flushed + * @param removeFromQueue true if the region needs to be removed from the + * flush queue. False if called from the main run loop and true if called from + * flushSomeRegions to relieve memory pressure from the region server. + * + *

In the main run loop, regions have already been removed from the flush + * queue, and if this method is called for the relief of memory pressure, + * this may not be necessarily true. We want to avoid trying to remove + * region from the queue because if it has already been removed, it reqires a + * sequential scan of the queue to determine that it is not in the queue. + * + *

If called from flushSomeRegions, the region may be in the queue but + * it may have been determined that the region had a significant amout of + * memory in use and needed to be flushed to relieve memory pressure. In this + * case, its flush may preempt the pending request in the queue, and if so, + * it needs to be removed from the queue to avoid flushing the region multiple + * times. + * + * @return true if the region was successfully flushed, false otherwise. If + * false, there will be accompanying log messages explaining why the log was + * not flushed. + */ + private boolean flushRegion(HRegion region, boolean removeFromQueue) { + synchronized (regionsInQueue) { + // take the region out of the set. If removeFromQueue is true, remove it + // from the queue too if it is there. This didn't used to be a constraint, + // but now that HBASE-512 is in play, we need to try and limit + // double-flushing of regions. + if (regionsInQueue.remove(region) && removeFromQueue) { + flushQueue.remove(region); + } + workingLock.lock(); + try { + if (region.flushcache()) { + compactSplitThread.compactionRequested(region); + } } catch (DroppedSnapshotException ex) { // Cache flush can fail in a few places. If it fails in a critical // section, we get a DroppedSnapshotException and a replay of hlog @@ -458,44 +454,124 @@ // the server. LOG.fatal("Replay of hlog required. Forcing server restart", ex); if (!checkFileSystem()) { - break; + return false; } - HRegionServer.this.stop(); - + server.stop(); + return false; } catch (IOException ex) { LOG.error("Cache flush failed" + - (e != null ? (" for region " + e.getRegion().getRegionName()) : ""), + (region != null ? (" for region " + region.getRegionName()) : ""), RemoteExceptionHandler.checkIOException(ex)); if (!checkFileSystem()) { - break; + return false; } - - } catch (Exception ex) { - LOG.error("Cache flush failed" + - (e != null ? (" for region " + e.getRegion().getRegionName()) : ""), - ex); - if (!checkFileSystem()) { - break; - } + } finally { + workingLock.unlock(); } } - flushQueue.clear(); - LOG.info(getName() + " exiting"); + return true; } + /** + * Find the regions that should be optionally flushed and put them on the + * flush queue. + */ + private void enqueueOptionalFlushRegions() { + long now = System.currentTimeMillis(); + // Queue up regions for optional flush if they need it + for (HRegion region: getRegionsToCheck()) { + optionallyAddRegion(region, now); + } + } + + /* + * Add region if not already added and if optional flush period has been + * exceeded. + * @param r Region to add. + * @param now The 'now' to use. Set last flush time to this value. + */ + private void optionallyAddRegion(final HRegion r, final long now) { + synchronized (regionsInQueue) { + if (!regionsInQueue.contains(r) && + (System.currentTimeMillis() - optionalFlushPeriod) > + r.getLastFlushTime()) { + addRegion(r, now); + } + } + } + + /* + * Add region if not already added. + * @param r Region to add. + * @param now The 'now' to use. Set last flush time to this value. + */ + private void addRegion(final HRegion r, final long now) { + synchronized (regionsInQueue) { + if (!regionsInQueue.contains(r)) { + regionsInQueue.add(r); + flushQueue.add(r); + r.setLastFlushTime(now); + } + } + } + /** {@inheritDoc} */ - public void flushRequested(HRegion region) { - if (region == null) { - return; + public void request(HRegion r) { + addRegion(r, System.currentTimeMillis()); + } + + /** + * Check if the regionserver's memcache memory usage is greater than the + * limit. If so, flush regions with the biggest memcaches until we're down + * to the lower limit. This method blocks callers until we're down to a safe + * amount of memcache consumption. + */ + public synchronized void reclaimMemcacheMemory() { + long globalMemory = getGlobalMemcacheSize(); + if (globalMemory >= globalMemcacheLimit) { + LOG.info("Global cache memory in use " + globalMemory + " >= " + + globalMemcacheLimit + " configured maximum." + + " Forcing cache flushes to relieve memory pressure."); + flushSomeRegions(); } - QueueEntry e = new QueueEntry(region, System.currentTimeMillis()); - synchronized (flushQueue) { - if (flushQueue.contains(e)) { - flushQueue.remove(e); + } + + private void flushSomeRegions() { + // we'll sort the regions in reverse + SortedMap sortedRegions = new TreeMap( + new Comparator() { + public int compare(Long a, Long b) { + return -1 * a.compareTo(b); + } + } + ); + + // copy over all the regions + for (HRegion region : getRegionsToCheck()) { + sortedRegions.put(region.memcacheSize.get(), region); + } + + // keep flushing until we hit the low water mark + while (getGlobalMemcacheSize() >= globalMemcacheLimitLowMark) { + // flush the region with the biggest memcache + HRegion biggestMemcacheRegion = + sortedRegions.remove(sortedRegions.firstKey()); + LOG.info("Force flush of region " + biggestMemcacheRegion.getRegionName()); + if (!flushRegion(biggestMemcacheRegion, true)) { + // Something bad happened - give up. + break; } - flushQueue.add(e); } } + + /** + * Only interrupt once it's done with a run through the work loop. + */ + void interruptIfNecessary() { + if (workingLock.tryLock()) { + this.interrupt(); + } + } } // HLog and HLog roller. log is protected rather than private to avoid @@ -505,7 +581,7 @@ final Integer logRollerLock = new Integer(0); /** Runs periodically to determine if the HLog should be rolled */ - class LogRoller extends Thread implements LogRollListener { + private class LogRoller extends Thread implements LogRollListener { private final Integer rollLock = new Integer(0); private volatile boolean rollLog; @@ -784,12 +860,9 @@ // Send interrupts to wake up threads if sleeping so they notice shutdown. // TODO: Should we check they are alive? If OOME could have exited already - synchronized(cacheFlusherLock) { - this.cacheFlusher.interrupt(); - } - synchronized (compactSplitLock) { - this.compactSplitThread.interrupt(); - } + this.cacheFlusher.interruptIfNecessary(); + this.compactSplitThread.interruptIfNecessary(); + synchronized (logRollerLock) { this.logRoller.interrupt(); } @@ -1467,6 +1540,7 @@ this.requestCount.incrementAndGet(); HRegion region = getRegion(regionName); try { + cacheFlusher.reclaimMemcacheMemory(); region.batchUpdate(timestamp, b); } catch (IOException e) { checkFileSystem(); @@ -1616,8 +1690,8 @@ return this.requestCount; } - /** @return reference to CacheFlushListener */ - public CacheFlushListener getCacheFlushListener() { + /** @return reference to FlushRequester */ + public FlushRequester getFlushRequester() { return this.cacheFlusher; } @@ -1706,12 +1780,8 @@ */ protected Set getRegionsToCheck() { HashSet regionsToCheck = new HashSet(); - //TODO: is this locking necessary? - lock.readLock().lock(); - try { + synchronized (this.onlineRegions) { regionsToCheck.addAll(this.onlineRegions.values()); - } finally { - lock.readLock().unlock(); } // Purge closed regions. for (final Iterator i = regionsToCheck.iterator(); i.hasNext();) { @@ -1740,6 +1810,18 @@ return this.outboundMsgs; } + /** + * Return the total size of all memcaches in every region. + * @return memcache size in bytes + */ + long getGlobalMemcacheSize() { + long total = 0; + for (HRegion region : getRegionsToCheck()) { + total += region.memcacheSize.get(); + } + return total; + } + // // Main program and support routines // Index: src/java/org/apache/hadoop/hbase/FlushRequester.java =================================================================== --- src/java/org/apache/hadoop/hbase/FlushRequester.java (revision 0) +++ src/java/org/apache/hadoop/hbase/FlushRequester.java (revision 0) @@ -0,0 +1,36 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +/** + * Implementors of this interface want to be notified when an HRegion + * determines that a cache flush is needed. A CacheFlushListener (or null) + * must be passed to the HRegion constructor. + */ +public interface FlushRequester { + + /** + * Tell the listener the cache needs to be flushed. + * + * @param region the HRegion requesting the cache flush + */ + void request(HRegion region); +}