diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index ba75a2e..a41813c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -4425,11 +4425,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } } - // The edits size added into rsAccounting during this replaying will not - // be required any more. So just clear it. - if (this.rsAccounting != null) { - this.rsAccounting.clearRegionReplayEditsSize(getRegionInfo().getRegionName()); - } if (seqid > minSeqIdForTheRegion) { // Then we added some edits to memory. Flush and cleanup split edit files. internalFlushcache(null, seqid, stores.values(), status, false, FlushLifeCycleTracker.DUMMY); @@ -4611,9 +4606,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi editsCount++; } MemStoreSize mss = memStoreSizing.getMemStoreSize(); - if (this.rsAccounting != null) { - rsAccounting.addRegionReplayEditsSize(getRegionInfo().getRegionName(), mss); - } incMemStoreSize(mss); flush = isFlushSize(this.memStoreSizing.getMemStoreSize()); if (flush) { @@ -5056,6 +5048,26 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } /** + * Be careful, this method will drop all data in all MemStores + */ + public MemStoreSize dropMemStoreContents() throws IOException { + MemStoreSizing totalFreedSize = new NonThreadSafeMemStoreSizing(); + this.updatesLock.writeLock().lock(); + try { + for (HStore s : stores.values()) { + MemStoreSize memStoreSize = doDropStoreMemStoreContentsForSeqId(s, HConstants.NO_SEQNUM); + LOG.info("Drop memstore fo Store " + s.getColumnFamilyName() + " in region " + + this.getRegionInfo().getRegionNameAsString() + + " , dropped memstoresize: [" + memStoreSize + " }"); + totalFreedSize.incMemStoreSize(memStoreSize); + } + return totalFreedSize.getMemStoreSize(); + } finally { + this.updatesLock.writeLock().unlock(); + } + } + + /** * Drops the memstore contents after replaying a flush descriptor or region open event replay * if the memstore edits have seqNums smaller than the given seq id * @throws IOException diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 035496f..728a04b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -2311,6 +2311,10 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat @Override public void abort() throws IOException { if (snapshot != null) { + //We need to close the snapshot when aborting, otherwise, the segment scanner + //won't be closed. If we are using MSLAB, the chunk referenced by those scanners + //can't be released, thus memory leak + snapshot.close(); HStore.this.updateStorefiles(Collections.emptyList(), snapshot.getId()); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java index 4e66fc7..baa9a6a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java @@ -19,14 +19,11 @@ package org.apache.hadoop.hbase.regionserver; import java.lang.management.MemoryType; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.LongAdder; import org.apache.hadoop.conf.Configuration; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.io.util.MemorySizeUtil; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; /** @@ -43,11 +40,6 @@ public class RegionServerAccounting { // memstore off-heap size. private final LongAdder globalMemStoreOffHeapSize = new LongAdder(); - // Store the edits size during replaying WAL. Use this to roll back the - // global memstore size once a region opening failed. - private final ConcurrentMap replayEditsPerRegion = - new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); - private long globalMemStoreLimit; private final float globalMemStoreLimitLowMarkPercent; private long globalMemStoreLimitLowMark; @@ -216,48 +208,4 @@ public class RegionServerAccounting { getGlobalMemStoreHeapSize() * 1.0 / globalOnHeapMemstoreLimitLowMark); } } - - /*** - * Add memStoreSize to replayEditsPerRegion. - * - * @param regionName region name. - * @param memStoreSize the Memstore size will be added to replayEditsPerRegion. - */ - public void addRegionReplayEditsSize(byte[] regionName, MemStoreSize memStoreSize) { - MemStoreSizing replayEdistsSize = replayEditsPerRegion.get(regionName); - // All ops on the same MemStoreSize object is going to be done by single thread, sequentially - // only. First calls to this method to increment the per region reply edits size and then call - // to either rollbackRegionReplayEditsSize or clearRegionReplayEditsSize as per the result of - // the region open operation. No need to handle multi thread issues on one region's entry in - // this Map. - if (replayEdistsSize == null) { - replayEdistsSize = new ThreadSafeMemStoreSizing(); - replayEditsPerRegion.put(regionName, replayEdistsSize); - } - replayEdistsSize.incMemStoreSize(memStoreSize); - } - - /** - * Roll back the global MemStore size for a specified region when this region - * can't be opened. - * - * @param regionName the region which could not open. - */ - public void rollbackRegionReplayEditsSize(byte[] regionName) { - MemStoreSizing replayEditsSizing = replayEditsPerRegion.get(regionName); - if (replayEditsSizing != null) { - clearRegionReplayEditsSize(regionName); - decGlobalMemStoreSize(replayEditsSizing.getDataSize(), replayEditsSizing.getHeapSize(), - replayEditsSizing.getOffHeapSize()); - } - } - - /** - * Clear a region from replayEditsPerRegion. - * - * @param regionName region name. - */ - public void clearRegionReplayEditsSize(byte[] regionName) { - replayEditsPerRegion.remove(regionName); - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java index f408629..c867064 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java @@ -301,14 +301,15 @@ public class OpenRegionHandler extends EventHandler { // we rely on the Timeout Monitor in the master to reassign. LOG.error( "Failed open of region=" + this.regionInfo.getRegionNameAsString() - + ", starting to roll back the global memstore size.", t); - // Decrease the global memstore size. - if (this.rsServices != null) { - RegionServerAccounting rsAccounting = - this.rsServices.getRegionServerAccounting(); - if (rsAccounting != null) { - rsAccounting.rollbackRegionReplayEditsSize(this.regionInfo.getRegionName()); - } + + ", starting to roll back memstore", t); + // global memstore size will be decreased when dropping memstore + try { + //drop the memory used by memstore if open region fails + region.dropMemStoreContents(); + } catch (IOException ioE) { + LOG.error("Failed drop memstore of region= " + this.regionInfo + .getRegionNameAsString() + + " if MSLAB is enabled, some chunks may not released forever."); } } return region; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 7502a67..50d49db 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -51,6 +51,7 @@ import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.Objects; +import java.util.Random; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.Callable; @@ -145,6 +146,7 @@ import org.apache.hadoop.hbase.test.MetricsAssertHelper; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; import org.apache.hadoop.hbase.util.FSUtils; @@ -220,6 +222,9 @@ public class TestHRegion { private static FileSystem FILESYSTEM; private final int MAX_VERSIONS = 2; + private final Random random = new Random(); + + // Test names protected TableName tableName; protected String method; @@ -639,6 +644,122 @@ public class TestHRegion { } @Test + public void testRecoveredEidtsReplayAndAbort() throws Exception { + //set flush size to 10MB + CONF.setInt("hbase.hregion.memstore.flush.size", 1024 * 1024 * 10); + //set the report interval to a very small value + CONF.setInt("hbase.hstore.report.interval.edits", 1); + CONF.setInt("hbase.hstore.report.period", 0); + //mock a RegionServerServices + final RegionServerAccounting rsAccounting = new RegionServerAccounting(CONF); + RegionServerServices rs = Mockito.mock(RegionServerServices.class); + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); + Mockito.when(rs.getRegionServerAccounting()).thenReturn(rsAccounting); + Mockito.when(rs.isAborted()).thenReturn(false); + Mockito.when(rs.getNonceManager()).thenReturn(null); + Mockito.when(rs.getServerName()).thenReturn(ServerName.valueOf("test", 000, 111)); + //create a region + TableName testTable = TableName.valueOf("testRecoveredEidtsReplayAndAbort"); + HTableDescriptor htd = new HTableDescriptor(testTable); + htd.addFamily(new HColumnDescriptor(fam1)); + HRegionInfo info = new HRegionInfo(htd.getTableName(), + HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, false); + Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + ".log"); + final WAL wal = HBaseTestingUtility.createWal(CONF, logDir, info); + Path rootDir = TEST_UTIL.getDataTestDir(); + Path tableDir = FSUtils.getTableDir(rootDir, info.getTable()); + HRegionFileSystem + .createRegionOnFileSystem(CONF, TEST_UTIL.getTestFileSystem(), tableDir, info); + region = HRegion.newHRegion(tableDir, wal, TEST_UTIL.getTestFileSystem(), CONF, info, + htd, rs); + region.initialize(); + //create some recovered.edits + final WALFactory wals = new WALFactory(CONF, method); + try { + Path regiondir = region.getRegionFileSystem().getRegionDir(); + FileSystem fs = region.getRegionFileSystem().getFileSystem(); + byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes(); + + Path recoveredEditsDir = WALSplitter + .getRegionDirRecoveredEditsDir(regiondir); + long maxSeqId = 1200; + long minSeqId = 1000; + long totalEdits = maxSeqId - minSeqId; + for (long i = minSeqId; i <= maxSeqId; i += 100) { + Path recoveredEdits = new Path(recoveredEditsDir, + String.format("%019d", i)); + LOG.info("Begin to write recovered.eidt : " + recoveredEdits); + fs.create(recoveredEdits); + WALProvider.Writer writer = wals + .createRecoveredEditsWriter(fs, recoveredEdits); + for (long j = i; j < i + 100; j++) { + long time = System.nanoTime(); + WALEdit edit = new WALEdit(); + // 200KB kv + byte[] value = new byte[200 * 1024]; + random.nextBytes(value); + edit.add( + new KeyValue(row, fam1, Bytes.toBytes(j), time, KeyValue.Type.Put, + value)); + writer.append(new WAL.Entry( + new WALKeyImpl(regionName, tableName, j, time, + HConstants.DEFAULT_CLUSTER_ID), edit)); + } + writer.close(); + } + MonitoredTask status = TaskMonitor.get().createStatus(method); + Map maxSeqIdInStores = new TreeMap<>( + Bytes.BYTES_COMPARATOR); + for (HStore store : region.getStores()) { + maxSeqIdInStores + .put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId - 1); + } + //try to replay the edits + try { + region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, + new CancelableProgressable() { + private long replayedEdits = 0; + @Override + public boolean progress() { + replayedEdits++; + //during replay, rsAccounting should align with global memstore, because + //there is only one memstore here + assertEquals(rsAccounting.getGlobalMemStoreDataSize(), + region.getMemStoreDataSize()); + assertEquals(rsAccounting.getGlobalMemStoreHeapSize(), + region.getMemStoreHeapSize()); + assertEquals(rsAccounting.getGlobalMemStoreOffHeapSize(), + region.getMemStoreOffHeapSize()); + // abort the replay before finishing, leaving some edits in the memory + if (replayedEdits > totalEdits -10) { + return false; + } else { + return true; + } + } + }, status); + } catch (Throwable t) { + //drop the memstore + LOG.error("replay failed, that's expected", t); + LOG.info("Current memstore: " + region.getMemStoreDataSize() + ", " + region + .getMemStoreHeapSize() + ", " + region + .getMemStoreOffHeapSize()); + region.dropMemStoreContents(); + } + //After aborting replay, there should be no data in the memory + assertEquals(0, rsAccounting.getGlobalMemStoreDataSize()); + assertEquals(0, region.getMemStoreDataSize()); + //All the chunk in the MSLAB should be recycled, otherwise, there might be + //a memory leak. + assertEquals(0, ChunkCreator.getInstance().numberOfMappedChunks()); + } finally { + HBaseTestingUtility.closeRegionAndWAL(this.region); + this.region = null; + wals.close(); + } + } + + @Test public void testSkipRecoveredEditsReplay() throws Exception { byte[] family = Bytes.toBytes("family"); this.region = initHRegion(tableName, method, CONF, family);