diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java index f22a6e5..970abc0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java @@ -24,6 +24,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.util.ByteRange; @@ -60,10 +64,14 @@ public class HeapMemStoreLAB implements MemStoreLAB { static final String MAX_ALLOC_KEY = "hbase.hregion.memstore.mslab.max.allocation"; static final int MAX_ALLOC_DEFAULT = 256 * 1024; // allocs bigger than this don't go through // allocator + // capacity of chunk queue + static final String CHUNK_QUEUE_LENGTH = "hbase.hregion.memstore.mslab.chunkqueue.length"; + + static final Log LOG = LogFactory.getLog(HeapMemStoreLAB.class); private AtomicReference curChunk = new AtomicReference(); - // A queue of chunks contained by this memstore - private BlockingQueue chunkQueue = new LinkedBlockingQueue(); + // A queue of chunks contained by this memstore, used with chunk pool + private BlockingQueue chunkQueue = null; final int chunkSize; final int maxAlloc; private final MemStoreChunkPool chunkPool; @@ -86,6 +94,14 @@ public class HeapMemStoreLAB implements MemStoreLAB { chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT); maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT); this.chunkPool = MemStoreChunkPool.getPool(conf); + // currently chunkQueue is only used for chunkPool + if (this.chunkPool != null) { + long memstoreFlushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, + HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE); + // by default max heap size of chunk queue won't exceed memstoreFlushSize + int queueLength = conf.getInt(CHUNK_QUEUE_LENGTH, (int) (memstoreFlushSize / chunkSize)); + chunkQueue = new LinkedBlockingQueue(queueLength); + } // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one! Preconditions.checkArgument( @@ -123,6 +139,13 @@ public class HeapMemStoreLAB implements MemStoreLAB { // not enough space! // try to retire this chunk tryRetireChunk(c); + + // chunk already initialized, so if chunk queue not null, we must remove the chunk from queue + // after retiring it or else it might cause memory leak, especially when there're lots of + // cells with same key + if (chunkQueue != null) { + chunkQueue.remove(c); + } } } @@ -196,7 +219,9 @@ public class HeapMemStoreLAB implements MemStoreLAB { // we won race - now we need to actually do the expensive // allocation step c.init(); - this.chunkQueue.add(c); + if (chunkQueue != null && !this.chunkQueue.offer(c)) { + LOG.debug("Chunk queue is full, won't reuse this new chunk"); + } return c; } else if (chunkPool != null) { chunkPool.putbackChunk(c);