diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java index d8fa5c3..c1c5a1f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java @@ -66,9 +66,9 @@ public class HeapMemStoreLAB implements MemStoreLAB { static final Log LOG = LogFactory.getLog(HeapMemStoreLAB.class); - private AtomicReference curChunk = new AtomicReference(); + private AtomicReference curChunk = new AtomicReference(); // A queue of chunks contained by this memstore, used with chunk pool - private BlockingQueue chunkQueue = null; + private BlockingQueue chunkQueue = null; final int chunkSize; final int maxAlloc; private final MemStoreChunkPool chunkPool; @@ -95,7 +95,7 @@ public class HeapMemStoreLAB implements MemStoreLAB { if (this.chunkPool != null) { // set queue length to chunk pool max count to avoid keeping reference of // too many non-reclaimable chunks - chunkQueue = new LinkedBlockingQueue(chunkPool.getMaxCount()); + chunkQueue = new LinkedBlockingQueue(chunkPool.getMaxCount()); } // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one! @@ -121,14 +121,14 @@ public class HeapMemStoreLAB implements MemStoreLAB { } while (true) { - Chunk c = getOrMakeChunk(); + MemstoreLABChunk c = getOrMakeChunk(); // Try to allocate from this chunk int allocOffset = c.alloc(size); if (allocOffset != -1) { // We succeeded - this is the common case - small alloc // from a big buffer - return new SimpleMutableByteRange(c.data, allocOffset, size); + return new SimpleMutableByteRange(c.getData(), allocOffset, size); } // not enough space! @@ -166,7 +166,7 @@ public class HeapMemStoreLAB implements MemStoreLAB { @Override public void decScannerCount() { int count = this.openScannerCount.decrementAndGet(); - if (chunkPool != null && count == 0 && this.closed + if (this.closed && chunkPool != null && count == 0 && reclaimed.compareAndSet(false, true)) { chunkPool.putbackChunks(this.chunkQueue); } @@ -179,7 +179,7 @@ public class HeapMemStoreLAB implements MemStoreLAB { * @param c the chunk to retire * @return true if we won the race to retire the chunk */ - private void tryRetireChunk(Chunk c) { + private void tryRetireChunk(MemstoreLABChunk c) { curChunk.compareAndSet(c, null); // If the CAS succeeds, that means that we won the race // to retire the chunk. We could use this opportunity to @@ -193,10 +193,10 @@ public class HeapMemStoreLAB implements MemStoreLAB { * Get the current chunk, or, if there is no current chunk, * allocate a new one from the JVM. */ - private Chunk getOrMakeChunk() { + private MemstoreLABChunk getOrMakeChunk() { while (true) { // Try to get the chunk - Chunk c = curChunk.get(); + MemstoreLABChunk c = curChunk.get(); if (c != null) { return c; } @@ -204,19 +204,27 @@ public class HeapMemStoreLAB implements MemStoreLAB { // No current chunk, so we want to allocate one. We race // against other allocators to CAS in an uninitialized chunk // (which is cheap to allocate) - c = (chunkPool != null) ? chunkPool.getChunk() : new Chunk(chunkSize); + if (chunkPool != null) { + c = chunkPool.getChunk(); + } + if (c == null) { + c = new MemstoreLABChunk(chunkSize, false); + } if (curChunk.compareAndSet(null, c)) { // we won race - now we need to actually do the expensive // allocation step c.init(); - if (chunkQueue != null && !this.closed && !this.chunkQueue.offer(c)) { - if (LOG.isTraceEnabled()) { - LOG.trace("Chunk queue is full, won't reuse this new chunk. Current queue size: " - + chunkQueue.size()); + if (c.isPooled()) { + // Its already pooled chunk. No need to init again. + if (!this.closed && !this.chunkQueue.offer(c)) { + if (LOG.isTraceEnabled()) { + LOG.trace("Chunk queue is full, won't reuse this new chunk. Current queue size: " + + chunkQueue.size()); + } } } return c; - } else if (chunkPool != null) { + } else if (c.isPooled()) { chunkPool.putbackChunk(c); } // someone else won race - that's fine, we'll try to grab theirs @@ -225,124 +233,12 @@ public class HeapMemStoreLAB implements MemStoreLAB { } @VisibleForTesting - Chunk getCurrentChunk() { + MemstoreLABChunk getCurrentChunk() { return this.curChunk.get(); } @VisibleForTesting - BlockingQueue getChunkQueue() { + BlockingQueue getChunkQueue() { return this.chunkQueue; } - - /** - * A chunk of memory out of which allocations are sliced. - */ - static class Chunk { - /** Actual underlying data */ - private byte[] data; - - private static final int UNINITIALIZED = -1; - private static final int OOM = -2; - /** - * Offset for the next allocation, or the sentinel value -1 - * which implies that the chunk is still uninitialized. - * */ - private AtomicInteger nextFreeOffset = new AtomicInteger(UNINITIALIZED); - - /** Total number of allocations satisfied from this buffer */ - private AtomicInteger allocCount = new AtomicInteger(); - - /** Size of chunk in bytes */ - private final int size; - - /** - * Create an uninitialized chunk. Note that memory is not allocated yet, so - * this is cheap. - * @param size in bytes - */ - Chunk(int size) { - this.size = size; - } - - /** - * Actually claim the memory for this chunk. This should only be called from - * the thread that constructed the chunk. It is thread-safe against other - * threads calling alloc(), who will block until the allocation is complete. - */ - public void init() { - assert nextFreeOffset.get() == UNINITIALIZED; - try { - if (data == null) { - data = new byte[size]; - } - } catch (OutOfMemoryError e) { - boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM); - assert failInit; // should be true. - throw e; - } - // Mark that it's ready for use - boolean initted = nextFreeOffset.compareAndSet( - UNINITIALIZED, 0); - // We should always succeed the above CAS since only one thread - // calls init()! - Preconditions.checkState(initted, - "Multiple threads tried to init same chunk"); - } - - /** - * Reset the offset to UNINITIALIZED before before reusing an old chunk - */ - void reset() { - if (nextFreeOffset.get() != UNINITIALIZED) { - nextFreeOffset.set(UNINITIALIZED); - allocCount.set(0); - } - } - - /** - * Try to allocate size bytes from the chunk. - * @return the offset of the successful allocation, or -1 to indicate not-enough-space - */ - public int alloc(int size) { - while (true) { - int oldOffset = nextFreeOffset.get(); - if (oldOffset == UNINITIALIZED) { - // The chunk doesn't have its data allocated yet. - // Since we found this in curChunk, we know that whoever - // CAS-ed it there is allocating it right now. So spin-loop - // shouldn't spin long! - Thread.yield(); - continue; - } - if (oldOffset == OOM) { - // doh we ran out of ram. return -1 to chuck this away. - return -1; - } - - if (oldOffset + size > data.length) { - return -1; // alloc doesn't fit - } - - // Try to atomically claim this chunk - if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) { - // we got the alloc - allocCount.incrementAndGet(); - return oldOffset; - } - // we raced and lost alloc, try again - } - } - - @Override - public String toString() { - return "Chunk@" + System.identityHashCode(this) + - " allocs=" + allocCount.get() + "waste=" + - (data.length - nextFreeOffset.get()); - } - - @VisibleForTesting - int getNextFreeOffset() { - return this.nextFreeOffset.get(); - } - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java index 81b6046..166b2b2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java @@ -31,7 +31,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil; -import org.apache.hadoop.hbase.regionserver.HeapMemStoreLAB.Chunk; import org.apache.hadoop.util.StringUtils; import com.google.common.annotations.VisibleForTesting; @@ -68,49 +67,69 @@ public class MemStoreChunkPool { private final int maxCount; // A queue of reclaimed chunks - private final BlockingQueue reclaimedChunks; + private final BlockingQueue reclaimedChunks; private final int chunkSize; /** Statistics thread schedule pool */ private final ScheduledExecutorService scheduleThreadPool; /** Statistics thread */ private static final int statThreadPeriod = 60 * 5; - private AtomicLong createdChunkCount = new AtomicLong(); - private AtomicLong reusedChunkCount = new AtomicLong(); + private final AtomicLong createdChunkCount = new AtomicLong(); + private final AtomicLong reusedChunkCount = new AtomicLong(); MemStoreChunkPool(Configuration conf, int chunkSize, int maxCount, int initialCount) { this.maxCount = maxCount; this.chunkSize = chunkSize; - this.reclaimedChunks = new LinkedBlockingQueue(); + this.reclaimedChunks = new LinkedBlockingQueue(); for (int i = 0; i < initialCount; i++) { - Chunk chunk = new Chunk(chunkSize); + MemstoreLABChunk chunk = new MemstoreLABChunk(chunkSize, true); chunk.init(); reclaimedChunks.add(chunk); } - final String n = Thread.currentThread().getName(); - scheduleThreadPool = Executors.newScheduledThreadPool(1, - new ThreadFactoryBuilder().setNameFormat(n+"-MemStoreChunkPool Statistics") - .setDaemon(true).build()); - this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), - statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS); + createdChunkCount.set(initialCount); + // StatisticsThread log statistics at Debug level. There is no point in starting this thread if + // log level is above debug. + if (LOG.isDebugEnabled()) { + final String n = Thread.currentThread().getName(); + scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() + .setNameFormat(n + "-MemStoreChunkPool Statistics").setDaemon(true).build()); + this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(), statThreadPeriod, + statThreadPeriod, TimeUnit.SECONDS); + } else { + scheduleThreadPool = null; + } } /** - * Poll a chunk from the pool, reset it if not null, else create a new chunk - * to return + * Poll a chunk from the pool, reset it if not null, else create a new chunk to return if we have + * not yet created max allowed chunks count. When we have already created max allowed chunks and + * no free chunks as of now, return null. It is the responsibility of the caller to make a chunk + * then. + * Note: Chunks returned by this pool must be put back to the pool after its use. * @return a chunk + * @see #putbackChunk(MemstoreLABChunk) + * @see #putbackChunks(BlockingQueue) */ - Chunk getChunk() { - Chunk chunk = reclaimedChunks.poll(); - if (chunk == null) { - chunk = new Chunk(chunkSize); - createdChunkCount.incrementAndGet(); - } else { + MemstoreLABChunk getChunk() { + MemstoreLABChunk chunk = reclaimedChunks.poll(); + if (chunk != null) { chunk.reset(); reusedChunkCount.incrementAndGet(); + return chunk; + } + // Make a chunk iff we have not yet created the maxCount chunks + while (true) { + long created = this.createdChunkCount.get(); + if (created < this.maxCount) { + chunk = new MemstoreLABChunk(chunkSize, true); + if (this.createdChunkCount.compareAndSet(created, created + 1)) { + return chunk; + } + } else { + return null; + } } - return chunk; } /** @@ -118,18 +137,13 @@ public class MemStoreChunkPool { * skip the remaining chunks * @param chunks */ - void putbackChunks(BlockingQueue chunks) { - int maxNumToPutback = this.maxCount - reclaimedChunks.size(); - if (maxNumToPutback <= 0) { - return; - } - chunks.drainTo(reclaimedChunks, maxNumToPutback); - // clear reference of any non-reclaimable chunks - if (chunks.size() > 0) { - if (LOG.isTraceEnabled()) { - LOG.trace("Left " + chunks.size() + " unreclaimable chunks, removing them from queue"); + void putbackChunks(BlockingQueue chunks) { + assert reclaimedChunks.size() < this.maxCount; + MemstoreLABChunk chunk = null; + while ((chunk = chunks.poll()) != null) { + if (chunk.isPooled()) { + reclaimedChunks.add(chunk); } - chunks.clear(); } } @@ -138,11 +152,11 @@ public class MemStoreChunkPool { * skip it * @param chunk */ - void putbackChunk(Chunk chunk) { - if (reclaimedChunks.size() >= this.maxCount) { - return; + void putbackChunk(MemstoreLABChunk chunk) { + assert reclaimedChunks.size() < this.maxCount; + if (chunk.isPooled()) { + reclaimedChunks.add(chunk); } - reclaimedChunks.add(chunk); } int getPoolSize() { @@ -156,31 +170,28 @@ public class MemStoreChunkPool { this.reclaimedChunks.clear(); } - private static class StatisticsThread extends Thread { - MemStoreChunkPool mcp; - - public StatisticsThread(MemStoreChunkPool mcp) { + private class StatisticsThread extends Thread { + StatisticsThread() { super("MemStoreChunkPool.StatisticsThread"); setDaemon(true); - this.mcp = mcp; } @Override public void run() { - mcp.logStats(); + logStats(); } - } - private void logStats() { - if (!LOG.isDebugEnabled()) return; - long created = createdChunkCount.get(); - long reused = reusedChunkCount.get(); - long total = created + reused; - LOG.debug("Stats: current pool size=" + reclaimedChunks.size() - + ",created chunk count=" + created - + ",reused chunk count=" + reused - + ",reuseRatio=" + (total == 0 ? "0" : StringUtils.formatPercent( - (float) reused / (float) total, 2))); + private void logStats() { + if (!LOG.isDebugEnabled()) return; + long created = createdChunkCount.get(); + long reused = reusedChunkCount.get(); + long total = created + reused; + LOG.debug("Stats: current pool size=" + reclaimedChunks.size() + + ",created chunk count=" + created + + ",reused chunk count=" + reused + + ",reuseRatio=" + (total == 0 ? "0" : StringUtils.formatPercent( + (float) reused / (float) total, 2))); + } } /** @@ -233,5 +244,4 @@ public class MemStoreChunkPool { static void clearDisableFlag() { chunkPoolDisabled = false; } - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreLABChunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreLABChunk.java new file mode 100644 index 0000000..982ba22 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreLABChunk.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +/** + * A chunk of memory out of which allocations are sliced. + */ +@InterfaceAudience.Private +public class MemstoreLABChunk { + /** Actual underlying data */ + private byte[] data; + + private static final int UNINITIALIZED = -1; + private static final int OOM = -2; + /** + * Offset for the next allocation, or the sentinel value -1 which implies that the chunk is still + * uninitialized. + */ + private AtomicInteger nextFreeOffset = new AtomicInteger(UNINITIALIZED); + + /** Total number of allocations satisfied from this buffer */ + private AtomicInteger allocCount = new AtomicInteger(); + + /** Size of chunk in bytes */ + private final int size; + + private final boolean pooled; // Whether this chunk comes from a pool or not + + /** + * Create an uninitialized chunk. Note that memory is not allocated yet, so this is cheap. + * + * @param size in bytes + * @param pooled Whether this chunk is part of a pool or not + */ + MemstoreLABChunk(int size, boolean pooled) { + this.size = size; + this.pooled = pooled; + } + + /** + * Actually claim the memory for this chunk. This should only be called from the thread that + * constructed the chunk. It is thread-safe against other threads calling alloc(), who will block + * until the allocation is complete. + */ + public void init() { + assert nextFreeOffset.get() == UNINITIALIZED; + try { + if (data == null) { + data = new byte[size]; + } + } catch (OutOfMemoryError e) { + boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM); + assert failInit; // should be true. + throw e; + } + // Mark that it's ready for use + boolean initted = nextFreeOffset.compareAndSet(UNINITIALIZED, 0); + // We should always succeed the above CAS since only one thread + // calls init()! + Preconditions.checkState(initted, "Multiple threads tried to init same chunk"); + } + + /** + * Reset the offset to UNINITIALIZED before before reusing an old chunk + */ + void reset() { + if (nextFreeOffset.get() != UNINITIALIZED) { + nextFreeOffset.set(UNINITIALIZED); + allocCount.set(0); + } + } + + /** + * Try to allocate size bytes from the chunk. + * + * @return the offset of the successful allocation, or -1 to indicate not-enough-space + */ + public int alloc(int size) { + while (true) { + int oldOffset = nextFreeOffset.get(); + if (oldOffset == UNINITIALIZED) { + // The chunk doesn't have its data allocated yet. + // Since we found this in curChunk, we know that whoever + // CAS-ed it there is allocating it right now. So spin-loop + // shouldn't spin long! + Thread.yield(); + continue; + } + if (oldOffset == OOM) { + // doh we ran out of ram. return -1 to chuck this away. + return -1; + } + + if (oldOffset + size > data.length) { + return -1; // alloc doesn't fit + } + + // Try to atomically claim this chunk + if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) { + // we got the alloc + allocCount.incrementAndGet(); + return oldOffset; + } + // we raced and lost alloc, try again + } + } + + /** + * @return This chunk's backing data. + */ + byte[] getData() { + return this.data; + } + + /** + * @return Whether this chunk is from a pool or not. + */ + boolean isPooled(){ + return this.pooled; + } + + @Override + public String toString() { + return "Chunk@" + System.identityHashCode(this) + " allocs=" + allocCount.get() + "waste=" + + (data.length - nextFreeOffset.get()); + } + + @VisibleForTesting + int getNextFreeOffset() { + return this.nextFreeOffset.get(); + } +}