diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index 734227c..ff2f7a3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.io.BoundedByteBufferPool; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; +import org.apache.hadoop.hbase.io.CellReadableByteArrayInputStream; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; @@ -180,7 +181,7 @@ public class IPCUtil { public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, final byte [] cellBlock) throws IOException { - return createCellScanner(codec, compressor, cellBlock, 0, cellBlock.length); + return createCellScanner(codec, compressor, cellBlock, 0, cellBlock.length, false); } /** @@ -188,11 +189,14 @@ public class IPCUtil { * @param cellBlock * @param offset * @param length + * @param directCellRead + * Whether to make Cells directly from the cellBlock bytes or need to copy. Pass false + * while using from client side. * @return CellScanner to work against the content of cellBlock * @throws IOException */ public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, - final byte [] cellBlock, final int offset, final int length) + final byte [] cellBlock, final int offset, final int length, boolean directCellRead) throws IOException { // If compressed, decompress it first before passing it on else we will leak compression // resources if the stream is not closed properly after we let it out. @@ -213,7 +217,8 @@ public class IPCUtil { IOUtils.copy(cis, bbos); bbos.close(); ByteBuffer bb = bbos.getByteBuffer(); - is = new ByteArrayInputStream(bb.array(), 0, bb.limit()); + is = directCellRead ? new CellReadableByteArrayInputStream(bb.array(), 0, bb.limit()) + : new ByteArrayInputStream(bb.array(), 0, bb.limit()); } finally { if (is != null) is.close(); if (bbos != null) bbos.close(); @@ -221,7 +226,8 @@ public class IPCUtil { CodecPool.returnDecompressor(poolDecompressor); } } else { - is = new ByteArrayInputStream(cellBlock, offset, length); + is = directCellRead ? new CellReadableByteArrayInputStream(cellBlock, offset, length) + : new ByteArrayInputStream(cellBlock, offset, length); } return codec.getDecoder(is); } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java index 163be70..24c1fe6 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java @@ -79,7 +79,7 @@ public class TestIPCUtil { CellScanner cellScanner = sized? getSizedCellScanner(cells): CellUtil.createCellScanner(Arrays.asList(cells).iterator()); ByteBuffer bb = util.buildCellBlock(codec, compressor, cellScanner); - cellScanner = util.createCellScanner(codec, compressor, bb.array(), 0, bb.limit()); + cellScanner = util.createCellScanner(codec, compressor, bb.array(), 0, bb.limit(), false); int i = 0; while (cellScanner.advance()) { i++; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java index 7db1c76..4a41ef6 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java @@ -22,7 +22,9 @@ import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY; import static org.apache.hadoop.hbase.Tag.TAG_LENGTH_SIZE; import java.io.DataOutputStream; +import java.io.EOFException; import java.io.IOException; +import java.io.InputStream; import java.math.BigDecimal; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -34,11 +36,13 @@ import java.util.NavigableMap; import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.io.CellReadable; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.TagCompressionContext; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.ByteRange; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.IOUtils; /** * Utility methods helpful slinging {@link Cell} instances. @@ -326,6 +330,40 @@ public final class CellUtil { } /** + * Create a Cell reading from the raw InputStream. + * @param in Stream to read Cell from + * @param withTags whether the Cell should include tags are not + * @return Created Cell OR if we find a length of zero, we will return + * null which can be useful marking a stream as done. + * @throws IOException + */ + @InterfaceAudience.Private + public static Cell createCell(InputStream in, boolean withTags) throws IOException { + if (in instanceof CellReadable) { + return ((CellReadable) in).readCell(withTags); + } + byte[] intBytes = new byte[Bytes.SIZEOF_INT]; + int bytesRead = 0; + while (bytesRead < intBytes.length) { + int n = in.read(intBytes, bytesRead, intBytes.length - bytesRead); + if (n < 0) { + if (bytesRead == 0) { + throw new EOFException(); + } + throw new IOException("Failed read of int, read " + bytesRead + " bytes"); + } + bytesRead += n; + } + byte[] bytes = new byte[Bytes.toInt(intBytes)]; + IOUtils.readFully(in, bytes, 0, bytes.length); + if (withTags) { + return new KeyValue(bytes, 0, bytes.length); + } else { + return new NoTagsKeyValue(bytes, 0, bytes.length); + } + } + + /** * @param cellScannerables * @return CellScanner interface over cellIterables */ diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 1b71cb4..4c3500f 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1260,6 +1260,9 @@ public final class HConstants { public static final String ZK_SERVER_KERBEROS_PRINCIPAL = "hbase.zookeeper.server.kerberos.principal"; + public static final String USEMSLAB_KEY = "hbase.hregion.memstore.mslab.enabled"; + public static final boolean USEMSLAB_DEFAULT = true; + private HConstants() { // Can't be instantiated with this ctor. } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java index 86f8678..5738789 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java @@ -29,6 +29,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.CellReadable; +import org.apache.hadoop.hbase.util.Bytes; /** * TODO javadoc @@ -51,8 +53,43 @@ public abstract class BaseDecoder implements Codec.Decoder { } } + protected static class CellReadablePBIS extends PBIS implements CellReadable { + + private byte[] intBuf = new byte[Bytes.SIZEOF_INT]; + + public CellReadablePBIS(InputStream in, int size) { + super(in, size); + assert in instanceof CellReadable; + } + + @Override + public Cell readCell(boolean withTags) throws IOException { + int bytesRead = 0; + while (bytesRead < intBuf.length) { + int n = read(intBuf, bytesRead, intBuf.length - bytesRead); + if (n < 0) { + if (bytesRead == 0) { + throw new EOFException(); + } + throw new IOException("Failed read of int, read " + bytesRead + " bytes"); + } + bytesRead += n; + } + return ((CellReadable) in).readCell(Bytes.toInt(intBuf), withTags); + } + + @Override + public Cell readCell(int length, boolean withTags) throws IOException { + return ((CellReadable) in).readCell(length, withTags); + } + } + public BaseDecoder(final InputStream in) { - this.in = new PBIS(in, 1); + if (in instanceof CellReadable) { + this.in = new CellReadablePBIS(in, 1); + } else { + this.in = new PBIS(in, 1); + } } @Override diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java index f99bfcb..8acdd98 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java @@ -22,6 +22,7 @@ import java.io.InputStream; import java.io.OutputStream; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -65,7 +66,7 @@ public class KeyValueCodec implements Codec { protected Cell parseCell() throws IOException { // No tags here - return KeyValueUtil.iscreate(in, false); + return CellUtil.createCell(in, false); } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java index ad762b4..6f11142 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java @@ -22,6 +22,7 @@ import java.io.InputStream; import java.io.OutputStream; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -71,7 +72,7 @@ public class KeyValueCodecWithTags implements Codec { protected Cell parseCell() throws IOException { // create KeyValue with tags - return KeyValueUtil.iscreate(in, true); + return CellUtil.createCell(in, true); } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/CellReadable.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/CellReadable.java new file mode 100644 index 0000000..205c303 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/CellReadable.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io; + +import java.io.IOException; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * This marks an InpuStream for support reading out a {@link Cell} directly from it + */ +@InterfaceAudience.Private +public interface CellReadable { + + Cell readCell(boolean withTags) throws IOException; + + Cell readCell(int length, boolean withTags) throws IOException; +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/CellReadableByteArrayInputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/CellReadableByteArrayInputStream.java new file mode 100644 index 0000000..a9dacd3 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/CellReadableByteArrayInputStream.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io; + +import java.io.ByteArrayInputStream; +import java.io.IOException; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.NoTagsKeyValue; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * This class is intentionally made not thread safe. We will be using it single threaded. + */ +@InterfaceAudience.Private +public class CellReadableByteArrayInputStream extends ByteArrayInputStream implements CellReadable { + + public CellReadableByteArrayInputStream(byte[] buf) { + super(buf); + } + + public CellReadableByteArrayInputStream(byte buf[], int offset, int length) { + super(buf, offset, length); + } + + @Override + public Cell readCell(boolean withTags) throws IOException { + int length = Bytes.toInt(buf, pos); + pos += Bytes.SIZEOF_INT; + return readCell(length, withTags); + } + + @Override + public Cell readCell(int length, boolean withTags) throws IOException { + Cell cell; + if (withTags) { + cell = new KeyValue(buf, pos, length); + } else { + cell = new NoTagsKeyValue(buf, pos, length); + } + pos += length; + return cell; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 922174d..d2b2d33 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -57,7 +57,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -280,6 +279,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { private volatile boolean allowFallbackToSimpleAuth; + private final boolean mslabEnabled; + /** * Datastructure that holds all necessary to a method invocation and then afterward, carries * the result. @@ -1840,7 +1841,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } if (header.hasCellBlockMeta()) { cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec, - buf, offset, buf.length); + buf, offset, buf.length, mslabEnabled); } } catch (Throwable t) { InetSocketAddress address = getListenerAddress(); @@ -2040,6 +2041,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { this.scheduler = scheduler; this.scheduler.init(new RpcSchedulerContext(this)); + this.mslabEnabled = conf.getBoolean(HConstants.USEMSLAB_KEY, HConstants.USEMSLAB_DEFAULT); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index 89ae0d1..b618204 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -68,8 +69,6 @@ import org.apache.htrace.Trace; @InterfaceAudience.Private public class DefaultMemStore implements MemStore { private static final Log LOG = LogFactory.getLog(DefaultMemStore.class); - static final String USEMSLAB_KEY = "hbase.hregion.memstore.mslab.enabled"; - private static final boolean USEMSLAB_DEFAULT = true; static final String MSLAB_CLASS_NAME = "hbase.regionserver.mslab.class"; private Configuration conf; @@ -122,7 +121,7 @@ public class DefaultMemStore implements MemStore { snapshotTimeRangeTracker = new TimeRangeTracker(); this.size = new AtomicLong(DEEP_OVERHEAD); this.snapshotSize = 0; - if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) { + if (conf.getBoolean(HConstants.USEMSLAB_KEY, HConstants.USEMSLAB_DEFAULT)) { String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName()); this.allocator = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] { Configuration.class }, new Object[] { conf }); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index ec70740..ad97f42 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -795,7 +795,7 @@ public class TestDefaultMemStore extends TestCase { */ public void testUpsertMSLAB() throws Exception { Configuration conf = HBaseConfiguration.create(); - conf.setBoolean(DefaultMemStore.USEMSLAB_KEY, true); + conf.setBoolean(HConstants.USEMSLAB_KEY, true); memstore = new DefaultMemStore(conf, CellComparator.COMPARATOR); int ROW_SIZE = 2048; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java index 80333e8..94b1c3f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Random; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -47,7 +48,7 @@ public class TestMemStoreChunkPool { @BeforeClass public static void setUpBeforeClass() throws Exception { - conf.setBoolean(DefaultMemStore.USEMSLAB_KEY, true); + conf.setBoolean(HConstants.USEMSLAB_KEY, true); conf.setFloat(MemStoreChunkPool.CHUNK_POOL_MAXSIZE_KEY, 0.2f); chunkPoolDisabledBeforeTest = MemStoreChunkPool.chunkPoolDisabled; MemStoreChunkPool.chunkPoolDisabled = false;