From aa5e7cc9b27f7bd09f8ed33ee2dca852cb01d6b5 Mon Sep 17 00:00:00 2001 From: sunyerui Date: Wed, 2 Sep 2015 00:01:20 +0800 Subject: [PATCH] HBASE-14229 Flushing canceled by coprocessor still leads to memstoreSize set down --- .../apache/hadoop/hbase/regionserver/HRegion.java | 25 +++++++--- .../hadoop/hbase/regionserver/TestHRegion.java | 56 +++++++++++++++++----- 2 files changed, 64 insertions(+), 17 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 2293311..e21942c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -521,6 +521,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi final FlushResult result; // indicating a failure result from prepare final TreeMap storeFlushCtxs; final TreeMap> committedFiles; + final TreeMap storeFlushableSize; final long startTime; final long flushOpSeqId; final long flushedSeqId; @@ -528,26 +529,29 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi /** Constructs an early exit case */ PrepareFlushResult(FlushResult result, long flushSeqId) { - this(result, null, null, Math.max(0, flushSeqId), 0, 0, 0); + this(result, null, null, null, Math.max(0, flushSeqId), 0, 0, 0); } /** Constructs a successful prepare flush result */ PrepareFlushResult( TreeMap storeFlushCtxs, - TreeMap> committedFiles, long startTime, long flushSeqId, + TreeMap> committedFiles, + TreeMap storeFlushableSize, long startTime, long flushSeqId, long flushedSeqId, long totalFlushableSize) { - this(null, storeFlushCtxs, committedFiles, startTime, + this(null, storeFlushCtxs, committedFiles, storeFlushableSize, startTime, flushSeqId, flushedSeqId, totalFlushableSize); } private PrepareFlushResult( FlushResult result, TreeMap storeFlushCtxs, - TreeMap> committedFiles, long startTime, long flushSeqId, + TreeMap> committedFiles, + TreeMap storeFlushableSize, long startTime, long flushSeqId, long flushedSeqId, long totalFlushableSize) { this.result = result; this.storeFlushCtxs = storeFlushCtxs; this.committedFiles = committedFiles; + this.storeFlushableSize = storeFlushableSize; this.startTime = startTime; this.flushOpSeqId = flushSeqId; this.flushedSeqId = flushedSeqId; @@ -2156,6 +2160,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi = new TreeMap(Bytes.BYTES_COMPARATOR); TreeMap> committedFiles = new TreeMap>( Bytes.BYTES_COMPARATOR); + TreeMap storeFlushableSize + = new TreeMap(Bytes.BYTES_COMPARATOR); // The sequence id of this flush operation which is used to log FlushMarker and pass to // createFlushContext to use as the store file's sequence id. It can be in advance of edits // still in the memstore, edits that are in other column families yet to be flushed. @@ -2194,6 +2200,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi totalFlushableSizeOfFlushableStores += s.getFlushableSize(); storeFlushCtxs.put(s.getFamily().getName(), s.createFlushContext(flushOpSeqId)); committedFiles.put(s.getFamily().getName(), null); // for writing stores to WAL + storeFlushableSize.put(s.getFamily().getName(), s.getFlushableSize()); } // write the snapshot start to WAL @@ -2260,7 +2267,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi mvcc.advanceMemstore(w); } } - return new PrepareFlushResult(storeFlushCtxs, committedFiles, startTime, flushOpSeqId, + return new PrepareFlushResult(storeFlushCtxs, committedFiles, storeFlushableSize, startTime, flushOpSeqId, flushedSeqId, totalFlushableSizeOfFlushableStores); } @@ -2335,7 +2342,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (needsCompaction) { compactionRequested = true; } - committedFiles.put(it.next().getFamily().getName(), flush.getCommittedFiles()); + byte[] storeName = it.next().getFamily().getName(); + List storeCommittedFiles = flush.getCommittedFiles(); + committedFiles.put(storeName, storeCommittedFiles); + // Flush committed no files, indicating flush is empty or flush was canceled + if (storeCommittedFiles == null || storeCommittedFiles.isEmpty()) { + totalFlushableSizeOfFlushableStores -= prepareResult.storeFlushableSize.get(storeName); + } } storeFlushCtxs.clear(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 826c9b3..b186f0e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -33,10 +33,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyBoolean; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -131,12 +128,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; import org.apache.hadoop.hbase.regionserver.Region.RowLock; import org.apache.hadoop.hbase.regionserver.TestStore.FaultyFileSystem; import org.apache.hadoop.hbase.regionserver.handler.FinishRegionRecoveringHandler; -import org.apache.hadoop.hbase.regionserver.wal.HLogKey; -import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; -import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource; -import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.regionserver.wal.WALUtil; +import org.apache.hadoop.hbase.regionserver.wal.*; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.test.MetricsAssertHelper; import org.apache.hadoop.hbase.testclassification.LargeTests; @@ -321,7 +313,7 @@ public class TestHRegion { Path rootDir = new Path(dir + "testMemstoreSnapshotSize"); MyFaultyFSLog faultyLog = new MyFaultyFSLog(fs, rootDir, "testMemstoreSnapshotSize", CONF); HRegion region = initHRegion(tableName, null, null, name.getMethodName(), - CONF, false, Durability.SYNC_WAL, faultyLog, COLUMN_FAMILY_BYTES); + CONF, false, Durability.SYNC_WAL, faultyLog, COLUMN_FAMILY_BYTES); Store store = region.getStore(COLUMN_FAMILY_BYTES); // Get some random bytes. @@ -363,6 +355,48 @@ public class TestHRegion { } /** + * Test for HBASE-14229: Flushing canceled by coprocessor still leads to memstoreSize set down + */ + @Test + public void testMemstoreSizeWithFlushCanceling() throws IOException { + FileSystem fs = FileSystem.get(CONF); + Path rootDir = new Path(dir + "testMemstoreSizeWithFlushCanceling"); + FSHLog hLog = new FSHLog(fs, rootDir, "testMemstoreSizeWithFlushCanceling", CONF); + HRegion region = initHRegion(tableName, null, null, name.getMethodName(), + CONF, false, Durability.SYNC_WAL, hLog, COLUMN_FAMILY_BYTES); + Store store = region.getStore(COLUMN_FAMILY_BYTES); + assertEquals(0, region.getMemstoreSize()); + + // Put some value and make sure flush could be completed normally + byte [] value = Bytes.toBytes(name.getMethodName()); + Put put = new Put(value); + put.add(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value); + region.put(put); + long onePutSize = region.getMemstoreSize(); + assertTrue(onePutSize > 0); + region.flush(true); + assertEquals("memstoreSize should be zero", 0, region.getMemstoreSize()); + assertEquals("flushable size should be zero", 0, store.getFlushableSize()); + + // save normalCPHost and replaced by mockedCPHost, which will cancel flush requests + RegionCoprocessorHost normalCPHost = region.getCoprocessorHost(); + RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class); + when(mockedCPHost.preFlush(isA(HStore.class), isA(InternalScanner.class))).thenReturn(null); + region.setCoprocessorHost(mockedCPHost); + region.put(put); + region.flush(true); + assertEquals("memstoreSize should NOT be zero", onePutSize, region.getMemstoreSize()); + assertEquals("flushable size should NOT be zero", onePutSize, store.getFlushableSize()); + + // set normalCPHost and flush again, the snapshot will be flushed + region.setCoprocessorHost(normalCPHost); + region.flush(true); + assertEquals("memstoreSize should be zero", 0, region.getMemstoreSize()); + assertEquals("flushable size should be zero", 0, store.getFlushableSize()); + HBaseTestingUtility.closeRegionAndWAL(region); + } + + /** * Test we do not lose data if we fail a flush and then close. * Part of HBase-10466. Tests the following from the issue description: * "Bug 1: Wrong calculation of HRegion.memstoreSize: When a flush fails, data to be flushed is -- 2.3.2 (Apple Git-55)