diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 549f15e..fbf0f5f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -95,7 +95,6 @@ public class MemStore implements HeapSize { TimeRangeTracker timeRangeTracker; TimeRangeTracker snapshotTimeRangeTracker; - MemStoreChunkPool chunkPool; volatile MemStoreLAB allocator; volatile MemStoreLAB snapshotAllocator; @@ -121,11 +120,9 @@ public class MemStore implements HeapSize { this.size = new AtomicLong(DEEP_OVERHEAD); this.snapshotSize = 0; if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) { - this.chunkPool = MemStoreChunkPool.getPool(conf); - this.allocator = new MemStoreLAB(conf, chunkPool); + this.allocator = new MemStoreLAB(conf); } else { this.allocator = null; - this.chunkPool = null; } } @@ -161,7 +158,7 @@ public class MemStore implements HeapSize { this.snapshotAllocator = this.allocator; // Reset allocator so we get a fresh buffer for the new memstore if (allocator != null) { - this.allocator = new MemStoreLAB(conf, chunkPool); + this.allocator = new MemStoreLAB(conf); } timeOfOldestEdit = Long.MAX_VALUE; } @@ -569,9 +566,9 @@ public class MemStore implements HeapSize { // Get the KeyValues for the row/family/qualifier regardless of timestamp. // For this case we want to clean up any other puts KeyValue firstKv = KeyValue.createFirstOnRow( - kv.getBuffer(), kv.getRowOffset(), kv.getRowLength(), - kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength(), - kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength()); + kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), + kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(), + kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength()); SortedSet ss = kvset.tailSet(firstKv); Iterator it = ss.iterator(); // versions visible to oldest scanner @@ -697,10 +694,6 @@ public class MemStore implements HeapSize { // the pre-calculated KeyValue to be returned by peek() or next() private KeyValue theNext; - // The allocator and snapshot allocator at the time of creating this scanner - volatile MemStoreLAB allocatorAtCreation; - volatile MemStoreLAB snapshotAllocatorAtCreation; - // A flag represents whether could stop skipping KeyValues for MVCC // if have encountered the next row. Only used for reversed scan private boolean stopSkippingKVsIfNextRow = false; @@ -734,14 +727,6 @@ public class MemStore implements HeapSize { this.readPoint = readPoint; kvsetAtCreation = kvset; snapshotAtCreation = snapshot; - if (allocator != null) { - this.allocatorAtCreation = allocator; - this.allocatorAtCreation.incScannerCount(); - } - if (snapshotAllocator != null) { - this.snapshotAllocatorAtCreation = snapshotAllocator; - this.snapshotAllocatorAtCreation.incScannerCount(); - } if (Trace.isTracing() && Trace.currentSpan() != null) { Trace.currentSpan().addTimelineAnnotation("Creating MemStoreScanner"); } @@ -911,15 +896,6 @@ public class MemStore implements HeapSize { this.kvsetIt = null; this.snapshotIt = null; - if (allocatorAtCreation != null) { - this.allocatorAtCreation.decScannerCount(); - this.allocatorAtCreation = null; - } - if (snapshotAllocatorAtCreation != null) { - this.snapshotAllocatorAtCreation.decScannerCount(); - this.snapshotAllocatorAtCreation = null; - } - this.kvsetItRow = null; this.snapshotItRow = null; } @@ -1013,7 +989,7 @@ public class MemStore implements HeapSize { } public final static long FIXED_OVERHEAD = ClassSize.align( - ClassSize.OBJECT + (10 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG)); + ClassSize.OBJECT + (9 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG)); public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + ClassSize.ATOMIC_LONG + (2 * ClassSize.TIMERANGE_TRACKER) + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java deleted file mode 100644 index be03488..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java +++ /dev/null @@ -1,219 +0,0 @@ -/** - * Copyright The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.lang.management.ManagementFactory; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Chunk; -import org.apache.hadoop.util.StringUtils; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; - -/** - * A pool of {@link MemStoreLAB$Chunk} instances. - * - * MemStoreChunkPool caches a number of retired chunks for reusing, it could - * decrease allocating bytes when writing, thereby optimizing the garbage - * collection on JVM. - * - * The pool instance is globally unique and could be obtained through - * {@link MemStoreChunkPool#getPool(Configuration)} - * - * {@link MemStoreChunkPool#getChunk()} is called when MemStoreLAB allocating - * bytes, and {@link MemStoreChunkPool#putbackChunks(BlockingQueue)} is called - * when MemStore clearing snapshot for flush - */ -@InterfaceAudience.Private -public class MemStoreChunkPool { - private static final Log LOG = LogFactory.getLog(MemStoreChunkPool.class); - final static String CHUNK_POOL_MAXSIZE_KEY = "hbase.hregion.memstore.chunkpool.maxsize"; - final static String CHUNK_POOL_INITIALSIZE_KEY = "hbase.hregion.memstore.chunkpool.initialsize"; - final static float POOL_MAX_SIZE_DEFAULT = 0.0f; - final static float POOL_INITIAL_SIZE_DEFAULT = 0.0f; - - // Static reference to the MemStoreChunkPool - private static MemStoreChunkPool globalInstance; - /** Boolean whether we have disabled the memstore chunk pool entirely. */ - static boolean chunkPoolDisabled = false; - - private final int maxCount; - - // A queue of reclaimed chunks - private final BlockingQueue reclaimedChunks; - private final int chunkSize; - - /** Statistics thread schedule pool */ - private final ScheduledExecutorService scheduleThreadPool; - /** Statistics thread */ - private static final int statThreadPeriod = 60 * 5; - private AtomicLong createdChunkCount = new AtomicLong(); - private AtomicLong reusedChunkCount = new AtomicLong(); - - MemStoreChunkPool(Configuration conf, int chunkSize, int maxCount, - int initialCount) { - this.maxCount = maxCount; - this.chunkSize = chunkSize; - this.reclaimedChunks = new LinkedBlockingQueue(); - for (int i = 0; i < initialCount; i++) { - Chunk chunk = new Chunk(chunkSize); - chunk.init(); - reclaimedChunks.add(chunk); - } - final String n = Thread.currentThread().getName(); - scheduleThreadPool = Executors.newScheduledThreadPool(1, - new ThreadFactoryBuilder().setNameFormat(n+"-MemStoreChunkPool Statistics") - .setDaemon(true).build()); - this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), - statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS); - } - - /** - * Poll a chunk from the pool, reset it if not null, else create a new chunk - * to return - * @return a chunk - */ - Chunk getChunk() { - Chunk chunk = reclaimedChunks.poll(); - if (chunk == null) { - chunk = new Chunk(chunkSize); - createdChunkCount.incrementAndGet(); - } else { - chunk.reset(); - reusedChunkCount.incrementAndGet(); - } - return chunk; - } - - /** - * Add the chunks to the pool, when the pool achieves the max size, it will - * skip the remaining chunks - * @param chunks - */ - void putbackChunks(BlockingQueue chunks) { - int maxNumToPutback = this.maxCount - reclaimedChunks.size(); - if (maxNumToPutback <= 0) { - return; - } - chunks.drainTo(reclaimedChunks, maxNumToPutback); - } - - /** - * Add the chunk to the pool, if the pool has achieved the max size, it will - * skip it - * @param chunk - */ - void putbackChunk(Chunk chunk) { - if (reclaimedChunks.size() >= this.maxCount) { - return; - } - reclaimedChunks.add(chunk); - } - - int getPoolSize() { - return this.reclaimedChunks.size(); - } - - /* - * Only used in testing - */ - void clearChunks() { - this.reclaimedChunks.clear(); - } - - private static class StatisticsThread extends Thread { - MemStoreChunkPool mcp; - - public StatisticsThread(MemStoreChunkPool mcp) { - super("MemStoreChunkPool.StatisticsThread"); - setDaemon(true); - this.mcp = mcp; - } - - @Override - public void run() { - mcp.logStats(); - } - } - - private void logStats() { - if (!LOG.isDebugEnabled()) return; - long created = createdChunkCount.get(); - long reused = reusedChunkCount.get(); - long total = created + reused; - LOG.debug("Stats: current pool size=" + reclaimedChunks.size() - + ",created chunk count=" + created - + ",reused chunk count=" + reused - + ",reuseRatio=" + (total == 0 ? "0" : StringUtils.formatPercent( - (float) reused / (float) total, 2))); - } - - /** - * @param conf - * @return the global MemStoreChunkPool instance - */ - static synchronized MemStoreChunkPool getPool(Configuration conf) { - if (globalInstance != null) return globalInstance; - if (chunkPoolDisabled) return null; - - - float poolSizePercentage = conf.getFloat(CHUNK_POOL_MAXSIZE_KEY, - POOL_MAX_SIZE_DEFAULT); - if (poolSizePercentage <= 0) { - chunkPoolDisabled = true; - return null; - } - if (poolSizePercentage > 1.0) { - throw new IllegalArgumentException(CHUNK_POOL_MAXSIZE_KEY - + " must be between 0.0 and 1.0"); - } - long heapMax = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() - .getMax(); - long globalMemStoreLimit = MemStoreFlusher.globalMemStoreLimit(heapMax, - MemStoreFlusher.DEFAULT_UPPER, MemStoreFlusher.UPPER_KEY, conf); - int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, - MemStoreLAB.CHUNK_SIZE_DEFAULT); - int maxCount = (int) (globalMemStoreLimit * poolSizePercentage / chunkSize); - - float initialCountPercentage = conf.getFloat(CHUNK_POOL_INITIALSIZE_KEY, - POOL_INITIAL_SIZE_DEFAULT); - if (initialCountPercentage > 1.0 || initialCountPercentage < 0) { - throw new IllegalArgumentException(CHUNK_POOL_INITIALSIZE_KEY - + " must be between 0.0 and 1.0"); - } - - int initialCount = (int) (initialCountPercentage * maxCount); - LOG.info("Allocating MemStoreChunkPool with chunk size " - + StringUtils.byteDesc(chunkSize) + ", max count " + maxCount - + ", initial count " + initialCount); - globalInstance = new MemStoreChunkPool(conf, chunkSize, maxCount, - initialCount); - return globalInstance; - } - -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java index d4e96e8..0aa379f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.regionserver; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -64,14 +63,6 @@ public class MemStoreLAB { final static int MAX_ALLOC_DEFAULT = 256 * 1024; // allocs bigger than this don't go through allocator final int maxAlloc; - private final MemStoreChunkPool chunkPool; - - // This flag is for closing this instance, its set when clearing snapshot of - // memstore - private volatile boolean closed = false; - // This flag is for reclaiming chunks. Its set when putting chunks back to - // pool - private AtomicBoolean reclaimed = new AtomicBoolean(false); // Current count of open scanners which reading data from this MemStoreLAB private final AtomicInteger openScannerCount = new AtomicInteger(); @@ -80,14 +71,9 @@ public class MemStoreLAB { this(new Configuration()); } - private MemStoreLAB(Configuration conf) { - this(conf, MemStoreChunkPool.getPool(conf)); - } - - public MemStoreLAB(Configuration conf, MemStoreChunkPool pool) { + public MemStoreLAB(Configuration conf) { chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT); maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT); - this.chunkPool = pool; // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one! Preconditions.checkArgument( @@ -132,13 +118,6 @@ public class MemStoreLAB { * back to pool */ void close() { - this.closed = true; - // We could put back the chunks to pool for reusing only when there is no - // opening scanner which will read their data - if (chunkPool != null && openScannerCount.get() == 0 - && reclaimed.compareAndSet(false, true)) { - chunkPool.putbackChunks(this.chunkQueue); - } } /** @@ -149,17 +128,6 @@ public class MemStoreLAB { } /** - * Called when closing a scanner on the data of this MemStoreLAB - */ - void decScannerCount() { - int count = this.openScannerCount.decrementAndGet(); - if (chunkPool != null && count == 0 && this.closed - && reclaimed.compareAndSet(false, true)) { - chunkPool.putbackChunks(this.chunkQueue); - } - } - - /** * Try to retire the current chunk if it is still * c. Postcondition is that curChunk.get() * != c @@ -189,15 +157,13 @@ public class MemStoreLAB { // No current chunk, so we want to allocate one. We race // against other allocators to CAS in an uninitialized chunk // (which is cheap to allocate) - c = (chunkPool != null) ? chunkPool.getChunk() : new Chunk(chunkSize); + c = new Chunk(chunkSize); if (curChunk.compareAndSet(null, c)) { // we won race - now we need to actually do the expensive // allocation step c.init(); this.chunkQueue.add(c); return c; - } else if (chunkPool != null) { - chunkPool.putbackChunk(c); } // someone else won race - that's fine, we'll try to grab theirs // in the next iteration of the loop. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java deleted file mode 100644 index 1cd6ab8..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java +++ /dev/null @@ -1,201 +0,0 @@ -/** - * Copyright The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.rmi.UnexpectedException; -import java.util.List; -import java.util.Random; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -/** - * Test the {@link MemStoreChunkPool} class - */ -@Category(SmallTests.class) -public class TestMemStoreChunkPool { - private final static Configuration conf = new Configuration(); - private static MemStoreChunkPool chunkPool; - private static boolean chunkPoolDisabledBeforeTest; - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - conf.setBoolean(MemStore.USEMSLAB_KEY, true); - conf.setFloat(MemStoreChunkPool.CHUNK_POOL_MAXSIZE_KEY, 0.2f); - chunkPoolDisabledBeforeTest = MemStoreChunkPool.chunkPoolDisabled; - MemStoreChunkPool.chunkPoolDisabled = false; - chunkPool = MemStoreChunkPool.getPool(conf); - assertTrue(chunkPool != null); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - MemStoreChunkPool.chunkPoolDisabled = chunkPoolDisabledBeforeTest; - } - - @Before - public void tearDown() throws Exception { - chunkPool.clearChunks(); - } - - @Test - public void testReusingChunks() { - Random rand = new Random(); - MemStoreLAB mslab = new MemStoreLAB(conf, chunkPool); - int expectedOff = 0; - byte[] lastBuffer = null; - // Randomly allocate some bytes - for (int i = 0; i < 100; i++) { - int size = rand.nextInt(1000); - Allocation alloc = mslab.allocateBytes(size); - - if (alloc.getData() != lastBuffer) { - expectedOff = 0; - lastBuffer = alloc.getData(); - } - assertEquals(expectedOff, alloc.getOffset()); - assertTrue("Allocation " + alloc + " overruns buffer", alloc.getOffset() - + size <= alloc.getData().length); - expectedOff += size; - } - // chunks will be put back to pool after close - mslab.close(); - int chunkCount = chunkPool.getPoolSize(); - assertTrue(chunkCount > 0); - // reconstruct mslab - mslab = new MemStoreLAB(conf, chunkPool); - // chunk should be got from the pool, so we can reuse it. - mslab.allocateBytes(1000); - assertEquals(chunkCount - 1, chunkPool.getPoolSize()); - } - - @Test - public void testPuttingBackChunksAfterFlushing() throws UnexpectedException { - byte[] row = Bytes.toBytes("testrow"); - byte[] fam = Bytes.toBytes("testfamily"); - byte[] qf1 = Bytes.toBytes("testqualifier1"); - byte[] qf2 = Bytes.toBytes("testqualifier2"); - byte[] qf3 = Bytes.toBytes("testqualifier3"); - byte[] qf4 = Bytes.toBytes("testqualifier4"); - byte[] qf5 = Bytes.toBytes("testqualifier5"); - byte[] val = Bytes.toBytes("testval"); - - MemStore memstore = new MemStore(); - - // Setting up memstore - memstore.add(new KeyValue(row, fam, qf1, val)); - memstore.add(new KeyValue(row, fam, qf2, val)); - memstore.add(new KeyValue(row, fam, qf3, val)); - - // Creating a snapshot - memstore.snapshot(); - KeyValueSkipListSet snapshot = memstore.getSnapshot(); - assertEquals(3, memstore.snapshot.size()); - - // Adding value to "new" memstore - assertEquals(0, memstore.kvset.size()); - memstore.add(new KeyValue(row, fam, qf4, val)); - memstore.add(new KeyValue(row, fam, qf5, val)); - assertEquals(2, memstore.kvset.size()); - memstore.clearSnapshot(snapshot); - - int chunkCount = chunkPool.getPoolSize(); - assertTrue(chunkCount > 0); - - } - - @Test - public void testPuttingBackChunksWithOpeningScanner() - throws UnexpectedException { - byte[] row = Bytes.toBytes("testrow"); - byte[] fam = Bytes.toBytes("testfamily"); - byte[] qf1 = Bytes.toBytes("testqualifier1"); - byte[] qf2 = Bytes.toBytes("testqualifier2"); - byte[] qf3 = Bytes.toBytes("testqualifier3"); - byte[] qf4 = Bytes.toBytes("testqualifier4"); - byte[] qf5 = Bytes.toBytes("testqualifier5"); - byte[] qf6 = Bytes.toBytes("testqualifier6"); - byte[] qf7 = Bytes.toBytes("testqualifier7"); - byte[] val = Bytes.toBytes("testval"); - - MemStore memstore = new MemStore(); - - // Setting up memstore - memstore.add(new KeyValue(row, fam, qf1, val)); - memstore.add(new KeyValue(row, fam, qf2, val)); - memstore.add(new KeyValue(row, fam, qf3, val)); - - // Creating a snapshot - memstore.snapshot(); - KeyValueSkipListSet snapshot = memstore.getSnapshot(); - assertEquals(3, memstore.snapshot.size()); - - // Adding value to "new" memstore - assertEquals(0, memstore.kvset.size()); - memstore.add(new KeyValue(row, fam, qf4, val)); - memstore.add(new KeyValue(row, fam, qf5, val)); - assertEquals(2, memstore.kvset.size()); - - // opening scanner before clear the snapshot - List scanners = memstore.getScanners(0); - // Shouldn't putting back the chunks to pool,since some scanners are opening - // based on their data - memstore.clearSnapshot(snapshot); - - assertTrue(chunkPool.getPoolSize() == 0); - - // Chunks will be put back to pool after close scanners; - for (KeyValueScanner scanner : scanners) { - scanner.close(); - } - assertTrue(chunkPool.getPoolSize() > 0); - - // clear chunks - chunkPool.clearChunks(); - - // Creating another snapshot - memstore.snapshot(); - snapshot = memstore.getSnapshot(); - // Adding more value - memstore.add(new KeyValue(row, fam, qf6, val)); - memstore.add(new KeyValue(row, fam, qf7, val)); - // opening scanners - scanners = memstore.getScanners(0); - // close scanners before clear the snapshot - for (KeyValueScanner scanner : scanners) { - scanner.close(); - } - // Since no opening scanner, the chunks of snapshot should be put back to - // pool - memstore.clearSnapshot(snapshot); - assertTrue(chunkPool.getPoolSize() > 0); - } - -}