.../apache/hadoop/hbase/ByteBufferChunkCell.java | 42 ++++++++ .../java/org/apache/hadoop/hbase/CellUtil.java | 11 +- .../java/org/apache/hadoop/hbase/ChunkCell.java | 29 +++++ .../hadoop/hbase/NoTagByteBufferChunkCell.java | 45 ++++++++ .../apache/hadoop/hbase/regionserver/Chunk.java | 38 ++++++- .../hadoop/hbase/regionserver/HRegionServer.java | 5 +- .../hbase/regionserver/MSLABChunkCreator.java | 120 +++++++++++++++++++++ .../hbase/regionserver/MemStoreChunkPool.java | 32 +++--- .../hadoop/hbase/regionserver/MemStoreLABImpl.java | 27 +++-- .../hadoop/hbase/regionserver/OffheapChunk.java | 25 ++--- .../hadoop/hbase/regionserver/OnheapChunk.java | 24 ++--- .../hbase/regionserver/TestCompactingMemStore.java | 3 +- .../hbase/regionserver/TestDefaultMemStore.java | 13 ++- .../hbase/regionserver/TestMemStoreChunkPool.java | 10 +- .../hadoop/hbase/regionserver/TestMemStoreLAB.java | 16 +-- 15 files changed, 359 insertions(+), 81 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ByteBufferChunkCell.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ByteBufferChunkCell.java new file mode 100644 index 0000000..87db80f --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ByteBufferChunkCell.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * ByteBuffer based ChunkCell with tags + */ +@InterfaceAudience.Private +public class ByteBufferChunkCell extends ByteBufferKeyValue implements ChunkCell { + public ByteBufferChunkCell(ByteBuffer buf, int offset, int length) { + super(buf, offset, length); + } + + public ByteBufferChunkCell(ByteBuffer buf, int offset, int length, long seqId) { + super(buf, offset, length, seqId); + } + + @Override + public long getChunkId() { + // The chunkId is embedded at the 0th offset of the bytebuffer + return this.buf.getLong(0); + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java index 5930928..0b32e4f 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java @@ -3177,9 +3177,10 @@ public final class CellUtil { } /** - * Clone the passed cell by copying its data into the passed buf. + * Clone the passed cell by copying its data into the passed buf and create a memstore + * chunk cell out of it */ - public static Cell copyCellTo(Cell cell, ByteBuffer buf, int offset, int len) { + public static Cell copyToChunkCell(Cell cell, ByteBuffer buf, int offset, int len) { int tagsLen = cell.getTagsLength(); if (cell instanceof ExtendedCell) { ((ExtendedCell) cell).write(buf, offset); @@ -3189,14 +3190,16 @@ public final class CellUtil { // serialization format only. KeyValueUtil.appendTo(cell, buf, offset, true); } + // TODO : write the seqid here. For writing seqId we should create a new cell type so + // that seqId is not used as the state if (tagsLen == 0) { // When tagsLen is 0, make a NoTagsByteBufferKeyValue version. This is an optimized class // which directly return tagsLen as 0. So we avoid parsing many length components in // reading the tagLength stored in the backing buffer. The Memstore addition of every Cell // call getTagsLength(). - return new NoTagsByteBufferKeyValue(buf, offset, len, cell.getSequenceId()); + return new NoTagByteBufferChunkCell(buf, offset, len, cell.getSequenceId()); } else { - return new ByteBufferKeyValue(buf, offset, len, cell.getSequenceId()); + return new ByteBufferChunkCell(buf, offset, len, cell.getSequenceId()); } } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ChunkCell.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ChunkCell.java new file mode 100644 index 0000000..0e41a93 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ChunkCell.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * A cell created out of the MSLAB pool's chunk. The chunkId can be extracted from the backing + * ByteBuffer + */ +@InterfaceAudience.Private +public interface ChunkCell { + long getChunkId(); +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/NoTagByteBufferChunkCell.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/NoTagByteBufferChunkCell.java new file mode 100644 index 0000000..33f2e71 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/NoTagByteBufferChunkCell.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +@InterfaceAudience.Private + +/** + * ByteBuffer based ChunkCell with no tags + */ +public class NoTagByteBufferChunkCell extends NoTagsByteBufferKeyValue implements ChunkCell { + + public NoTagByteBufferChunkCell(ByteBuffer buf, int offset, int length) { + super(buf, offset, length); + } + + public NoTagByteBufferChunkCell(ByteBuffer buf, int offset, int length, long seqId) { + super(buf, offset, length, seqId); + } + + @Override + public long getChunkId() { + // The chunkId is embedded at the 0th offset of the bytebuffer + return this.buf.getLong(0); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java index 2cbf0a3..ff03655 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Chunk.java @@ -21,8 +21,10 @@ import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; /** * A chunk of memory out of which allocations are sliced. @@ -46,13 +48,22 @@ public abstract class Chunk { /** Size of chunk in bytes */ protected final int size; + // The unique id associated with the chunk. -1 indicates Chunk not from pool + private long id = -1; + /** - * Create an uninitialized chunk. Note that memory is not allocated yet, so this is cheap. - * + * Create an uninitialized chunk. Note that memory is not allocated yet, so + * this is cheap. * @param size in bytes + * @param id the chunk id */ - Chunk(int size) { + public Chunk(int size, long id) { this.size = size; + this.id = id; + } + + long getId() { + return this.id; } /** @@ -60,7 +71,24 @@ public abstract class Chunk { * constructed the chunk. It is thread-safe against other threads calling alloc(), who will block * until the allocation is complete. */ - public abstract void init(); + public void init() { + assert nextFreeOffset.get() == UNINITIALIZED; + try { + allocateDataBuffer(); + } catch (OutOfMemoryError e) { + boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM); + assert failInit; // should be true. + throw e; + } + // Mark that it's ready for use + // Move 8 bytes since the first 8 bytes are having the chunkid in it + boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, Bytes.SIZEOF_LONG); + // We should always succeed the above CAS since only one thread + // calls init()! + Preconditions.checkState(initted, "Multiple threads tried to init same chunk"); + } + + abstract void allocateDataBuffer(); /** * Reset the offset to UNINITIALIZED before before reusing an old chunk @@ -96,7 +124,7 @@ public abstract class Chunk { if (oldOffset + size > data.capacity()) { return -1; // alloc doesn't fit } - + // TODO : If seqID is to be written add 8 bytes here for nextFreeOFfset // Try to atomically claim this chunk if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) { // we got the alloc diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index be4cca0..42d47af 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1507,8 +1507,11 @@ public class HRegionServer extends HasThread implements float initialCountPercentage = conf.getFloat(MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT); int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT); + // init the chunkCreator + MSLABChunkCreator chunkCreator = + MSLABChunkCreator.initialize(chunkSize, offheap); MemStoreChunkPool pool = MemStoreChunkPool.initialize(globalMemStoreSize, poolSizePercentage, - initialCountPercentage, chunkSize, offheap); + initialCountPercentage, chunkCreator); if (pool != null && this.hMemManager != null) { // Register with Heap Memory manager this.hMemManager.registerTuneObserver(pool); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MSLABChunkCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MSLABChunkCreator.java new file mode 100644 index 0000000..9f1792f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MSLABChunkCreator.java @@ -0,0 +1,120 @@ + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Does the management of memstoreLAB chunk creations. A monotonically incrementing id is associated + * with every chunk + */ +@InterfaceAudience.Private +public class MSLABChunkCreator { + // monotonically increasing chunkid + private AtomicLong chunkID = new AtomicLong(1); + // maps the chunk against the monotonically increasing chunk id. We need to preserver the + // natural ordering of the key + private ConcurrentHashMap chunkIdMap = + new ConcurrentHashMap(); + private int chunkSize; + private boolean offheap; + @VisibleForTesting + static MSLABChunkCreator INSTANCE; + + @VisibleForTesting + MSLABChunkCreator(int chunkSize, boolean offheap) { + this.chunkSize = chunkSize; + this.offheap = offheap; + } + + /** + * Return the instance of MSLABChunkCreator + * @param chunkSize the chunkSize + * @param offheap indicates if the chunk is to be created offheap or not + * @return singleton MSLABChunkCreator + */ + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "LI_LAZY_INIT_STATIC", + justification = "Method is called by single thread at the starting of RS") + static MSLABChunkCreator initialize(int chunkSize, boolean offheap) { + if (INSTANCE != null) return INSTANCE; + INSTANCE = new MSLABChunkCreator(chunkSize, offheap); + return INSTANCE; + } + + static MSLABChunkCreator getChunkCreator() { + return INSTANCE; + } + + /** + * Creates the chunk either onheap or offheap + * @return the chunk + */ + Chunk createChunk() { + return createChunk(false); + } + + /** + * Creates the chunk either onheap or offheap + * @param forceOnheapOnly indicates if the chunks have to be onheap only though the chunkCreator is + * built for offheap + * @return the chunk + */ + Chunk createChunk(boolean forceOnheapOnly) { + Chunk memstoreLABChunk; + long id = chunkID.getAndIncrement(); + if (this.offheap && !forceOnheapOnly) { + memstoreLABChunk = new OffheapChunk(chunkSize, id); + } else { + memstoreLABChunk = new OnheapChunk(chunkSize, id); + } + chunkIdMap.put(id, memstoreLABChunk); + return memstoreLABChunk; + } + + Chunk getChunk(int id) { + return this.chunkIdMap.get(id); + } + + int getChunkSize() { + return this.chunkSize; + } + + boolean isOffheap() { + return this.offheap; + } + + void removeChunks(Collection chunkIds) { + for (long chunkId : chunkIds) { + this.chunkIdMap.remove(chunkId); + } + } + + @VisibleForTesting + void clearChunks() { + this.chunkIdMap.clear(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java index b7ac212..0a9162f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java @@ -62,7 +62,6 @@ public class MemStoreChunkPool implements HeapMemoryTuneObserver { // A queue of reclaimed chunks private final BlockingQueue reclaimedChunks; - private final int chunkSize; private final float poolSizePercentage; /** Statistics thread schedule pool */ @@ -71,17 +70,16 @@ public class MemStoreChunkPool implements HeapMemoryTuneObserver { private static final int statThreadPeriod = 60 * 5; private final AtomicLong chunkCount = new AtomicLong(); private final AtomicLong reusedChunkCount = new AtomicLong(); - private final boolean offheap; + private MSLABChunkCreator chunkCreator; - MemStoreChunkPool(int chunkSize, int maxCount, int initialCount, float poolSizePercentage, - boolean offheap) { + MemStoreChunkPool(int maxCount, int initialCount, float poolSizePercentage, + MSLABChunkCreator chunkCreator) { this.maxCount = maxCount; - this.chunkSize = chunkSize; this.poolSizePercentage = poolSizePercentage; - this.offheap = offheap; + this.chunkCreator = chunkCreator; this.reclaimedChunks = new LinkedBlockingQueue<>(); for (int i = 0; i < initialCount; i++) { - Chunk chunk = this.offheap ? new OffheapChunk(chunkSize) : new OnheapChunk(chunkSize); + Chunk chunk = this.chunkCreator.createChunk(); chunk.init(); reclaimedChunks.add(chunk); } @@ -113,7 +111,7 @@ public class MemStoreChunkPool implements HeapMemoryTuneObserver { while (true) { long created = this.chunkCount.get(); if (created < this.maxCount) { - chunk = this.offheap ? new OffheapChunk(this.chunkSize) : new OnheapChunk(this.chunkSize); + chunk = this.chunkCreator.createChunk(); if (this.chunkCount.compareAndSet(created, created + 1)) { break; } @@ -191,7 +189,7 @@ public class MemStoreChunkPool implements HeapMemoryTuneObserver { @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "LI_LAZY_INIT_STATIC", justification = "Method is called by single thread at the starting of RS") static MemStoreChunkPool initialize(long globalMemStoreSize, float poolSizePercentage, - float initialCountPercentage, int chunkSize, boolean offheap) { + float initialCountPercentage, MSLABChunkCreator chunkCreator) { if (GLOBAL_INSTANCE != null) return GLOBAL_INSTANCE; if (chunkPoolDisabled) return null; @@ -203,16 +201,17 @@ public class MemStoreChunkPool implements HeapMemoryTuneObserver { throw new IllegalArgumentException( MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY + " must be between 0.0 and 1.0"); } - int maxCount = (int) (globalMemStoreSize * poolSizePercentage / chunkSize); + int maxCount = (int) (globalMemStoreSize * poolSizePercentage / chunkCreator.getChunkSize()); if (initialCountPercentage > 1.0 || initialCountPercentage < 0) { throw new IllegalArgumentException( MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY + " must be between 0.0 and 1.0"); } int initialCount = (int) (initialCountPercentage * maxCount); - LOG.info("Allocating MemStoreChunkPool with chunk size " + StringUtils.byteDesc(chunkSize) - + ", max count " + maxCount + ", initial count " + initialCount); - GLOBAL_INSTANCE = new MemStoreChunkPool(chunkSize, maxCount, initialCount, poolSizePercentage, - offheap); + LOG.info("Allocating MemStoreChunkPool with chunk size " + + StringUtils.byteDesc(chunkCreator.getChunkSize()) + ", max count " + maxCount + + ", initial count " + initialCount); + GLOBAL_INSTANCE = + new MemStoreChunkPool(maxCount, initialCount, poolSizePercentage, chunkCreator); return GLOBAL_INSTANCE; } @@ -235,11 +234,12 @@ public class MemStoreChunkPool implements HeapMemoryTuneObserver { @Override public void onHeapMemoryTune(long newMemstoreSize, long newBlockCacheSize) { // don't do any tuning in case of offheap memstore - if (this.offheap) { + if (this.chunkCreator.isOffheap()) { LOG.warn("Not tuning the chunk pool as it is offheap"); return; } - int newMaxCount = (int) (newMemstoreSize * poolSizePercentage / chunkSize); + int newMaxCount = + (int) (newMemstoreSize * poolSizePercentage / this.chunkCreator.getChunkSize()); if (newMaxCount != this.maxCount) { // We need an adjustment in the chunks numbers if (newMaxCount > this.maxCount) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java index 4e87135..7614acb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java @@ -18,7 +18,9 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -74,6 +76,7 @@ public class MemStoreLABImpl implements MemStoreLAB { private final int chunkSize; private final int maxAlloc; private final MemStoreChunkPool chunkPool; + private final MSLABChunkCreator chunkCreator; // This flag is for closing this instance, its set when clearing snapshot of // memstore @@ -83,6 +86,8 @@ public class MemStoreLABImpl implements MemStoreLAB { private AtomicBoolean reclaimed = new AtomicBoolean(false); // Current count of open scanners which reading data from this MemStoreLAB private final AtomicInteger openScannerCount = new AtomicInteger(); + // Stores all the chunkIds that are used by this MSLAB when there is no chunkPool + private final Map chunkIds = new ConcurrentHashMap(); // Used in testing public MemStoreLABImpl() { @@ -93,19 +98,18 @@ public class MemStoreLABImpl implements MemStoreLAB { chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT); maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT); this.chunkPool = MemStoreChunkPool.getPool(); + this.chunkCreator = MSLABChunkCreator.getChunkCreator(); // currently chunkQueue is only used for chunkPool if (this.chunkPool != null) { // set queue length to chunk pool max count to avoid keeping reference of // too many non-reclaimable chunks pooledChunkQueue = new LinkedBlockingQueue<>(chunkPool.getMaxCount()); } - // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one! Preconditions.checkArgument(maxAlloc <= chunkSize, MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY); } - @Override public Cell copyCellInto(Cell cell) { int size = KeyValueUtil.length(cell); @@ -130,7 +134,7 @@ public class MemStoreLABImpl implements MemStoreLAB { // try to retire this chunk tryRetireChunk(c); } - return CellUtil.copyCellTo(cell, c.getData(), allocOffset, size); + return CellUtil.copyToChunkCell(cell, c.getData(), allocOffset, size); } /** @@ -142,8 +146,10 @@ public class MemStoreLABImpl implements MemStoreLAB { this.closed = true; // We could put back the chunks to pool for reusing only when there is no // opening scanner which will read their data - if (chunkPool != null && openScannerCount.get() == 0 - && reclaimed.compareAndSet(false, true)) { + int count = openScannerCount.get(); + if (this.chunkPool == null && count == 0) { + this.chunkCreator.removeChunks(this.chunkIds.keySet()); + } else if (chunkPool != null && count == 0 && reclaimed.compareAndSet(false, true)) { chunkPool.putbackChunks(this.pooledChunkQueue); } } @@ -162,7 +168,9 @@ public class MemStoreLABImpl implements MemStoreLAB { @Override public void decScannerCount() { int count = this.openScannerCount.decrementAndGet(); - if (this.closed && chunkPool != null && count == 0 + if (this.closed && this.chunkPool == null && count == 0) { + this.chunkCreator.removeChunks(this.chunkIds.keySet()); + } else if (this.closed && chunkPool != null && count == 0 && reclaimed.compareAndSet(false, true)) { chunkPool.putbackChunks(this.pooledChunkQueue); } @@ -208,7 +216,12 @@ public class MemStoreLABImpl implements MemStoreLAB { // This is chunk from pool pooledChunk = true; } else { - c = new OnheapChunk(chunkSize);// When chunk is not from pool, always make it as on heap. + // When chunk is not from pool, always make it as on heap. + c = this.chunkCreator.createChunk(true); + if (chunkPool == null) { + // better to store chunkid though it is still no the curChunk + chunkIds.put(c.getId(), true); + } } if (curChunk.compareAndSet(null, c)) { // we won race - now we need to actually do the expensive diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java index ed98cfa..484d28d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OffheapChunk.java @@ -21,34 +21,21 @@ import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import com.google.common.base.Preconditions; - /** * An off heap chunk implementation. */ @InterfaceAudience.Private public class OffheapChunk extends Chunk { - OffheapChunk(int size) { - super(size); + OffheapChunk(int size, long id) { + super(size, id); } @Override - public void init() { - assert nextFreeOffset.get() == UNINITIALIZED; - try { - if (data == null) { - data = ByteBuffer.allocateDirect(this.size); - } - } catch (OutOfMemoryError e) { - boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM); - assert failInit; // should be true. - throw e; + void allocateDataBuffer() { + if (data == null) { + data = ByteBuffer.allocateDirect(this.size); + data.putLong(0, this.getId()); } - // Mark that it's ready for use - boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, 0); - // We should always succeed the above CAS since only one thread - // calls init()! - Preconditions.checkState(initted, "Multiple threads tried to init same chunk"); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java index bd33cb5..d94a1d3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnheapChunk.java @@ -29,25 +29,15 @@ import com.google.common.base.Preconditions; @InterfaceAudience.Private public class OnheapChunk extends Chunk { - OnheapChunk(int size) { - super(size); + OnheapChunk(int size, long id) { + super(size, id); } - public void init() { - assert nextFreeOffset.get() == UNINITIALIZED; - try { - if (data == null) { - data = ByteBuffer.allocate(this.size); - } - } catch (OutOfMemoryError e) { - boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM); - assert failInit; // should be true. - throw e; + @Override + void allocateDataBuffer() { + if (data == null) { + data = ByteBuffer.allocate(this.size); + data.putLong(0, this.getId()); } - // Mark that it's ready for use - boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, 0); - // We should always succeed the above CAS since only one thread - // calls init()! - Preconditions.checkState(initted, "Multiple threads tried to init same chunk"); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index 63bbe65..a294e02 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -92,7 +92,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore { long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); chunkPool = MemStoreChunkPool.initialize(globalMemStoreLimit, 0.2f, - MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false); + MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, + MSLABChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false)); assertTrue(chunkPool != null); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index e6d3147..10b2517 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.wal.WALFactory; +import org.junit.AfterClass; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -84,6 +85,7 @@ public class TestDefaultMemStore { protected static final byte[] FAMILY = Bytes.toBytes("column"); protected MultiVersionConcurrencyControl mvcc; protected AtomicLong startSeqNum = new AtomicLong(0); + protected MSLABChunkCreator chunkCreator; private String getName() { return this.name.getMethodName(); @@ -92,9 +94,16 @@ public class TestDefaultMemStore { @Before public void setUp() throws Exception { internalSetUp(); + this.chunkCreator = + MSLABChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false); this.memstore = new DefaultMemStore(); } + @AfterClass + public static void tearDownClass() throws Exception { + MSLABChunkCreator.getChunkCreator().clearChunks(); + } + protected void internalSetUp() throws Exception { this.mvcc = new MultiVersionConcurrencyControl(); } @@ -131,7 +140,9 @@ public class TestDefaultMemStore { assertEquals(Segment.getCellLength(kv), sizeChangeForSecondCell.getDataSize()); // make sure chunk size increased even when writing the same cell, if using MSLAB if (msLab instanceof MemStoreLABImpl) { - assertEquals(2 * Segment.getCellLength(kv), + // since we add the chunkID at the 0th offset of the chunk and the + // chunkid is a long we need to account for those 8 bytes + assertEquals(2 * Segment.getCellLength(kv) + Bytes.SIZEOF_LONG, ((MemStoreLABImpl) msLab).getCurrentChunk().getNextFreeOffset()); } } else { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java index 42aad5c..bdf2fa6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java @@ -60,7 +60,8 @@ public class TestMemStoreChunkPool { long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); chunkPool = MemStoreChunkPool.initialize(globalMemStoreLimit, 0.2f, - MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false); + MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, + MSLABChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false)); assertTrue(chunkPool != null); } @@ -90,7 +91,7 @@ public class TestMemStoreChunkPool { int size = KeyValueUtil.length(kv); ByteBufferKeyValue newKv = (ByteBufferKeyValue) mslab.copyCellInto(kv); if (newKv.getBuffer() != lastBuffer) { - expectedOff = 0; + expectedOff = 8; lastBuffer = newKv.getBuffer(); } assertEquals(expectedOff, newKv.getOffset()); @@ -222,7 +223,9 @@ public class TestMemStoreChunkPool { final int initialCount = 5; final int chunkSize = 30; final int valSize = 7; - MemStoreChunkPool pool = new MemStoreChunkPool(chunkSize, maxCount, initialCount, 1, false); + MSLABChunkCreator oldCreator = MSLABChunkCreator.getChunkCreator(); + MSLABChunkCreator newCreator = new MSLABChunkCreator(chunkSize, false); + MemStoreChunkPool pool = new MemStoreChunkPool(maxCount, initialCount, 1, newCreator); assertEquals(initialCount, pool.getPoolSize()); assertEquals(maxCount, pool.getMaxCount()); MemStoreChunkPool.GLOBAL_INSTANCE = pool;// Replace the global ref with the new one we created. @@ -255,6 +258,7 @@ public class TestMemStoreChunkPool { assertTrue(pool.getPoolSize() <= maxCount); } finally { MemStoreChunkPool.GLOBAL_INSTANCE = oldPool; + MSLABChunkCreator.INSTANCE = oldCreator; } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java index 141b802..315d604 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java @@ -64,7 +64,7 @@ public class TestMemStoreLAB { long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); MemStoreChunkPool.initialize(globalMemStoreLimit, 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, - MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false); + MSLABChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false)); } /** @@ -85,7 +85,9 @@ public class TestMemStoreLAB { int size = KeyValueUtil.length(kv); ByteBufferKeyValue newKv = (ByteBufferKeyValue) mslab.copyCellInto(kv); if (newKv.getBuffer() != lastBuffer) { - expectedOff = 0; + // since we add the chunkID at the 0th offset of the chunk and the + // chunkid is a long we need to account for those 8 bytes + expectedOff = Bytes.SIZEOF_LONG; lastBuffer = newKv.getBuffer(); } assertEquals(expectedOff, newKv.getOffset()); @@ -136,23 +138,21 @@ public class TestMemStoreLAB { }; ctx.addThread(t); } - + ctx.startThreads(); while (totalAllocated.get() < 50*1024*1024 && ctx.shouldRun()) { Thread.sleep(10); } ctx.stop(); - // Partition the allocations by the actual byte[] they point into, // make sure offsets are unique for each chunk Map> mapsByChunk = Maps.newHashMap(); - + int sizeCounted = 0; for (AllocRecord rec : Iterables.concat(allocations)) { sizeCounted += rec.size; if (rec.size == 0) continue; - Map mapForThisByteArray = mapsByChunk.get(rec.alloc); if (mapForThisByteArray == null) { @@ -167,7 +167,9 @@ public class TestMemStoreLAB { // Now check each byte array to make sure allocations don't overlap for (Map allocsInChunk : mapsByChunk.values()) { - int expectedOff = 0; + // since we add the chunkID at the 0th offset of the chunk and the + // chunkid is a long we need to account for those 8 bytes + int expectedOff = Bytes.SIZEOF_LONG; for (AllocRecord alloc : allocsInChunk.values()) { assertEquals(expectedOff, alloc.offset); assertTrue("Allocation overruns buffer",