From 1c3b8b65d2f37bb0375e415cc25b7755cf397b46 Mon Sep 17 00:00:00 2001 From: sunyerui Date: Tue, 18 Aug 2015 18:14:25 +0800 Subject: [PATCH] HBASE-14229: Flushing canceled by coprocessor still leads to memstoreSize set down --- .../apache/hadoop/hbase/regionserver/HRegion.java | 25 +++++++--- .../hadoop/hbase/regionserver/TestHRegion.java | 56 +++++++++++++++++----- 2 files changed, 64 insertions(+), 17 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 709e890..e81a405 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -520,6 +520,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi final FlushResult result; // indicating a failure result from prepare final TreeMap storeFlushCtxs; final TreeMap> committedFiles; + final TreeMap storeFlushableSize; final long startTime; final long flushOpSeqId; final long flushedSeqId; @@ -527,26 +528,29 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi /** Constructs an early exit case */ PrepareFlushResult(FlushResult result, long flushSeqId) { - this(result, null, null, Math.max(0, flushSeqId), 0, 0, 0); + this(result, null, null, null, Math.max(0, flushSeqId), 0, 0, 0); } /** Constructs a successful prepare flush result */ PrepareFlushResult( TreeMap storeFlushCtxs, - TreeMap> committedFiles, long startTime, long flushSeqId, + TreeMap> committedFiles, + TreeMap storeFlushableSize, long startTime, long flushSeqId, long flushedSeqId, long totalFlushableSize) { - this(null, storeFlushCtxs, committedFiles, startTime, + this(null, storeFlushCtxs, committedFiles, storeFlushableSize, startTime, flushSeqId, flushedSeqId, totalFlushableSize); } private PrepareFlushResult( FlushResult result, TreeMap storeFlushCtxs, - TreeMap> committedFiles, long startTime, long flushSeqId, + TreeMap> committedFiles, + TreeMap storeFlushableSize, long startTime, long flushSeqId, long flushedSeqId, long totalFlushableSize) { this.result = result; this.storeFlushCtxs = storeFlushCtxs; this.committedFiles = committedFiles; + this.storeFlushableSize = storeFlushableSize; this.startTime = startTime; this.flushOpSeqId = flushSeqId; this.flushedSeqId = flushedSeqId; @@ -2158,6 +2162,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi = new TreeMap(Bytes.BYTES_COMPARATOR); TreeMap> committedFiles = new TreeMap>( Bytes.BYTES_COMPARATOR); + TreeMap storeFlushableSize + = new TreeMap(Bytes.BYTES_COMPARATOR); // The sequence id of this flush operation which is used to log FlushMarker and pass to // createFlushContext to use as the store file's sequence id. It can be in advance of edits // still in the memstore, edits that are in other column families yet to be flushed. @@ -2196,6 +2202,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi totalFlushableSizeOfFlushableStores += s.getFlushableSize(); storeFlushCtxs.put(s.getFamily().getName(), s.createFlushContext(flushOpSeqId)); committedFiles.put(s.getFamily().getName(), null); // for writing stores to WAL + storeFlushableSize.put(s.getFamily().getName(), s.getFlushableSize()); } // write the snapshot start to WAL @@ -2262,7 +2269,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi mvcc.advanceMemstore(w); } } - return new PrepareFlushResult(storeFlushCtxs, committedFiles, startTime, flushOpSeqId, + return new PrepareFlushResult(storeFlushCtxs, committedFiles, storeFlushableSize, startTime, flushOpSeqId, flushedSeqId, totalFlushableSizeOfFlushableStores); } @@ -2337,7 +2344,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (needsCompaction) { compactionRequested = true; } - committedFiles.put(it.next().getFamily().getName(), flush.getCommittedFiles()); + byte[] storeName = it.next().getFamily().getName(); + List storeCommittedFiles = flush.getCommittedFiles(); + committedFiles.put(storeName, storeCommittedFiles); + // Flush committed no files, indicating flush is empty or flush was canceled + if (storeCommittedFiles == null || storeCommittedFiles.isEmpty()) { + totalFlushableSizeOfFlushableStores -= prepareResult.storeFlushableSize.get(storeName); + } } storeFlushCtxs.clear(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index cb2203a..b648ba2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -33,10 +33,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyBoolean; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -132,12 +129,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; import org.apache.hadoop.hbase.regionserver.Region.RowLock; import org.apache.hadoop.hbase.regionserver.TestStore.FaultyFileSystem; import org.apache.hadoop.hbase.regionserver.handler.FinishRegionRecoveringHandler; -import org.apache.hadoop.hbase.regionserver.wal.HLogKey; -import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; -import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource; -import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.regionserver.wal.WALUtil; +import org.apache.hadoop.hbase.regionserver.wal.*; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.test.MetricsAssertHelper; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -321,7 +313,7 @@ public class TestHRegion { Path rootDir = new Path(dir + "testMemstoreSnapshotSize"); MyFaultyFSLog faultyLog = new MyFaultyFSLog(fs, rootDir, "testMemstoreSnapshotSize", CONF); HRegion region = initHRegion(tableName, null, null, name.getMethodName(), - CONF, false, Durability.SYNC_WAL, faultyLog, COLUMN_FAMILY_BYTES); + CONF, false, Durability.SYNC_WAL, faultyLog, COLUMN_FAMILY_BYTES); Store store = region.getStore(COLUMN_FAMILY_BYTES); // Get some random bytes. @@ -346,6 +338,48 @@ public class TestHRegion { } /** + * Test for HBASE-14229: Flushing canceled by coprocessor still leads to memstoreSize set down + */ + @Test + public void testMemstoreSizeWithFlushCanceling() throws IOException { + FileSystem fs = FileSystem.get(CONF); + Path rootDir = new Path(dir + "testMemstoreSizeWithFlushCanceling"); + FSHLog hLog = new FSHLog(fs, rootDir, "testMemstoreSizeWithFlushCanceling", CONF); + HRegion region = initHRegion(tableName, null, null, name.getMethodName(), + CONF, false, Durability.SYNC_WAL, hLog, COLUMN_FAMILY_BYTES); + Store store = region.getStore(COLUMN_FAMILY_BYTES); + assertEquals(0, region.getMemstoreSize()); + + // Put some value and make sure flush could be completed normally + byte [] value = Bytes.toBytes(name.getMethodName()); + Put put = new Put(value); + put.add(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value); + region.put(put); + long onePutSize = region.getMemstoreSize(); + assertTrue(onePutSize > 0); + region.flush(true); + assertEquals("memstoreSize should be zero", 0, region.getMemstoreSize()); + assertEquals("flushable size should be zero", 0, store.getFlushableSize()); + + // save normalCPHost and replaced by mockedCPHost, which will cancel flush requests + RegionCoprocessorHost normalCPHost = region.getCoprocessorHost(); + RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class); + when(mockedCPHost.preFlush(isA(HStore.class), isA(InternalScanner.class))).thenReturn(null); + region.setCoprocessorHost(mockedCPHost); + region.put(put); + region.flush(true); + assertEquals("memstoreSize should NOT be zero", onePutSize, region.getMemstoreSize()); + assertEquals("flushable size should NOT be zero", onePutSize, store.getFlushableSize()); + + // set normalCPHost and flush again, the snapshot will be flushed + region.setCoprocessorHost(normalCPHost); + region.flush(true); + assertEquals("memstoreSize should be zero", 0, region.getMemstoreSize()); + assertEquals("flushable size should be zero", 0, store.getFlushableSize()); + HRegion.closeHRegion(region); + } + + /** * Test we do not lose data if we fail a flush and then close. * Part of HBase-10466. Tests the following from the issue description: * "Bug 1: Wrong calculation of HRegion.memstoreSize: When a flush fails, data to be flushed is -- 2.3.2 (Apple Git-55)