From 62fd55983dc9d20f55a388cd3c91f3c0306389cb Mon Sep 17 00:00:00 2001 From: sunyerui Date: Tue, 18 Aug 2015 20:00:01 +0800 Subject: [PATCH] HBASE-14229: Flushing canceled by coprocessor still leads to memstoreSize set down --- .../apache/hadoop/hbase/regionserver/HRegion.java | 19 ++++++-- .../apache/hadoop/hbase/regionserver/HStore.java | 12 ++++- .../hbase/regionserver/StoreFlushContext.java | 8 ++++ .../hadoop/hbase/regionserver/TestHRegion.java | 55 +++++++++++++++++++--- 4 files changed, 81 insertions(+), 13 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index df7f977..a69b6a6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1772,7 +1772,9 @@ public class HRegion implements HeapSize { // , Writable{ this.updatesLock.writeLock().lock(); long totalFlushableSize = 0; status.setStatus("Preparing to flush by snapshotting stores"); - List storeFlushCtxs = new ArrayList(stores.size()); + TreeMap storeFlushCtxs + = new TreeMap(Bytes.BYTES_COMPARATOR); + TreeMap storeFlushableSize = new TreeMap(Bytes.BYTES_COMPARATOR); long flushSeqId = -1L; try { // Record the mvcc for all transactions in progress. @@ -1794,11 +1796,13 @@ public class HRegion implements HeapSize { // , Writable{ for (Store s : stores.values()) { totalFlushableSize += s.getFlushableSize(); - storeFlushCtxs.add(s.createFlushContext(flushSeqId)); + byte[] storeName = s.getFamily().getName(); + storeFlushCtxs.put(storeName, s.createFlushContext(flushSeqId)); + storeFlushableSize.put(storeName, s.getFlushableSize()); } // prepare flush (take a snapshot) - for (StoreFlushContext flush : storeFlushCtxs) { + for (StoreFlushContext flush : storeFlushCtxs.values()) { flush.prepare(); } } finally { @@ -1837,17 +1841,22 @@ public class HRegion implements HeapSize { // , Writable{ // just-made new flush store file. The new flushed file is still in the // tmp directory. - for (StoreFlushContext flush : storeFlushCtxs) { + for (StoreFlushContext flush : storeFlushCtxs.values()) { flush.flushCache(status); } // Switch snapshot (in memstore) -> new hfile (thus causing // all the store scanners to reset/reseek). - for (StoreFlushContext flush : storeFlushCtxs) { + for (Map.Entry flushEntry : storeFlushCtxs.entrySet()) { + byte[] storeName = flushEntry.getKey(); + StoreFlushContext flush = flushEntry.getValue(); boolean needsCompaction = flush.commit(status); if (needsCompaction) { compactionRequested = true; } + if (flush.getCommittedFiles() == null || flush.getCommittedFiles().isEmpty()) { + totalFlushableSize -= storeFlushableSize.get(storeName); + } } storeFlushCtxs.clear(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index d74ef6c..f7d7749 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -2030,6 +2030,7 @@ public class HStore implements Store { private long cacheFlushSeqNum; private SortedSet snapshot; private List tempFiles; + private List committedFiles; private TimeRangeTracker snapshotTimeRangeTracker; private long flushedCount; private final AtomicLong flushedSize = new AtomicLong(); @@ -2048,6 +2049,7 @@ public class HStore implements Store { this.snapshot = memstore.getSnapshot(); this.snapshotTimeRangeTracker = memstore.getSnapshotTimeRangeTracker(); this.flushedCount = this.snapshot.size(); + this.committedFiles = new ArrayList(1); } @Override @@ -2082,10 +2084,11 @@ public class HStore implements Store { } } - if (HStore.this.getCoprocessorHost() != null) { - for (StoreFile sf : storeFiles) { + for (StoreFile sf : storeFiles) { + if (HStore.this.getCoprocessorHost() != null) { HStore.this.getCoprocessorHost().postFlush(HStore.this, sf); } + committedFiles.add(sf.getPath()); } HStore.this.flushedCellsCount += flushedCount; @@ -2094,6 +2097,11 @@ public class HStore implements Store { // Add new file to store files. Clear snapshot too while we have the Store write lock. return HStore.this.updateStorefiles(storeFiles, snapshot); } + + @Override + public List getCommittedFiles() { + return this.committedFiles; + } } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java index 9b7b135..542e387 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java @@ -19,7 +19,9 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.List; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.monitoring.MonitoredTask; @@ -61,4 +63,10 @@ interface StoreFlushContext { * @throws IOException */ boolean commit(MonitoredTask status) throws IOException; + + /** + * Returns the newly committed files from the flush. Called only if commit returns true + * @return a list of Paths for new files + */ + List getCommittedFiles(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 51a2252..3ace9a4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -41,6 +41,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.IOException; import java.io.InterruptedIOException; @@ -294,7 +295,49 @@ public class TestHRegion { assertTrue("flushable size should be zero, but it is " + sz, sz == 0); HRegion.closeHRegion(region); } - + + /** + * Test for HBASE-14229: Flushing canceled by coprocessor still leads to memstoreSize set down + */ + @Test + public void testMemstoreSizeWithFlushCanceling() throws IOException { + FileSystem fs = FileSystem.get(CONF); + Path rootDir = new Path(dir + "testMemstoreSizeWithFlushCanceling"); + FaultyHLog hLog = new FaultyHLog(fs, rootDir, "testMemstoreSizeWithFlushCanceling", CONF); + HRegion region = initHRegion(tableName, null, null, name.getMethodName(), + CONF, false, Durability.SYNC_WAL, hLog, COLUMN_FAMILY_BYTES); + Store store = region.getStore(COLUMN_FAMILY_BYTES); + assertEquals(0, region.getMemstoreSize().get()); + + // Put some value and make sure flush could be completed normally + byte [] value = Bytes.toBytes(name.getMethodName()); + Put put = new Put(value); + put.add(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value); + region.put(put); + long onePutSize = region.getMemstoreSize().get(); + assertTrue(onePutSize > 0); + region.flushcache(); + assertEquals("memstoreSize should be zero", 0, region.getMemstoreSize().get()); + assertEquals("flushable size should be zero", 0, store.getFlushableSize()); + + // save normalCPHost and replaced by mockedCPHost, which will cancel flush requests + RegionCoprocessorHost normalCPHost = region.getCoprocessorHost(); + RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class); + when(mockedCPHost.preFlush(any(HStore.class), any(InternalScanner.class))).thenReturn(null); + region.setCoprocessorHost(mockedCPHost); + region.put(put); + region.flushcache(); + assertEquals("memstoreSize should NOT be zero", onePutSize, region.getMemstoreSize().get()); + assertEquals("flushable size should NOT be zero", onePutSize, store.getFlushableSize()); + + // set normalCPHost and flush again, the snapshot will be flushed + region.setCoprocessorHost(normalCPHost); + region.flushcache(); + assertEquals("memstoreSize should be zero", 0, region.getMemstoreSize().get()); + assertEquals("flushable size should be zero", 0, store.getFlushableSize()); + HRegion.closeHRegion(region); + } + /** * Test we do not lose data if we fail a flush and then close. * Part of HBase-10466. Tests the following from the issue description: @@ -3155,10 +3198,10 @@ public class TestHRegion { // Add a store that has references. HStore storeMock = Mockito.mock(HStore.class); - Mockito.when(storeMock.hasReferences()).thenReturn(true); - Mockito.when(storeMock.getFamily()).thenReturn(new HColumnDescriptor("cf")); - Mockito.when(storeMock.close()).thenReturn(ImmutableList.of()); - Mockito.when(storeMock.getColumnFamilyName()).thenReturn("cf"); + when(storeMock.hasReferences()).thenReturn(true); + when(storeMock.getFamily()).thenReturn(new HColumnDescriptor("cf")); + when(storeMock.close()).thenReturn(ImmutableList.of()); + when(storeMock.getColumnFamilyName()).thenReturn("cf"); region.stores.put(Bytes.toBytes(storeMock.getColumnFamilyName()), storeMock); assertTrue(region.hasReferences()); @@ -3880,7 +3923,7 @@ public class TestHRegion { HRegionInfo info = null; try { FileSystem fs = Mockito.mock(FileSystem.class); - Mockito.when(fs.exists((Path) Mockito.anyObject())).thenThrow(new IOException()); + when(fs.exists((Path) Mockito.anyObject())).thenThrow(new IOException()); HTableDescriptor htd = new HTableDescriptor(tableName); htd.addFamily(new HColumnDescriptor("cf")); info = new HRegionInfo(htd.getTableName(), HConstants.EMPTY_BYTE_ARRAY, -- 2.3.2 (Apple Git-55)