chunks) {
- int toAdd = Math.min(chunks.size(), this.maxCount - reclaimedChunks.size());
- Chunk chunk = null;
- while ((chunk = chunks.poll()) != null && toAdd > 0) {
- reclaimedChunks.add(chunk);
- toAdd--;
- }
- }
-
- /**
- * Add the chunk to the pool, if the pool has achieved the max size, it will
- * skip it
- * @param chunk
- */
- synchronized void putbackChunk(Chunk chunk) {
- if (reclaimedChunks.size() < this.maxCount) {
- reclaimedChunks.add(chunk);
- }
- }
-
- int getPoolSize() {
- return this.reclaimedChunks.size();
- }
-
- /*
- * Only used in testing
- */
- void clearChunks() {
- this.reclaimedChunks.clear();
- }
-
- private class StatisticsThread extends Thread {
- StatisticsThread() {
- super("MemStoreChunkPool.StatisticsThread");
- setDaemon(true);
- }
-
- @Override
- public void run() {
- logStats();
- }
-
- private void logStats() {
- if (!LOG.isDebugEnabled()) return;
- long created = chunkCount.get();
- long reused = reusedChunkCount.get();
- long total = created + reused;
- LOG.debug("Stats: current pool size=" + reclaimedChunks.size()
- + ",created chunk count=" + created
- + ",reused chunk count=" + reused
- + ",reuseRatio=" + (total == 0 ? "0" : StringUtils.formatPercent(
- (float) reused / (float) total, 2)));
- }
- }
-
- /**
- * @return the global MemStoreChunkPool instance
- */
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "LI_LAZY_INIT_STATIC",
- justification = "Method is called by single thread at the starting of RS")
- static MemStoreChunkPool initialize(long globalMemStoreSize, float poolSizePercentage,
- float initialCountPercentage, int chunkSize, boolean offheap) {
- if (GLOBAL_INSTANCE != null) return GLOBAL_INSTANCE;
- if (chunkPoolDisabled) return null;
-
- if (poolSizePercentage <= 0) {
- chunkPoolDisabled = true;
- return null;
- }
- if (poolSizePercentage > 1.0) {
- throw new IllegalArgumentException(
- MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY + " must be between 0.0 and 1.0");
- }
- int maxCount = (int) (globalMemStoreSize * poolSizePercentage / chunkSize);
- if (initialCountPercentage > 1.0 || initialCountPercentage < 0) {
- throw new IllegalArgumentException(
- MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY + " must be between 0.0 and 1.0");
- }
- int initialCount = (int) (initialCountPercentage * maxCount);
- LOG.info("Allocating MemStoreChunkPool with chunk size " + StringUtils.byteDesc(chunkSize)
- + ", max count " + maxCount + ", initial count " + initialCount);
- GLOBAL_INSTANCE = new MemStoreChunkPool(chunkSize, maxCount, initialCount, poolSizePercentage,
- offheap);
- return GLOBAL_INSTANCE;
- }
-
- /**
- * @return The singleton instance of this pool.
- */
- static MemStoreChunkPool getPool() {
- return GLOBAL_INSTANCE;
- }
-
- int getMaxCount() {
- return this.maxCount;
- }
-
- @VisibleForTesting
- static void clearDisableFlag() {
- chunkPoolDisabled = false;
- }
-
- @Override
- public void onHeapMemoryTune(long newMemstoreSize, long newBlockCacheSize) {
- // don't do any tuning in case of offheap memstore
- if (this.offheap) {
- LOG.warn("Not tuning the chunk pool as it is offheap");
- return;
- }
- int newMaxCount = (int) (newMemstoreSize * poolSizePercentage / chunkSize);
- if (newMaxCount != this.maxCount) {
- // We need an adjustment in the chunks numbers
- if (newMaxCount > this.maxCount) {
- // Max chunks getting increased. Just change the variable. Later calls to getChunk() would
- // create and add them to Q
- LOG.info("Max count for chunks increased from " + this.maxCount + " to " + newMaxCount);
- this.maxCount = newMaxCount;
- } else {
- // Max chunks getting decreased. We may need to clear off some of the pooled chunks now
- // itself. If the extra chunks are serving already, do not pool those when we get them back
- LOG.info("Max count for chunks decreased from " + this.maxCount + " to " + newMaxCount);
- this.maxCount = newMaxCount;
- if (this.reclaimedChunks.size() > newMaxCount) {
- synchronized (this) {
- while (this.reclaimedChunks.size() > newMaxCount) {
- this.reclaimedChunks.poll();
- }
- }
- }
- }
- }
- }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
index f6d1607..1a4be79 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
@@ -41,7 +41,7 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
* {@link #copyCellInto(Cell)} gets called. This allocates enough size in the chunk to hold this
* cell's data and copies into this area and then recreate a Cell over this copied data.
*
- * @see MemStoreChunkPool
+ * @see ChunkCreator
*/
@InterfaceAudience.Private
public interface MemStoreLAB {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
index 4e87135..50fee89 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
@@ -18,23 +18,26 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import java.nio.ByteBuffer;
+import java.util.Set;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.ExtendedCell;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-
/**
* A memstore-local allocation buffer.
*
@@ -55,8 +58,8 @@ import com.google.common.base.Preconditions;
* would provide a performance improvement - probably would speed up the
* Bytes.toLong/Bytes.toInt calls in KeyValue, but some of those are cached
* anyway.
- * The chunks created by this MemStoreLAB can get pooled at {@link MemStoreChunkPool}.
- * When the Chunk comes pool, it can be either an on heap or an off heap backed chunk. The chunks,
+ * The chunks created by this MemStoreLAB can get pooled at {@link ChunkCreator}.
+ * When the Chunk comes from pool, it can be either an on heap or an off heap backed chunk. The chunks,
* which this MemStoreLAB creates on its own (when no chunk available from pool), those will be
* always on heap backed.
*/
@@ -66,14 +69,15 @@ public class MemStoreLABImpl implements MemStoreLAB {
static final Log LOG = LogFactory.getLog(MemStoreLABImpl.class);
private AtomicReference curChunk = new AtomicReference<>();
- // A queue of chunks from pool contained by this memstore LAB
- // TODO: in the future, it would be better to have List implementation instead of Queue,
- // as FIFO order is not so important here
+ // Lock to manage multiple handlers requesting for a chunk
+ private ReentrantLock lock = new ReentrantLock();
+
+ // A set of chunks contained by this memstore LAB
@VisibleForTesting
- BlockingQueue pooledChunkQueue = null;
+ Set chunks = new ConcurrentSkipListSet();
private final int chunkSize;
private final int maxAlloc;
- private final MemStoreChunkPool chunkPool;
+ private final ChunkCreator chunkCreator;
// This flag is for closing this instance, its set when clearing snapshot of
// memstore
@@ -92,20 +96,12 @@ public class MemStoreLABImpl implements MemStoreLAB {
public MemStoreLABImpl(Configuration conf) {
chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT);
maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT);
- this.chunkPool = MemStoreChunkPool.getPool();
- // currently chunkQueue is only used for chunkPool
- if (this.chunkPool != null) {
- // set queue length to chunk pool max count to avoid keeping reference of
- // too many non-reclaimable chunks
- pooledChunkQueue = new LinkedBlockingQueue<>(chunkPool.getMaxCount());
- }
-
+ this.chunkCreator = ChunkCreator.getInstance();
// if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
Preconditions.checkArgument(maxAlloc <= chunkSize,
MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY);
}
-
@Override
public Cell copyCellInto(Cell cell) {
int size = KeyValueUtil.length(cell);
@@ -118,19 +114,52 @@ public class MemStoreLABImpl implements MemStoreLAB {
Chunk c = null;
int allocOffset = 0;
while (true) {
+ // Try to get the chunk
c = getOrMakeChunk();
+ // we may get null because the some other thread succeeded in getting the lock
+ // and so the current thread has to try again to make its chunk or grab the chunk
+ // that the other thread created
// Try to allocate from this chunk
- allocOffset = c.alloc(size);
- if (allocOffset != -1) {
- // We succeeded - this is the common case - small alloc
- // from a big buffer
- break;
+ if (c != null) {
+ allocOffset = c.alloc(size);
+ if (allocOffset != -1) {
+ // We succeeded - this is the common case - small alloc
+ // from a big buffer
+ break;
+ }
+ // not enough space!
+ // try to retire this chunk
+ tryRetireChunk(c);
}
- // not enough space!
- // try to retire this chunk
- tryRetireChunk(c);
}
- return CellUtil.copyCellTo(cell, c.getData(), allocOffset, size);
+ return copyToChunkCell(cell, c.getData(), allocOffset, size);
+ }
+
+ /**
+ * Clone the passed cell by copying its data into the passed buf and create a cell with a chunkid
+ * out of it
+ */
+ private Cell copyToChunkCell(Cell cell, ByteBuffer buf, int offset, int len) {
+ int tagsLen = cell.getTagsLength();
+ if (cell instanceof ExtendedCell) {
+ ((ExtendedCell) cell).write(buf, offset);
+ } else {
+ // Normally all Cell impls within Server will be of type ExtendedCell. Just considering the
+ // other case also. The data fragments within Cell is copied into buf as in KeyValue
+ // serialization format only.
+ KeyValueUtil.appendTo(cell, buf, offset, true);
+ }
+ // TODO : write the seqid here. For writing seqId we should create a new cell type so
+ // that seqId is not used as the state
+ if (tagsLen == 0) {
+ // When tagsLen is 0, make a NoTagsByteBufferKeyValue version. This is an optimized class
+ // which directly return tagsLen as 0. So we avoid parsing many length components in
+ // reading the tagLength stored in the backing buffer. The Memstore addition of every Cell
+ // call getTagsLength().
+ return new NoTagByteBufferChunkCell(buf, offset, len, cell.getSequenceId());
+ } else {
+ return new ByteBufferChunkCell(buf, offset, len, cell.getSequenceId());
+ }
}
/**
@@ -142,9 +171,9 @@ public class MemStoreLABImpl implements MemStoreLAB {
this.closed = true;
// We could put back the chunks to pool for reusing only when there is no
// opening scanner which will read their data
- if (chunkPool != null && openScannerCount.get() == 0
- && reclaimed.compareAndSet(false, true)) {
- chunkPool.putbackChunks(this.pooledChunkQueue);
+ int count = openScannerCount.get();
+ if(count == 0) {
+ recycleChunks();
}
}
@@ -162,9 +191,14 @@ public class MemStoreLABImpl implements MemStoreLAB {
@Override
public void decScannerCount() {
int count = this.openScannerCount.decrementAndGet();
- if (this.closed && chunkPool != null && count == 0
- && reclaimed.compareAndSet(false, true)) {
- chunkPool.putbackChunks(this.pooledChunkQueue);
+ if (this.closed && count == 0) {
+ recycleChunks();
+ }
+ }
+
+ private void recycleChunks() {
+ if (reclaimed.compareAndSet(false, true)) {
+ chunkCreator.putbackChunks(chunks);
}
}
@@ -190,45 +224,35 @@ public class MemStoreLABImpl implements MemStoreLAB {
* allocate a new one from the JVM.
*/
private Chunk getOrMakeChunk() {
- while (true) {
- // Try to get the chunk
- Chunk c = curChunk.get();
- if (c != null) {
- return c;
- }
-
- // No current chunk, so we want to allocate one. We race
- // against other allocators to CAS in an uninitialized chunk
- // (which is cheap to allocate)
- if (chunkPool != null) {
- c = chunkPool.getChunk();
- }
- boolean pooledChunk = false;
- if (c != null) {
- // This is chunk from pool
- pooledChunk = true;
- } else {
- c = new OnheapChunk(chunkSize);// When chunk is not from pool, always make it as on heap.
- }
- if (curChunk.compareAndSet(null, c)) {
- // we won race - now we need to actually do the expensive
- // allocation step
- c.init();
- if (pooledChunk) {
- if (!this.closed && !this.pooledChunkQueue.offer(c)) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Chunk queue is full, won't reuse this new chunk. Current queue size: "
- + pooledChunkQueue.size());
- }
+ // Try to get the chunk
+ Chunk c = curChunk.get();
+ if (c != null) {
+ return c;
+ }
+ // No current chunk, so we want to allocate one. We race
+ // against other allocators to CAS in an uninitialized chunk
+ // (which is cheap to allocate)
+ if (lock.tryLock()) {
+ try {
+ // once again check inside the lock
+ c = curChunk.get();
+ if (c != null) {
+ return c;
+ }
+ c = this.chunkCreator.getChunk();
+ if (c != null) {
+ // set the curChunk. No need of CAS as only one thread will be here
+ curChunk.set(c);
+ if (!this.closed) {
+ chunks.add(c.getId());
}
+ return c;
}
- return c;
- } else if (pooledChunk) {
- chunkPool.putbackChunk(c);
+ } finally {
+ lock.unlock();
}
- // someone else won race - that's fine, we'll try to grab theirs
- // in the next iteration of the loop.
}
+ return null;
}
@VisibleForTesting
@@ -236,8 +260,15 @@ public class MemStoreLABImpl implements MemStoreLAB {
return this.curChunk.get();
}
-
+ @VisibleForTesting
BlockingQueue getPooledChunks() {
- return this.pooledChunkQueue;
+ BlockingQueue pooledChunks = new LinkedBlockingQueue<>();
+ for (Long id : this.chunks) {
+ Chunk chunk = chunkCreator.getChunk(id);
+ if (chunk != null && chunk.isFromPool()) {
+ pooledChunks.add(chunk);
+ }
+ }
+ return pooledChunks;
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoTagByteBufferChunkCell.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoTagByteBufferChunkCell.java
new file mode 100644
index 0000000..1b652fa
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoTagByteBufferChunkCell.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.NoTagsByteBufferKeyValue;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+
+
+/**
+ * ByteBuffer based cell which has the chunkid at the 0th offset and with no tags
+ * @see MemStoreLAB
+ */
+@InterfaceAudience.Private
+public class NoTagByteBufferChunkCell extends NoTagsByteBufferKeyValue {
+
+ public NoTagByteBufferChunkCell(ByteBuffer buf, int offset, int length) {
+ super(buf, offset, length);
+ }
+
+ public NoTagByteBufferChunkCell(ByteBuffer buf, int offset, int length, long seqId) {
+ super(buf, offset, length, seqId);
+ }
+
+ @Override
+ public long getChunkId() {
+ // The chunkId is embedded at the 0th offset of the bytebuffer
+ return ByteBufferUtils.toLong(buf, 0);
+ }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java
index ed98cfa..802a123 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java
@@ -21,34 +21,27 @@ import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import com.google.common.base.Preconditions;
-
/**
* An off heap chunk implementation.
*/
@InterfaceAudience.Private
public class OffheapChunk extends Chunk {
- OffheapChunk(int size) {
- super(size);
+ OffheapChunk(int size, long id) {
+ // better if this is always created fromPool. This should not be called
+ super(size, id);
+ }
+
+ OffheapChunk(int size, long id, boolean fromPool) {
+ super(size, id, fromPool);
+ assert fromPool == true;
}
@Override
- public void init() {
- assert nextFreeOffset.get() == UNINITIALIZED;
- try {
- if (data == null) {
- data = ByteBuffer.allocateDirect(this.size);
- }
- } catch (OutOfMemoryError e) {
- boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
- assert failInit; // should be true.
- throw e;
+ void allocateDataBuffer() {
+ if (data == null) {
+ data = ByteBuffer.allocateDirect(this.size);
+ data.putLong(0, this.getId());
}
- // Mark that it's ready for use
- boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, 0);
- // We should always succeed the above CAS since only one thread
- // calls init()!
- Preconditions.checkState(initted, "Multiple threads tried to init same chunk");
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java
index bd33cb5..476786e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java
@@ -21,33 +21,25 @@ import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import com.google.common.base.Preconditions;
-
/**
* An on heap chunk implementation.
*/
@InterfaceAudience.Private
public class OnheapChunk extends Chunk {
- OnheapChunk(int size) {
- super(size);
+ OnheapChunk(int size, long id) {
+ super(size, id);
+ }
+
+ OnheapChunk(int size, long id, boolean fromPool) {
+ super(size, id, fromPool);
}
- public void init() {
- assert nextFreeOffset.get() == UNINITIALIZED;
- try {
- if (data == null) {
- data = ByteBuffer.allocate(this.size);
- }
- } catch (OutOfMemoryError e) {
- boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
- assert failInit; // should be true.
- throw e;
+ @Override
+ void allocateDataBuffer() {
+ if (data == null) {
+ data = ByteBuffer.allocate(this.size);
+ data.putLong(0, this.getId());
}
- // Mark that it's ready for use
- boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, 0);
- // We should always succeed the above CAS since only one thread
- // calls init()!
- Preconditions.checkState(initted, "Multiple threads tried to init same chunk");
}
-}
+}
\ No newline at end of file
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index 696ea18..d2e10b4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -97,6 +97,8 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.ChunkCreator;
+import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
@@ -2428,6 +2430,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
public static HRegion createRegionAndWAL(final HRegionInfo info, final Path rootDir,
final Configuration conf, final HTableDescriptor htd, boolean initialize)
throws IOException {
+ ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
WAL wal = createWal(conf, rootDir, info);
return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
index cc73d9d..32bce26 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
@@ -65,6 +65,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActi
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.ChunkCreator;
+import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.testclassification.MasterTests;
@@ -73,6 +75,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.Triple;
+import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -88,6 +91,10 @@ public class TestCatalogJanitor {
@Rule
public TestName name = new TestName();
+ @BeforeClass
+ public static void setup() throws Exception {
+ ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
+ }
/**
* Mock MasterServices for tests below.
*/
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java
index 418aadf..096c5ef 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java
@@ -241,7 +241,7 @@ public class TestBulkLoad {
for (byte[] family : families) {
hTableDescriptor.addFamily(new HColumnDescriptor(family));
}
-
+ ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
// TODO We need a way to do this without creating files
return HRegion.createHRegion(hRegionInfo,
new Path(testFolder.newFolder().toURI()),
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java
index 3b4d068..09877b0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java
@@ -73,7 +73,7 @@ public class TestCellFlatSet extends TestCase {
descCbOnHeap = new CellArrayMap(CellComparator.COMPARATOR,descCells,0,NUM_OF_CELLS,true);
CONF.setBoolean(MemStoreLAB.USEMSLAB_KEY, true);
CONF.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f);
- MemStoreChunkPool.chunkPoolDisabled = false;
+ ChunkCreator.chunkPoolDisabled = false;
}
/* Create and test CellSet based on CellArrayMap */
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
index a888c45..9e90f3e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.WAL;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -50,7 +51,7 @@ import static org.junit.Assert.assertTrue;
public class TestCompactingMemStore extends TestDefaultMemStore {
private static final Log LOG = LogFactory.getLog(TestCompactingMemStore.class);
- protected static MemStoreChunkPool chunkPool;
+ protected static ChunkCreator chunkCreator;
protected HRegion region;
protected RegionServicesForStores regionServicesForStores;
protected HStore store;
@@ -65,7 +66,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
@After
public void tearDown() throws Exception {
- chunkPool.clearChunks();
+ chunkCreator.clearChunksInPool();
}
@Override
@@ -84,15 +85,21 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 1000);
HBaseTestingUtility hbaseUtility = HBaseTestingUtility.createLocalHTU(conf);
HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
- this.region = hbaseUtility.createTestRegion("foobar", hcd);
+ HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("foobar"));
+ htd.addFamily(hcd);
+ HRegionInfo info =
+ new HRegionInfo(TableName.valueOf("foobar"), null, null, false);
+ WAL wal = hbaseUtility.createWal(conf, hbaseUtility.getDataTestDir(), info);
+ this.region = HRegion.createHRegion(info, hbaseUtility.getDataTestDir(), conf, htd, wal, true);
+ //this.region = hbaseUtility.createTestRegion("foobar", hcd);
this.regionServicesForStores = region.getRegionServicesForStores();
this.store = new HStore(region, hcd, conf);
long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()
.getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
- chunkPool = MemStoreChunkPool.initialize(globalMemStoreLimit, 0.2f,
- MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false);
- assertTrue(chunkPool != null);
+ chunkCreator = ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false,
+ globalMemStoreLimit, 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null);
+ assertTrue(chunkCreator != null);
}
/**
@@ -390,7 +397,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
}
memstore.clearSnapshot(snapshot.getId());
- int chunkCount = chunkPool.getPoolSize();
+ int chunkCount = chunkCreator.getPoolSize();
assertTrue(chunkCount > 0);
}
@@ -434,16 +441,16 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
}
memstore.clearSnapshot(snapshot.getId());
- assertTrue(chunkPool.getPoolSize() == 0);
+ assertTrue(chunkCreator.getPoolSize() == 0);
// Chunks will be put back to pool after close scanners;
for (KeyValueScanner scanner : scanners) {
scanner.close();
}
- assertTrue(chunkPool.getPoolSize() > 0);
+ assertTrue(chunkCreator.getPoolSize() > 0);
// clear chunks
- chunkPool.clearChunks();
+ chunkCreator.clearChunksInPool();
// Creating another snapshot
@@ -464,7 +471,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
scanner.close();
}
memstore.clearSnapshot(snapshot.getId());
- assertTrue(chunkPool.getPoolSize() > 0);
+ assertTrue(chunkCreator.getPoolSize() > 0);
}
@Test
@@ -516,16 +523,16 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
memstore.add(new KeyValue(row, fam, qf1, 3, val), null);
assertEquals(3, memstore.getActive().getCellsCount());
- assertTrue(chunkPool.getPoolSize() == 0);
+ assertTrue(chunkCreator.getPoolSize() == 0);
// Chunks will be put back to pool after close scanners;
for (KeyValueScanner scanner : scanners) {
scanner.close();
}
- assertTrue(chunkPool.getPoolSize() > 0);
+ assertTrue(chunkCreator.getPoolSize() > 0);
// clear chunks
- chunkPool.clearChunks();
+ chunkCreator.clearChunksInPool();
// Creating another snapshot
@@ -553,7 +560,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
scanner.close();
}
memstore.clearSnapshot(snapshot.getId());
- assertTrue(chunkPool.getPoolSize() > 0);
+ assertTrue(chunkCreator.getPoolSize() > 0);
}
//////////////////////////////////////////////////////////////////////////////
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
index 5a48455..66e107a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
@@ -44,17 +44,13 @@ import java.util.List;
public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore {
private static final Log LOG = LogFactory.getLog(TestCompactingToCellArrayMapMemStore.class);
- //private static MemStoreChunkPool chunkPool;
- //private HRegion region;
- //private RegionServicesForStores regionServicesForStores;
- //private HStore store;
//////////////////////////////////////////////////////////////////////////////
// Helpers
//////////////////////////////////////////////////////////////////////////////
@Override public void tearDown() throws Exception {
- chunkPool.clearChunks();
+ chunkCreator.clearChunksInPool();
}
@Override public void setUp() throws Exception {
@@ -408,16 +404,16 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
}
memstore.clearSnapshot(snapshot.getId());
- assertTrue(chunkPool.getPoolSize() == 0);
+ assertTrue(chunkCreator.getPoolSize() == 0);
// Chunks will be put back to pool after close scanners;
for (KeyValueScanner scanner : scanners) {
scanner.close();
}
- assertTrue(chunkPool.getPoolSize() > 0);
+ assertTrue(chunkCreator.getPoolSize() > 0);
// clear chunks
- chunkPool.clearChunks();
+ chunkCreator.clearChunksInPool();
// Creating another snapshot
@@ -438,7 +434,7 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
scanner.close();
}
memstore.clearSnapshot(snapshot.getId());
- assertTrue(chunkPool.getPoolSize() > 0);
+ assertTrue(chunkCreator.getPoolSize() > 0);
}
@Test
@@ -472,7 +468,7 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
}
memstore.clearSnapshot(snapshot.getId());
- int chunkCount = chunkPool.getPoolSize();
+ int chunkCount = chunkCreator.getPoolSize();
assertTrue(chunkCount > 0);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java
index 7154511..bff5bec 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionPolicy.java
@@ -104,6 +104,7 @@ public class TestCompactionPolicy {
HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
hlog = new FSHLog(fs, basedir, logName, conf);
+ ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
region = HRegion.createHRegion(info, basedir, conf, htd, hlog);
region.close();
Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
index 7434eb1..41b304b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.wal.WALFactory;
+import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -84,6 +85,7 @@ public class TestDefaultMemStore {
protected static final byte[] FAMILY = Bytes.toBytes("column");
protected MultiVersionConcurrencyControl mvcc;
protected AtomicLong startSeqNum = new AtomicLong(0);
+ protected ChunkCreator chunkCreator;
private String getName() {
return this.name.getMethodName();
@@ -92,9 +94,17 @@ public class TestDefaultMemStore {
@Before
public void setUp() throws Exception {
internalSetUp();
+ // no pool
+ this.chunkCreator =
+ ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
this.memstore = new DefaultMemStore();
}
+ @AfterClass
+ public static void tearDownClass() throws Exception {
+ ChunkCreator.getInstance().clearChunkIds();
+ }
+
protected void internalSetUp() throws Exception {
this.mvcc = new MultiVersionConcurrencyControl();
}
@@ -129,7 +139,9 @@ public class TestDefaultMemStore {
assertEquals(Segment.getCellLength(kv), sizeChangeForSecondCell.getDataSize());
// make sure chunk size increased even when writing the same cell, if using MSLAB
if (msLab instanceof MemStoreLABImpl) {
- assertEquals(2 * Segment.getCellLength(kv),
+ // since we add the chunkID at the 0th offset of the chunk and the
+ // chunkid is a long we need to account for those 8 bytes
+ assertEquals(2 * Segment.getCellLength(kv) + Bytes.SIZEOF_LONG,
((MemStoreLABImpl) msLab).getCurrentChunk().getNextFreeOffset());
}
} else {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
index 37a7664..1768801 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
@@ -48,30 +48,30 @@ import static org.junit.Assert.assertTrue;
@Category({RegionServerTests.class, SmallTests.class})
public class TestMemStoreChunkPool {
private final static Configuration conf = new Configuration();
- private static MemStoreChunkPool chunkPool;
+ private static ChunkCreator chunkCreator;
private static boolean chunkPoolDisabledBeforeTest;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, true);
conf.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f);
- chunkPoolDisabledBeforeTest = MemStoreChunkPool.chunkPoolDisabled;
- MemStoreChunkPool.chunkPoolDisabled = false;
+ chunkPoolDisabledBeforeTest = ChunkCreator.chunkPoolDisabled;
+ ChunkCreator.chunkPoolDisabled = false;
long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()
.getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
- chunkPool = MemStoreChunkPool.initialize(globalMemStoreLimit, 0.2f,
- MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false);
- assertTrue(chunkPool != null);
+ chunkCreator = ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false,
+ globalMemStoreLimit, 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null);
+ assertTrue(chunkCreator != null);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
- MemStoreChunkPool.chunkPoolDisabled = chunkPoolDisabledBeforeTest;
+ ChunkCreator.chunkPoolDisabled = chunkPoolDisabledBeforeTest;
}
@Before
public void tearDown() throws Exception {
- chunkPool.clearChunks();
+ chunkCreator.clearChunksInPool();
}
@Test
@@ -90,7 +90,7 @@ public class TestMemStoreChunkPool {
int size = KeyValueUtil.length(kv);
ByteBufferKeyValue newKv = (ByteBufferKeyValue) mslab.copyCellInto(kv);
if (newKv.getBuffer() != lastBuffer) {
- expectedOff = 0;
+ expectedOff = 8;
lastBuffer = newKv.getBuffer();
}
assertEquals(expectedOff, newKv.getOffset());
@@ -100,14 +100,14 @@ public class TestMemStoreChunkPool {
}
// chunks will be put back to pool after close
mslab.close();
- int chunkCount = chunkPool.getPoolSize();
+ int chunkCount = chunkCreator.getPoolSize();
assertTrue(chunkCount > 0);
// reconstruct mslab
mslab = new MemStoreLABImpl(conf);
// chunk should be got from the pool, so we can reuse it.
KeyValue kv = new KeyValue(rk, cf, q, new byte[10]);
mslab.copyCellInto(kv);
- assertEquals(chunkCount - 1, chunkPool.getPoolSize());
+ assertEquals(chunkCount - 1, chunkCreator.getPoolSize());
}
@Test
@@ -143,7 +143,7 @@ public class TestMemStoreChunkPool {
}
memstore.clearSnapshot(snapshot.getId());
- int chunkCount = chunkPool.getPoolSize();
+ int chunkCount = chunkCreator.getPoolSize();
assertTrue(chunkCount > 0);
}
@@ -189,16 +189,16 @@ public class TestMemStoreChunkPool {
}
memstore.clearSnapshot(snapshot.getId());
- assertTrue(chunkPool.getPoolSize() == 0);
+ assertTrue(chunkCreator.getPoolSize() == 0);
// Chunks will be put back to pool after close scanners;
for (KeyValueScanner scanner : scanners) {
scanner.close();
}
- assertTrue(chunkPool.getPoolSize() > 0);
+ assertTrue(chunkCreator.getPoolSize() > 0);
// clear chunks
- chunkPool.clearChunks();
+ chunkCreator.clearChunksInPool();
// Creating another snapshot
snapshot = memstore.snapshot();
@@ -218,20 +218,20 @@ public class TestMemStoreChunkPool {
scanner.close();
}
memstore.clearSnapshot(snapshot.getId());
- assertTrue(chunkPool.getPoolSize() > 0);
+ assertTrue(chunkCreator.getPoolSize() > 0);
}
@Test
public void testPutbackChunksMultiThreaded() throws Exception {
- MemStoreChunkPool oldPool = MemStoreChunkPool.GLOBAL_INSTANCE;
final int maxCount = 10;
final int initialCount = 5;
- final int chunkSize = 30;
+ final int chunkSize = 40;
final int valSize = 7;
- MemStoreChunkPool pool = new MemStoreChunkPool(chunkSize, maxCount, initialCount, 1, false);
- assertEquals(initialCount, pool.getPoolSize());
- assertEquals(maxCount, pool.getMaxCount());
- MemStoreChunkPool.GLOBAL_INSTANCE = pool;// Replace the global ref with the new one we created.
+ ChunkCreator oldCreator = ChunkCreator.getInstance();
+ ChunkCreator newCreator = new ChunkCreator(chunkSize, false, 400, 1, 0.5f, null);
+ assertEquals(initialCount, newCreator.getPoolSize());
+ assertEquals(maxCount, newCreator.getMaxCount());
+ ChunkCreator.INSTANCE = newCreator;// Replace the global ref with the new one we created.
// Used it for the testing. Later in finally we put
// back the original
final KeyValue kv = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"),
@@ -258,9 +258,9 @@ public class TestMemStoreChunkPool {
t1.join();
t2.join();
t3.join();
- assertTrue(pool.getPoolSize() <= maxCount);
+ assertTrue(newCreator.getPoolSize() <= maxCount);
} finally {
- MemStoreChunkPool.GLOBAL_INSTANCE = oldPool;
+ ChunkCreator.INSTANCE = oldCreator;
}
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java
index 141b802..6696e43 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java
@@ -63,8 +63,8 @@ public class TestMemStoreLAB {
public static void setUpBeforeClass() throws Exception {
long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()
.getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
- MemStoreChunkPool.initialize(globalMemStoreLimit, 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT,
- MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false);
+ ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, globalMemStoreLimit,
+ 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null);
}
/**
@@ -76,6 +76,7 @@ public class TestMemStoreLAB {
MemStoreLAB mslab = new MemStoreLABImpl();
int expectedOff = 0;
ByteBuffer lastBuffer = null;
+ long lastChunkId = -1;
// 100K iterations by 0-1K alloc -> 50MB expected
// should be reasonable for unit test and also cover wraparound
// behavior
@@ -85,8 +86,13 @@ public class TestMemStoreLAB {
int size = KeyValueUtil.length(kv);
ByteBufferKeyValue newKv = (ByteBufferKeyValue) mslab.copyCellInto(kv);
if (newKv.getBuffer() != lastBuffer) {
- expectedOff = 0;
+ // since we add the chunkID at the 0th offset of the chunk and the
+ // chunkid is a long we need to account for those 8 bytes
+ expectedOff = Bytes.SIZEOF_LONG;
lastBuffer = newKv.getBuffer();
+ long chunkId = newKv.getBuffer().getLong(0);
+ assertTrue("chunkid should be different", chunkId != lastChunkId);
+ lastChunkId = chunkId;
}
assertEquals(expectedOff, newKv.getOffset());
assertTrue("Allocation overruns buffer",
@@ -136,23 +142,21 @@ public class TestMemStoreLAB {
};
ctx.addThread(t);
}
-
+
ctx.startThreads();
while (totalAllocated.get() < 50*1024*1024 && ctx.shouldRun()) {
Thread.sleep(10);
}
ctx.stop();
-
// Partition the allocations by the actual byte[] they point into,
// make sure offsets are unique for each chunk
Map> mapsByChunk =
Maps.newHashMap();
-
+
int sizeCounted = 0;
for (AllocRecord rec : Iterables.concat(allocations)) {
sizeCounted += rec.size;
if (rec.size == 0) continue;
-
Map mapForThisByteArray =
mapsByChunk.get(rec.alloc);
if (mapForThisByteArray == null) {
@@ -167,7 +171,9 @@ public class TestMemStoreLAB {
// Now check each byte array to make sure allocations don't overlap
for (Map allocsInChunk : mapsByChunk.values()) {
- int expectedOff = 0;
+ // since we add the chunkID at the 0th offset of the chunk and the
+ // chunkid is a long we need to account for those 8 bytes
+ int expectedOff = Bytes.SIZEOF_LONG;
for (AllocRecord alloc : allocsInChunk.values()) {
assertEquals(expectedOff, alloc.offset);
assertTrue("Allocation overruns buffer",
@@ -175,7 +181,6 @@ public class TestMemStoreLAB {
expectedOff += alloc.size;
}
}
-
}
/**
@@ -194,7 +199,7 @@ public class TestMemStoreLAB {
// set chunk size to default max alloc size, so we could easily trigger chunk retirement
conf.setLong(MemStoreLABImpl.CHUNK_SIZE_KEY, MemStoreLABImpl.MAX_ALLOC_DEFAULT);
// reconstruct mslab
- MemStoreChunkPool.clearDisableFlag();
+ ChunkCreator.clearDisableFlag();
mslab = new MemStoreLABImpl(conf);
// launch multiple threads to trigger frequent chunk retirement
List threads = new ArrayList<>();
@@ -223,6 +228,8 @@ public class TestMemStoreLAB {
}
// close the mslab
mslab.close();
+ // none of the chunkIds would have been returned back
+ assertTrue("All the chunks must have been cleared", ChunkCreator.INSTANCE.size() != 0);
// make sure all chunks reclaimed or removed from chunk queue
int queueLength = mslab.getPooledChunks().size();
assertTrue("All chunks in chunk queue should be reclaimed or removed"
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java
new file mode 100644
index 0000000..f38a75e
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java
@@ -0,0 +1,168 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.lang.management.ManagementFactory;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ByteBufferKeyValue;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({RegionServerTests.class, SmallTests.class})
+public class TestMemstoreLABWithoutPool {
+ private final static Configuration conf = new Configuration();
+
+ private static final byte[] rk = Bytes.toBytes("r1");
+ private static final byte[] cf = Bytes.toBytes("f");
+ private static final byte[] q = Bytes.toBytes("q");
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()
+ .getMax() * 0.8);
+ // disable pool
+ ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT + Bytes.SIZEOF_LONG, false, globalMemStoreLimit,
+ 0.0f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null);
+ }
+
+ /**
+ * Test a bunch of random allocations
+ */
+ @Test
+ public void testLABRandomAllocation() {
+ Random rand = new Random();
+ MemStoreLAB mslab = new MemStoreLABImpl();
+ int expectedOff = 0;
+ ByteBuffer lastBuffer = null;
+ long lastChunkId = -1;
+ // 100K iterations by 0-1K alloc -> 50MB expected
+ // should be reasonable for unit test and also cover wraparound
+ // behavior
+ for (int i = 0; i < 100000; i++) {
+ int valSize = rand.nextInt(1000);
+ KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]);
+ int size = KeyValueUtil.length(kv);
+ ByteBufferKeyValue newKv = (ByteBufferKeyValue) mslab.copyCellInto(kv);
+ if (newKv.getBuffer() != lastBuffer) {
+ // since we add the chunkID at the 0th offset of the chunk and the
+ // chunkid is a long we need to account for those 8 bytes
+ expectedOff = Bytes.SIZEOF_LONG;
+ lastBuffer = newKv.getBuffer();
+ long chunkId = newKv.getBuffer().getLong(0);
+ assertTrue("chunkid should be different", chunkId != lastChunkId);
+ lastChunkId = chunkId;
+ }
+ assertEquals(expectedOff, newKv.getOffset());
+ assertTrue("Allocation overruns buffer",
+ newKv.getOffset() + size <= newKv.getBuffer().capacity());
+ expectedOff += size;
+ }
+ }
+
+ /**
+ * Test frequent chunk retirement with chunk pool triggered by lots of threads, making sure
+ * there's no memory leak (HBASE-16195)
+ * @throws Exception if any error occurred
+ */
+ @Test
+ public void testLABChunkQueueWithMultipleMSLABs() throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ MemStoreLABImpl[] mslab = new MemStoreLABImpl[10];
+ for (int i = 0; i < 10; i++) {
+ mslab[i] = new MemStoreLABImpl(conf);
+ }
+ // launch multiple threads to trigger frequent chunk retirement
+ List threads = new ArrayList<>();
+ // create smaller sized kvs
+ final KeyValue kv = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"),
+ new byte[0]);
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 10; j++) {
+ threads.add(getChunkQueueTestThread(mslab[i], "testLABChunkQueue-" + j, kv));
+ }
+ }
+ for (Thread thread : threads) {
+ thread.start();
+ }
+ // let it run for some time
+ Thread.sleep(3000);
+ for (Thread thread : threads) {
+ thread.interrupt();
+ }
+ boolean threadsRunning = true;
+ boolean alive = false;
+ while (threadsRunning) {
+ alive = false;
+ for (Thread thread : threads) {
+ if (thread.isAlive()) {
+ alive = true;
+ break;
+ }
+ }
+ if (!alive) {
+ threadsRunning = false;
+ }
+ }
+ // close the mslab
+ for (int i = 0; i < 10; i++) {
+ mslab[i].close();
+ }
+ // all of the chunkIds would have been returned back
+ assertTrue("All the chunks must have been cleared", ChunkCreator.INSTANCE.size() == 0);
+ }
+
+ private Thread getChunkQueueTestThread(final MemStoreLABImpl mslab, String threadName,
+ Cell cellToCopyInto) {
+ Thread thread = new Thread() {
+ boolean stopped = false;
+
+ @Override
+ public void run() {
+ while (!stopped) {
+ // keep triggering chunk retirement
+ mslab.copyCellInto(cellToCopyInto);
+ }
+ }
+
+ @Override
+ public void interrupt() {
+ this.stopped = true;
+ }
+ };
+ thread.setName(threadName);
+ thread.setDaemon(true);
+ return thread;
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java
index 3cdb227..99dd00d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java
@@ -111,6 +111,7 @@ public class TestStoreFileRefresherChore {
final Configuration walConf = new Configuration(conf);
FSUtils.setRootDir(walConf, tableDir);
final WALFactory wals = new WALFactory(walConf, null, "log_" + replicaId);
+ ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
HRegion region =
new HRegion(fs, wals.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace()),
conf, htd, null);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java
index 994779f..e63bad9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java
@@ -98,6 +98,7 @@ public class TestWALMonotonicallyIncreasingSeqId {
FSUtils.setRootDir(walConf, tableDir);
this.walConf = walConf;
wals = new WALFactory(walConf, null, "log_" + replicaId);
+ ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
HRegion region = new HRegion(fs, wals.getWAL(info.getEncodedNameAsBytes(),
info.getTable().getNamespace()), conf, htd, null);
region.initialize();