diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java index 625811a..d8fa5c3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java @@ -24,6 +24,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.util.ByteRange; @@ -62,9 +64,11 @@ public class HeapMemStoreLAB implements MemStoreLAB { static final int MAX_ALLOC_DEFAULT = 256 * 1024; // allocs bigger than this don't go through // allocator + static final Log LOG = LogFactory.getLog(HeapMemStoreLAB.class); + private AtomicReference curChunk = new AtomicReference(); - // A queue of chunks contained by this memstore - private BlockingQueue chunkQueue = new LinkedBlockingQueue(); + // A queue of chunks contained by this memstore, used with chunk pool + private BlockingQueue chunkQueue = null; final int chunkSize; final int maxAlloc; private final MemStoreChunkPool chunkPool; @@ -87,6 +91,12 @@ public class HeapMemStoreLAB implements MemStoreLAB { chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT); maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT); this.chunkPool = MemStoreChunkPool.getPool(conf); + // currently chunkQueue is only used for chunkPool + if (this.chunkPool != null) { + // set queue length to chunk pool max count to avoid keeping reference of + // too many non-reclaimable chunks + chunkQueue = new LinkedBlockingQueue(chunkPool.getMaxCount()); + } // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one! Preconditions.checkArgument( @@ -166,6 +176,8 @@ public class HeapMemStoreLAB implements MemStoreLAB { * Try to retire the current chunk if it is still * c. Postcondition is that curChunk.get() * != c + * @param c the chunk to retire + * @return true if we won the race to retire the chunk */ private void tryRetireChunk(Chunk c) { curChunk.compareAndSet(c, null); @@ -197,7 +209,12 @@ public class HeapMemStoreLAB implements MemStoreLAB { // we won race - now we need to actually do the expensive // allocation step c.init(); - this.chunkQueue.add(c); + if (chunkQueue != null && !this.closed && !this.chunkQueue.offer(c)) { + if (LOG.isTraceEnabled()) { + LOG.trace("Chunk queue is full, won't reuse this new chunk. Current queue size: " + + chunkQueue.size()); + } + } return c; } else if (chunkPool != null) { chunkPool.putbackChunk(c); @@ -212,6 +229,11 @@ public class HeapMemStoreLAB implements MemStoreLAB { return this.curChunk.get(); } + @VisibleForTesting + BlockingQueue getChunkQueue() { + return this.chunkQueue; + } + /** * A chunk of memory out of which allocations are sliced. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java index 6285060..81b6046 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil; import org.apache.hadoop.hbase.regionserver.HeapMemStoreLAB.Chunk; import org.apache.hadoop.util.StringUtils; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -123,6 +124,13 @@ public class MemStoreChunkPool { return; } chunks.drainTo(reclaimedChunks, maxNumToPutback); + // clear reference of any non-reclaimable chunks + if (chunks.size() > 0) { + if (LOG.isTraceEnabled()) { + LOG.trace("Left " + chunks.size() + " unreclaimable chunks, removing them from queue"); + } + chunks.clear(); + } } /** @@ -217,4 +225,13 @@ public class MemStoreChunkPool { } } + int getMaxCount() { + return this.maxCount; + } + + @VisibleForTesting + static void clearDisableFlag() { + chunkPoolDisabled = false; + } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java index 170bdd4..34caf97 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java @@ -20,12 +20,14 @@ package org.apache.hadoop.hbase.regionserver; import static org.junit.Assert.*; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.MultithreadedTestUtil; import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; import org.apache.hadoop.hbase.testclassification.RegionServerTests; @@ -37,6 +39,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.primitives.Ints; + import org.junit.experimental.categories.Category; @Category({RegionServerTests.class, SmallTests.class}) @@ -149,7 +152,78 @@ public class TestMemStoreLAB { } } - + + /** + * Test frequent chunk retirement with chunk pool triggered by lots of threads, making sure + * there's no memory leak (HBASE-16195) + * @throws Exception if any error occurred + */ + @Test + public void testLABChunkQueue() throws Exception { + HeapMemStoreLAB mslab = new HeapMemStoreLAB(); + // by default setting, there should be no chunk queue initialized + assertNull(mslab.getChunkQueue()); + // reset mslab with chunk pool + Configuration conf = HBaseConfiguration.create(); + conf.setDouble(MemStoreChunkPool.CHUNK_POOL_MAXSIZE_KEY, 0.1); + // set chunk size to default max alloc size, so we could easily trigger chunk retirement + conf.setLong(HeapMemStoreLAB.CHUNK_SIZE_KEY, HeapMemStoreLAB.MAX_ALLOC_DEFAULT); + // reconstruct mslab + MemStoreChunkPool.clearDisableFlag(); + mslab = new HeapMemStoreLAB(conf); + // launch multiple threads to trigger frequent chunk retirement + List threads = new ArrayList(); + for (int i = 0; i < 10; i++) { + threads.add(getChunkQueueTestThread(mslab, "testLABChunkQueue-" + i)); + } + for (Thread thread : threads) { + thread.start(); + } + // let it run for some time + Thread.sleep(1000); + for (Thread thread : threads) { + thread.interrupt(); + } + boolean threadsRunning = true; + while (threadsRunning) { + for (Thread thread : threads) { + if (thread.isAlive()) { + threadsRunning = true; + break; + } + } + threadsRunning = false; + } + // close the mslab + mslab.close(); + // make sure all chunks reclaimed or removed from chunk queue + int queueLength = mslab.getChunkQueue().size(); + assertTrue("All chunks in chunk queue should be reclaimed or removed" + + " after mslab closed but actually: " + queueLength, queueLength == 0); + } + + private Thread getChunkQueueTestThread(final HeapMemStoreLAB mslab, String threadName) { + Thread thread = new Thread() { + boolean stopped = false; + + @Override + public void run() { + while (!stopped) { + // keep triggering chunk retirement + mslab.allocateBytes(HeapMemStoreLAB.MAX_ALLOC_DEFAULT - 1); + } + } + + @Override + public void interrupt() { + this.stopped = true; + } + }; + thread.setName(threadName); + thread.setDaemon(true); + return thread; + } + private static class AllocRecord implements Comparable{ private final ByteRange alloc; private final int size;