From 102b570bae900bc71df75364a73f9c79f86a4fe5 Mon Sep 17 00:00:00 2001 From: anastas Date: Thu, 10 Aug 2017 13:05:28 +0300 Subject: [PATCH] HBASE-18375: fixing bug of chunks being not returned to the chunk pool --- .../regionserver/CellChunkImmutableSegment.java | 5 +- .../hadoop/hbase/regionserver/ChunkCreator.java | 112 +++++++++------------ .../hbase/regionserver/CompactionPipeline.java | 5 +- .../hadoop/hbase/regionserver/MemStoreLABImpl.java | 18 +++- .../hadoop/hbase/regionserver/TestCellFlatSet.java | 8 +- .../hadoop/hbase/regionserver/TestMemStoreLAB.java | 10 +- .../regionserver/TestMemstoreLABWithoutPool.java | 3 +- 7 files changed, 79 insertions(+), 82 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java index cdda279..3653166 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java @@ -176,10 +176,7 @@ public class CellChunkImmutableSegment extends ImmutableSegment { private int createCellReference(ByteBufferKeyValue cell, ByteBuffer idxBuffer, int idxOffset) { int offset = idxOffset; int dataChunkID = cell.getChunkId(); - // ensure strong pointer to data chunk, as index is no longer directly points to it - Chunk c = ChunkCreator.getInstance().saveChunkFromGC(dataChunkID); - // if c is null, it means that this cell chunks was already released shouldn't happen - assert (c!=null); + offset = ByteBufferUtils.putInt(idxBuffer, offset, dataChunkID); // write data chunk id offset = ByteBufferUtils.putInt(idxBuffer, offset, cell.getOffset()); // offset offset = ByteBufferUtils.putInt(idxBuffer, offset, KeyValueUtil.length(cell)); // length diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java index 7e5395c..8321f46 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java @@ -58,21 +58,8 @@ public class ChunkCreator { // the header size need to be changed in case chunk id size is changed public static final int SIZEOF_CHUNK_HEADER = Bytes.SIZEOF_INT; - // An object pointed by a weak reference can be garbage collected, in opposite to an object - // referenced by a strong (regular) reference. Every chunk created via ChunkCreator is referenced - // from either weakChunkIdMap or strongChunkIdMap. - // Upon chunk C creation, C's ID is mapped into weak reference to C, in order not to disturb C's - // GC in case all other reference to C are going to be removed. - // When chunk C is referenced from CellChunkMap (via C's ID) it is possible to GC the chunk C. - // To avoid that upon inserting C into CellChunkMap, C's ID is mapped into strong (regular) - // reference to C. - - // map that doesn't influence GC - private Map> weakChunkIdMap = - new ConcurrentHashMap>(); - - // map that keeps chunks from garbage collection - private Map strongChunkIdMap = new ConcurrentHashMap(); + // mapping from chunk IDs to chunks + private Map chunkIdMap = new ConcurrentHashMap(); private final int chunkSize; private final boolean offheap; @@ -95,7 +82,7 @@ public class ChunkCreator { } /** - * Initializes the instance of MSLABChunkCreator + * Initializes the instance of ChunkCreator * @param chunkSize the chunkSize * @param offheap indicates if the chunk is to be created offheap or not * @param globalMemStoreSize the global memstore size @@ -122,8 +109,9 @@ public class ChunkCreator { /** * Creates and inits a chunk. * @return the chunk that was initialized + * @param forCellChunkMap */ - Chunk getChunk() { + Chunk getChunk(boolean forCellChunkMap) { Chunk chunk = null; if (pool != null) { // the pool creates the chunk internally. The chunk#init() call happens here @@ -138,9 +126,12 @@ public class ChunkCreator { } if (chunk == null) { chunk = createChunk(); + // if CellChunkMap index is requested, put allocated on demand chunk mapping into chunkIdMap + if (forCellChunkMap) { + this.chunkIdMap.put(chunk.getId(), chunk); + } } - // put this chunk initially into the weakChunkIdMap - this.weakChunkIdMap.put(chunk.getId(), new WeakReference<>(chunk)); + // now we need to actually do the expensive memory allocation step in case of a new chunk, // else only the offset is set to the beginning of the chunk to accept allocations chunk.init(); @@ -157,48 +148,27 @@ public class ChunkCreator { * @return the chunk */ private Chunk createChunk(boolean pool) { + Chunk chunk = null; int id = chunkID.getAndIncrement(); assert id > 0; // do not create offheap chunk on demand if (pool && this.offheap) { - return new OffheapChunk(chunkSize, id, pool); + chunk = new OffheapChunk(chunkSize, id, pool); } else { - return new OnheapChunk(chunkSize, id, pool); + chunk = new OnheapChunk(chunkSize, id, pool); } + if (pool) { + // put the pool chunk forever into the chunkIdMap so it is not GC-ed + this.chunkIdMap.put(chunk.getId(), chunk); + } + return chunk; } @VisibleForTesting // Used to translate the ChunkID into a chunk ref Chunk getChunk(int id) { - WeakReference ref = weakChunkIdMap.get(id); - if (ref != null) { - return ref.get(); - } - // check also the strong mapping - return strongChunkIdMap.get(id); - } - - // transfer the weak pointer to be a strong chunk pointer - Chunk saveChunkFromGC(int chunkID) { - Chunk c = strongChunkIdMap.get(chunkID); // check whether the chunk is already protected - if (c != null) // with strong pointer - return c; - WeakReference ref = weakChunkIdMap.get(chunkID); - if (ref != null) { - c = ref.get(); - } - if (c != null) { - // put this strong reference to chunk into the strongChunkIdMap - // the read of the weakMap is always happening before the read of the strongMap - // so no synchronization issues here - this.strongChunkIdMap.put(chunkID, c); - this.weakChunkIdMap.remove(chunkID); - return c; - } - // we should actually never return null as someone should not ask to save from GC a chunk, - // which is already released. However, we are not asserting it here and we let the caller - // to deal with the return value an assert if needed - return null; + // can return null if chunk was never mapped + return chunkIdMap.get(id); } int getChunkSize() { @@ -210,30 +180,23 @@ public class ChunkCreator { } private void removeChunks(Set chunkIDs) { - this.weakChunkIdMap.keySet().removeAll(chunkIDs); - this.strongChunkIdMap.keySet().removeAll(chunkIDs); + this.chunkIdMap.keySet().removeAll(chunkIDs); } Chunk removeChunk(int chunkId) { - WeakReference weak = this.weakChunkIdMap.remove(chunkId); - Chunk strong = this.strongChunkIdMap.remove(chunkId); - if (weak != null) { - return weak.get(); - } - return strong; + return this.chunkIdMap.remove(chunkId); } @VisibleForTesting - // the chunks in the weakChunkIdMap may already be released so we shouldn't relay + // the chunks in the chunkIdMap may already be released so we shouldn't relay // on this counting for strong correctness. This method is used only in testing. - int size() { - return this.weakChunkIdMap.size()+this.strongChunkIdMap.size(); + int numberOfMappedChunks() { + return this.chunkIdMap.size(); } @VisibleForTesting void clearChunkIds() { - this.strongChunkIdMap.clear(); - this.weakChunkIdMap.clear(); + this.chunkIdMap.clear(); } /** @@ -315,11 +278,16 @@ public class ChunkCreator { Iterator iterator = chunks.iterator(); while (iterator.hasNext()) { Integer chunkId = iterator.next(); - // remove the chunks every time though they are from the pool or not - Chunk chunk = ChunkCreator.this.removeChunk(chunkId); + // translate chunk ID to chunk, + // if chunk initially wasn't in pool this translation will (most likely) return null + Chunk chunk = ChunkCreator.this.getChunk(chunkId); if (chunk != null) { if (chunk.isFromPool() && toAdd > 0) { reclaimedChunks.add(chunk); + } else { + // remove the chunks (that are not going to pool) every time + // though they are initially from the pool or not + ChunkCreator.this.removeChunk(chunkId); } toAdd--; } @@ -433,6 +401,20 @@ public class ChunkCreator { return 0; } + @VisibleForTesting + boolean isChunkInPool(int chunkId) { + if (pool != null) { + // chunks that are from pool will return true chunk reference not null + Chunk c = getChunk(chunkId); + if (c==null) { + return false; + } + return pool.reclaimedChunks.contains(c); + } + + return false; + } + /* * Only used in testing */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index 5136f24..a0d97e8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -261,6 +261,8 @@ public class CompactionPipeline { private void swapSuffix(List suffix, ImmutableSegment segment, boolean closeSegmentsInSuffix) { + pipeline.removeAll(suffix); + if(segment != null) pipeline.addLast(segment); // During index merge we won't be closing the segments undergoing the merge. Segment#close() // will release the MSLAB chunks to pool. But in case of index merge there wont be any data copy // from old MSLABs. So the new cells in new segment also refers to same chunks. In case of data @@ -272,8 +274,6 @@ public class CompactionPipeline { itemInSuffix.close(); } } - pipeline.removeAll(suffix); - if(segment != null) pipeline.addLast(segment); } // replacing one segment in the pipeline with a new one exactly at the same index @@ -281,6 +281,7 @@ public class CompactionPipeline { private void replaceAtIndex(int idx, ImmutableSegment newSegment) { pipeline.set(idx, newSegment); readOnlyCopy = new LinkedList<>(pipeline); + version++; } public Segment getTail() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java index 85e2abe..354d44d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java @@ -78,6 +78,7 @@ public class MemStoreLABImpl implements MemStoreLAB { private final int chunkSize; private final int maxAlloc; private final ChunkCreator chunkCreator; + private final CompactingMemStore.IndexType idxType; // what index is used for corresponding segment // This flag is for closing this instance, its set when clearing snapshot of // memstore @@ -100,6 +101,9 @@ public class MemStoreLABImpl implements MemStoreLAB { // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one! Preconditions.checkArgument(maxAlloc <= chunkSize, MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY); + idxType = CompactingMemStore.IndexType.valueOf(conf.get( + CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY, + CompactingMemStore.COMPACTING_MEMSTORE_INDEX_DEFAULT)); } @Override @@ -239,7 +243,7 @@ public class MemStoreLABImpl implements MemStoreLAB { if (c != null) { return c; } - c = this.chunkCreator.getChunk(); + c = this.chunkCreator.getChunk(idxType==CompactingMemStore.IndexType.CHUNK_MAP); if (c != null) { // set the curChunk. No need of CAS as only one thread will be here curChunk.set(c); @@ -259,7 +263,7 @@ public class MemStoreLABImpl implements MemStoreLAB { // The interface is only for external callers @Override public Chunk getNewExternalChunk() { - Chunk c = this.chunkCreator.getChunk(); + Chunk c = this.chunkCreator.getChunk(false); chunks.add(c.getId()); return c; } @@ -280,4 +284,14 @@ public class MemStoreLABImpl implements MemStoreLAB { } return pooledChunks; } + + @VisibleForTesting BlockingQueue getChunksReturnedToPool() { + BlockingQueue pooledChunks = new LinkedBlockingQueue<>(); + for (Integer id : this.chunks) { + if (chunkCreator.isChunkInPool(id)) { + pooledChunks.add(id); + } + } + return pooledChunks; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java index 6307d32..07ec891 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java @@ -278,8 +278,8 @@ public class TestCellFlatSet extends TestCase { // allocate new chunks and use the data chunk to hold the full data of the cells // and the index chunk to hold the cell-representations - Chunk dataChunk = chunkCreator.getChunk(); - Chunk idxChunk = chunkCreator.getChunk(); + Chunk dataChunk = chunkCreator.getChunk(false); + Chunk idxChunk = chunkCreator.getChunk(false); // the array of index chunks to be used as a basis for CellChunkMap Chunk chunkArray[] = new Chunk[8]; // according to test currently written 8 is way enough int chunkArrayIdx = 0; @@ -295,7 +295,7 @@ public class TestCellFlatSet extends TestCase { for (Cell kv: cellArray) { // do we have enough space to write the cell data on the data chunk? if (dataOffset + KeyValueUtil.length(kv) > chunkCreator.getChunkSize()) { - dataChunk = chunkCreator.getChunk(); // allocate more data chunks if needed + dataChunk = chunkCreator.getChunk(false); // allocate more data chunks if needed dataBuffer = dataChunk.getData(); dataOffset = ChunkCreator.SIZEOF_CHUNK_HEADER; } @@ -304,7 +304,7 @@ public class TestCellFlatSet extends TestCase { // do we have enough space to write the cell-representation on the index chunk? if (idxOffset + ClassSize.CELL_CHUNK_MAP_ENTRY > chunkCreator.getChunkSize()) { - idxChunk = chunkCreator.getChunk(); // allocate more index chunks if needed + idxChunk = chunkCreator.getChunk(false); // allocate more index chunks if needed idxBuffer = idxChunk.getData(); idxOffset = ChunkCreator.SIZEOF_CHUNK_HEADER; chunkArray[chunkArrayIdx++] = idxChunk; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java index f171dd0..c9607e4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java @@ -247,14 +247,16 @@ public class TestMemStoreLAB { } } // none of the chunkIds would have been returned back - assertTrue("All the chunks must have been cleared", ChunkCreator.INSTANCE.size() != 0); + assertTrue("All the chunks must have been cleared", + ChunkCreator.INSTANCE.numberOfMappedChunks() != 0); + int pooledChunksNum = mslab.getPooledChunks().size(); // close the mslab mslab.close(); // make sure all chunks reclaimed or removed from chunk queue - int queueLength = mslab.getPooledChunks().size(); + int queueLength = mslab.getChunksReturnedToPool().size(); assertTrue("All chunks in chunk queue should be reclaimed or removed" - + " after mslab closed but actually: " + queueLength, - queueLength == 0); + + " after mslab closed but actually: " + (pooledChunksNum-queueLength), + pooledChunksNum-queueLength == 0); } finally { ChunkCreator.INSTANCE = oldInstance; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java index 96be8ec..d3f9bc1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java @@ -140,7 +140,8 @@ public class TestMemstoreLABWithoutPool { mslab[i].close(); } // all of the chunkIds would have been returned back - assertTrue("All the chunks must have been cleared", ChunkCreator.INSTANCE.size() == 0); + assertTrue("All the chunks must have been cleared", + ChunkCreator.INSTANCE.numberOfMappedChunks() == 0); } private Thread getChunkQueueTestThread(final MemStoreLABImpl mslab, String threadName, -- 1.8.5.2 (Apple Git-48)