commit 17e81077ec9e944633fc8e780638ad201e374d8b Author: Todd Lipcon Date: Tue Jan 25 00:55:21 2011 -0800 HBASE-3455. Allocator for bytes per memstore diff --git src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 53ec17c..6f8bfd6 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -34,10 +34,13 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; @@ -55,6 +58,13 @@ import org.apache.hadoop.hbase.util.ClassSize; public class MemStore implements HeapSize { private static final Log LOG = LogFactory.getLog(MemStore.class); + private static final String USEMSLAB_KEY = + "hbase.hregion.memstore.mslab.enabled"; + private static final boolean USEMSLAB_DEFAULT = false; + + + private Configuration conf; + // MemStore. Use a KeyValueSkipListSet rather than SkipListSet because of the // better semantics. The Map will overwrite if passed a key it already had // whereas the Set will not add new KV if key is same though value might be @@ -80,19 +90,23 @@ public class MemStore implements HeapSize { TimeRangeTracker timeRangeTracker; TimeRangeTracker snapshotTimeRangeTracker; + + MemStoreLAB allocator; /** * Default constructor. Used for tests. */ public MemStore() { - this(KeyValue.COMPARATOR); + this(HBaseConfiguration.create(), KeyValue.COMPARATOR); } /** * Constructor. * @param c Comparator */ - public MemStore(final KeyValue.KVComparator c) { + public MemStore(final Configuration conf, + final KeyValue.KVComparator c) { + this.conf = conf; this.comparator = c; this.comparatorIgnoreTimestamp = this.comparator.getComparatorIgnoringTimestamps(); @@ -102,6 +116,11 @@ public class MemStore implements HeapSize { timeRangeTracker = new TimeRangeTracker(); snapshotTimeRangeTracker = new TimeRangeTracker(); this.size = new AtomicLong(DEEP_OVERHEAD); + if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) { + this.allocator = new MemStoreLAB(conf); + } else { + this.allocator = null; + } } void dump() { @@ -134,6 +153,10 @@ public class MemStore implements HeapSize { this.timeRangeTracker = new TimeRangeTracker(); // Reset heap to not include any keys this.size.set(DEEP_OVERHEAD); + // Reset allocator so we get a fresh buffer for the new memstore + if (allocator != null) { + this.allocator = new MemStoreLAB(conf); + } } } } finally { @@ -187,8 +210,9 @@ public class MemStore implements HeapSize { long s = -1; this.lock.readLock().lock(); try { - s = heapSizeChange(kv, this.kvset.add(kv)); - timeRangeTracker.includeTimestamp(kv); + KeyValue toAdd = maybeCloneWithAllocator(kv); + s = heapSizeChange(kv, this.kvset.add(toAdd)); + timeRangeTracker.includeTimestamp(toAdd); this.size.addAndGet(s); } finally { this.lock.readLock().unlock(); @@ -196,6 +220,20 @@ public class MemStore implements HeapSize { return s; } + private KeyValue maybeCloneWithAllocator(KeyValue kv) { + if (allocator == null) { + return kv; + } + + int len = kv.getLength(); + Allocation alloc = allocator.allocateBytes(len); + assert alloc != null && alloc.getData() != null; + System.arraycopy(kv.getBuffer(), kv.getOffset(), alloc.getData(), alloc.getOffset(), len); + KeyValue newKv = new KeyValue(alloc.getData(), alloc.getOffset(), len); + newKv.setMemstoreTS(kv.getMemstoreTS()); + return newKv; + } + /** * Write a delete * @param delete @@ -205,8 +243,9 @@ public class MemStore implements HeapSize { long s = 0; this.lock.readLock().lock(); try { - s += heapSizeChange(delete, this.kvset.add(delete)); - timeRangeTracker.includeTimestamp(delete); + KeyValue toAdd = maybeCloneWithAllocator(delete); + s += heapSizeChange(toAdd, this.kvset.add(toAdd)); + timeRangeTracker.includeTimestamp(toAdd); } finally { this.lock.readLock().unlock(); } @@ -732,7 +771,7 @@ public class MemStore implements HeapSize { } public final static long FIXED_OVERHEAD = ClassSize.align( - ClassSize.OBJECT + (9 * ClassSize.REFERENCE)); + ClassSize.OBJECT + (10 * ClassSize.REFERENCE)); public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG + diff --git src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java new file mode 100644 index 0000000..6fae341 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java @@ -0,0 +1,259 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.conf.Configuration; +import com.google.common.base.Preconditions; + +/** + * A memstore-local allocation buffer. + *

+ * The MemStoreLAB is basically a bump-the-pointer allocator that allocates + * big (2MB) byte[] chunks from and then doles it out to threads that request + * slices into the array. + *

+ * The purpose of this class is to combat heap fragmentation in the + * regionserver. By ensuring that all KeyValues in a given memstore refer + * only to large chunks of contiguous memory, we ensure that large blocks + * get freed up when the memstore is flushed. + *

+ * Without the MSLAB, the byte array allocated during insertion end up + * interleaved throughout the heap, and the old generation gets progressively + * more fragmented until a stop-the-world compacting collection occurs. + *

+ * TODO: we should probably benchmark whether word-aligning the allocations + * would provide a performance improvement - probably would speed up the + * Bytes.toLong/Bytes.toInt calls in KeyValue, but some of those are cached + * anyway + */ +public class MemStoreLAB { + private AtomicReference curChunk = new AtomicReference(); + + final static String CHUNK_SIZE_KEY = "hbase.hregion.memstore.mslab.chunksize"; + final static int CHUNK_SIZE_DEFAULT = 2048 * 1024; + final int chunkSize; + + final static String MAX_ALLOC_KEY = "hbase.hregion.memstore.mslab.max.allocation"; + final static int MAX_ALLOC_DEFAULT = 256 * 1024; // allocs bigger than this don't go through allocator + final int maxAlloc; + + public MemStoreLAB() { + this(new Configuration()); + } + + public MemStoreLAB(Configuration conf) { + chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT); + maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT); + + // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one! + Preconditions.checkArgument( + maxAlloc <= chunkSize, + MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY); + } + + /** + * Allocate a slice of the given length. + */ + public Allocation allocateBytes(int size) { + Preconditions.checkArgument(size >= 0, "negative size"); + + // Satisfy large allocations directly from JVM since they don't + // cause fragmentation as badly. + if (size > maxAlloc) { + return new Allocation(new byte[size], 0); + } + + while (true) { + Chunk c = getOrMakeChunk(); + + // Try to allocate from this chunk + int allocOffset = c.alloc(size); + if (allocOffset != -1) { + // We succeeded - this is the common case - small alloc + // from a big buffer + return new Allocation(c.data, allocOffset); + } + + // not enough space! + // try to retire this chunk + tryRetireChunk(c); + } + } + + /** + * Try to retire the current chunk if it is still + * c. Postcondition is that curChunk.get() + * != c + */ + private void tryRetireChunk(Chunk c) { + @SuppressWarnings("unused") + boolean weRetiredIt = curChunk.compareAndSet(c, null); + // If the CAS succeeds, that means that we won the race + // to retire the chunk. We could use this opportunity to + // update metrics on external fragmentation. + // + // If the CAS fails, that means that someone else already + // retired the chunk for us. + } + + /** + * Get the current chunk, or, if there is no current chunk, + * allocate a new one from the JVM. + */ + private Chunk getOrMakeChunk() { + while (true) { + // Try to get the chunk + Chunk c = curChunk.get(); + if (c != null) { + return c; + } + + // No current chunk, so we want to allocate one. We race + // against other allocators to CAS in an uninitialized chunk + // (which is cheap to allocate) + c = new Chunk(chunkSize); + if (curChunk.compareAndSet(null, c)) { + // we won race - now we need to actually do the expensive + // allocation step + c.init(); + return c; + } + // someone else won race - that's fine, we'll try to grab theirs + // in the next iteration of the loop. + } + } + + /** + * A chunk of memory out of which allocations are sliced. + */ + private static class Chunk { + /** Actual underlying data */ + private byte[] data; + + private static final int UNINITIALIZED = -1; + /** + * Offset for the next allocation, or the sentinel value -1 + * which implies that the chunk is still uninitialized. + * */ + private AtomicInteger nextFreeOffset = new AtomicInteger(UNINITIALIZED); + + /** Total number of allocations satisfied from this buffer */ + private AtomicInteger allocCount = new AtomicInteger(); + + /** Size of chunk in bytes */ + private final int size; + + /** + * Create an uninitialized chunk. Note that memory is not allocated yet, so + * this is cheap. + * @param size in bytes + */ + private Chunk(int size) { + this.size = size; + } + + /** + * Actually claim the memory for this chunk. This should only be called from + * the thread that constructed the chunk. It is thread-safe against other + * threads calling alloc(), who will block until the allocation is complete. + */ + public void init() { + assert nextFreeOffset.get() == UNINITIALIZED; + data = new byte[size]; + // Mark that it's ready for use + boolean initted = nextFreeOffset.compareAndSet( + UNINITIALIZED, 0); + // We should always succeed the above CAS since only one thread + // calls init()! + Preconditions.checkState(initted, + "Multiple threads tried to init same chunk"); + } + + /** + * Try to allocate size bytes from the chunk. + * @return the offset of the successful allocation, or -1 to indicate not-enough-space + */ + public int alloc(int size) { + while (true) { + int oldOffset = nextFreeOffset.get(); + if (oldOffset == UNINITIALIZED) { + // The chunk doesn't have its data allocated yet. + // Since we found this in curChunk, we know that whoever + // CAS-ed it there is allocating it right now. So spin-loop + // shouldn't spin long! + Thread.yield(); + continue; + } + + if (oldOffset + size > data.length) { + return -1; // alloc doesn't fit + } + + // Try to atomically claim this chunk + if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) { + // we got the alloc + allocCount.incrementAndGet(); + return oldOffset; + } + // we raced and lost alloc, try again + } + } + + @Override + public String toString() { + return "Chunk@" + System.identityHashCode(this) + + " allocs=" + allocCount.get() + "waste=" + + (data.length - nextFreeOffset.get()); + } + } + + /** + * The result of a single allocation. Contains the chunk that the + * allocation points into, and the offset in this array where the + * slice begins. + */ + public static class Allocation { + private final byte[] data; + private final int offset; + + private Allocation(byte[] data, int off) { + this.data = data; + this.offset = off; + } + + @Override + public String toString() { + return "Allocation(data=" + data + + " with capacity=" + data.length + + ", off=" + offset + ")"; + } + + byte[] getData() { + return data; + } + + int getOffset() { + return offset; + } + } +} diff --git src/main/java/org/apache/hadoop/hbase/regionserver/Store.java src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index ba9733d..0121d6d 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -178,7 +178,7 @@ public class Store implements HeapSize { // second -> ms adjust for user data this.ttl *= 1000; } - this.memstore = new MemStore(this.comparator); + this.memstore = new MemStore(conf, this.comparator); this.storeNameStr = Bytes.toString(this.family.getName()); // By default, compact if storefile.count >= minFilesToCompact diff --git src/main/resources/hbase-default.xml src/main/resources/hbase-default.xml index c52045a..8010a0d 100644 --- src/main/resources/hbase-default.xml +++ src/main/resources/hbase-default.xml @@ -360,6 +360,32 @@ + hbase.hregion.memstore.mslab.enabled + false + + Experimental: enables the MemStore-Local Allocation Buffer, + a feature which works to prevent heap fragmentation under + heavy write loads. This can reduce the frequency of stop-the-world + GC pauses on large heaps. + + + + hbase.hregion.memstore.mslab.chunksize + 2097152 + + Expert: The size of chunks allocated from the JVM for the MSLAB, + in bytes. + + + + hbase.hregion.memstore.mslab.chunksize + 2097152 + + Expert: The size of chunks allocated from the JVM for the MSLAB, + in bytes. + + + hbase.hregion.max.filesize 268435456 diff --git src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java new file mode 100644 index 0000000..8cff122 --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java @@ -0,0 +1,169 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.*; + +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.MultithreadedTestUtil; +import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; +import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation; +import org.junit.Test; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.MapMaker; +import com.google.common.collect.Maps; +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; + +public class TestMemStoreLAB { + + /** + * Test a bunch of random allocations + */ + @Test + public void testLABRandomAllocation() { + Random rand = new Random(); + MemStoreLAB mslab = new MemStoreLAB(); + int expectedOff = 0; + byte[] lastBuffer = null; + // 100K iterations by 0-1K alloc -> 50MB expected + // should be reasonable for unit test and also cover wraparound + // behavior + for (int i = 0; i < 100000; i++) { + int size = rand.nextInt(1000); + Allocation alloc = mslab.allocateBytes(size); + + if (alloc.getData() != lastBuffer) { + expectedOff = 0; + lastBuffer = alloc.getData(); + } + assertEquals(expectedOff, alloc.getOffset()); + assertTrue("Allocation " + alloc + " overruns buffer", + alloc.getOffset() + size <= alloc.getData().length); + expectedOff += size; + } + } + + /** + * Test allocation from lots of threads, making sure the results don't + * overlap in any way + */ + @Test + public void testLABThreading() throws Exception { + Configuration conf = new Configuration(); + MultithreadedTestUtil.TestContext ctx = + new MultithreadedTestUtil.TestContext(conf); + + final AtomicInteger totalAllocated = new AtomicInteger(); + + final MemStoreLAB mslab = new MemStoreLAB(); + List> allocations = Lists.newArrayList(); + + for (int i = 0; i < 10; i++) { + final List allocsByThisThread = Lists.newLinkedList(); + allocations.add(allocsByThisThread); + + TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { + private Random r = new Random(); + @Override + public void doAnAction() throws Exception { + int size = r.nextInt(1000); + Allocation alloc = mslab.allocateBytes(size); + totalAllocated.addAndGet(size); + allocsByThisThread.add(new AllocRecord(alloc, size)); + } + }; + ctx.addThread(t); + } + + ctx.startThreads(); + while (totalAllocated.get() < 50*1024*1024 && ctx.shouldRun()) { + Thread.sleep(10); + } + ctx.stop(); + + // Partition the allocations by the actual byte[] they point into, + // make sure offsets are unique for each chunk + Map> mapsByChunk = + Maps.newHashMap(); + + int sizeCounted = 0; + for (AllocRecord rec : Iterables.concat(allocations)) { + sizeCounted += rec.size; + if (rec.size == 0) continue; + + Map mapForThisByteArray = + mapsByChunk.get(rec.alloc.getData()); + if (mapForThisByteArray == null) { + mapForThisByteArray = Maps.newTreeMap(); + mapsByChunk.put(rec.alloc.getData(), mapForThisByteArray); + } + AllocRecord oldVal = mapForThisByteArray.put(rec.alloc.getOffset(), rec); + assertNull("Already had an entry " + oldVal + " for allocation " + rec, + oldVal); + } + assertEquals("Sanity check test", sizeCounted, totalAllocated.get()); + + // Now check each byte array to make sure allocations don't overlap + for (Map allocsInChunk : mapsByChunk.values()) { + int expectedOff = 0; + for (AllocRecord alloc : allocsInChunk.values()) { + assertEquals(expectedOff, alloc.alloc.getOffset()); + assertTrue("Allocation " + alloc + " overruns buffer", + alloc.alloc.getOffset() + alloc.size <= alloc.alloc.getData().length); + expectedOff += alloc.size; + } + } + + } + + private static class AllocRecord implements Comparable{ + private final Allocation alloc; + private final int size; + public AllocRecord(Allocation alloc, int size) { + super(); + this.alloc = alloc; + this.size = size; + } + + @Override + public int compareTo(AllocRecord e) { + if (alloc.getData() != e.alloc.getData()) { + throw new RuntimeException("Can only compare within a particular array"); + } + return Ints.compare(alloc.getOffset(), e.alloc.getOffset()); + } + + @Override + public String toString() { + return "AllocRecord(alloc=" + alloc + ", size=" + size + ")"; + } + + } +}