diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index ba75a2e..6b03129 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -857,7 +857,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @return What the next sequence (edit) id should be. * @throws IOException e */ - private long initialize(final CancelableProgressable reporter) throws IOException { + @VisibleForTesting + public long initialize(final CancelableProgressable reporter) throws IOException { //Refuse to open the region if there is no column family in the table if (htableDescriptor.getColumnFamilyCount() == 0) { @@ -870,6 +871,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi try { nextSeqId = initializeRegionInternals(reporter, status); return nextSeqId; + } catch (Throwable t) { + LOG.error("Failed initialize of region=" + getRegionInfo() + .getRegionNameAsString() + ", starting to roll back memstore", t); + // global memstore size will be decreased when dropping memstore + try { + //drop the memory used by memstore if open region fails + dropMemStoreContents(); + } catch (IOException ioE) { + LOG.error("Failed drop memstore of region= " + getRegionInfo() + .getRegionNameAsString() + + " if MSLAB is enabled, some chunks may not released forever."); + } + throw t; } finally { // nextSeqid will be -1 if the initialization fails. // At least it will be 0 otherwise. @@ -4425,11 +4439,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } } - // The edits size added into rsAccounting during this replaying will not - // be required any more. So just clear it. - if (this.rsAccounting != null) { - this.rsAccounting.clearRegionReplayEditsSize(getRegionInfo().getRegionName()); - } if (seqid > minSeqIdForTheRegion) { // Then we added some edits to memory. Flush and cleanup split edit files. internalFlushcache(null, seqid, stores.values(), status, false, FlushLifeCycleTracker.DUMMY); @@ -4611,9 +4620,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi editsCount++; } MemStoreSize mss = memStoreSizing.getMemStoreSize(); - if (this.rsAccounting != null) { - rsAccounting.addRegionReplayEditsSize(getRegionInfo().getRegionName(), mss); - } incMemStoreSize(mss); flush = isFlushSize(this.memStoreSizing.getMemStoreSize()); if (flush) { @@ -5056,6 +5062,26 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } /** + * Be careful, this method will drop all data in all MemStores + */ + public MemStoreSize dropMemStoreContents() throws IOException { + MemStoreSizing totalFreedSize = new NonThreadSafeMemStoreSizing(); + this.updatesLock.writeLock().lock(); + try { + for (HStore s : stores.values()) { + MemStoreSize memStoreSize = doDropStoreMemStoreContentsForSeqId(s, HConstants.NO_SEQNUM); + LOG.info("Drop memstore fo Store " + s.getColumnFamilyName() + " in region " + + this.getRegionInfo().getRegionNameAsString() + + " , dropped memstoresize: [" + memStoreSize + " }"); + totalFreedSize.incMemStoreSize(memStoreSize); + } + return totalFreedSize.getMemStoreSize(); + } finally { + this.updatesLock.writeLock().unlock(); + } + } + + /** * Drops the memstore contents after replaying a flush descriptor or region open event replay * if the memstore edits have seqNums smaller than the given seq id * @throws IOException diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 035496f..728a04b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -2311,6 +2311,10 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat @Override public void abort() throws IOException { if (snapshot != null) { + //We need to close the snapshot when aborting, otherwise, the segment scanner + //won't be closed. If we are using MSLAB, the chunk referenced by those scanners + //can't be released, thus memory leak + snapshot.close(); HStore.this.updateStorefiles(Collections.emptyList(), snapshot.getId()); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java index 4e66fc7..baa9a6a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java @@ -19,14 +19,11 @@ package org.apache.hadoop.hbase.regionserver; import java.lang.management.MemoryType; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.LongAdder; import org.apache.hadoop.conf.Configuration; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.io.util.MemorySizeUtil; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; /** @@ -43,11 +40,6 @@ public class RegionServerAccounting { // memstore off-heap size. private final LongAdder globalMemStoreOffHeapSize = new LongAdder(); - // Store the edits size during replaying WAL. Use this to roll back the - // global memstore size once a region opening failed. - private final ConcurrentMap replayEditsPerRegion = - new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); - private long globalMemStoreLimit; private final float globalMemStoreLimitLowMarkPercent; private long globalMemStoreLimitLowMark; @@ -216,48 +208,4 @@ public class RegionServerAccounting { getGlobalMemStoreHeapSize() * 1.0 / globalOnHeapMemstoreLimitLowMark); } } - - /*** - * Add memStoreSize to replayEditsPerRegion. - * - * @param regionName region name. - * @param memStoreSize the Memstore size will be added to replayEditsPerRegion. - */ - public void addRegionReplayEditsSize(byte[] regionName, MemStoreSize memStoreSize) { - MemStoreSizing replayEdistsSize = replayEditsPerRegion.get(regionName); - // All ops on the same MemStoreSize object is going to be done by single thread, sequentially - // only. First calls to this method to increment the per region reply edits size and then call - // to either rollbackRegionReplayEditsSize or clearRegionReplayEditsSize as per the result of - // the region open operation. No need to handle multi thread issues on one region's entry in - // this Map. - if (replayEdistsSize == null) { - replayEdistsSize = new ThreadSafeMemStoreSizing(); - replayEditsPerRegion.put(regionName, replayEdistsSize); - } - replayEdistsSize.incMemStoreSize(memStoreSize); - } - - /** - * Roll back the global MemStore size for a specified region when this region - * can't be opened. - * - * @param regionName the region which could not open. - */ - public void rollbackRegionReplayEditsSize(byte[] regionName) { - MemStoreSizing replayEditsSizing = replayEditsPerRegion.get(regionName); - if (replayEditsSizing != null) { - clearRegionReplayEditsSize(regionName); - decGlobalMemStoreSize(replayEditsSizing.getDataSize(), replayEditsSizing.getHeapSize(), - replayEditsSizing.getOffHeapSize()); - } - } - - /** - * Clear a region from replayEditsPerRegion. - * - * @param regionName region name. - */ - public void clearRegionReplayEditsSize(byte[] regionName) { - replayEditsPerRegion.remove(regionName); - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java index f408629..970911f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java @@ -29,7 +29,6 @@ import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.Region; -import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.RegionServerServices.PostOpenDeployContext; import org.apache.hadoop.hbase.regionserver.RegionServerServices.RegionStateTransitionContext; @@ -300,16 +299,7 @@ public class OpenRegionHandler extends EventHandler { // and transition the node back to FAILED_OPEN. If that fails, // we rely on the Timeout Monitor in the master to reassign. LOG.error( - "Failed open of region=" + this.regionInfo.getRegionNameAsString() - + ", starting to roll back the global memstore size.", t); - // Decrease the global memstore size. - if (this.rsServices != null) { - RegionServerAccounting rsAccounting = - this.rsServices.getRegionServerAccounting(); - if (rsAccounting != null) { - rsAccounting.rollbackRegionReplayEditsSize(this.regionInfo.getRegionName()); - } - } + "Failed open of region=" + this.regionInfo.getRegionNameAsString(), t); } return region; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEidtsReplayAndAbort.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEidtsReplayAndAbort.java new file mode 100644 index 0000000..6ee63e5 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEidtsReplayAndAbort.java @@ -0,0 +1,209 @@ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.Map; +import java.util.Random; +import java.util.TreeMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.monitoring.TaskMonitor; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.apache.hadoop.hbase.wal.WALProvider; +import org.apache.hadoop.hbase.wal.WALSplitter; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; +import static org.junit.Assert.assertEquals; + +/** + * HBASE-21031 + * If replay edits fails, we need to make sure memstore is rollbacked + * And if MSLAB is used, all chunk is released too. + */ +@Category({RegionServerTests.class, SmallTests.class }) +public class TestRecoveredEidtsReplayAndAbort { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRecoveredEidtsReplayAndAbort.class); + + private static final Logger LOG = LoggerFactory + .getLogger(TestRecoveredEidtsReplayAndAbort.class); + + protected final byte[] row = Bytes.toBytes("rowA"); + + @Rule + public TestName name = new TestName(); + + // Test names + protected TableName tableName; + protected String method; + + protected static HBaseTestingUtility TEST_UTIL; + public static Configuration CONF ; + private static FileSystem FILESYSTEM; + private HRegion region = null; + + private final Random random = new Random(); + + @Before + public void setup() throws IOException { + TEST_UTIL = HBaseTestingUtility.createLocalHTU(); + FILESYSTEM = TEST_UTIL.getTestFileSystem(); + CONF = TEST_UTIL.getConfiguration(); + method = name.getMethodName(); + tableName = TableName.valueOf(method); + } + + @After + public void tearDown() throws Exception { + EnvironmentEdgeManagerTestHelper.reset(); + LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir()); + TEST_UTIL.cleanupTestDir(); + } + + @Test + public void test() throws Exception { + //set flush size to 10MB + CONF.setInt("hbase.hregion.memstore.flush.size", 1024 * 1024 * 10); + //set the report interval to a very small value + CONF.setInt("hbase.hstore.report.interval.edits", 1); + CONF.setInt("hbase.hstore.report.period", 0); + //mock a RegionServerServices + final RegionServerAccounting rsAccounting = new RegionServerAccounting(CONF); + RegionServerServices rs = Mockito.mock(RegionServerServices.class); + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); + Mockito.when(rs.getRegionServerAccounting()).thenReturn(rsAccounting); + Mockito.when(rs.isAborted()).thenReturn(false); + Mockito.when(rs.getNonceManager()).thenReturn(null); + Mockito.when(rs.getServerName()).thenReturn(ServerName + .valueOf("test", 000, 111)); + //create a region + TableName testTable = TableName.valueOf("testRecoveredEidtsReplayAndAbort"); + HTableDescriptor htd = new HTableDescriptor(testTable); + htd.addFamily(new HColumnDescriptor(fam1)); + HRegionInfo info = new HRegionInfo(htd.getTableName(), + HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, false); + Path logDir = TEST_UTIL + .getDataTestDirOnTestFS("TestRecoveredEidtsReplayAndAbort.log"); + final WAL wal = HBaseTestingUtility.createWal(CONF, logDir, info); + Path rootDir = TEST_UTIL.getDataTestDir(); + Path tableDir = FSUtils.getTableDir(rootDir, info.getTable()); + HRegionFileSystem + .createRegionOnFileSystem(CONF, TEST_UTIL.getTestFileSystem(), tableDir, info); + region = HRegion.newHRegion(tableDir, wal, TEST_UTIL.getTestFileSystem(), CONF, info, + htd, rs); + //create some recovered.edits + final WALFactory wals = new WALFactory(CONF, method); + try { + Path regiondir = region.getRegionFileSystem().getRegionDir(); + FileSystem fs = region.getRegionFileSystem().getFileSystem(); + byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes(); + + Path recoveredEditsDir = WALSplitter + .getRegionDirRecoveredEditsDir(regiondir); + long maxSeqId = 1200; + long minSeqId = 1000; + long totalEdits = maxSeqId - minSeqId; + for (long i = minSeqId; i <= maxSeqId; i += 100) { + Path recoveredEdits = new Path(recoveredEditsDir, + String.format("%019d", i)); + LOG.info("Begin to write recovered.eidt : " + recoveredEdits); + fs.create(recoveredEdits); + WALProvider.Writer writer = wals + .createRecoveredEditsWriter(fs, recoveredEdits); + for (long j = i; j < i + 100; j++) { + long time = System.nanoTime(); + WALEdit edit = new WALEdit(); + // 200KB kv + byte[] value = new byte[200 * 1024]; + random.nextBytes(value); + edit.add( + new KeyValue(row, fam1, Bytes.toBytes(j), time, KeyValue.Type.Put, + value)); + writer.append(new WAL.Entry( + new WALKeyImpl(regionName, tableName, j, time, + HConstants.DEFAULT_CLUSTER_ID), edit)); + } + writer.close(); + } + MonitoredTask status = TaskMonitor.get().createStatus(method); + Map maxSeqIdInStores = new TreeMap<>( + Bytes.BYTES_COMPARATOR); + for (HStore store : region.getStores()) { + maxSeqIdInStores + .put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId - 1); + } + //try to replay the edits + try { + region.initialize(new CancelableProgressable() { + private long replayedEdits = 0; + + @Override + public boolean progress() { + replayedEdits++; + //during replay, rsAccounting should align with global memstore, because + //there is only one memstore here + assertEquals(rsAccounting.getGlobalMemStoreDataSize(), + region.getMemStoreDataSize()); + assertEquals(rsAccounting.getGlobalMemStoreHeapSize(), + region.getMemStoreHeapSize()); + assertEquals(rsAccounting.getGlobalMemStoreOffHeapSize(), + region.getMemStoreOffHeapSize()); + // abort the replay before finishing, leaving some edits in the memory + if (replayedEdits > totalEdits - 10) { + return false; + } else { + return true; + } + } + }); + } catch (Throwable t) { + //drop the memstore + LOG.error("replay failed, that's expected", t); + LOG.info("Current memstore: " + region.getMemStoreDataSize() + ", " + region + .getMemStoreHeapSize() + ", " + region + .getMemStoreOffHeapSize()); + } + //After aborting replay, there should be no data in the memory + assertEquals(0, rsAccounting.getGlobalMemStoreDataSize()); + assertEquals(0, region.getMemStoreDataSize()); + //All the chunk in the MSLAB should be recycled, otherwise, there might be + //a memory leak. + assertEquals(0, ChunkCreator.getInstance().numberOfMappedChunks()); + } finally { + HBaseTestingUtility.closeRegionAndWAL(this.region); + this.region = null; + wals.close(); + } + } +}