.../apache/hadoop/hbase/ByteBufferChunkCell.java | 42 +++ .../java/org/apache/hadoop/hbase/CellUtil.java | 11 +- .../java/org/apache/hadoop/hbase/ChunkCell.java | 29 ++ .../hadoop/hbase/NoTagByteBufferChunkCell.java | 45 +++ .../apache/hadoop/hbase/regionserver/Chunk.java | 42 ++- .../hadoop/hbase/regionserver/HRegionServer.java | 14 +- .../hbase/regionserver/MSLABChunkCreator.java | 395 +++++++++++++++++++++ .../hbase/regionserver/MemStoreChunkPool.java | 265 -------------- .../hadoop/hbase/regionserver/MemStoreLABImpl.java | 127 ++++--- .../hadoop/hbase/regionserver/OffheapChunk.java | 35 +- .../hadoop/hbase/regionserver/OnheapChunk.java | 37 +- .../apache/hadoop/hbase/HBaseTestingUtility.java | 3 + .../hadoop/hbase/regionserver/TestCellFlatSet.java | 2 +- .../hbase/regionserver/TestCompactingMemStore.java | 28 +- .../TestCompactingToCellArrayMapMemStore.java | 12 +- .../hbase/regionserver/TestDefaultMemStore.java | 14 +- .../hbase/regionserver/TestMemStoreChunkPool.java | 48 +-- .../hadoop/hbase/regionserver/TestMemStoreLAB.java | 27 +- .../regionserver/TestMemstoreLABWithoutPool.java | 169 +++++++++ .../regionserver/TestStoreFileRefresherChore.java | 1 + 20 files changed, 917 insertions(+), 429 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ByteBufferChunkCell.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ByteBufferChunkCell.java new file mode 100644 index 0000000..87db80f --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ByteBufferChunkCell.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * ByteBuffer based ChunkCell with tags + */ +@InterfaceAudience.Private +public class ByteBufferChunkCell extends ByteBufferKeyValue implements ChunkCell { + public ByteBufferChunkCell(ByteBuffer buf, int offset, int length) { + super(buf, offset, length); + } + + public ByteBufferChunkCell(ByteBuffer buf, int offset, int length, long seqId) { + super(buf, offset, length, seqId); + } + + @Override + public long getChunkId() { + // The chunkId is embedded at the 0th offset of the bytebuffer + return this.buf.getLong(0); + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java index bb5197f..bbd28e3 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java @@ -3141,9 +3141,10 @@ public final class CellUtil { } /** - * Clone the passed cell by copying its data into the passed buf. + * Clone the passed cell by copying its data into the passed buf and create a memstore + * chunk cell out of it */ - public static Cell copyCellTo(Cell cell, ByteBuffer buf, int offset, int len) { + public static Cell copyToChunkCell(Cell cell, ByteBuffer buf, int offset, int len) { int tagsLen = cell.getTagsLength(); if (cell instanceof ExtendedCell) { ((ExtendedCell) cell).write(buf, offset); @@ -3153,14 +3154,16 @@ public final class CellUtil { // serialization format only. KeyValueUtil.appendTo(cell, buf, offset, true); } + // TODO : write the seqid here. For writing seqId we should create a new cell type so + // that seqId is not used as the state if (tagsLen == 0) { // When tagsLen is 0, make a NoTagsByteBufferKeyValue version. This is an optimized class // which directly return tagsLen as 0. So we avoid parsing many length components in // reading the tagLength stored in the backing buffer. The Memstore addition of every Cell // call getTagsLength(). - return new NoTagsByteBufferKeyValue(buf, offset, len, cell.getSequenceId()); + return new NoTagByteBufferChunkCell(buf, offset, len, cell.getSequenceId()); } else { - return new ByteBufferKeyValue(buf, offset, len, cell.getSequenceId()); + return new ByteBufferChunkCell(buf, offset, len, cell.getSequenceId()); } } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ChunkCell.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ChunkCell.java new file mode 100644 index 0000000..0e41a93 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ChunkCell.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * A cell created out of the MSLAB pool's chunk. The chunkId can be extracted from the backing + * ByteBuffer + */ +@InterfaceAudience.Private +public interface ChunkCell { + long getChunkId(); +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/NoTagByteBufferChunkCell.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/NoTagByteBufferChunkCell.java new file mode 100644 index 0000000..33f2e71 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/NoTagByteBufferChunkCell.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +@InterfaceAudience.Private + +/** + * ByteBuffer based ChunkCell with no tags + */ +public class NoTagByteBufferChunkCell extends NoTagsByteBufferKeyValue implements ChunkCell { + + public NoTagByteBufferChunkCell(ByteBuffer buf, int offset, int length) { + super(buf, offset, length); + } + + public NoTagByteBufferChunkCell(ByteBuffer buf, int offset, int length, long seqId) { + super(buf, offset, length, seqId); + } + + @Override + public long getChunkId() { + // The chunkId is embedded at the 0th offset of the bytebuffer + return this.buf.getLong(0); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java index 2cbf0a3..0c0a6f2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java @@ -21,8 +21,10 @@ import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; /** * A chunk of memory out of which allocations are sliced. @@ -46,13 +48,22 @@ public abstract class Chunk { /** Size of chunk in bytes */ protected final int size; + // The unique id associated with the chunk. + private final long id; + /** - * Create an uninitialized chunk. Note that memory is not allocated yet, so this is cheap. - * + * Create an uninitialized chunk. Note that memory is not allocated yet, so + * this is cheap. * @param size in bytes + * @param id the chunk id */ - Chunk(int size) { + public Chunk(int size, long id) { this.size = size; + this.id = id; + } + + long getId() { + return this.id; } /** @@ -60,7 +71,24 @@ public abstract class Chunk { * constructed the chunk. It is thread-safe against other threads calling alloc(), who will block * until the allocation is complete. */ - public abstract void init(); + public void init() { + assert nextFreeOffset.get() == UNINITIALIZED; + try { + allocateDataBuffer(); + } catch (OutOfMemoryError e) { + boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM); + assert failInit; // should be true. + throw e; + } + // Mark that it's ready for use + // Move 8 bytes since the first 8 bytes are having the chunkid in it + boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, Bytes.SIZEOF_LONG); + // We should always succeed the above CAS since only one thread + // calls init()! + Preconditions.checkState(initted, "Multiple threads tried to init same chunk"); + } + + abstract void allocateDataBuffer(); /** * Reset the offset to UNINITIALIZED before before reusing an old chunk @@ -96,7 +124,7 @@ public abstract class Chunk { if (oldOffset + size > data.capacity()) { return -1; // alloc doesn't fit } - + // TODO : If seqID is to be written add 8 bytes here for nextFreeOFfset // Try to atomically claim this chunk if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) { // we got the alloc @@ -125,3 +153,7 @@ public abstract class Chunk { return this.nextFreeOffset.get(); } } + +// Marker interface to say the chunk came out of a pool +interface PooledChunk { +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index be4cca0..044a4e3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1473,7 +1473,7 @@ public class HRegionServer extends HasThread implements startServiceThreads(); startHeapMemoryManager(); // Call it after starting HeapMemoryManager. - initializeMemStoreChunkPool(); + initializeMemStoreChunkCreator(); LOG.info("Serving as " + this.serverName + ", RpcServer on " + rpcServices.isa + ", sessionid=0x" + @@ -1493,7 +1493,7 @@ public class HRegionServer extends HasThread implements } } - private void initializeMemStoreChunkPool() { + private void initializeMemStoreChunkCreator() { if (MemStoreLAB.isEnabled(conf)) { // MSLAB is enabled. So initialize MemStoreChunkPool // By this time, the MemstoreFlusher is already initialized. We can get the global limits from @@ -1507,11 +1507,13 @@ public class HRegionServer extends HasThread implements float initialCountPercentage = conf.getFloat(MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT); int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT); - MemStoreChunkPool pool = MemStoreChunkPool.initialize(globalMemStoreSize, poolSizePercentage, - initialCountPercentage, chunkSize, offheap); - if (pool != null && this.hMemManager != null) { + // init the chunkCreator + MSLABChunkCreator chunkCreator = + MSLABChunkCreator.initialize(chunkSize, offheap, globalMemStoreSize, poolSizePercentage, + initialCountPercentage); + if (chunkCreator.getPool() != null && this.hMemManager != null) { // Register with Heap Memory manager - this.hMemManager.registerTuneObserver(pool); + this.hMemManager.registerTuneObserver(chunkCreator.getPool()); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MSLABChunkCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MSLABChunkCreator.java new file mode 100644 index 0000000..e183caa --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MSLABChunkCreator.java @@ -0,0 +1,395 @@ + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.Collection; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.HeapMemoryTuneObserver; +import org.apache.hadoop.util.StringUtils; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * Does the management of memstoreLAB chunk creations. A monotonically incrementing id is associated + * with every chunk + */ +@InterfaceAudience.Private +public class MSLABChunkCreator { + private final Log LOG = LogFactory.getLog(MSLABChunkCreator.class); + // monotonically increasing chunkid + private AtomicLong chunkID = new AtomicLong(1); + // maps the chunk against the monotonically increasing chunk id. We need to preserver the + // natural ordering of the key + private ConcurrentHashMap chunkIdMap = + new ConcurrentHashMap(); + private final int chunkSize; + private final boolean offheap; + @VisibleForTesting + static MSLABChunkCreator INSTANCE; + static boolean chunkPoolDisabled; + private MemStoreChunkPool pool; + + @VisibleForTesting + MSLABChunkCreator(int chunkSize, boolean offheap, long globalMemStoreSize, + float poolSizePercentage, float initialCountPercentage) { + this.chunkSize = chunkSize; + this.offheap = offheap; + this.pool = initializePool(globalMemStoreSize, poolSizePercentage, initialCountPercentage); + } + + /** + * Initializes the instance of MSLABChunkCreator + * @param chunkSize the chunkSize + * @param offheap indicates if the chunk is to be created offheap or not + * @param initialCountPercentage the initial count of the chunk pool if any + * @param poolSizePercentage pool size percentage + * @param globalMemStoreSize the global memstore size + * @return singleton MSLABChunkCreator + */ + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "LI_LAZY_INIT_STATIC", + justification = "Method is called by single thread at the starting of RS") + @VisibleForTesting + public static MSLABChunkCreator initialize(int chunkSize, boolean offheap, + long globalMemStoreSize, float poolSizePercentage, float initialCountPercentage) { + if (INSTANCE != null) return INSTANCE; + INSTANCE = new MSLABChunkCreator(chunkSize, offheap, globalMemStoreSize, poolSizePercentage, + initialCountPercentage); + // Initialize the chunkPool too here and then use that instance + return INSTANCE; + } + + static MSLABChunkCreator getChunkCreator() { + return INSTANCE; + } + + // We need this to pass it to the heapMemoryTuner + public MemStoreChunkPool getPool() { + return this.pool; + } + + /** + * Creates the chunk either onheap or offheap + * @return the chunk + */ + private Chunk createChunk() { + return createChunk(false); + } + + /** + * Creates and inits a chunk. + * @return the chunk that was initialized + */ + Chunk getChunk() { + Chunk chunk; + if (pool != null) { + chunk = this.pool.getChunk(); + // the pool has run out of maxCount + if (chunk == null) { + chunk = createChunk(); + } + } else { + chunk = createChunk(); + } + // now we need to actually do the expensive allocation step + chunk.init(); + return chunk; + } + + /** + * Creates the chunk either onheap or offheap + * @param forPool indicates if the chunks have to be created which will be used by the Pool + * @return the chunk + */ + private Chunk createChunk(boolean forPool) { + Chunk memstoreLABChunk; + long id = chunkID.getAndIncrement(); + if (forPool) { + if (this.offheap) { + memstoreLABChunk = new PooledOffheapChunk(chunkSize, id); + } else { + memstoreLABChunk = new PooledOnheapChunk(chunkSize, id); + } + } else { + if (this.offheap) { + memstoreLABChunk = new OffheapChunk(chunkSize, id); + } else { + memstoreLABChunk = new OnheapChunk(chunkSize, id); + } + } + chunkIdMap.put(id, memstoreLABChunk); + return memstoreLABChunk; + } + + Chunk getChunk(long id) { + return this.chunkIdMap.get(id); + } + + int getChunkSize() { + return this.chunkSize; + } + + boolean isOffheap() { + return this.offheap; + } + + void removeChunks(Collection chunkIds) { + if (pool == null) { + for (long chunkId : chunkIds) { + this.chunkIdMap.remove(chunkId); + } + } + } + + @VisibleForTesting + // Used only in tests + int size() { + return this.chunkIdMap.size(); + } + + @VisibleForTesting + void clearChunkIds() { + this.chunkIdMap.clear(); + } + + private class MemStoreChunkPool implements HeapMemoryTuneObserver { + private int maxCount; + + // A queue of reclaimed chunks + private final BlockingQueue reclaimedChunks; + private final float poolSizePercentage; + + /** Statistics thread schedule pool */ + private final ScheduledExecutorService scheduleThreadPool; + /** Statistics thread */ + private static final int statThreadPeriod = 60 * 5; + private final AtomicLong chunkCount = new AtomicLong(); + private final AtomicLong reusedChunkCount = new AtomicLong(); + + MemStoreChunkPool(int maxCount, int initialCount, float poolSizePercentage) { + this.maxCount = maxCount; + this.poolSizePercentage = poolSizePercentage; + this.reclaimedChunks = new LinkedBlockingQueue<>(); + for (int i = 0; i < initialCount; i++) { + Chunk chunk = createChunk(true); + chunk.init(); + reclaimedChunks.add(chunk); + } + chunkCount.set(initialCount); + final String n = Thread.currentThread().getName(); + scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() + .setNameFormat(n + "-MemStoreChunkPool Statistics").setDaemon(true).build()); + this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(), statThreadPeriod, + statThreadPeriod, TimeUnit.SECONDS); + } + + /** + * Poll a chunk from the pool, reset it if not null, else create a new chunk to return if we have + * not yet created max allowed chunks count. When we have already created max allowed chunks and + * no free chunks as of now, return null. It is the responsibility of the caller to make a chunk + * then. + * Note: Chunks returned by this pool must be put back to the pool after its use. + * @return a chunk + * @see #putbackChunk(Chunk) + * @see #putbackChunks(BlockingQueue) + */ + Chunk getChunk() { + Chunk chunk = reclaimedChunks.poll(); + if (chunk != null) { + chunk.reset(); + reusedChunkCount.incrementAndGet(); + } else { + // Make a chunk iff we have not yet created the maxCount chunks + while (true) { + long created = this.chunkCount.get(); + if (created < this.maxCount) { + chunk = createChunk(true); + if (this.chunkCount.compareAndSet(created, created + 1)) { + break; + } + } else { + break; + } + } + } + return chunk; + } + + /** + * Add the chunks to the pool, when the pool achieves the max size, it will + * skip the remaining chunks + * @param chunks + */ + private void putbackChunks(BlockingQueue chunks) { + int toAdd = Math.min(chunks.size(), this.maxCount - reclaimedChunks.size()); + Chunk chunk = null; + while ((chunk = chunks.poll()) != null && toAdd > 0) { + reclaimedChunks.add(chunk); + toAdd--; + } + } + + /** + * Add the chunk to the pool, if the pool has achieved the max size, it will + * skip it + * @param chunk + */ + private void putbackChunk(Chunk chunk) { + if (reclaimedChunks.size() < this.maxCount) { + reclaimedChunks.add(chunk); + } + } + + private class StatisticsThread extends Thread { + StatisticsThread() { + super("MemStoreChunkPool.StatisticsThread"); + setDaemon(true); + } + + @Override + public void run() { + logStats(); + } + + private void logStats() { + if (!LOG.isDebugEnabled()) return; + long created = chunkCount.get(); + long reused = reusedChunkCount.get(); + long total = created + reused; + LOG.debug("Stats: current pool size=" + reclaimedChunks.size() + + ",created chunk count=" + created + + ",reused chunk count=" + reused + + ",reuseRatio=" + (total == 0 ? "0" : StringUtils.formatPercent( + (float) reused / (float) total, 2))); + } + } + + private int getMaxCount() { + return this.maxCount; + } + + @Override + public void onHeapMemoryTune(long newMemstoreSize, long newBlockCacheSize) { + // don't do any tuning in case of offheap memstore + if (isOffheap()) { + LOG.warn("Not tuning the chunk pool as it is offheap"); + return; + } + int newMaxCount = + (int) (newMemstoreSize * poolSizePercentage / getChunkSize()); + if (newMaxCount != this.maxCount) { + // We need an adjustment in the chunks numbers + if (newMaxCount > this.maxCount) { + // Max chunks getting increased. Just change the variable. Later calls to getChunk() would + // create and add them to Q + LOG.info("Max count for chunks increased from " + this.maxCount + " to " + newMaxCount); + this.maxCount = newMaxCount; + } else { + // Max chunks getting decreased. We may need to clear off some of the pooled chunks now + // itself. If the extra chunks are serving already, do not pool those when we get them back + LOG.info("Max count for chunks decreased from " + this.maxCount + " to " + newMaxCount); + this.maxCount = newMaxCount; + if (this.reclaimedChunks.size() > newMaxCount) { + synchronized (this) { + while (this.reclaimedChunks.size() > newMaxCount) { + this.reclaimedChunks.poll(); + } + } + } + } + } + } + } + + @VisibleForTesting + static void clearDisableFlag() { + chunkPoolDisabled = false; + } + + private MemStoreChunkPool initializePool(long globalMemStoreSize, float poolSizePercentage, + float initialCountPercentage) { + if (poolSizePercentage <= 0) { + LOG.info("PoolSizePercentage is less than 0. So not using pool"); + return null; + } + if (chunkPoolDisabled) { + return null; + } + if (poolSizePercentage > 1.0) { + throw new IllegalArgumentException( + MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY + " must be between 0.0 and 1.0"); + } + int maxCount = (int) (globalMemStoreSize * poolSizePercentage / getChunkSize()); + if (initialCountPercentage > 1.0 || initialCountPercentage < 0) { + throw new IllegalArgumentException( + MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY + " must be between 0.0 and 1.0"); + } + int initialCount = (int) (initialCountPercentage * maxCount); + LOG.info("Allocating MemStoreChunkPool with chunk size " + + StringUtils.byteDesc(getChunkSize()) + ", max count " + maxCount + + ", initial count " + initialCount); + return new MemStoreChunkPool(maxCount, initialCount, poolSizePercentage); + } + + synchronized void putbackChunk(Chunk c) { + if (pool != null) { + pool.putbackChunk(c); + } + } + + synchronized void putbackChunks(BlockingQueue chunks) { + if (pool != null) { + pool.putbackChunks(chunks); + } + } + + public int getMaxCount() { + if (pool != null) { + return pool.getMaxCount(); + } + return 0; + } + + int getPoolSize() { + if (pool != null) { + return pool.reclaimedChunks.size(); + } + return 0; + } + + /* + * Only used in testing + */ + void clearChunks() { + if (pool != null) { + pool.reclaimedChunks.clear(); + } + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java deleted file mode 100644 index b7ac212..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java +++ /dev/null @@ -1,265 +0,0 @@ -/** - * Copyright The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.HeapMemoryTuneObserver; -import org.apache.hadoop.util.StringUtils; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - -/** - * A pool of {@link Chunk} instances. - * - * MemStoreChunkPool caches a number of retired chunks for reusing, it could - * decrease allocating bytes when writing, thereby optimizing the garbage - * collection on JVM. - * - * The pool instance is globally unique and could be obtained through - * {@link MemStoreChunkPool#initialize(long, float, float, int, boolean)} - * - * {@link MemStoreChunkPool#getChunk()} is called when MemStoreLAB allocating - * bytes, and {@link MemStoreChunkPool#putbackChunks(BlockingQueue)} is called - * when MemStore clearing snapshot for flush - */ -@SuppressWarnings("javadoc") -@InterfaceAudience.Private -public class MemStoreChunkPool implements HeapMemoryTuneObserver { - private static final Log LOG = LogFactory.getLog(MemStoreChunkPool.class); - - // Static reference to the MemStoreChunkPool - static MemStoreChunkPool GLOBAL_INSTANCE; - /** Boolean whether we have disabled the memstore chunk pool entirely. */ - static boolean chunkPoolDisabled = false; - - private int maxCount; - - // A queue of reclaimed chunks - private final BlockingQueue reclaimedChunks; - private final int chunkSize; - private final float poolSizePercentage; - - /** Statistics thread schedule pool */ - private final ScheduledExecutorService scheduleThreadPool; - /** Statistics thread */ - private static final int statThreadPeriod = 60 * 5; - private final AtomicLong chunkCount = new AtomicLong(); - private final AtomicLong reusedChunkCount = new AtomicLong(); - private final boolean offheap; - - MemStoreChunkPool(int chunkSize, int maxCount, int initialCount, float poolSizePercentage, - boolean offheap) { - this.maxCount = maxCount; - this.chunkSize = chunkSize; - this.poolSizePercentage = poolSizePercentage; - this.offheap = offheap; - this.reclaimedChunks = new LinkedBlockingQueue<>(); - for (int i = 0; i < initialCount; i++) { - Chunk chunk = this.offheap ? new OffheapChunk(chunkSize) : new OnheapChunk(chunkSize); - chunk.init(); - reclaimedChunks.add(chunk); - } - chunkCount.set(initialCount); - final String n = Thread.currentThread().getName(); - scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() - .setNameFormat(n + "-MemStoreChunkPool Statistics").setDaemon(true).build()); - this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(), statThreadPeriod, - statThreadPeriod, TimeUnit.SECONDS); - } - - /** - * Poll a chunk from the pool, reset it if not null, else create a new chunk to return if we have - * not yet created max allowed chunks count. When we have already created max allowed chunks and - * no free chunks as of now, return null. It is the responsibility of the caller to make a chunk - * then. - * Note: Chunks returned by this pool must be put back to the pool after its use. - * @return a chunk - * @see #putbackChunk(Chunk) - * @see #putbackChunks(BlockingQueue) - */ - Chunk getChunk() { - Chunk chunk = reclaimedChunks.poll(); - if (chunk != null) { - chunk.reset(); - reusedChunkCount.incrementAndGet(); - } else { - // Make a chunk iff we have not yet created the maxCount chunks - while (true) { - long created = this.chunkCount.get(); - if (created < this.maxCount) { - chunk = this.offheap ? new OffheapChunk(this.chunkSize) : new OnheapChunk(this.chunkSize); - if (this.chunkCount.compareAndSet(created, created + 1)) { - break; - } - } else { - break; - } - } - } - return chunk; - } - - /** - * Add the chunks to the pool, when the pool achieves the max size, it will - * skip the remaining chunks - * @param chunks - */ - synchronized void putbackChunks(BlockingQueue chunks) { - int toAdd = Math.min(chunks.size(), this.maxCount - reclaimedChunks.size()); - Chunk chunk = null; - while ((chunk = chunks.poll()) != null && toAdd > 0) { - reclaimedChunks.add(chunk); - toAdd--; - } - } - - /** - * Add the chunk to the pool, if the pool has achieved the max size, it will - * skip it - * @param chunk - */ - synchronized void putbackChunk(Chunk chunk) { - if (reclaimedChunks.size() < this.maxCount) { - reclaimedChunks.add(chunk); - } - } - - int getPoolSize() { - return this.reclaimedChunks.size(); - } - - /* - * Only used in testing - */ - void clearChunks() { - this.reclaimedChunks.clear(); - } - - private class StatisticsThread extends Thread { - StatisticsThread() { - super("MemStoreChunkPool.StatisticsThread"); - setDaemon(true); - } - - @Override - public void run() { - logStats(); - } - - private void logStats() { - if (!LOG.isDebugEnabled()) return; - long created = chunkCount.get(); - long reused = reusedChunkCount.get(); - long total = created + reused; - LOG.debug("Stats: current pool size=" + reclaimedChunks.size() - + ",created chunk count=" + created - + ",reused chunk count=" + reused - + ",reuseRatio=" + (total == 0 ? "0" : StringUtils.formatPercent( - (float) reused / (float) total, 2))); - } - } - - /** - * @return the global MemStoreChunkPool instance - */ - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "LI_LAZY_INIT_STATIC", - justification = "Method is called by single thread at the starting of RS") - static MemStoreChunkPool initialize(long globalMemStoreSize, float poolSizePercentage, - float initialCountPercentage, int chunkSize, boolean offheap) { - if (GLOBAL_INSTANCE != null) return GLOBAL_INSTANCE; - if (chunkPoolDisabled) return null; - - if (poolSizePercentage <= 0) { - chunkPoolDisabled = true; - return null; - } - if (poolSizePercentage > 1.0) { - throw new IllegalArgumentException( - MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY + " must be between 0.0 and 1.0"); - } - int maxCount = (int) (globalMemStoreSize * poolSizePercentage / chunkSize); - if (initialCountPercentage > 1.0 || initialCountPercentage < 0) { - throw new IllegalArgumentException( - MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY + " must be between 0.0 and 1.0"); - } - int initialCount = (int) (initialCountPercentage * maxCount); - LOG.info("Allocating MemStoreChunkPool with chunk size " + StringUtils.byteDesc(chunkSize) - + ", max count " + maxCount + ", initial count " + initialCount); - GLOBAL_INSTANCE = new MemStoreChunkPool(chunkSize, maxCount, initialCount, poolSizePercentage, - offheap); - return GLOBAL_INSTANCE; - } - - /** - * @return The singleton instance of this pool. - */ - static MemStoreChunkPool getPool() { - return GLOBAL_INSTANCE; - } - - int getMaxCount() { - return this.maxCount; - } - - @VisibleForTesting - static void clearDisableFlag() { - chunkPoolDisabled = false; - } - - @Override - public void onHeapMemoryTune(long newMemstoreSize, long newBlockCacheSize) { - // don't do any tuning in case of offheap memstore - if (this.offheap) { - LOG.warn("Not tuning the chunk pool as it is offheap"); - return; - } - int newMaxCount = (int) (newMemstoreSize * poolSizePercentage / chunkSize); - if (newMaxCount != this.maxCount) { - // We need an adjustment in the chunks numbers - if (newMaxCount > this.maxCount) { - // Max chunks getting increased. Just change the variable. Later calls to getChunk() would - // create and add them to Q - LOG.info("Max count for chunks increased from " + this.maxCount + " to " + newMaxCount); - this.maxCount = newMaxCount; - } else { - // Max chunks getting decreased. We may need to clear off some of the pooled chunks now - // itself. If the extra chunks are serving already, do not pool those when we get them back - LOG.info("Max count for chunks decreased from " + this.maxCount + " to " + newMaxCount); - this.maxCount = newMaxCount; - if (this.reclaimedChunks.size() > newMaxCount) { - synchronized (this) { - while (this.reclaimedChunks.size() > newMaxCount) { - this.reclaimedChunks.poll(); - } - } - } - } - } - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java index 4e87135..b5fe661 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java @@ -18,11 +18,14 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -66,6 +69,9 @@ public class MemStoreLABImpl implements MemStoreLAB { static final Log LOG = LogFactory.getLog(MemStoreLABImpl.class); private AtomicReference curChunk = new AtomicReference<>(); + // Lock to manage multiple handlers requesting for a chunk + private ReentrantLock lock = new ReentrantLock(); + // A queue of chunks from pool contained by this memstore LAB // TODO: in the future, it would be better to have List implementation instead of Queue, // as FIFO order is not so important here @@ -73,7 +79,8 @@ public class MemStoreLABImpl implements MemStoreLAB { BlockingQueue pooledChunkQueue = null; private final int chunkSize; private final int maxAlloc; - private final MemStoreChunkPool chunkPool; + //private final MemStoreChunkPool chunkPool; + private final MSLABChunkCreator chunkCreator; // This flag is for closing this instance, its set when clearing snapshot of // memstore @@ -83,6 +90,8 @@ public class MemStoreLABImpl implements MemStoreLAB { private AtomicBoolean reclaimed = new AtomicBoolean(false); // Current count of open scanners which reading data from this MemStoreLAB private final AtomicInteger openScannerCount = new AtomicInteger(); + // Stores all the chunkIds that are used by this MSLAB when there is no chunkPool + private final Map chunkIds = new ConcurrentHashMap(); // Used in testing public MemStoreLABImpl() { @@ -92,20 +101,18 @@ public class MemStoreLABImpl implements MemStoreLAB { public MemStoreLABImpl(Configuration conf) { chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT); maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT); - this.chunkPool = MemStoreChunkPool.getPool(); + this.chunkCreator = MSLABChunkCreator.getChunkCreator(); // currently chunkQueue is only used for chunkPool - if (this.chunkPool != null) { + if (this.chunkCreator.getPool() != null) { // set queue length to chunk pool max count to avoid keeping reference of // too many non-reclaimable chunks - pooledChunkQueue = new LinkedBlockingQueue<>(chunkPool.getMaxCount()); + pooledChunkQueue = new LinkedBlockingQueue<>(chunkCreator.getMaxCount()); } - // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one! Preconditions.checkArgument(maxAlloc <= chunkSize, MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY); } - @Override public Cell copyCellInto(Cell cell) { int size = KeyValueUtil.length(cell); @@ -118,19 +125,22 @@ public class MemStoreLABImpl implements MemStoreLAB { Chunk c = null; int allocOffset = 0; while (true) { + // Try to get the chunk c = getOrMakeChunk(); // Try to allocate from this chunk - allocOffset = c.alloc(size); - if (allocOffset != -1) { - // We succeeded - this is the common case - small alloc - // from a big buffer - break; + if (c != null) { + allocOffset = c.alloc(size); + if (allocOffset != -1) { + // We succeeded - this is the common case - small alloc + // from a big buffer + break; + } + // not enough space! + // try to retire this chunk + tryRetireChunk(c); } - // not enough space! - // try to retire this chunk - tryRetireChunk(c); } - return CellUtil.copyCellTo(cell, c.getData(), allocOffset, size); + return CellUtil.copyToChunkCell(cell, c.getData(), allocOffset, size); } /** @@ -142,9 +152,9 @@ public class MemStoreLABImpl implements MemStoreLAB { this.closed = true; // We could put back the chunks to pool for reusing only when there is no // opening scanner which will read their data - if (chunkPool != null && openScannerCount.get() == 0 - && reclaimed.compareAndSet(false, true)) { - chunkPool.putbackChunks(this.pooledChunkQueue); + int count = openScannerCount.get(); + if(count == 0) { + recycleChunks(); } } @@ -162,9 +172,16 @@ public class MemStoreLABImpl implements MemStoreLAB { @Override public void decScannerCount() { int count = this.openScannerCount.decrementAndGet(); - if (this.closed && chunkPool != null && count == 0 - && reclaimed.compareAndSet(false, true)) { - chunkPool.putbackChunks(this.pooledChunkQueue); + if (this.closed && count == 0) { + recycleChunks(); + } + } + + private void recycleChunks() { + if (chunkCreator.getPool() != null && reclaimed.compareAndSet(false, true)) { + chunkCreator.putbackChunks(this.pooledChunkQueue); + } else { + this.chunkCreator.removeChunks(this.chunkIds.keySet()); } } @@ -188,47 +205,47 @@ public class MemStoreLABImpl implements MemStoreLAB { /** * Get the current chunk, or, if there is no current chunk, * allocate a new one from the JVM. + * @param size */ private Chunk getOrMakeChunk() { - while (true) { - // Try to get the chunk - Chunk c = curChunk.get(); - if (c != null) { - return c; - } - - // No current chunk, so we want to allocate one. We race - // against other allocators to CAS in an uninitialized chunk - // (which is cheap to allocate) - if (chunkPool != null) { - c = chunkPool.getChunk(); - } - boolean pooledChunk = false; - if (c != null) { - // This is chunk from pool - pooledChunk = true; - } else { - c = new OnheapChunk(chunkSize);// When chunk is not from pool, always make it as on heap. - } - if (curChunk.compareAndSet(null, c)) { - // we won race - now we need to actually do the expensive - // allocation step - c.init(); - if (pooledChunk) { - if (!this.closed && !this.pooledChunkQueue.offer(c)) { - if (LOG.isTraceEnabled()) { - LOG.trace("Chunk queue is full, won't reuse this new chunk. Current queue size: " - + pooledChunkQueue.size()); + // Try to get the chunk + Chunk c = curChunk.get(); + if (c != null) { + return c; + } + // No current chunk, so we want to allocate one. We race + // against other allocators to CAS in an uninitialized chunk + // (which is cheap to allocate) + if (lock.tryLock()) { + try { + // once again check inside the lock + c = curChunk.get(); + if (c != null) { + return c; + } + c = this.chunkCreator.getChunk(); + if (c != null) { + // set the curChunk. No need of CAS as only one thread will be here + curChunk.set(c); + if (chunkCreator.getPool() == null) { + // better to store chunkid though it is still no the curChunk + chunkIds.put(c.getId(), true); + } + if (c instanceof PooledChunk) { + if (!this.closed && !this.pooledChunkQueue.offer(c)) { + if (LOG.isTraceEnabled()) { + LOG.trace("Chunk queue is full, won't reuse this new chunk. Current queue size: " + + pooledChunkQueue.size()); + } } } + return c; } - return c; - } else if (pooledChunk) { - chunkPool.putbackChunk(c); + } finally { + lock.unlock(); } - // someone else won race - that's fine, we'll try to grab theirs - // in the next iteration of the loop. } + return null; } @VisibleForTesting diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java index ed98cfa..1efee45 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java @@ -21,34 +21,31 @@ import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import com.google.common.base.Preconditions; - /** * An off heap chunk implementation. */ @InterfaceAudience.Private public class OffheapChunk extends Chunk { - OffheapChunk(int size) { - super(size); + OffheapChunk(int size, long id) { + super(size, id); } @Override - public void init() { - assert nextFreeOffset.get() == UNINITIALIZED; - try { - if (data == null) { - data = ByteBuffer.allocateDirect(this.size); - } - } catch (OutOfMemoryError e) { - boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM); - assert failInit; // should be true. - throw e; + void allocateDataBuffer() { + if (data == null) { + data = ByteBuffer.allocateDirect(this.size); + data.putLong(0, this.getId()); } - // Mark that it's ready for use - boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, 0); - // We should always succeed the above CAS since only one thread - // calls init()! - Preconditions.checkState(initted, "Multiple threads tried to init same chunk"); + } +} + +/** + * Indicates that this offheap chunk is coming out of the ChunkPool + */ +class PooledOffheapChunk extends OffheapChunk implements PooledChunk { + + PooledOffheapChunk(int size, long id) { + super(size, id); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java index bd33cb5..389d30f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java @@ -21,33 +21,32 @@ import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import com.google.common.base.Preconditions; - /** * An on heap chunk implementation. */ @InterfaceAudience.Private public class OnheapChunk extends Chunk { - OnheapChunk(int size) { - super(size); + OnheapChunk(int size, long id) { + super(size, id); } - public void init() { - assert nextFreeOffset.get() == UNINITIALIZED; - try { - if (data == null) { - data = ByteBuffer.allocate(this.size); - } - } catch (OutOfMemoryError e) { - boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM); - assert failInit; // should be true. - throw e; + @Override + void allocateDataBuffer() { + if (data == null) { + data = ByteBuffer.allocate(this.size); + data.putLong(0, this.getId()); } - // Mark that it's ready for use - boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, 0); - // We should always succeed the above CAS since only one thread - // calls init()! - Preconditions.checkState(initted, "Multiple threads tried to init same chunk"); } } + +/** + * Indicates that this onheap chunk is coming out of the ChunkPool + */ +class PooledOnheapChunk extends OnheapChunk implements PooledChunk { + + PooledOnheapChunk(int size, long id) { + super(size, id); + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index a6c7f68..5649e81 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -98,6 +98,8 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.MSLABChunkCreator; +import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.RegionServerServices; @@ -2414,6 +2416,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { public static HRegion createRegionAndWAL(final HRegionInfo info, final Path rootDir, final Configuration conf, final HTableDescriptor htd, boolean initialize) throws IOException { + MSLABChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0); WAL wal = createWal(conf, rootDir, info); return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java index 3b4d068..ddb9ae6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java @@ -73,7 +73,7 @@ public class TestCellFlatSet extends TestCase { descCbOnHeap = new CellArrayMap(CellComparator.COMPARATOR,descCells,0,NUM_OF_CELLS,true); CONF.setBoolean(MemStoreLAB.USEMSLAB_KEY, true); CONF.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f); - MemStoreChunkPool.chunkPoolDisabled = false; + MSLABChunkCreator.chunkPoolDisabled = false; } /* Create and test CellSet based on CellArrayMap */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index 09ddd6f..990d11c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -50,7 +50,7 @@ import static org.junit.Assert.assertTrue; public class TestCompactingMemStore extends TestDefaultMemStore { private static final Log LOG = LogFactory.getLog(TestCompactingMemStore.class); - protected static MemStoreChunkPool chunkPool; + protected static MSLABChunkCreator chunkCreator; protected HRegion region; protected RegionServicesForStores regionServicesForStores; protected HStore store; @@ -65,7 +65,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { @After public void tearDown() throws Exception { - chunkPool.clearChunks(); + chunkCreator.clearChunks(); } @Override @@ -90,9 +90,9 @@ public class TestCompactingMemStore extends TestDefaultMemStore { long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); - chunkPool = MemStoreChunkPool.initialize(globalMemStoreLimit, 0.2f, - MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false); - assertTrue(chunkPool != null); + chunkCreator = MSLABChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, + globalMemStoreLimit, 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT); + assertTrue(chunkCreator != null); } /** @@ -388,7 +388,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { snapshot.getScanner().close(); memstore.clearSnapshot(snapshot.getId()); - int chunkCount = chunkPool.getPoolSize(); + int chunkCount = chunkCreator.getPoolSize(); assertTrue(chunkCount > 0); } @@ -430,16 +430,16 @@ public class TestCompactingMemStore extends TestDefaultMemStore { snapshot.getScanner().close(); memstore.clearSnapshot(snapshot.getId()); - assertTrue(chunkPool.getPoolSize() == 0); + assertTrue(chunkCreator.getPoolSize() == 0); // Chunks will be put back to pool after close scanners; for (KeyValueScanner scanner : scanners) { scanner.close(); } - assertTrue(chunkPool.getPoolSize() > 0); + assertTrue(chunkCreator.getPoolSize() > 0); // clear chunks - chunkPool.clearChunks(); + chunkCreator.clearChunks(); // Creating another snapshot @@ -458,7 +458,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { // close the scanner snapshot.getScanner().close(); memstore.clearSnapshot(snapshot.getId()); - assertTrue(chunkPool.getPoolSize() > 0); + assertTrue(chunkCreator.getPoolSize() > 0); } @Test @@ -510,16 +510,16 @@ public class TestCompactingMemStore extends TestDefaultMemStore { memstore.add(new KeyValue(row, fam, qf1, 3, val), null); assertEquals(3, memstore.getActive().getCellsCount()); - assertTrue(chunkPool.getPoolSize() == 0); + assertTrue(chunkCreator.getPoolSize() == 0); // Chunks will be put back to pool after close scanners; for (KeyValueScanner scanner : scanners) { scanner.close(); } - assertTrue(chunkPool.getPoolSize() > 0); + assertTrue(chunkCreator.getPoolSize() > 0); // clear chunks - chunkPool.clearChunks(); + chunkCreator.clearChunks(); // Creating another snapshot @@ -543,7 +543,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { // close the scanner snapshot.getScanner().close(); memstore.clearSnapshot(snapshot.getId()); - assertTrue(chunkPool.getPoolSize() > 0); + assertTrue(chunkCreator.getPoolSize() > 0); } ////////////////////////////////////////////////////////////////////////////// diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java index a9f8a97..9cadafd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java @@ -54,7 +54,7 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore ////////////////////////////////////////////////////////////////////////////// @Override public void tearDown() throws Exception { - chunkPool.clearChunks(); + chunkCreator.clearChunks(); } @Override public void setUp() throws Exception { @@ -402,16 +402,16 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore snapshot.getScanner().close(); memstore.clearSnapshot(snapshot.getId()); - assertTrue(chunkPool.getPoolSize() == 0); + assertTrue(chunkCreator.getPoolSize() == 0); // Chunks will be put back to pool after close scanners; for (KeyValueScanner scanner : scanners) { scanner.close(); } - assertTrue(chunkPool.getPoolSize() > 0); + assertTrue(chunkCreator.getPoolSize() > 0); // clear chunks - chunkPool.clearChunks(); + chunkCreator.clearChunks(); // Creating another snapshot @@ -430,7 +430,7 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore // close the scanner snapshot.getScanner().close(); memstore.clearSnapshot(snapshot.getId()); - assertTrue(chunkPool.getPoolSize() > 0); + assertTrue(chunkCreator.getPoolSize() > 0); } @Test @@ -462,7 +462,7 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore snapshot.getScanner().close(); memstore.clearSnapshot(snapshot.getId()); - int chunkCount = chunkPool.getPoolSize(); + int chunkCount = chunkCreator.getPoolSize(); assertTrue(chunkCount > 0); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index e76da5a..679fbbd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.wal.WALFactory; +import org.junit.AfterClass; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -84,6 +85,7 @@ public class TestDefaultMemStore { protected static final byte[] FAMILY = Bytes.toBytes("column"); protected MultiVersionConcurrencyControl mvcc; protected AtomicLong startSeqNum = new AtomicLong(0); + protected MSLABChunkCreator chunkCreator; private String getName() { return this.name.getMethodName(); @@ -92,9 +94,17 @@ public class TestDefaultMemStore { @Before public void setUp() throws Exception { internalSetUp(); + // no pool + this.chunkCreator = + MSLABChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0); this.memstore = new DefaultMemStore(); } + @AfterClass + public static void tearDownClass() throws Exception { + MSLABChunkCreator.getChunkCreator().clearChunkIds(); + } + protected void internalSetUp() throws Exception { this.mvcc = new MultiVersionConcurrencyControl(); } @@ -129,7 +139,9 @@ public class TestDefaultMemStore { assertEquals(Segment.getCellLength(kv), sizeChangeForSecondCell.getDataSize()); // make sure chunk size increased even when writing the same cell, if using MSLAB if (msLab instanceof MemStoreLABImpl) { - assertEquals(2 * Segment.getCellLength(kv), + // since we add the chunkID at the 0th offset of the chunk and the + // chunkid is a long we need to account for those 8 bytes + assertEquals(2 * Segment.getCellLength(kv) + Bytes.SIZEOF_LONG, ((MemStoreLABImpl) msLab).getCurrentChunk().getNextFreeOffset()); } } else { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java index 42aad5c..b5e8bbd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java @@ -48,30 +48,30 @@ import static org.junit.Assert.assertTrue; @Category({RegionServerTests.class, SmallTests.class}) public class TestMemStoreChunkPool { private final static Configuration conf = new Configuration(); - private static MemStoreChunkPool chunkPool; + private static MSLABChunkCreator chunkCreator; private static boolean chunkPoolDisabledBeforeTest; @BeforeClass public static void setUpBeforeClass() throws Exception { conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, true); conf.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f); - chunkPoolDisabledBeforeTest = MemStoreChunkPool.chunkPoolDisabled; - MemStoreChunkPool.chunkPoolDisabled = false; + chunkPoolDisabledBeforeTest = MSLABChunkCreator.chunkPoolDisabled; + MSLABChunkCreator.chunkPoolDisabled = false; long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); - chunkPool = MemStoreChunkPool.initialize(globalMemStoreLimit, 0.2f, - MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false); - assertTrue(chunkPool != null); + chunkCreator = MSLABChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, + globalMemStoreLimit, 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT); + assertTrue(chunkCreator != null); } @AfterClass public static void tearDownAfterClass() throws Exception { - MemStoreChunkPool.chunkPoolDisabled = chunkPoolDisabledBeforeTest; + MSLABChunkCreator.chunkPoolDisabled = chunkPoolDisabledBeforeTest; } @Before public void tearDown() throws Exception { - chunkPool.clearChunks(); + chunkCreator.clearChunks(); } @Test @@ -90,7 +90,7 @@ public class TestMemStoreChunkPool { int size = KeyValueUtil.length(kv); ByteBufferKeyValue newKv = (ByteBufferKeyValue) mslab.copyCellInto(kv); if (newKv.getBuffer() != lastBuffer) { - expectedOff = 0; + expectedOff = 8; lastBuffer = newKv.getBuffer(); } assertEquals(expectedOff, newKv.getOffset()); @@ -100,14 +100,14 @@ public class TestMemStoreChunkPool { } // chunks will be put back to pool after close mslab.close(); - int chunkCount = chunkPool.getPoolSize(); + int chunkCount = chunkCreator.getPoolSize(); assertTrue(chunkCount > 0); // reconstruct mslab mslab = new MemStoreLABImpl(conf); // chunk should be got from the pool, so we can reuse it. KeyValue kv = new KeyValue(rk, cf, q, new byte[10]); mslab.copyCellInto(kv); - assertEquals(chunkCount - 1, chunkPool.getPoolSize()); + assertEquals(chunkCount - 1, chunkCreator.getPoolSize()); } @Test @@ -141,7 +141,7 @@ public class TestMemStoreChunkPool { snapshot.getScanner().close(); memstore.clearSnapshot(snapshot.getId()); - int chunkCount = chunkPool.getPoolSize(); + int chunkCount = chunkCreator.getPoolSize(); assertTrue(chunkCount > 0); } @@ -185,16 +185,16 @@ public class TestMemStoreChunkPool { snapshot.getScanner().close(); memstore.clearSnapshot(snapshot.getId()); - assertTrue(chunkPool.getPoolSize() == 0); + assertTrue(chunkCreator.getPoolSize() == 0); // Chunks will be put back to pool after close scanners; for (KeyValueScanner scanner : scanners) { scanner.close(); } - assertTrue(chunkPool.getPoolSize() > 0); + assertTrue(chunkCreator.getPoolSize() > 0); // clear chunks - chunkPool.clearChunks(); + chunkCreator.clearChunks(); // Creating another snapshot snapshot = memstore.snapshot(); @@ -212,20 +212,20 @@ public class TestMemStoreChunkPool { // close the snapshot scanner snapshot.getScanner().close(); memstore.clearSnapshot(snapshot.getId()); - assertTrue(chunkPool.getPoolSize() > 0); + assertTrue(chunkCreator.getPoolSize() > 0); } @Test public void testPutbackChunksMultiThreaded() throws Exception { - MemStoreChunkPool oldPool = MemStoreChunkPool.GLOBAL_INSTANCE; final int maxCount = 10; final int initialCount = 5; - final int chunkSize = 30; + final int chunkSize = 40; final int valSize = 7; - MemStoreChunkPool pool = new MemStoreChunkPool(chunkSize, maxCount, initialCount, 1, false); - assertEquals(initialCount, pool.getPoolSize()); - assertEquals(maxCount, pool.getMaxCount()); - MemStoreChunkPool.GLOBAL_INSTANCE = pool;// Replace the global ref with the new one we created. + MSLABChunkCreator oldCreator = MSLABChunkCreator.getChunkCreator(); + MSLABChunkCreator newCreator = new MSLABChunkCreator(chunkSize, false, 400, 1, 0.5f); + assertEquals(initialCount, newCreator.getPoolSize()); + assertEquals(maxCount, newCreator.getMaxCount()); + MSLABChunkCreator.INSTANCE = newCreator;// Replace the global ref with the new one we created. // Used it for the testing. Later in finally we put // back the original final KeyValue kv = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"), @@ -252,9 +252,9 @@ public class TestMemStoreChunkPool { t1.join(); t2.join(); t3.join(); - assertTrue(pool.getPoolSize() <= maxCount); + assertTrue(newCreator.getPoolSize() <= maxCount); } finally { - MemStoreChunkPool.GLOBAL_INSTANCE = oldPool; + MSLABChunkCreator.INSTANCE = oldCreator; } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java index 141b802..d47571a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java @@ -63,8 +63,8 @@ public class TestMemStoreLAB { public static void setUpBeforeClass() throws Exception { long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); - MemStoreChunkPool.initialize(globalMemStoreLimit, 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, - MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false); + MSLABChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, globalMemStoreLimit, + 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT); } /** @@ -76,6 +76,7 @@ public class TestMemStoreLAB { MemStoreLAB mslab = new MemStoreLABImpl(); int expectedOff = 0; ByteBuffer lastBuffer = null; + long lastChunkId = -1; // 100K iterations by 0-1K alloc -> 50MB expected // should be reasonable for unit test and also cover wraparound // behavior @@ -85,8 +86,13 @@ public class TestMemStoreLAB { int size = KeyValueUtil.length(kv); ByteBufferKeyValue newKv = (ByteBufferKeyValue) mslab.copyCellInto(kv); if (newKv.getBuffer() != lastBuffer) { - expectedOff = 0; + // since we add the chunkID at the 0th offset of the chunk and the + // chunkid is a long we need to account for those 8 bytes + expectedOff = Bytes.SIZEOF_LONG; lastBuffer = newKv.getBuffer(); + long chunkId = newKv.getBuffer().getLong(0); + assertTrue("chunkid should be different", chunkId != lastChunkId); + lastChunkId = chunkId; } assertEquals(expectedOff, newKv.getOffset()); assertTrue("Allocation overruns buffer", @@ -136,23 +142,21 @@ public class TestMemStoreLAB { }; ctx.addThread(t); } - + ctx.startThreads(); while (totalAllocated.get() < 50*1024*1024 && ctx.shouldRun()) { Thread.sleep(10); } ctx.stop(); - // Partition the allocations by the actual byte[] they point into, // make sure offsets are unique for each chunk Map> mapsByChunk = Maps.newHashMap(); - + int sizeCounted = 0; for (AllocRecord rec : Iterables.concat(allocations)) { sizeCounted += rec.size; if (rec.size == 0) continue; - Map mapForThisByteArray = mapsByChunk.get(rec.alloc); if (mapForThisByteArray == null) { @@ -167,7 +171,9 @@ public class TestMemStoreLAB { // Now check each byte array to make sure allocations don't overlap for (Map allocsInChunk : mapsByChunk.values()) { - int expectedOff = 0; + // since we add the chunkID at the 0th offset of the chunk and the + // chunkid is a long we need to account for those 8 bytes + int expectedOff = Bytes.SIZEOF_LONG; for (AllocRecord alloc : allocsInChunk.values()) { assertEquals(expectedOff, alloc.offset); assertTrue("Allocation overruns buffer", @@ -175,7 +181,6 @@ public class TestMemStoreLAB { expectedOff += alloc.size; } } - } /** @@ -194,7 +199,7 @@ public class TestMemStoreLAB { // set chunk size to default max alloc size, so we could easily trigger chunk retirement conf.setLong(MemStoreLABImpl.CHUNK_SIZE_KEY, MemStoreLABImpl.MAX_ALLOC_DEFAULT); // reconstruct mslab - MemStoreChunkPool.clearDisableFlag(); + MSLABChunkCreator.clearDisableFlag(); mslab = new MemStoreLABImpl(conf); // launch multiple threads to trigger frequent chunk retirement List threads = new ArrayList<>(); @@ -223,6 +228,8 @@ public class TestMemStoreLAB { } // close the mslab mslab.close(); + // none of the chunkIds would have been returned back + assertTrue("All the chunks must have been cleared", MSLABChunkCreator.INSTANCE.size() != 0); // make sure all chunks reclaimed or removed from chunk queue int queueLength = mslab.getPooledChunks().size(); assertTrue("All chunks in chunk queue should be reclaimed or removed" diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java new file mode 100644 index 0000000..4bb6de7 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java @@ -0,0 +1,169 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.lang.management.ManagementFactory; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ByteBufferKeyValue; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.io.util.MemorySizeUtil; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({RegionServerTests.class, SmallTests.class}) +public class TestMemstoreLABWithoutPool { + private final static Configuration conf = new Configuration(); + + private static final byte[] rk = Bytes.toBytes("r1"); + private static final byte[] cf = Bytes.toBytes("f"); + private static final byte[] q = Bytes.toBytes("q"); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() + .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); + // disable pool + MSLABChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, globalMemStoreLimit, + 0.0f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT); + } + + /** + * Test a bunch of random allocations + */ + @Test + public void testLABRandomAllocation() { + Random rand = new Random(); + MemStoreLAB mslab = new MemStoreLABImpl(); + int expectedOff = 0; + ByteBuffer lastBuffer = null; + long lastChunkId = -1; + // 100K iterations by 0-1K alloc -> 50MB expected + // should be reasonable for unit test and also cover wraparound + // behavior + for (int i = 0; i < 100000; i++) { + int valSize = rand.nextInt(1000); + KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]); + int size = KeyValueUtil.length(kv); + ByteBufferKeyValue newKv = (ByteBufferKeyValue) mslab.copyCellInto(kv); + if (newKv.getBuffer() != lastBuffer) { + // since we add the chunkID at the 0th offset of the chunk and the + // chunkid is a long we need to account for those 8 bytes + expectedOff = Bytes.SIZEOF_LONG; + lastBuffer = newKv.getBuffer(); + long chunkId = newKv.getBuffer().getLong(0); + assertTrue("chunkid should be different", chunkId != lastChunkId); + lastChunkId = chunkId; + } + assertEquals(expectedOff, newKv.getOffset()); + assertTrue("Allocation overruns buffer", + newKv.getOffset() + size <= newKv.getBuffer().capacity()); + expectedOff += size; + } + } + + /** + * Test frequent chunk retirement with chunk pool triggered by lots of threads, making sure + * there's no memory leak (HBASE-16195) + * @throws Exception if any error occurred + */ + @Test + public void testLABChunkQueueWithMultipleMSLABs() throws Exception { + Configuration conf = HBaseConfiguration.create(); + MemStoreLABImpl[] mslab = new MemStoreLABImpl[10]; + for (int i = 0; i < 10; i++) { + mslab[i] = new MemStoreLABImpl(conf); + } + // launch multiple threads to trigger frequent chunk retirement + List threads = new ArrayList<>(); + // create smaller sized kvs + final KeyValue kv = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"), + new byte[0]); + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + threads.add(getChunkQueueTestThread(mslab[i], "testLABChunkQueue-" + j, kv)); + } + } + for (Thread thread : threads) { + thread.start(); + } + // let it run for some time + Thread.sleep(3000); + for (Thread thread : threads) { + thread.interrupt(); + } + boolean threadsRunning = true; + boolean alive = false; + while (threadsRunning) { + alive = false; + for (Thread thread : threads) { + if (thread.isAlive()) { + alive = true; + break; + } + } + if (!alive) { + threadsRunning = false; + } + } + // close the mslab + for (int i = 0; i < 10; i++) { + mslab[i].close(); + } + // all of the chunkIds would have been returned back + assertTrue("All the chunks must have been cleared", MSLABChunkCreator.INSTANCE.size() == 0); + } + + private Thread getChunkQueueTestThread(final MemStoreLABImpl mslab, String threadName, + Cell cellToCopyInto) { + Thread thread = new Thread() { + boolean stopped = false; + + @Override + public void run() { + while (!stopped) { + // keep triggering chunk retirement + mslab.copyCellInto(cellToCopyInto); + } + } + + @Override + public void interrupt() { + this.stopped = true; + } + }; + thread.setName(threadName); + thread.setDaemon(true); + return thread; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java index 3cdb227..b5e16e4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java @@ -111,6 +111,7 @@ public class TestStoreFileRefresherChore { final Configuration walConf = new Configuration(conf); FSUtils.setRootDir(walConf, tableDir); final WALFactory wals = new WALFactory(walConf, null, "log_" + replicaId); + MSLABChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0); HRegion region = new HRegion(fs, wals.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace()), conf, htd, null);