From e87f5a0d7930e2c54f35abb29b3f50d643dbc6b7 Mon Sep 17 00:00:00 2001 From: fan Date: Fri, 14 Feb 2014 00:22:01 +0000 Subject: =?UTF-8?q?Fix=20bugs=20that=20causes=20flushes=20being=20skipped=20?= =?UTF-8?q?during=20region=20close=0AIt's=20a=20combination=20of=20D1158429=20?= =?UTF-8?q?and=20D1163424?= git-svn-id: svn+ssh://tubbs/svnhive/hadoop/branches/hbase-rH32118-20130111-pluto@40273 e7acf4d4-3532-417f-9e73-7a9ae25a1f51 --- .../apache/hadoop/hbase/regionserver/HRegion.java | 55 +++++++++++++--------- .../hadoop/hbase/regionserver/HRegionServer.java | 2 +- .../apache/hadoop/hbase/regionserver/MemStore.java | 18 +++++++ .../apache/hadoop/hbase/regionserver/Store.java | 8 ++++ .../hbase/regionserver/TestHRegionClose.java | 26 ++++++++++ 5 files changed, 87 insertions(+), 22 deletions(-) diff --git a/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index c08e2e6..5ec08fe 100644 --- a/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -809,12 +809,10 @@ public class HRegion implements HeapSize { status.setStatus("Waiting for split lock"); synchronized (splitLock) { status.setStatus("Disabling compacts and flushes for region"); - boolean wasFlushing = false; synchronized (writestate) { // Disable compacting and flushing by background threads for this // region. writestate.writesEnabled = false; - wasFlushing = writestate.flushing; LOG.debug("Closing " + this + ": disabling compactions & flushes"); while (writestate.compacting > 0 || writestate.flushing) { LOG.debug("waiting for " + writestate.compacting + " compactions" + @@ -827,13 +825,22 @@ public class HRegion implements HeapSize { } } } - // If we were not just flushing, is it worth doing a preflush...one - // that will clear out of the bulk of the memstore before we put up - // the close flag? - if (!abort && !wasFlushing && worthPreFlushing()) { + + // First flush clears content in either snapshots or current memstores + if (!abort) { status.setStatus("Pre-flushing region before close"); LOG.info("Running close preflush of " + this.getRegionNameAsString()); - internalFlushcache(status); + try { + internalFlushcache(status); + } catch (IOException ioe) { + // Failed to flush the region but probably it is still able to serve request, + // so re-enable writes to it. + status.setStatus("Failed to flush the region, putting it online again"); + synchronized (writestate) { + writestate.writesEnabled = true; + } + throw ioe; + } } newScannerLock.writeLock().lock(); this.closing.set(true); @@ -849,9 +856,18 @@ public class HRegion implements HeapSize { waitOnRowLocks(); LOG.debug("No more row locks outstanding on region " + this); - // Don't flush the cache if we are aborting + // Second flush to ensure no unflushed data in memory. if (!abort) { - internalFlushcache(status); + try { + internalFlushcache(status); + } catch (IOException ioe) { + status.setStatus("Failed to flush the region, putting it online again"); + synchronized (writestate) { + writestate.writesEnabled = true; + } + this.closing.set(false); + throw ioe; + } } List result = new ArrayList(); @@ -866,6 +882,7 @@ public class HRegion implements HeapSize { // close each store in parallel for (final Store store : stores.values()) { + assert store.getFlushableMemstoreSize() == 0; completionService .submit(new Callable>() { public ImmutableList call() throws IOException { @@ -889,6 +906,9 @@ public class HRegion implements HeapSize { } } this.closed.set(true); + if (memstoreSize.get() != 0) { + LOG.error("Memstore size should be 0 after clean region close, but is " + memstoreSize.get()); + } status.markComplete("Closed"); LOG.info("Closed " + this); return result; @@ -938,14 +958,6 @@ public class HRegion implements HeapSize { return openAndCloseThreadPool; } - /** - * @return True if its worth doing a flush before we put up the close flag. - */ - private boolean worthPreFlushing() { - return this.memstoreSize.get() > - this.conf.getLong("hbase.hregion.preclose.flush.size", 1024 * 1024 * 5); - } - ////////////////////////////////////////////////////////////////////////////// // HRegion accessors ////////////////////////////////////////////////////////////////////////////// @@ -1436,7 +1448,7 @@ public class HRegion implements HeapSize { this.updatesLock.writeLock().lock(); t0 = EnvironmentEdgeManager.currentTimeMillis(); status.setStatus("Preparing to flush by snapshotting stores"); - final long currentMemStoreSize = this.memstoreSize.get(); + long totalMemstoreSizeOfFlushableStores = 0; //copy the array of per column family memstore values List storeFlushers = new ArrayList( stores.size()); @@ -1445,6 +1457,7 @@ public class HRegion implements HeapSize { wal.getStartCacheFlushSeqNum(this.regionInfo.getRegionName()); completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId); for (Store s : stores.values()) { + totalMemstoreSizeOfFlushableStores += s.getFlushableMemstoreSize(); storeFlushers.add(s.getStoreFlusher(completeSequenceId)); } @@ -1521,7 +1534,7 @@ public class HRegion implements HeapSize { storeFlushers.clear(); // Set down the memstore size by amount of flush. - this.incMemoryUsage(-currentMemStoreSize); + this.incMemoryUsage(-totalMemstoreSizeOfFlushableStores); } catch (Throwable t) { // An exception here means that the snapshot was not persisted. // The hlog needs to be replayed so its content is restored to memstore. @@ -1567,13 +1580,13 @@ public class HRegion implements HeapSize { long time = EnvironmentEdgeManager.currentTimeMillis() - startTime; if (LOG.isDebugEnabled()) { LOG.info("Finished memstore flush of ~" + - StringUtils.humanReadableInt(currentMemStoreSize) + " for region " + + StringUtils.humanReadableInt(totalMemstoreSizeOfFlushableStores) + " for region " + this + " in " + time + "ms, sequence id=" + sequenceId + ", compaction requested=" + compactionRequested + ((wal == null)? "; wal=null": "")); status.setStatus("Finished memstore flush"); } - this.recentFlushes.add(new Pair(time/1000,currentMemStoreSize)); + this.recentFlushes.add(new Pair(time/1000, totalMemstoreSizeOfFlushableStores)); return compactionRequested; } diff --git a/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index fd51d6c..096591b 100644 --- a/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2134,7 +2134,7 @@ public class HRegionServer implements HRegionInterface, "interrupted.", ex); } } else { - LOG.error("unable to process message" + + LOG.error("FAILED TO PROCESS MESSAGE FROM MASTER" + (e != null ? (": " + e.msg.toString()) : ""), ex); checkFileSystem(); } diff --git a/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index ec2b4c1..07c2689 100644 --- a/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -77,6 +77,7 @@ public class MemStore implements HeapSize { // Used to track own heapSize final AtomicLong size; + volatile private long snapshotSize; TimeRangeTracker timeRangeTracker; TimeRangeTracker snapshotTimeRangeTracker; @@ -102,6 +103,7 @@ public class MemStore implements HeapSize { timeRangeTracker = new TimeRangeTracker(); snapshotTimeRangeTracker = new TimeRangeTracker(); this.size = new AtomicLong(DEEP_OVERHEAD); + this.snapshotSize = 0; } void dump() { @@ -128,6 +130,7 @@ public class MemStore implements HeapSize { "Doing nothing. Another ongoing flush or did we fail last attempt?"); } else { if (!this.kvset.isEmpty()) { + this.snapshotSize = keySize(); this.snapshot = this.kvset; this.kvset = new KeyValueSkipListSet(this.comparator); this.snapshotTimeRangeTracker = this.timeRangeTracker; @@ -173,6 +176,7 @@ public class MemStore implements HeapSize { this.snapshot = new KeyValueSkipListSet(this.comparator); this.snapshotTimeRangeTracker = new TimeRangeTracker(); } + this.snapshotSize = 0; } finally { this.lock.writeLock().unlock(); } @@ -729,6 +733,20 @@ public class MemStore implements HeapSize { return heapSize() - DEEP_OVERHEAD; } + public long getSnapshotSize() { + return snapshotSize; + } + + /** + * Flush will first clear out the data in snapshot if any. If snapshot is + * empty, current keyvalue set will be flushed. + * + * @return size of data that is going to be flushed + */ + public long getFlushableSize() { + return snapshotSize > 0 ? snapshotSize : keySize(); + } + /** * Code to help figure if our approximation of object heap sizes is close * enough. See hbase-900. Fills memstores then waits so user can heap diff --git a/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 35e6b04..388d109 100644 --- a/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/VENDOR.hbase/hbase-trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -1765,6 +1765,14 @@ public class Store extends SchemaConfigured implements HeapSize { return this.memstore.heapSize(); } + public long getSnapshotSize() { + return this.memstore.getSnapshotSize(); + } + + public long getFlushableMemstoreSize() { + return this.memstore.getFlushableSize(); + } + /** * @return The priority that this store should have in the compaction queue */ diff --git a/VENDOR.hbase/hbase-trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionClose.java b/VENDOR.hbase/hbase-trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionClose.java index d1dc4b5..eb4b4b2 100644 --- a/VENDOR.hbase/hbase-trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionClose.java +++ b/VENDOR.hbase/hbase-trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionClose.java @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType; import org.apache.hadoop.hbase.executor.RegionTransitionEventData; import org.apache.hadoop.hbase.util.Bytes; @@ -96,4 +97,29 @@ public class TestHRegionClose { public void mainTest() throws Exception { tryCloseRegion(); } + + @Test + public void testMemstoreCleanup() throws Exception { + HRegion region = server.getOnlineRegionsAsArray()[0]; + + Store store = region.getStore(FAMILIES[0]); + + byte[] row = region.getStartKey(); + byte[] value = Bytes.toBytes("testMemstoreCleanup"); + Put put = new Put(row); + put.add(FAMILIES[0], null, Bytes.toBytes("testMemstoreCleanup")); + + // First put something in current memstore, which will be in snapshot after flusher.prepare() + region.put(put); + + StoreFlusher flusher = store.getStoreFlusher(12345); + flusher.prepare(); + + // Second put something in current memstore + put.add(FAMILIES[0], Bytes.toBytes("abc"), value); + region.put(put); + + region.close(); + assertEquals(0, region.getMemstoreSize().get()); + } } -- 1.7.12.4 (Apple Git-37)