From d79170a6f1e10193334394085c4d0ce0d9a36f9c Mon Sep 17 00:00:00 2001 From: anastas Date: Mon, 17 Jul 2017 17:32:55 +0300 Subject: [PATCH] HBASE-18375: correct the protection of chunks, against GCing the chunks from pool, by keeping those chunks being always pointed by strong map --- .../hadoop/hbase/regionserver/ChunkCreator.java | 35 +++++++++++++++++----- .../hbase/regionserver/CompactionPipeline.java | 4 +-- .../hadoop/hbase/regionserver/MemStoreLABImpl.java | 10 +++++++ .../hadoop/hbase/regionserver/TestMemStoreLAB.java | 7 +++-- 4 files changed, 44 insertions(+), 12 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java index baf9a7b..3ad525a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java @@ -95,7 +95,7 @@ public class ChunkCreator { } /** - * Initializes the instance of MSLABChunkCreator + * Initializes the instance of ChunkCreator * @param chunkSize the chunkSize * @param offheap indicates if the chunk is to be created offheap or not * @param globalMemStoreSize the global memstore size @@ -138,9 +138,10 @@ public class ChunkCreator { } if (chunk == null) { chunk = createChunk(); + // put allocated on demand chunk initially into the weakChunkIdMap + this.weakChunkIdMap.put(chunk.getId(), new WeakReference<>(chunk)); } - // put this chunk initially into the weakChunkIdMap - this.weakChunkIdMap.put(chunk.getId(), new WeakReference<>(chunk)); + // now we need to actually do the expensive memory allocation step in case of a new chunk, // else only the offset is set to the beginning of the chunk to accept allocations chunk.init(); @@ -157,14 +158,20 @@ public class ChunkCreator { * @return the chunk */ private Chunk createChunk(boolean pool) { + Chunk chunk = null; int id = chunkID.getAndIncrement(); assert id > 0; // do not create offheap chunk on demand if (pool && this.offheap) { - return new OffheapChunk(chunkSize, id, pool); + chunk = new OffheapChunk(chunkSize, id, pool); } else { - return new OnheapChunk(chunkSize, id, pool); + chunk = new OnheapChunk(chunkSize, id, pool); + } + if (pool) { + // put the pool chunk forever into the strongChunkIdMap + this.strongChunkIdMap.put(chunk.getId(), chunk); } + return chunk; } @VisibleForTesting @@ -315,11 +322,15 @@ public class ChunkCreator { Iterator iterator = chunks.iterator(); while (iterator.hasNext()) { Integer chunkId = iterator.next(); - // remove the chunks every time though they are from the pool or not - Chunk chunk = ChunkCreator.this.removeChunk(chunkId); + // translate chunk ID to chunk + Chunk chunk = ChunkCreator.this.getChunk(chunkId); if (chunk != null) { if (chunk.isFromPool() && toAdd > 0) { reclaimedChunks.add(chunk); + } else { + // remove the chunks (that are not going to pool) every time + // though they are initially from the pool or not + ChunkCreator.this.removeChunk(chunkId); } toAdd--; } @@ -433,6 +444,16 @@ public class ChunkCreator { return 0; } + @VisibleForTesting + boolean isChunkInPool(int chunkId) { + if (pool != null) { + Chunk c = getChunk(chunkId); + return pool.reclaimedChunks.contains(c); + } + + return false; + } + /* * Only used in testing */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index 5136f24..3da8f31 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -261,6 +261,8 @@ public class CompactionPipeline { private void swapSuffix(List suffix, ImmutableSegment segment, boolean closeSegmentsInSuffix) { + pipeline.removeAll(suffix); + if(segment != null) pipeline.addLast(segment); // During index merge we won't be closing the segments undergoing the merge. Segment#close() // will release the MSLAB chunks to pool. But in case of index merge there wont be any data copy // from old MSLABs. So the new cells in new segment also refers to same chunks. In case of data @@ -272,8 +274,6 @@ public class CompactionPipeline { itemInSuffix.close(); } } - pipeline.removeAll(suffix); - if(segment != null) pipeline.addLast(segment); } // replacing one segment in the pipeline with a new one exactly at the same index diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java index 21b3cc9..7ffa841 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java @@ -280,4 +280,14 @@ public class MemStoreLABImpl implements MemStoreLAB { } return pooledChunks; } + + @VisibleForTesting BlockingQueue getChunksReturnedToPool() { + BlockingQueue pooledChunks = new LinkedBlockingQueue<>(); + for (Integer id : this.chunks) { + if (chunkCreator.isChunkInPool(id)) { + pooledChunks.add(id); + } + } + return pooledChunks; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java index e27d7c2..c9a5dae 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java @@ -248,13 +248,14 @@ public class TestMemStoreLAB { } // none of the chunkIds would have been returned back assertTrue("All the chunks must have been cleared", ChunkCreator.INSTANCE.size() != 0); + int pooledChunksNum = mslab.getPooledChunks().size(); // close the mslab mslab.close(); // make sure all chunks reclaimed or removed from chunk queue - int queueLength = mslab.getPooledChunks().size(); + int queueLength = mslab.getChunksReturnedToPool().size(); assertTrue("All chunks in chunk queue should be reclaimed or removed" - + " after mslab closed but actually: " + queueLength, - queueLength == 0); + + " after mslab closed but actually: " + (pooledChunksNum-queueLength), + pooledChunksNum-queueLength == 0); } finally { ChunkCreator.INSTANCE = oldInstance; } -- 1.8.5.2 (Apple Git-48)