diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java index d9f0679..f8052e3 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java @@ -446,6 +446,19 @@ public final class CellUtil { }; } + @InterfaceAudience.Private + public static Cell cloneTo(Cell cell, byte[] buf, int offset) { + int len = KeyValueUtil.length(cell); + if (cell instanceof Streamable) { + ((Streamable) cell).write(buf, offset); + } else { + KeyValueUtil.appendToByteArray(cell, buf, offset); + } + KeyValue newKv = new KeyValue(buf, offset, len); + newKv.setSequenceId(cell.getSequenceId()); + return newKv; + } + /** * @param left * @param right diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java index a30a24c..70854f0 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -1236,9 +1236,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, /** * @return The byte array backing this KeyValue. - * @deprecated Since 0.98.0. Use Cell Interface instead. Do not presume single backing buffer. */ - @Deprecated public byte [] getBuffer() { return this.bytes; } @@ -2777,4 +2775,9 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, return super.heapSize() + Bytes.SIZEOF_SHORT; } } + + @Override + public void write(byte[] buf, int offset) { + System.arraycopy(this.bytes, this.offset, buf, offset, this.length); + } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java index c9da738..f2368c0 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java @@ -142,8 +142,6 @@ public class KeyValueUtil { /**************** copy key and value *********************/ public static int appendToByteArray(final Cell cell, final byte[] output, final int offset) { - // TODO when cell instance of KV we can bypass all steps and just do backing single array - // copy(?) int pos = offset; pos = Bytes.putInt(output, pos, keyLength(cell)); pos = Bytes.putInt(output, pos, cell.getValueLength()); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java index d060b02..79c2dc0 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java @@ -262,4 +262,9 @@ public class OffheapKeyValue extends ByteBufferedCell public String toString() { return CellUtil.toString(this, true); } + + @Override + public void write(byte[] buf, int offset) { + ByteBufferUtils.copyFromBufferToArray(buf, this.buf, this.offset, offset, this.length); + } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/Streamable.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/Streamable.java index be91a56..27cf0ee 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/Streamable.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/Streamable.java @@ -44,4 +44,11 @@ public interface Streamable { * @throws IOException */ int write(OutputStream out, boolean withTags) throws IOException; + + /** + * Serialize this cell to the given ByteBuffer at specified position + * @param buf array where to write + * @param offset Offset within buf where to write + */ + void write(byte[] buf, int offset); } \ No newline at end of file diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java index d873f7e..2c179ed 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java @@ -463,6 +463,12 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { } return lenToWrite + Bytes.SIZEOF_INT; } + + @Override + public void write(byte[] buf, int offset) { + // We wont use this cells in the write path at all. + throw new UnsupportedOperationException(); + } } protected static class OffheapDecodedCell extends ByteBufferedCell implements HeapSize, @@ -686,6 +692,12 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { } return lenToWrite + Bytes.SIZEOF_INT; } + + @Override + public void write(byte[] buf, int offset) { + // We wont use this cells in the write path at all. + throw new UnsupportedOperationException(); + } } protected abstract static class diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java index f22a6e5..e472f93 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java @@ -24,10 +24,11 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.util.ByteRange; -import org.apache.hadoop.hbase.util.SimpleMutableByteRange; import com.google.common.base.Preconditions; @@ -93,14 +94,9 @@ public class HeapMemStoreLAB implements MemStoreLAB { MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY); } - /** - * Allocate a slice of the given length. - * - * If the size is larger than the maximum size specified for this - * allocator, returns null. - */ @Override - public ByteRange allocateBytes(int size) { + public Cell cloneInto(Cell cell) { + int size = KeyValueUtil.length(cell); Preconditions.checkArgument(size >= 0, "negative size"); // Callers should satisfy large allocations directly from JVM since they @@ -108,22 +104,22 @@ public class HeapMemStoreLAB implements MemStoreLAB { if (size > maxAlloc) { return null; } - + Chunk c = null; + int allocOffset = 0; while (true) { - Chunk c = getOrMakeChunk(); - + c = getOrMakeChunk(); // Try to allocate from this chunk - int allocOffset = c.alloc(size); + allocOffset = c.alloc(size); if (allocOffset != -1) { // We succeeded - this is the common case - small alloc // from a big buffer - return new SimpleMutableByteRange(c.data, allocOffset, size); + break; } - // not enough space! // try to retire this chunk tryRetireChunk(c); } + return CellUtil.cloneTo(cell, c.data, allocOffset); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java index 5c3c1e7..3156468 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java @@ -17,8 +17,8 @@ */ package org.apache.hadoop.hbase.regionserver; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.util.ByteRange; /** * A memstore-local allocation buffer. @@ -39,12 +39,10 @@ import org.apache.hadoop.hbase.util.ByteRange; public interface MemStoreLAB { /** - * Allocate a slice of the given length. If the size is larger than the maximum size specified for - * this allocator, returns null. - * @param size - * @return {@link ByteRange} + * Clone the cell content to this MSLAB allocated memory area and create a new Cell around that. + * When it is not able to get enough space to clone this cell to, it returns null */ - ByteRange allocateBytes(int size); + Cell cloneInto(Cell cell); /** * Close instance since it won't be used any more, try to put the chunks back to pool diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java index dcad5a0..09f6244 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java @@ -25,11 +25,8 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.util.ByteRange; /** * This is an abstraction of a segment maintained in a memstore, e.g., the active @@ -120,22 +117,11 @@ public abstract class Segment { * @return either the given cell or its clone */ public Cell maybeCloneWithAllocator(Cell cell) { - if (getMemStoreLAB() == null) { + if (this.memStoreLAB == null) { return cell; } - - int len = KeyValueUtil.length(cell); - ByteRange alloc = getMemStoreLAB().allocateBytes(len); - if (alloc == null) { - // The allocation was too large, allocator decided - // not to do anything with it. - return cell; - } - assert alloc.getBytes() != null; - KeyValueUtil.appendToByteArray(cell, alloc.getBytes(), alloc.getOffset()); - KeyValue newKv = new KeyValue(alloc.getBytes(), alloc.getOffset(), len); - newKv.setSequenceId(cell.getSequenceId()); - return newKv; + Cell cellFromMslab = this.memStoreLAB.cloneInto(cell); + return (cellFromMslab != null) ? cellFromMslab : cell; } public abstract boolean shouldSeek(Scan scan, long oldestUnexpiredTS); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java index b5e9798..1612334 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java @@ -20,9 +20,9 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.util.ByteRange; import org.apache.hadoop.hbase.util.Bytes; import org.junit.AfterClass; import org.junit.Before; @@ -72,18 +72,22 @@ public class TestMemStoreChunkPool { MemStoreLAB mslab = new HeapMemStoreLAB(conf); int expectedOff = 0; byte[] lastBuffer = null; + final byte[] rk = Bytes.toBytes("r1"); + final byte[] cf = Bytes.toBytes("f"); + final byte[] q = Bytes.toBytes("q"); // Randomly allocate some bytes for (int i = 0; i < 100; i++) { - int size = rand.nextInt(1000); - ByteRange alloc = mslab.allocateBytes(size); - - if (alloc.getBytes() != lastBuffer) { + int valSize = rand.nextInt(1000); + KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]); + int size = KeyValueUtil.length(kv); + KeyValue newKv = (KeyValue) mslab.cloneInto(kv); + if (newKv.getBuffer() != lastBuffer) { expectedOff = 0; - lastBuffer = alloc.getBytes(); + lastBuffer = newKv.getBuffer(); } - assertEquals(expectedOff, alloc.getOffset()); - assertTrue("Allocation overruns buffer", alloc.getOffset() - + size <= alloc.getBytes().length); + assertEquals(expectedOff, newKv.getOffset()); + assertTrue("Allocation overruns buffer", newKv.getOffset() + + size <= newKv.getBuffer().length); expectedOff += size; } // chunks will be put back to pool after close @@ -93,7 +97,8 @@ public class TestMemStoreChunkPool { // reconstruct mslab mslab = new HeapMemStoreLAB(conf); // chunk should be got from the pool, so we can reuse it. - mslab.allocateBytes(1000); + KeyValue kv = new KeyValue(rk, cf, q, new byte[10]); + mslab.cloneInto(kv); assertEquals(chunkCount - 1, chunkPool.getPoolSize()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java index 170bdd4..51640d4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java @@ -26,11 +26,14 @@ import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.MultithreadedTestUtil; import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.util.ByteRange; +import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test; import com.google.common.collect.Iterables; @@ -42,6 +45,10 @@ import org.junit.experimental.categories.Category; @Category({RegionServerTests.class, SmallTests.class}) public class TestMemStoreLAB { + private static final byte[] rk = Bytes.toBytes("r1"); + private static final byte[] cf = Bytes.toBytes("f"); + private static final byte[] q = Bytes.toBytes("q"); + /** * Test a bunch of random allocations */ @@ -55,16 +62,17 @@ public class TestMemStoreLAB { // should be reasonable for unit test and also cover wraparound // behavior for (int i = 0; i < 100000; i++) { - int size = rand.nextInt(1000); - ByteRange alloc = mslab.allocateBytes(size); - - if (alloc.getBytes() != lastBuffer) { + int valSize = rand.nextInt(1000); + KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]); + int size = KeyValueUtil.length(kv); + KeyValue newKv = (KeyValue) mslab.cloneInto(kv); + if (newKv.getBuffer() != lastBuffer) { expectedOff = 0; - lastBuffer = alloc.getBytes(); + lastBuffer = newKv.getBuffer(); } - assertEquals(expectedOff, alloc.getOffset()); + assertEquals(expectedOff, newKv.getOffset()); assertTrue("Allocation overruns buffer", - alloc.getOffset() + size <= alloc.getBytes().length); + newKv.getOffset() + size <= newKv.getBuffer().length); expectedOff += size; } } @@ -72,9 +80,9 @@ public class TestMemStoreLAB { @Test public void testLABLargeAllocation() { MemStoreLAB mslab = new HeapMemStoreLAB(); - ByteRange alloc = mslab.allocateBytes(2*1024*1024); - assertNull("2MB allocation shouldn't be satisfied by LAB.", - alloc); + KeyValue kv = new KeyValue(rk, cf, q, new byte[2*1024*1024]); + Cell newCell = mslab.cloneInto(kv); + assertNull("2MB allocation shouldn't be satisfied by LAB.", newCell); } /** @@ -100,10 +108,12 @@ public class TestMemStoreLAB { private Random r = new Random(); @Override public void doAnAction() throws Exception { - int size = r.nextInt(1000); - ByteRange alloc = mslab.allocateBytes(size); + int valSize = r.nextInt(1000); + KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]); + int size = KeyValueUtil.length(kv); + KeyValue newKv = (KeyValue) mslab.cloneInto(kv); totalAllocated.addAndGet(size); - allocsByThisThread.add(new AllocRecord(alloc, size)); + allocsByThisThread.add(new AllocRecord(newKv.getBuffer(), newKv.getOffset(), size)); } }; ctx.addThread(t); @@ -126,12 +136,12 @@ public class TestMemStoreLAB { if (rec.size == 0) continue; Map mapForThisByteArray = - mapsByChunk.get(rec.alloc.getBytes()); + mapsByChunk.get(rec.alloc); if (mapForThisByteArray == null) { mapForThisByteArray = Maps.newTreeMap(); - mapsByChunk.put(rec.alloc.getBytes(), mapForThisByteArray); + mapsByChunk.put(rec.alloc, mapForThisByteArray); } - AllocRecord oldVal = mapForThisByteArray.put(rec.alloc.getOffset(), rec); + AllocRecord oldVal = mapForThisByteArray.put(rec.offset, rec); assertNull("Already had an entry " + oldVal + " for allocation " + rec, oldVal); } @@ -141,9 +151,9 @@ public class TestMemStoreLAB { for (Map allocsInChunk : mapsByChunk.values()) { int expectedOff = 0; for (AllocRecord alloc : allocsInChunk.values()) { - assertEquals(expectedOff, alloc.alloc.getOffset()); + assertEquals(expectedOff, alloc.offset); assertTrue("Allocation overruns buffer", - alloc.alloc.getOffset() + alloc.size <= alloc.alloc.getBytes().length); + alloc.offset + alloc.size <= alloc.alloc.length); expectedOff += alloc.size; } } @@ -151,25 +161,27 @@ public class TestMemStoreLAB { } private static class AllocRecord implements Comparable{ - private final ByteRange alloc; + private final byte[] alloc; + private final int offset; private final int size; - public AllocRecord(ByteRange alloc, int size) { + public AllocRecord(byte[] alloc, int offset, int size) { super(); this.alloc = alloc; + this.offset = offset; this.size = size; } @Override public int compareTo(AllocRecord e) { - if (alloc.getBytes() != e.alloc.getBytes()) { + if (alloc != e.alloc) { throw new RuntimeException("Can only compare within a particular array"); } - return Ints.compare(alloc.getOffset(), e.alloc.getOffset()); + return Ints.compare(this.offset, e.offset); } @Override public String toString() { - return "AllocRecord(offset=" + alloc.getOffset() + ", size=" + size + ")"; + return "AllocRecord(offset=" + this.offset + ", size=" + size + ")"; } }