.../java/org/apache/hadoop/hbase/CellUtil.java | 24 -- .../java/org/apache/hadoop/hbase/ExtendedCell.java | 10 + .../org/apache/hadoop/hbase/master/HMaster.java | 2 + .../hbase/regionserver/ByteBufferChunkCell.java | 45 +++ .../apache/hadoop/hbase/regionserver/Chunk.java | 60 +++- .../hadoop/hbase/regionserver/ChunkCreator.java | 396 +++++++++++++++++++++ .../hadoop/hbase/regionserver/HRegionServer.java | 14 +- .../hbase/regionserver/MemStoreChunkPool.java | 265 -------------- .../hadoop/hbase/regionserver/MemStoreLAB.java | 2 +- .../hadoop/hbase/regionserver/MemStoreLABImpl.java | 171 +++++---- .../regionserver/NoTagByteBufferChunkCell.java | 48 +++ .../hadoop/hbase/regionserver/OffheapChunk.java | 31 +- .../hadoop/hbase/regionserver/OnheapChunk.java | 32 +- .../apache/hadoop/hbase/HBaseTestingUtility.java | 3 + .../hadoop/hbase/master/TestCatalogJanitor.java | 7 + .../hadoop/hbase/regionserver/TestBulkLoad.java | 2 +- .../hadoop/hbase/regionserver/TestCellFlatSet.java | 2 +- .../hbase/regionserver/TestCompactingMemStore.java | 37 +- .../TestCompactingToCellArrayMapMemStore.java | 16 +- .../hbase/regionserver/TestCompactionPolicy.java | 1 + .../hbase/regionserver/TestDefaultMemStore.java | 14 +- .../hbase/regionserver/TestMemStoreChunkPool.java | 48 +-- .../hadoop/hbase/regionserver/TestMemStoreLAB.java | 27 +- .../regionserver/TestMemstoreLABWithoutPool.java | 168 +++++++++ .../regionserver/TestStoreFileRefresherChore.java | 1 + .../TestWALMonotonicallyIncreasingSeqId.java | 1 + 26 files changed, 952 insertions(+), 475 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java index bb5197f..8cc57d1 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java @@ -3139,28 +3139,4 @@ public final class CellUtil { return Type.DeleteFamily.getCode(); } } - - /** - * Clone the passed cell by copying its data into the passed buf. - */ - public static Cell copyCellTo(Cell cell, ByteBuffer buf, int offset, int len) { - int tagsLen = cell.getTagsLength(); - if (cell instanceof ExtendedCell) { - ((ExtendedCell) cell).write(buf, offset); - } else { - // Normally all Cell impls within Server will be of type ExtendedCell. Just considering the - // other case also. The data fragments within Cell is copied into buf as in KeyValue - // serialization format only. - KeyValueUtil.appendTo(cell, buf, offset, true); - } - if (tagsLen == 0) { - // When tagsLen is 0, make a NoTagsByteBufferKeyValue version. This is an optimized class - // which directly return tagsLen as 0. So we avoid parsing many length components in - // reading the tagLength stored in the backing buffer. The Memstore addition of every Cell - // call getTagsLength(). - return new NoTagsByteBufferKeyValue(buf, offset, len, cell.getSequenceId()); - } else { - return new ByteBufferKeyValue(buf, offset, len, cell.getSequenceId()); - } - } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java index 517873f..a7ca0c6 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.io.HeapSize; public interface ExtendedCell extends Cell, SettableSequenceId, SettableTimestamp, HeapSize, Cloneable { + public static int CELL_NOT_BASED_ON_CHUNK = -1; /** * Write this cell to an OutputStream in a {@link KeyValue} format. *
KeyValue format
@@ -73,4 +74,13 @@ public interface ExtendedCell extends Cell, SettableSequenceId, SettableTimestam * @return The deep cloned cell */ Cell deepClone(); + + /** + * Extracts the id of the backing bytebuffer of this cell if it was obtained from fixed sized + * chunks as in case of MemstoreLAB + * @return the chunk id if the cell is backed by fixed sized Chunks, else return -1 + */ + default long getChunkId() { + return CELL_NOT_BASED_ON_CHUNK; + } } \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index a1cbe53..f84fe56 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -748,6 +748,8 @@ public class HMaster extends HRegionServer implements MasterServices { this.masterActiveTime = System.currentTimeMillis(); // TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring. + // Initialize the chunkCreator + initializeMemStoreChunkCreator(); this.fileSystemManager = new MasterFileSystem(this); this.walManager = new MasterWalManager(this); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ByteBufferChunkCell.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ByteBufferChunkCell.java new file mode 100644 index 0000000..992ab33 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ByteBufferChunkCell.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.ByteBufferKeyValue; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ByteBufferUtils; + +/** + * ByteBuffer based cell which has the chunkid at the 0th offset + * @see MemStoreLAB + */ +@InterfaceAudience.Private +public class ByteBufferChunkCell extends ByteBufferKeyValue { + public ByteBufferChunkCell(ByteBuffer buf, int offset, int length) { + super(buf, offset, length); + } + + public ByteBufferChunkCell(ByteBuffer buf, int offset, int length, long seqId) { + super(buf, offset, length, seqId); + } + + @Override + public long getChunkId() { + // The chunkId is embedded at the 0th offset of the bytebuffer + return ByteBufferUtils.toLong(buf, 0); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java index 2cbf0a3..cb56d4d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java @@ -21,8 +21,10 @@ import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; /** * A chunk of memory out of which allocations are sliced. @@ -46,13 +48,41 @@ public abstract class Chunk { /** Size of chunk in bytes */ protected final int size; + // The unique id associated with the chunk. + private final long id; + + // indicates if the chunk is formed by ChunkCreator#MemstorePool + private final boolean fromPool; + + /** + * Create an uninitialized chunk. Note that memory is not allocated yet, so + * this is cheap. + * @param size in bytes + * @param id the chunk id + */ + public Chunk(int size, long id) { + this(size, id, false); + } + /** - * Create an uninitialized chunk. Note that memory is not allocated yet, so this is cheap. - * + * Create an uninitialized chunk. Note that memory is not allocated yet, so + * this is cheap. * @param size in bytes + * @param id the chunk id + * @param fromPool if the chunk is formed by pool */ - Chunk(int size) { + public Chunk(int size, long id, boolean fromPool) { this.size = size; + this.id = id; + this.fromPool = fromPool; + } + + long getId() { + return this.id; + } + + boolean isFromPool() { + return this.fromPool; } /** @@ -60,7 +90,24 @@ public abstract class Chunk { * constructed the chunk. It is thread-safe against other threads calling alloc(), who will block * until the allocation is complete. */ - public abstract void init(); + public void init() { + assert nextFreeOffset.get() == UNINITIALIZED; + try { + allocateDataBuffer(); + } catch (OutOfMemoryError e) { + boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM); + assert failInit; // should be true. + throw e; + } + // Mark that it's ready for use + // Move 8 bytes since the first 8 bytes are having the chunkid in it + boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, Bytes.SIZEOF_LONG); + // We should always succeed the above CAS since only one thread + // calls init()! + Preconditions.checkState(initted, "Multiple threads tried to init same chunk"); + } + + abstract void allocateDataBuffer(); /** * Reset the offset to UNINITIALIZED before before reusing an old chunk @@ -74,7 +121,8 @@ public abstract class Chunk { /** * Try to allocate size bytes from the chunk. - * + * If a chunk is tried to get allocated before init() call, the thread doing the allocation + * will be in busy-wait state as it will keep looping till the nextFreeOffset is set. * @return the offset of the successful allocation, or -1 to indicate not-enough-space */ public int alloc(int size) { @@ -96,7 +144,7 @@ public abstract class Chunk { if (oldOffset + size > data.capacity()) { return -1; // alloc doesn't fit } - + // TODO : If seqID is to be written add 8 bytes here for nextFreeOFfset // Try to atomically claim this chunk if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) { // we got the alloc diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java new file mode 100644 index 0000000..5b04fe4 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java @@ -0,0 +1,396 @@ + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.lang.ref.SoftReference; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.HeapMemoryTuneObserver; +import org.apache.hadoop.util.StringUtils; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * Does the management of memstoreLAB chunk creations. A monotonically incrementing id is associated + * with every chunk + */ +@InterfaceAudience.Private +public class ChunkCreator { + private final Log LOG = LogFactory.getLog(ChunkCreator.class); + // monotonically increasing chunkid + private AtomicLong chunkID = new AtomicLong(1); + // maps the chunk against the monotonically increasing chunk id. We need to preserve the + // natural ordering of the key + // CellChunkMap creation should convert the soft ref to hard reference + private Map> chunkIdMap = + new ConcurrentHashMap>(); + private final int chunkSize; + private final boolean offheap; + @VisibleForTesting + static ChunkCreator INSTANCE; + @VisibleForTesting + static boolean chunkPoolDisabled = false; + private MemStoreChunkPool pool; + + @VisibleForTesting + ChunkCreator(int chunkSize, boolean offheap, long globalMemStoreSize, float poolSizePercentage, + float initialCountPercentage, HeapMemoryManager heapMemoryManager) { + this.chunkSize = chunkSize; + this.offheap = offheap; + this.pool = initializePool(globalMemStoreSize, poolSizePercentage, initialCountPercentage); + if (heapMemoryManager != null && this.pool != null) { + // Register with Heap Memory manager + heapMemoryManager.registerTuneObserver(this.pool); + } + } + + /** + * Initializes the instance of MSLABChunkCreator + * @param chunkSize the chunkSize + * @param offheap indicates if the chunk is to be created offheap or not + * @param initialCountPercentage the initial count of the chunk pool if any + * @param poolSizePercentage pool size percentage + * @param globalMemStoreSize the global memstore size + * @return singleton MSLABChunkCreator + */ + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "LI_LAZY_INIT_STATIC", + justification = "Method is called by single thread at the starting of RS") + @VisibleForTesting + public static ChunkCreator initialize(int chunkSize, boolean offheap, long globalMemStoreSize, + float poolSizePercentage, float initialCountPercentage, HeapMemoryManager heapMemoryManager) { + if (INSTANCE != null) return INSTANCE; + INSTANCE = new ChunkCreator(chunkSize, offheap, globalMemStoreSize, poolSizePercentage, + initialCountPercentage, heapMemoryManager); + // Initialize the chunkPool too here and then use that instance + return INSTANCE; + } + + static ChunkCreator getInstance() { + return INSTANCE; + } + + /** + * Creates and inits a chunk. + * @return the chunk that was initialized + */ + Chunk getChunk() { + Chunk chunk = null; + if (pool != null) { + // the pool creates the chunk internally. The chunk#init() call happens here + chunk = this.pool.getChunk(); + // the pool has run out of maxCount + if (chunk == null) { + if (LOG.isTraceEnabled()) { + LOG.trace("The chunk pool is full. Reached maxCount= " + this.pool.getMaxCount() + + ". Creating chunk onheap."); + } + } + } + if (chunk == null) { + chunk = createChunk(); + } + // put this chunk into the chunkIdMap + this.chunkIdMap.put(chunk.getId(), new SoftReference<>(chunk)); + // now we need to actually do the expensive memory allocation step in case of a new chunk, + // else only the offset is set to the beginning of the chunk to accept allocations + chunk.init(); + return chunk; + } + + private Chunk createChunk() { + return createChunk(false); + } + + /** + * Creates the chunk either onheap or offheap + * @param pool indicates if the chunks have to be created which will be used by the Pool + * @return the chunk + */ + private Chunk createChunk(boolean pool) { + long id = chunkID.getAndIncrement(); + // do not create offheap chunk on demand + if (pool && this.offheap) { + return new OffheapChunk(chunkSize, id, pool); + } else { + return new OnheapChunk(chunkSize, id, pool); + } + } + + @VisibleForTesting + // TODO : To be used by CellChunkMap + Chunk getChunk(long id) { + SoftReference ref = chunkIdMap.get(id); + if (ref != null) { + return ref.get(); + } + return null; + } + + int getChunkSize() { + return this.chunkSize; + } + + boolean isOffheap() { + return this.offheap; + } + + private void removeChunks(Set chunks) { + this.chunkIdMap.keySet().removeAll(chunks); + } + + Chunk removeChunk(long chunkId) { + SoftReference ref = this.chunkIdMap.remove(chunkId); + if (ref != null) { + return ref.get(); + } + return null; + } + + @VisibleForTesting + int size() { + return this.chunkIdMap.size(); + } + + @VisibleForTesting + void clearChunkIds() { + this.chunkIdMap.clear(); + } + + private class MemStoreChunkPool implements HeapMemoryTuneObserver { + private int maxCount; + + // A queue of reclaimed chunks + private final BlockingQueue reclaimedChunks; + private final float poolSizePercentage; + + /** Statistics thread schedule pool */ + private final ScheduledExecutorService scheduleThreadPool; + /** Statistics thread */ + private static final int statThreadPeriod = 60 * 5; + private final AtomicLong chunkCount = new AtomicLong(); + private final AtomicLong reusedChunkCount = new AtomicLong(); + + MemStoreChunkPool(int maxCount, int initialCount, float poolSizePercentage) { + this.maxCount = maxCount; + this.poolSizePercentage = poolSizePercentage; + this.reclaimedChunks = new LinkedBlockingQueue<>(); + for (int i = 0; i < initialCount; i++) { + Chunk chunk = createChunk(true); + chunk.init(); + reclaimedChunks.add(chunk); + } + chunkCount.set(initialCount); + final String n = Thread.currentThread().getName(); + scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() + .setNameFormat(n + "-MemStoreChunkPool Statistics").setDaemon(true).build()); + this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(), statThreadPeriod, + statThreadPeriod, TimeUnit.SECONDS); + } + + /** + * Poll a chunk from the pool, reset it if not null, else create a new chunk to return if we have + * not yet created max allowed chunks count. When we have already created max allowed chunks and + * no free chunks as of now, return null. It is the responsibility of the caller to make a chunk + * then. + * Note: Chunks returned by this pool must be put back to the pool after its use. + * @return a chunk + * @see #putbackChunks(Set) + */ + Chunk getChunk() { + Chunk chunk = reclaimedChunks.poll(); + if (chunk != null) { + chunk.reset(); + reusedChunkCount.incrementAndGet(); + } else { + // Make a chunk iff we have not yet created the maxCount chunks + while (true) { + long created = this.chunkCount.get(); + if (created < this.maxCount) { + if (this.chunkCount.compareAndSet(created, created + 1)) { + chunk = createChunk(true); + break; + } + } else { + break; + } + } + } + return chunk; + } + + /** + * Add the chunks to the pool, when the pool achieves the max size, it will skip the remaining + * chunks + * @param chunks + */ + private void putbackChunks(Set chunks) { + int toAdd = Math.min(chunks.size(), this.maxCount - reclaimedChunks.size()); + Iterator iterator = chunks.iterator(); + while (iterator.hasNext()) { + Long chunkId = iterator.next(); + // remove the chunks every time though they are from the pool or not + Chunk chunk = ChunkCreator.this.removeChunk(chunkId); + if (chunk != null) { + if (chunk.isFromPool() && toAdd > 0) { + reclaimedChunks.add(chunk); + } + toAdd--; + } + iterator.remove(); + } + } + + private class StatisticsThread extends Thread { + StatisticsThread() { + super("MemStoreChunkPool.StatisticsThread"); + setDaemon(true); + } + + @Override + public void run() { + logStats(); + } + + private void logStats() { + if (!LOG.isDebugEnabled()) return; + long created = chunkCount.get(); + long reused = reusedChunkCount.get(); + long total = created + reused; + LOG.debug("Stats: current pool size=" + reclaimedChunks.size() + + ",created chunk count=" + created + + ",reused chunk count=" + reused + + ",reuseRatio=" + (total == 0 ? "0" : StringUtils.formatPercent( + (float) reused / (float) total, 2))); + } + } + + private int getMaxCount() { + return this.maxCount; + } + + @Override + public void onHeapMemoryTune(long newMemstoreSize, long newBlockCacheSize) { + // don't do any tuning in case of offheap memstore + if (isOffheap()) { + LOG.warn("Not tuning the chunk pool as it is offheap"); + return; + } + int newMaxCount = + (int) (newMemstoreSize * poolSizePercentage / getChunkSize()); + if (newMaxCount != this.maxCount) { + // We need an adjustment in the chunks numbers + if (newMaxCount > this.maxCount) { + // Max chunks getting increased. Just change the variable. Later calls to getChunk() would + // create and add them to Q + LOG.info("Max count for chunks increased from " + this.maxCount + " to " + newMaxCount); + this.maxCount = newMaxCount; + } else { + // Max chunks getting decreased. We may need to clear off some of the pooled chunks now + // itself. If the extra chunks are serving already, do not pool those when we get them back + LOG.info("Max count for chunks decreased from " + this.maxCount + " to " + newMaxCount); + this.maxCount = newMaxCount; + if (this.reclaimedChunks.size() > newMaxCount) { + synchronized (this) { + while (this.reclaimedChunks.size() > newMaxCount) { + this.reclaimedChunks.poll(); + } + } + } + } + } + } + } + + @VisibleForTesting + static void clearDisableFlag() { + chunkPoolDisabled = false; + } + + private MemStoreChunkPool initializePool(long globalMemStoreSize, float poolSizePercentage, + float initialCountPercentage) { + if (poolSizePercentage <= 0) { + LOG.info("PoolSizePercentage is less than 0. So not using pool"); + return null; + } + if (chunkPoolDisabled) { + return null; + } + if (poolSizePercentage > 1.0) { + throw new IllegalArgumentException( + MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY + " must be between 0.0 and 1.0"); + } + int maxCount = (int) (globalMemStoreSize * poolSizePercentage / getChunkSize()); + if (initialCountPercentage > 1.0 || initialCountPercentage < 0) { + throw new IllegalArgumentException( + MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY + " must be between 0.0 and 1.0"); + } + int initialCount = (int) (initialCountPercentage * maxCount); + LOG.info("Allocating MemStoreChunkPool with chunk size " + + StringUtils.byteDesc(getChunkSize()) + ", max count " + maxCount + + ", initial count " + initialCount); + return new MemStoreChunkPool(maxCount, initialCount, poolSizePercentage); + } + + @VisibleForTesting + int getMaxCount() { + if (pool != null) { + return pool.getMaxCount(); + } + return 0; + } + + @VisibleForTesting + int getPoolSize() { + if (pool != null) { + return pool.reclaimedChunks.size(); + } + return 0; + } + + /* + * Only used in testing + */ + @VisibleForTesting + void clearChunksInPool() { + if (pool != null) { + pool.reclaimedChunks.clear(); + } + } + + synchronized void putbackChunks(Set chunks) { + if (pool != null) { + pool.putbackChunks(chunks); + } else { + this.removeChunks(chunks); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index b3b5113..41eb0a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1472,7 +1472,7 @@ public class HRegionServer extends HasThread implements startServiceThreads(); startHeapMemoryManager(); // Call it after starting HeapMemoryManager. - initializeMemStoreChunkPool(); + initializeMemStoreChunkCreator(); LOG.info("Serving as " + this.serverName + ", RpcServer on " + rpcServices.isa + ", sessionid=0x" + @@ -1492,7 +1492,7 @@ public class HRegionServer extends HasThread implements } } - private void initializeMemStoreChunkPool() { + protected void initializeMemStoreChunkCreator() { if (MemStoreLAB.isEnabled(conf)) { // MSLAB is enabled. So initialize MemStoreChunkPool // By this time, the MemstoreFlusher is already initialized. We can get the global limits from @@ -1506,12 +1506,10 @@ public class HRegionServer extends HasThread implements float initialCountPercentage = conf.getFloat(MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT); int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT); - MemStoreChunkPool pool = MemStoreChunkPool.initialize(globalMemStoreSize, poolSizePercentage, - initialCountPercentage, chunkSize, offheap); - if (pool != null && this.hMemManager != null) { - // Register with Heap Memory manager - this.hMemManager.registerTuneObserver(pool); - } + // init the chunkCreator + ChunkCreator chunkCreator = + ChunkCreator.initialize(chunkSize, offheap, globalMemStoreSize, poolSizePercentage, + initialCountPercentage, this.hMemManager); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java deleted file mode 100644 index b7ac212..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java +++ /dev/null @@ -1,265 +0,0 @@ -/** - * Copyright The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.HeapMemoryTuneObserver; -import org.apache.hadoop.util.StringUtils; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - -/** - * A pool of {@link Chunk} instances. - * - * MemStoreChunkPool caches a number of retired chunks for reusing, it could - * decrease allocating bytes when writing, thereby optimizing the garbage - * collection on JVM. - * - * The pool instance is globally unique and could be obtained through - * {@link MemStoreChunkPool#initialize(long, float, float, int, boolean)} - * - * {@link MemStoreChunkPool#getChunk()} is called when MemStoreLAB allocating - * bytes, and {@link MemStoreChunkPool#putbackChunks(BlockingQueue)} is called - * when MemStore clearing snapshot for flush - */ -@SuppressWarnings("javadoc") -@InterfaceAudience.Private -public class MemStoreChunkPool implements HeapMemoryTuneObserver { - private static final Log LOG = LogFactory.getLog(MemStoreChunkPool.class); - - // Static reference to the MemStoreChunkPool - static MemStoreChunkPool GLOBAL_INSTANCE; - /** Boolean whether we have disabled the memstore chunk pool entirely. */ - static boolean chunkPoolDisabled = false; - - private int maxCount; - - // A queue of reclaimed chunks - private final BlockingQueue reclaimedChunks; - private final int chunkSize; - private final float poolSizePercentage; - - /** Statistics thread schedule pool */ - private final ScheduledExecutorService scheduleThreadPool; - /** Statistics thread */ - private static final int statThreadPeriod = 60 * 5; - private final AtomicLong chunkCount = new AtomicLong(); - private final AtomicLong reusedChunkCount = new AtomicLong(); - private final boolean offheap; - - MemStoreChunkPool(int chunkSize, int maxCount, int initialCount, float poolSizePercentage, - boolean offheap) { - this.maxCount = maxCount; - this.chunkSize = chunkSize; - this.poolSizePercentage = poolSizePercentage; - this.offheap = offheap; - this.reclaimedChunks = new LinkedBlockingQueue<>(); - for (int i = 0; i < initialCount; i++) { - Chunk chunk = this.offheap ? new OffheapChunk(chunkSize) : new OnheapChunk(chunkSize); - chunk.init(); - reclaimedChunks.add(chunk); - } - chunkCount.set(initialCount); - final String n = Thread.currentThread().getName(); - scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() - .setNameFormat(n + "-MemStoreChunkPool Statistics").setDaemon(true).build()); - this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(), statThreadPeriod, - statThreadPeriod, TimeUnit.SECONDS); - } - - /** - * Poll a chunk from the pool, reset it if not null, else create a new chunk to return if we have - * not yet created max allowed chunks count. When we have already created max allowed chunks and - * no free chunks as of now, return null. It is the responsibility of the caller to make a chunk - * then. - * Note: Chunks returned by this pool must be put back to the pool after its use. - * @return a chunk - * @see #putbackChunk(Chunk) - * @see #putbackChunks(BlockingQueue) - */ - Chunk getChunk() { - Chunk chunk = reclaimedChunks.poll(); - if (chunk != null) { - chunk.reset(); - reusedChunkCount.incrementAndGet(); - } else { - // Make a chunk iff we have not yet created the maxCount chunks - while (true) { - long created = this.chunkCount.get(); - if (created < this.maxCount) { - chunk = this.offheap ? new OffheapChunk(this.chunkSize) : new OnheapChunk(this.chunkSize); - if (this.chunkCount.compareAndSet(created, created + 1)) { - break; - } - } else { - break; - } - } - } - return chunk; - } - - /** - * Add the chunks to the pool, when the pool achieves the max size, it will - * skip the remaining chunks - * @param chunks - */ - synchronized void putbackChunks(BlockingQueue chunks) { - int toAdd = Math.min(chunks.size(), this.maxCount - reclaimedChunks.size()); - Chunk chunk = null; - while ((chunk = chunks.poll()) != null && toAdd > 0) { - reclaimedChunks.add(chunk); - toAdd--; - } - } - - /** - * Add the chunk to the pool, if the pool has achieved the max size, it will - * skip it - * @param chunk - */ - synchronized void putbackChunk(Chunk chunk) { - if (reclaimedChunks.size() < this.maxCount) { - reclaimedChunks.add(chunk); - } - } - - int getPoolSize() { - return this.reclaimedChunks.size(); - } - - /* - * Only used in testing - */ - void clearChunks() { - this.reclaimedChunks.clear(); - } - - private class StatisticsThread extends Thread { - StatisticsThread() { - super("MemStoreChunkPool.StatisticsThread"); - setDaemon(true); - } - - @Override - public void run() { - logStats(); - } - - private void logStats() { - if (!LOG.isDebugEnabled()) return; - long created = chunkCount.get(); - long reused = reusedChunkCount.get(); - long total = created + reused; - LOG.debug("Stats: current pool size=" + reclaimedChunks.size() - + ",created chunk count=" + created - + ",reused chunk count=" + reused - + ",reuseRatio=" + (total == 0 ? "0" : StringUtils.formatPercent( - (float) reused / (float) total, 2))); - } - } - - /** - * @return the global MemStoreChunkPool instance - */ - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "LI_LAZY_INIT_STATIC", - justification = "Method is called by single thread at the starting of RS") - static MemStoreChunkPool initialize(long globalMemStoreSize, float poolSizePercentage, - float initialCountPercentage, int chunkSize, boolean offheap) { - if (GLOBAL_INSTANCE != null) return GLOBAL_INSTANCE; - if (chunkPoolDisabled) return null; - - if (poolSizePercentage <= 0) { - chunkPoolDisabled = true; - return null; - } - if (poolSizePercentage > 1.0) { - throw new IllegalArgumentException( - MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY + " must be between 0.0 and 1.0"); - } - int maxCount = (int) (globalMemStoreSize * poolSizePercentage / chunkSize); - if (initialCountPercentage > 1.0 || initialCountPercentage < 0) { - throw new IllegalArgumentException( - MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY + " must be between 0.0 and 1.0"); - } - int initialCount = (int) (initialCountPercentage * maxCount); - LOG.info("Allocating MemStoreChunkPool with chunk size " + StringUtils.byteDesc(chunkSize) - + ", max count " + maxCount + ", initial count " + initialCount); - GLOBAL_INSTANCE = new MemStoreChunkPool(chunkSize, maxCount, initialCount, poolSizePercentage, - offheap); - return GLOBAL_INSTANCE; - } - - /** - * @return The singleton instance of this pool. - */ - static MemStoreChunkPool getPool() { - return GLOBAL_INSTANCE; - } - - int getMaxCount() { - return this.maxCount; - } - - @VisibleForTesting - static void clearDisableFlag() { - chunkPoolDisabled = false; - } - - @Override - public void onHeapMemoryTune(long newMemstoreSize, long newBlockCacheSize) { - // don't do any tuning in case of offheap memstore - if (this.offheap) { - LOG.warn("Not tuning the chunk pool as it is offheap"); - return; - } - int newMaxCount = (int) (newMemstoreSize * poolSizePercentage / chunkSize); - if (newMaxCount != this.maxCount) { - // We need an adjustment in the chunks numbers - if (newMaxCount > this.maxCount) { - // Max chunks getting increased. Just change the variable. Later calls to getChunk() would - // create and add them to Q - LOG.info("Max count for chunks increased from " + this.maxCount + " to " + newMaxCount); - this.maxCount = newMaxCount; - } else { - // Max chunks getting decreased. We may need to clear off some of the pooled chunks now - // itself. If the extra chunks are serving already, do not pool those when we get them back - LOG.info("Max count for chunks decreased from " + this.maxCount + " to " + newMaxCount); - this.maxCount = newMaxCount; - if (this.reclaimedChunks.size() > newMaxCount) { - synchronized (this) { - while (this.reclaimedChunks.size() > newMaxCount) { - this.reclaimedChunks.poll(); - } - } - } - } - } - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java index f6d1607..1a4be79 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java @@ -41,7 +41,7 @@ import org.apache.hadoop.hbase.util.ReflectionUtils; * {@link #copyCellInto(Cell)} gets called. This allocates enough size in the chunk to hold this * cell's data and copies into this area and then recreate a Cell over this copied data. *

- * @see MemStoreChunkPool + * @see ChunkCreator */ @InterfaceAudience.Private public interface MemStoreLAB { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java index 4e87135..50fee89 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java @@ -18,23 +18,26 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.nio.ByteBuffer; +import java.util.Set; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.ExtendedCell; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; - /** * A memstore-local allocation buffer. *

@@ -55,8 +58,8 @@ import com.google.common.base.Preconditions; * would provide a performance improvement - probably would speed up the * Bytes.toLong/Bytes.toInt calls in KeyValue, but some of those are cached * anyway. - * The chunks created by this MemStoreLAB can get pooled at {@link MemStoreChunkPool}. - * When the Chunk comes pool, it can be either an on heap or an off heap backed chunk. The chunks, + * The chunks created by this MemStoreLAB can get pooled at {@link ChunkCreator}. + * When the Chunk comes from pool, it can be either an on heap or an off heap backed chunk. The chunks, * which this MemStoreLAB creates on its own (when no chunk available from pool), those will be * always on heap backed. */ @@ -66,14 +69,15 @@ public class MemStoreLABImpl implements MemStoreLAB { static final Log LOG = LogFactory.getLog(MemStoreLABImpl.class); private AtomicReference curChunk = new AtomicReference<>(); - // A queue of chunks from pool contained by this memstore LAB - // TODO: in the future, it would be better to have List implementation instead of Queue, - // as FIFO order is not so important here + // Lock to manage multiple handlers requesting for a chunk + private ReentrantLock lock = new ReentrantLock(); + + // A set of chunks contained by this memstore LAB @VisibleForTesting - BlockingQueue pooledChunkQueue = null; + Set chunks = new ConcurrentSkipListSet(); private final int chunkSize; private final int maxAlloc; - private final MemStoreChunkPool chunkPool; + private final ChunkCreator chunkCreator; // This flag is for closing this instance, its set when clearing snapshot of // memstore @@ -92,20 +96,12 @@ public class MemStoreLABImpl implements MemStoreLAB { public MemStoreLABImpl(Configuration conf) { chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT); maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT); - this.chunkPool = MemStoreChunkPool.getPool(); - // currently chunkQueue is only used for chunkPool - if (this.chunkPool != null) { - // set queue length to chunk pool max count to avoid keeping reference of - // too many non-reclaimable chunks - pooledChunkQueue = new LinkedBlockingQueue<>(chunkPool.getMaxCount()); - } - + this.chunkCreator = ChunkCreator.getInstance(); // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one! Preconditions.checkArgument(maxAlloc <= chunkSize, MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY); } - @Override public Cell copyCellInto(Cell cell) { int size = KeyValueUtil.length(cell); @@ -118,19 +114,52 @@ public class MemStoreLABImpl implements MemStoreLAB { Chunk c = null; int allocOffset = 0; while (true) { + // Try to get the chunk c = getOrMakeChunk(); + // we may get null because the some other thread succeeded in getting the lock + // and so the current thread has to try again to make its chunk or grab the chunk + // that the other thread created // Try to allocate from this chunk - allocOffset = c.alloc(size); - if (allocOffset != -1) { - // We succeeded - this is the common case - small alloc - // from a big buffer - break; + if (c != null) { + allocOffset = c.alloc(size); + if (allocOffset != -1) { + // We succeeded - this is the common case - small alloc + // from a big buffer + break; + } + // not enough space! + // try to retire this chunk + tryRetireChunk(c); } - // not enough space! - // try to retire this chunk - tryRetireChunk(c); } - return CellUtil.copyCellTo(cell, c.getData(), allocOffset, size); + return copyToChunkCell(cell, c.getData(), allocOffset, size); + } + + /** + * Clone the passed cell by copying its data into the passed buf and create a cell with a chunkid + * out of it + */ + private Cell copyToChunkCell(Cell cell, ByteBuffer buf, int offset, int len) { + int tagsLen = cell.getTagsLength(); + if (cell instanceof ExtendedCell) { + ((ExtendedCell) cell).write(buf, offset); + } else { + // Normally all Cell impls within Server will be of type ExtendedCell. Just considering the + // other case also. The data fragments within Cell is copied into buf as in KeyValue + // serialization format only. + KeyValueUtil.appendTo(cell, buf, offset, true); + } + // TODO : write the seqid here. For writing seqId we should create a new cell type so + // that seqId is not used as the state + if (tagsLen == 0) { + // When tagsLen is 0, make a NoTagsByteBufferKeyValue version. This is an optimized class + // which directly return tagsLen as 0. So we avoid parsing many length components in + // reading the tagLength stored in the backing buffer. The Memstore addition of every Cell + // call getTagsLength(). + return new NoTagByteBufferChunkCell(buf, offset, len, cell.getSequenceId()); + } else { + return new ByteBufferChunkCell(buf, offset, len, cell.getSequenceId()); + } } /** @@ -142,9 +171,9 @@ public class MemStoreLABImpl implements MemStoreLAB { this.closed = true; // We could put back the chunks to pool for reusing only when there is no // opening scanner which will read their data - if (chunkPool != null && openScannerCount.get() == 0 - && reclaimed.compareAndSet(false, true)) { - chunkPool.putbackChunks(this.pooledChunkQueue); + int count = openScannerCount.get(); + if(count == 0) { + recycleChunks(); } } @@ -162,9 +191,14 @@ public class MemStoreLABImpl implements MemStoreLAB { @Override public void decScannerCount() { int count = this.openScannerCount.decrementAndGet(); - if (this.closed && chunkPool != null && count == 0 - && reclaimed.compareAndSet(false, true)) { - chunkPool.putbackChunks(this.pooledChunkQueue); + if (this.closed && count == 0) { + recycleChunks(); + } + } + + private void recycleChunks() { + if (reclaimed.compareAndSet(false, true)) { + chunkCreator.putbackChunks(chunks); } } @@ -190,45 +224,35 @@ public class MemStoreLABImpl implements MemStoreLAB { * allocate a new one from the JVM. */ private Chunk getOrMakeChunk() { - while (true) { - // Try to get the chunk - Chunk c = curChunk.get(); - if (c != null) { - return c; - } - - // No current chunk, so we want to allocate one. We race - // against other allocators to CAS in an uninitialized chunk - // (which is cheap to allocate) - if (chunkPool != null) { - c = chunkPool.getChunk(); - } - boolean pooledChunk = false; - if (c != null) { - // This is chunk from pool - pooledChunk = true; - } else { - c = new OnheapChunk(chunkSize);// When chunk is not from pool, always make it as on heap. - } - if (curChunk.compareAndSet(null, c)) { - // we won race - now we need to actually do the expensive - // allocation step - c.init(); - if (pooledChunk) { - if (!this.closed && !this.pooledChunkQueue.offer(c)) { - if (LOG.isTraceEnabled()) { - LOG.trace("Chunk queue is full, won't reuse this new chunk. Current queue size: " - + pooledChunkQueue.size()); - } + // Try to get the chunk + Chunk c = curChunk.get(); + if (c != null) { + return c; + } + // No current chunk, so we want to allocate one. We race + // against other allocators to CAS in an uninitialized chunk + // (which is cheap to allocate) + if (lock.tryLock()) { + try { + // once again check inside the lock + c = curChunk.get(); + if (c != null) { + return c; + } + c = this.chunkCreator.getChunk(); + if (c != null) { + // set the curChunk. No need of CAS as only one thread will be here + curChunk.set(c); + if (!this.closed) { + chunks.add(c.getId()); } + return c; } - return c; - } else if (pooledChunk) { - chunkPool.putbackChunk(c); + } finally { + lock.unlock(); } - // someone else won race - that's fine, we'll try to grab theirs - // in the next iteration of the loop. } + return null; } @VisibleForTesting @@ -236,8 +260,15 @@ public class MemStoreLABImpl implements MemStoreLAB { return this.curChunk.get(); } - + @VisibleForTesting BlockingQueue getPooledChunks() { - return this.pooledChunkQueue; + BlockingQueue pooledChunks = new LinkedBlockingQueue<>(); + for (Long id : this.chunks) { + Chunk chunk = chunkCreator.getChunk(id); + if (chunk != null && chunk.isFromPool()) { + pooledChunks.add(chunk); + } + } + return pooledChunks; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoTagByteBufferChunkCell.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoTagByteBufferChunkCell.java new file mode 100644 index 0000000..1b652fa --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoTagByteBufferChunkCell.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.NoTagsByteBufferKeyValue; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ByteBufferUtils; + + +/** + * ByteBuffer based cell which has the chunkid at the 0th offset and with no tags + * @see MemStoreLAB + */ +@InterfaceAudience.Private +public class NoTagByteBufferChunkCell extends NoTagsByteBufferKeyValue { + + public NoTagByteBufferChunkCell(ByteBuffer buf, int offset, int length) { + super(buf, offset, length); + } + + public NoTagByteBufferChunkCell(ByteBuffer buf, int offset, int length, long seqId) { + super(buf, offset, length, seqId); + } + + @Override + public long getChunkId() { + // The chunkId is embedded at the 0th offset of the bytebuffer + return ByteBufferUtils.toLong(buf, 0); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java index ed98cfa..802a123 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java @@ -21,34 +21,27 @@ import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import com.google.common.base.Preconditions; - /** * An off heap chunk implementation. */ @InterfaceAudience.Private public class OffheapChunk extends Chunk { - OffheapChunk(int size) { - super(size); + OffheapChunk(int size, long id) { + // better if this is always created fromPool. This should not be called + super(size, id); + } + + OffheapChunk(int size, long id, boolean fromPool) { + super(size, id, fromPool); + assert fromPool == true; } @Override - public void init() { - assert nextFreeOffset.get() == UNINITIALIZED; - try { - if (data == null) { - data = ByteBuffer.allocateDirect(this.size); - } - } catch (OutOfMemoryError e) { - boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM); - assert failInit; // should be true. - throw e; + void allocateDataBuffer() { + if (data == null) { + data = ByteBuffer.allocateDirect(this.size); + data.putLong(0, this.getId()); } - // Mark that it's ready for use - boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, 0); - // We should always succeed the above CAS since only one thread - // calls init()! - Preconditions.checkState(initted, "Multiple threads tried to init same chunk"); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java index bd33cb5..476786e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java @@ -21,33 +21,25 @@ import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import com.google.common.base.Preconditions; - /** * An on heap chunk implementation. */ @InterfaceAudience.Private public class OnheapChunk extends Chunk { - OnheapChunk(int size) { - super(size); + OnheapChunk(int size, long id) { + super(size, id); + } + + OnheapChunk(int size, long id, boolean fromPool) { + super(size, id, fromPool); } - public void init() { - assert nextFreeOffset.get() == UNINITIALIZED; - try { - if (data == null) { - data = ByteBuffer.allocate(this.size); - } - } catch (OutOfMemoryError e) { - boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM); - assert failInit; // should be true. - throw e; + @Override + void allocateDataBuffer() { + if (data == null) { + data = ByteBuffer.allocate(this.size); + data.putLong(0, this.getId()); } - // Mark that it's ready for use - boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, 0); - // We should always succeed the above CAS since only one thread - // calls init()! - Preconditions.checkState(initted, "Multiple threads tried to init same chunk"); } -} +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 696ea18..d2e10b4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -97,6 +97,8 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.ChunkCreator; +import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.RegionServerServices; @@ -2428,6 +2430,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { public static HRegion createRegionAndWAL(final HRegionInfo info, final Path rootDir, final Configuration conf, final HTableDescriptor htd, boolean initialize) throws IOException { + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); WAL wal = createWal(conf, rootDir, info); return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java index cc73d9d..32bce26 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java @@ -65,6 +65,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActi import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.ChunkCreator; +import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController; import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; import org.apache.hadoop.hbase.testclassification.MasterTests; @@ -73,6 +75,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HFileArchiveUtil; import org.apache.hadoop.hbase.util.Triple; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -88,6 +91,10 @@ public class TestCatalogJanitor { @Rule public TestName name = new TestName(); + @BeforeClass + public static void setup() throws Exception { + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); + } /** * Mock MasterServices for tests below. */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java index 418aadf..096c5ef 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java @@ -241,7 +241,7 @@ public class TestBulkLoad { for (byte[] family : families) { hTableDescriptor.addFamily(new HColumnDescriptor(family)); } - + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); // TODO We need a way to do this without creating files return HRegion.createHRegion(hRegionInfo, new Path(testFolder.newFolder().toURI()), diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java index 3b4d068..09877b0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java @@ -73,7 +73,7 @@ public class TestCellFlatSet extends TestCase { descCbOnHeap = new CellArrayMap(CellComparator.COMPARATOR,descCells,0,NUM_OF_CELLS,true); CONF.setBoolean(MemStoreLAB.USEMSLAB_KEY, true); CONF.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f); - MemStoreChunkPool.chunkPoolDisabled = false; + ChunkCreator.chunkPoolDisabled = false; } /* Create and test CellSet based on CellArrayMap */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index a888c45..9e90f3e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.WAL; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -50,7 +51,7 @@ import static org.junit.Assert.assertTrue; public class TestCompactingMemStore extends TestDefaultMemStore { private static final Log LOG = LogFactory.getLog(TestCompactingMemStore.class); - protected static MemStoreChunkPool chunkPool; + protected static ChunkCreator chunkCreator; protected HRegion region; protected RegionServicesForStores regionServicesForStores; protected HStore store; @@ -65,7 +66,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { @After public void tearDown() throws Exception { - chunkPool.clearChunks(); + chunkCreator.clearChunksInPool(); } @Override @@ -84,15 +85,21 @@ public class TestCompactingMemStore extends TestDefaultMemStore { conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 1000); HBaseTestingUtility hbaseUtility = HBaseTestingUtility.createLocalHTU(conf); HColumnDescriptor hcd = new HColumnDescriptor(FAMILY); - this.region = hbaseUtility.createTestRegion("foobar", hcd); + HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("foobar")); + htd.addFamily(hcd); + HRegionInfo info = + new HRegionInfo(TableName.valueOf("foobar"), null, null, false); + WAL wal = hbaseUtility.createWal(conf, hbaseUtility.getDataTestDir(), info); + this.region = HRegion.createHRegion(info, hbaseUtility.getDataTestDir(), conf, htd, wal, true); + //this.region = hbaseUtility.createTestRegion("foobar", hcd); this.regionServicesForStores = region.getRegionServicesForStores(); this.store = new HStore(region, hcd, conf); long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); - chunkPool = MemStoreChunkPool.initialize(globalMemStoreLimit, 0.2f, - MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false); - assertTrue(chunkPool != null); + chunkCreator = ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, + globalMemStoreLimit, 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null); + assertTrue(chunkCreator != null); } /** @@ -390,7 +397,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { } memstore.clearSnapshot(snapshot.getId()); - int chunkCount = chunkPool.getPoolSize(); + int chunkCount = chunkCreator.getPoolSize(); assertTrue(chunkCount > 0); } @@ -434,16 +441,16 @@ public class TestCompactingMemStore extends TestDefaultMemStore { } memstore.clearSnapshot(snapshot.getId()); - assertTrue(chunkPool.getPoolSize() == 0); + assertTrue(chunkCreator.getPoolSize() == 0); // Chunks will be put back to pool after close scanners; for (KeyValueScanner scanner : scanners) { scanner.close(); } - assertTrue(chunkPool.getPoolSize() > 0); + assertTrue(chunkCreator.getPoolSize() > 0); // clear chunks - chunkPool.clearChunks(); + chunkCreator.clearChunksInPool(); // Creating another snapshot @@ -464,7 +471,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { scanner.close(); } memstore.clearSnapshot(snapshot.getId()); - assertTrue(chunkPool.getPoolSize() > 0); + assertTrue(chunkCreator.getPoolSize() > 0); } @Test @@ -516,16 +523,16 @@ public class TestCompactingMemStore extends TestDefaultMemStore { memstore.add(new KeyValue(row, fam, qf1, 3, val), null); assertEquals(3, memstore.getActive().getCellsCount()); - assertTrue(chunkPool.getPoolSize() == 0); + assertTrue(chunkCreator.getPoolSize() == 0); // Chunks will be put back to pool after close scanners; for (KeyValueScanner scanner : scanners) { scanner.close(); } - assertTrue(chunkPool.getPoolSize() > 0); + assertTrue(chunkCreator.getPoolSize() > 0); // clear chunks - chunkPool.clearChunks(); + chunkCreator.clearChunksInPool(); // Creating another snapshot @@ -553,7 +560,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { scanner.close(); } memstore.clearSnapshot(snapshot.getId()); - assertTrue(chunkPool.getPoolSize() > 0); + assertTrue(chunkCreator.getPoolSize() > 0); } ////////////////////////////////////////////////////////////////////////////// diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java index 5a48455..66e107a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java @@ -44,17 +44,13 @@ import java.util.List; public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore { private static final Log LOG = LogFactory.getLog(TestCompactingToCellArrayMapMemStore.class); - //private static MemStoreChunkPool chunkPool; - //private HRegion region; - //private RegionServicesForStores regionServicesForStores; - //private HStore store; ////////////////////////////////////////////////////////////////////////////// // Helpers ////////////////////////////////////////////////////////////////////////////// @Override public void tearDown() throws Exception { - chunkPool.clearChunks(); + chunkCreator.clearChunksInPool(); } @Override public void setUp() throws Exception { @@ -408,16 +404,16 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore } memstore.clearSnapshot(snapshot.getId()); - assertTrue(chunkPool.getPoolSize() == 0); + assertTrue(chunkCreator.getPoolSize() == 0); // Chunks will be put back to pool after close scanners; for (KeyValueScanner scanner : scanners) { scanner.close(); } - assertTrue(chunkPool.getPoolSize() > 0); + assertTrue(chunkCreator.getPoolSize() > 0); // clear chunks - chunkPool.clearChunks(); + chunkCreator.clearChunksInPool(); // Creating another snapshot @@ -438,7 +434,7 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore scanner.close(); } memstore.clearSnapshot(snapshot.getId()); - assertTrue(chunkPool.getPoolSize() > 0); + assertTrue(chunkCreator.getPoolSize() > 0); } @Test @@ -472,7 +468,7 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore } memstore.clearSnapshot(snapshot.getId()); - int chunkCount = chunkPool.getPoolSize(); + int chunkCount = chunkCreator.getPoolSize(); assertTrue(chunkCount > 0); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java index 7154511..bff5bec 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java @@ -104,6 +104,7 @@ public class TestCompactionPolicy { HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); hlog = new FSHLog(fs, basedir, logName, conf); + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); region = HRegion.createHRegion(info, basedir, conf, htd, hlog); region.close(); Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index 7434eb1..41b304b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.wal.WALFactory; +import org.junit.AfterClass; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -84,6 +85,7 @@ public class TestDefaultMemStore { protected static final byte[] FAMILY = Bytes.toBytes("column"); protected MultiVersionConcurrencyControl mvcc; protected AtomicLong startSeqNum = new AtomicLong(0); + protected ChunkCreator chunkCreator; private String getName() { return this.name.getMethodName(); @@ -92,9 +94,17 @@ public class TestDefaultMemStore { @Before public void setUp() throws Exception { internalSetUp(); + // no pool + this.chunkCreator = + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); this.memstore = new DefaultMemStore(); } + @AfterClass + public static void tearDownClass() throws Exception { + ChunkCreator.getInstance().clearChunkIds(); + } + protected void internalSetUp() throws Exception { this.mvcc = new MultiVersionConcurrencyControl(); } @@ -129,7 +139,9 @@ public class TestDefaultMemStore { assertEquals(Segment.getCellLength(kv), sizeChangeForSecondCell.getDataSize()); // make sure chunk size increased even when writing the same cell, if using MSLAB if (msLab instanceof MemStoreLABImpl) { - assertEquals(2 * Segment.getCellLength(kv), + // since we add the chunkID at the 0th offset of the chunk and the + // chunkid is a long we need to account for those 8 bytes + assertEquals(2 * Segment.getCellLength(kv) + Bytes.SIZEOF_LONG, ((MemStoreLABImpl) msLab).getCurrentChunk().getNextFreeOffset()); } } else { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java index 37a7664..1768801 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java @@ -48,30 +48,30 @@ import static org.junit.Assert.assertTrue; @Category({RegionServerTests.class, SmallTests.class}) public class TestMemStoreChunkPool { private final static Configuration conf = new Configuration(); - private static MemStoreChunkPool chunkPool; + private static ChunkCreator chunkCreator; private static boolean chunkPoolDisabledBeforeTest; @BeforeClass public static void setUpBeforeClass() throws Exception { conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, true); conf.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f); - chunkPoolDisabledBeforeTest = MemStoreChunkPool.chunkPoolDisabled; - MemStoreChunkPool.chunkPoolDisabled = false; + chunkPoolDisabledBeforeTest = ChunkCreator.chunkPoolDisabled; + ChunkCreator.chunkPoolDisabled = false; long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); - chunkPool = MemStoreChunkPool.initialize(globalMemStoreLimit, 0.2f, - MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false); - assertTrue(chunkPool != null); + chunkCreator = ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, + globalMemStoreLimit, 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null); + assertTrue(chunkCreator != null); } @AfterClass public static void tearDownAfterClass() throws Exception { - MemStoreChunkPool.chunkPoolDisabled = chunkPoolDisabledBeforeTest; + ChunkCreator.chunkPoolDisabled = chunkPoolDisabledBeforeTest; } @Before public void tearDown() throws Exception { - chunkPool.clearChunks(); + chunkCreator.clearChunksInPool(); } @Test @@ -90,7 +90,7 @@ public class TestMemStoreChunkPool { int size = KeyValueUtil.length(kv); ByteBufferKeyValue newKv = (ByteBufferKeyValue) mslab.copyCellInto(kv); if (newKv.getBuffer() != lastBuffer) { - expectedOff = 0; + expectedOff = 8; lastBuffer = newKv.getBuffer(); } assertEquals(expectedOff, newKv.getOffset()); @@ -100,14 +100,14 @@ public class TestMemStoreChunkPool { } // chunks will be put back to pool after close mslab.close(); - int chunkCount = chunkPool.getPoolSize(); + int chunkCount = chunkCreator.getPoolSize(); assertTrue(chunkCount > 0); // reconstruct mslab mslab = new MemStoreLABImpl(conf); // chunk should be got from the pool, so we can reuse it. KeyValue kv = new KeyValue(rk, cf, q, new byte[10]); mslab.copyCellInto(kv); - assertEquals(chunkCount - 1, chunkPool.getPoolSize()); + assertEquals(chunkCount - 1, chunkCreator.getPoolSize()); } @Test @@ -143,7 +143,7 @@ public class TestMemStoreChunkPool { } memstore.clearSnapshot(snapshot.getId()); - int chunkCount = chunkPool.getPoolSize(); + int chunkCount = chunkCreator.getPoolSize(); assertTrue(chunkCount > 0); } @@ -189,16 +189,16 @@ public class TestMemStoreChunkPool { } memstore.clearSnapshot(snapshot.getId()); - assertTrue(chunkPool.getPoolSize() == 0); + assertTrue(chunkCreator.getPoolSize() == 0); // Chunks will be put back to pool after close scanners; for (KeyValueScanner scanner : scanners) { scanner.close(); } - assertTrue(chunkPool.getPoolSize() > 0); + assertTrue(chunkCreator.getPoolSize() > 0); // clear chunks - chunkPool.clearChunks(); + chunkCreator.clearChunksInPool(); // Creating another snapshot snapshot = memstore.snapshot(); @@ -218,20 +218,20 @@ public class TestMemStoreChunkPool { scanner.close(); } memstore.clearSnapshot(snapshot.getId()); - assertTrue(chunkPool.getPoolSize() > 0); + assertTrue(chunkCreator.getPoolSize() > 0); } @Test public void testPutbackChunksMultiThreaded() throws Exception { - MemStoreChunkPool oldPool = MemStoreChunkPool.GLOBAL_INSTANCE; final int maxCount = 10; final int initialCount = 5; - final int chunkSize = 30; + final int chunkSize = 40; final int valSize = 7; - MemStoreChunkPool pool = new MemStoreChunkPool(chunkSize, maxCount, initialCount, 1, false); - assertEquals(initialCount, pool.getPoolSize()); - assertEquals(maxCount, pool.getMaxCount()); - MemStoreChunkPool.GLOBAL_INSTANCE = pool;// Replace the global ref with the new one we created. + ChunkCreator oldCreator = ChunkCreator.getInstance(); + ChunkCreator newCreator = new ChunkCreator(chunkSize, false, 400, 1, 0.5f, null); + assertEquals(initialCount, newCreator.getPoolSize()); + assertEquals(maxCount, newCreator.getMaxCount()); + ChunkCreator.INSTANCE = newCreator;// Replace the global ref with the new one we created. // Used it for the testing. Later in finally we put // back the original final KeyValue kv = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"), @@ -258,9 +258,9 @@ public class TestMemStoreChunkPool { t1.join(); t2.join(); t3.join(); - assertTrue(pool.getPoolSize() <= maxCount); + assertTrue(newCreator.getPoolSize() <= maxCount); } finally { - MemStoreChunkPool.GLOBAL_INSTANCE = oldPool; + ChunkCreator.INSTANCE = oldCreator; } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java index 141b802..6696e43 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java @@ -63,8 +63,8 @@ public class TestMemStoreLAB { public static void setUpBeforeClass() throws Exception { long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); - MemStoreChunkPool.initialize(globalMemStoreLimit, 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, - MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false); + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, globalMemStoreLimit, + 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null); } /** @@ -76,6 +76,7 @@ public class TestMemStoreLAB { MemStoreLAB mslab = new MemStoreLABImpl(); int expectedOff = 0; ByteBuffer lastBuffer = null; + long lastChunkId = -1; // 100K iterations by 0-1K alloc -> 50MB expected // should be reasonable for unit test and also cover wraparound // behavior @@ -85,8 +86,13 @@ public class TestMemStoreLAB { int size = KeyValueUtil.length(kv); ByteBufferKeyValue newKv = (ByteBufferKeyValue) mslab.copyCellInto(kv); if (newKv.getBuffer() != lastBuffer) { - expectedOff = 0; + // since we add the chunkID at the 0th offset of the chunk and the + // chunkid is a long we need to account for those 8 bytes + expectedOff = Bytes.SIZEOF_LONG; lastBuffer = newKv.getBuffer(); + long chunkId = newKv.getBuffer().getLong(0); + assertTrue("chunkid should be different", chunkId != lastChunkId); + lastChunkId = chunkId; } assertEquals(expectedOff, newKv.getOffset()); assertTrue("Allocation overruns buffer", @@ -136,23 +142,21 @@ public class TestMemStoreLAB { }; ctx.addThread(t); } - + ctx.startThreads(); while (totalAllocated.get() < 50*1024*1024 && ctx.shouldRun()) { Thread.sleep(10); } ctx.stop(); - // Partition the allocations by the actual byte[] they point into, // make sure offsets are unique for each chunk Map> mapsByChunk = Maps.newHashMap(); - + int sizeCounted = 0; for (AllocRecord rec : Iterables.concat(allocations)) { sizeCounted += rec.size; if (rec.size == 0) continue; - Map mapForThisByteArray = mapsByChunk.get(rec.alloc); if (mapForThisByteArray == null) { @@ -167,7 +171,9 @@ public class TestMemStoreLAB { // Now check each byte array to make sure allocations don't overlap for (Map allocsInChunk : mapsByChunk.values()) { - int expectedOff = 0; + // since we add the chunkID at the 0th offset of the chunk and the + // chunkid is a long we need to account for those 8 bytes + int expectedOff = Bytes.SIZEOF_LONG; for (AllocRecord alloc : allocsInChunk.values()) { assertEquals(expectedOff, alloc.offset); assertTrue("Allocation overruns buffer", @@ -175,7 +181,6 @@ public class TestMemStoreLAB { expectedOff += alloc.size; } } - } /** @@ -194,7 +199,7 @@ public class TestMemStoreLAB { // set chunk size to default max alloc size, so we could easily trigger chunk retirement conf.setLong(MemStoreLABImpl.CHUNK_SIZE_KEY, MemStoreLABImpl.MAX_ALLOC_DEFAULT); // reconstruct mslab - MemStoreChunkPool.clearDisableFlag(); + ChunkCreator.clearDisableFlag(); mslab = new MemStoreLABImpl(conf); // launch multiple threads to trigger frequent chunk retirement List threads = new ArrayList<>(); @@ -223,6 +228,8 @@ public class TestMemStoreLAB { } // close the mslab mslab.close(); + // none of the chunkIds would have been returned back + assertTrue("All the chunks must have been cleared", ChunkCreator.INSTANCE.size() != 0); // make sure all chunks reclaimed or removed from chunk queue int queueLength = mslab.getPooledChunks().size(); assertTrue("All chunks in chunk queue should be reclaimed or removed" diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java new file mode 100644 index 0000000..f38a75e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java @@ -0,0 +1,168 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.lang.management.ManagementFactory; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ByteBufferKeyValue; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({RegionServerTests.class, SmallTests.class}) +public class TestMemstoreLABWithoutPool { + private final static Configuration conf = new Configuration(); + + private static final byte[] rk = Bytes.toBytes("r1"); + private static final byte[] cf = Bytes.toBytes("f"); + private static final byte[] q = Bytes.toBytes("q"); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() + .getMax() * 0.8); + // disable pool + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT + Bytes.SIZEOF_LONG, false, globalMemStoreLimit, + 0.0f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null); + } + + /** + * Test a bunch of random allocations + */ + @Test + public void testLABRandomAllocation() { + Random rand = new Random(); + MemStoreLAB mslab = new MemStoreLABImpl(); + int expectedOff = 0; + ByteBuffer lastBuffer = null; + long lastChunkId = -1; + // 100K iterations by 0-1K alloc -> 50MB expected + // should be reasonable for unit test and also cover wraparound + // behavior + for (int i = 0; i < 100000; i++) { + int valSize = rand.nextInt(1000); + KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]); + int size = KeyValueUtil.length(kv); + ByteBufferKeyValue newKv = (ByteBufferKeyValue) mslab.copyCellInto(kv); + if (newKv.getBuffer() != lastBuffer) { + // since we add the chunkID at the 0th offset of the chunk and the + // chunkid is a long we need to account for those 8 bytes + expectedOff = Bytes.SIZEOF_LONG; + lastBuffer = newKv.getBuffer(); + long chunkId = newKv.getBuffer().getLong(0); + assertTrue("chunkid should be different", chunkId != lastChunkId); + lastChunkId = chunkId; + } + assertEquals(expectedOff, newKv.getOffset()); + assertTrue("Allocation overruns buffer", + newKv.getOffset() + size <= newKv.getBuffer().capacity()); + expectedOff += size; + } + } + + /** + * Test frequent chunk retirement with chunk pool triggered by lots of threads, making sure + * there's no memory leak (HBASE-16195) + * @throws Exception if any error occurred + */ + @Test + public void testLABChunkQueueWithMultipleMSLABs() throws Exception { + Configuration conf = HBaseConfiguration.create(); + MemStoreLABImpl[] mslab = new MemStoreLABImpl[10]; + for (int i = 0; i < 10; i++) { + mslab[i] = new MemStoreLABImpl(conf); + } + // launch multiple threads to trigger frequent chunk retirement + List threads = new ArrayList<>(); + // create smaller sized kvs + final KeyValue kv = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"), + new byte[0]); + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + threads.add(getChunkQueueTestThread(mslab[i], "testLABChunkQueue-" + j, kv)); + } + } + for (Thread thread : threads) { + thread.start(); + } + // let it run for some time + Thread.sleep(3000); + for (Thread thread : threads) { + thread.interrupt(); + } + boolean threadsRunning = true; + boolean alive = false; + while (threadsRunning) { + alive = false; + for (Thread thread : threads) { + if (thread.isAlive()) { + alive = true; + break; + } + } + if (!alive) { + threadsRunning = false; + } + } + // close the mslab + for (int i = 0; i < 10; i++) { + mslab[i].close(); + } + // all of the chunkIds would have been returned back + assertTrue("All the chunks must have been cleared", ChunkCreator.INSTANCE.size() == 0); + } + + private Thread getChunkQueueTestThread(final MemStoreLABImpl mslab, String threadName, + Cell cellToCopyInto) { + Thread thread = new Thread() { + boolean stopped = false; + + @Override + public void run() { + while (!stopped) { + // keep triggering chunk retirement + mslab.copyCellInto(cellToCopyInto); + } + } + + @Override + public void interrupt() { + this.stopped = true; + } + }; + thread.setName(threadName); + thread.setDaemon(true); + return thread; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java index 3cdb227..99dd00d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java @@ -111,6 +111,7 @@ public class TestStoreFileRefresherChore { final Configuration walConf = new Configuration(conf); FSUtils.setRootDir(walConf, tableDir); final WALFactory wals = new WALFactory(walConf, null, "log_" + replicaId); + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); HRegion region = new HRegion(fs, wals.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace()), conf, htd, null); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java index 994779f..e63bad9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java @@ -98,6 +98,7 @@ public class TestWALMonotonicallyIncreasingSeqId { FSUtils.setRootDir(walConf, tableDir); this.walConf = walConf; wals = new WALFactory(walConf, null, "log_" + replicaId); + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); HRegion region = new HRegion(fs, wals.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace()), conf, htd, null); region.initialize();