diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
index 734227c..ff2f7a3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
+import org.apache.hadoop.hbase.io.CellReadableByteArrayInputStream;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
@@ -180,7 +181,7 @@ public class IPCUtil {
public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
final byte [] cellBlock)
throws IOException {
- return createCellScanner(codec, compressor, cellBlock, 0, cellBlock.length);
+ return createCellScanner(codec, compressor, cellBlock, 0, cellBlock.length, false);
}
/**
@@ -188,11 +189,14 @@ public class IPCUtil {
* @param cellBlock
* @param offset
* @param length
+ * @param directCellRead
+ * Whether to make Cells directly from the cellBlock bytes or need to copy. Pass false
+ * while using from client side.
* @return CellScanner to work against the content of cellBlock
* @throws IOException
*/
public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
- final byte [] cellBlock, final int offset, final int length)
+ final byte [] cellBlock, final int offset, final int length, boolean directCellRead)
throws IOException {
// If compressed, decompress it first before passing it on else we will leak compression
// resources if the stream is not closed properly after we let it out.
@@ -213,7 +217,8 @@ public class IPCUtil {
IOUtils.copy(cis, bbos);
bbos.close();
ByteBuffer bb = bbos.getByteBuffer();
- is = new ByteArrayInputStream(bb.array(), 0, bb.limit());
+ is = directCellRead ? new CellReadableByteArrayInputStream(bb.array(), 0, bb.limit())
+ : new ByteArrayInputStream(bb.array(), 0, bb.limit());
} finally {
if (is != null) is.close();
if (bbos != null) bbos.close();
@@ -221,7 +226,8 @@ public class IPCUtil {
CodecPool.returnDecompressor(poolDecompressor);
}
} else {
- is = new ByteArrayInputStream(cellBlock, offset, length);
+ is = directCellRead ? new CellReadableByteArrayInputStream(cellBlock, offset, length)
+ : new ByteArrayInputStream(cellBlock, offset, length);
}
return codec.getDecoder(is);
}
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java
index 163be70..24c1fe6 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java
@@ -79,7 +79,7 @@ public class TestIPCUtil {
CellScanner cellScanner = sized? getSizedCellScanner(cells):
CellUtil.createCellScanner(Arrays.asList(cells).iterator());
ByteBuffer bb = util.buildCellBlock(codec, compressor, cellScanner);
- cellScanner = util.createCellScanner(codec, compressor, bb.array(), 0, bb.limit());
+ cellScanner = util.createCellScanner(codec, compressor, bb.array(), 0, bb.limit(), false);
int i = 0;
while (cellScanner.advance()) {
i++;
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
index 1b38b56..b2fb938 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
@@ -22,7 +22,9 @@ import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY;
import static org.apache.hadoop.hbase.Tag.TAG_LENGTH_SIZE;
import java.io.DataOutputStream;
+import java.io.EOFException;
import java.io.IOException;
+import java.io.InputStream;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -34,10 +36,12 @@ import java.util.NavigableMap;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.io.CellReadable;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.ByteRange;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.IOUtils;
/**
* Utility methods helpful slinging {@link Cell} instances.
@@ -325,6 +329,40 @@ public final class CellUtil {
}
/**
+ * Create a Cell reading from the raw InputStream.
+ * @param in Stream to read Cell from
+ * @param withTags whether the Cell should include tags are not
+ * @return Created Cell OR if we find a length of zero, we will return
+ * null which can be useful marking a stream as done.
+ * @throws IOException
+ */
+ @InterfaceAudience.Private
+ public static Cell createCell(InputStream in, boolean withTags) throws IOException {
+ if (in instanceof CellReadable) {
+ return ((CellReadable) in).readCell(withTags);
+ }
+ byte[] intBytes = new byte[Bytes.SIZEOF_INT];
+ int bytesRead = 0;
+ while (bytesRead < intBytes.length) {
+ int n = in.read(intBytes, bytesRead, intBytes.length - bytesRead);
+ if (n < 0) {
+ if (bytesRead == 0) {
+ throw new EOFException();
+ }
+ throw new IOException("Failed read of int, read " + bytesRead + " bytes");
+ }
+ bytesRead += n;
+ }
+ byte[] bytes = new byte[Bytes.toInt(intBytes)];
+ IOUtils.readFully(in, bytes, 0, bytes.length);
+ if (withTags) {
+ return new KeyValue(bytes, 0, bytes.length);
+ } else {
+ return new NoTagsKeyValue(bytes, 0, bytes.length);
+ }
+ }
+
+ /**
* @param cellScannerables
* @return CellScanner interface over cellIterables
*/
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 1b71cb4..4c3500f 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1260,6 +1260,9 @@ public final class HConstants {
public static final String ZK_SERVER_KERBEROS_PRINCIPAL =
"hbase.zookeeper.server.kerberos.principal";
+ public static final String USEMSLAB_KEY = "hbase.hregion.memstore.mslab.enabled";
+ public static final boolean USEMSLAB_DEFAULT = true;
+
private HConstants() {
// Can't be instantiated with this ctor.
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java
index 86f8678..5738789 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java
@@ -29,6 +29,8 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.io.CellReadable;
+import org.apache.hadoop.hbase.util.Bytes;
/**
* TODO javadoc
@@ -51,8 +53,43 @@ public abstract class BaseDecoder implements Codec.Decoder {
}
}
+ protected static class CellReadablePBIS extends PBIS implements CellReadable {
+
+ private byte[] intBuf = new byte[Bytes.SIZEOF_INT];
+
+ public CellReadablePBIS(InputStream in, int size) {
+ super(in, size);
+ assert in instanceof CellReadable;
+ }
+
+ @Override
+ public Cell readCell(boolean withTags) throws IOException {
+ int bytesRead = 0;
+ while (bytesRead < intBuf.length) {
+ int n = read(intBuf, bytesRead, intBuf.length - bytesRead);
+ if (n < 0) {
+ if (bytesRead == 0) {
+ throw new EOFException();
+ }
+ throw new IOException("Failed read of int, read " + bytesRead + " bytes");
+ }
+ bytesRead += n;
+ }
+ return ((CellReadable) in).readCell(Bytes.toInt(intBuf), withTags);
+ }
+
+ @Override
+ public Cell readCell(int length, boolean withTags) throws IOException {
+ return ((CellReadable) in).readCell(length, withTags);
+ }
+ }
+
public BaseDecoder(final InputStream in) {
- this.in = new PBIS(in, 1);
+ if (in instanceof CellReadable) {
+ this.in = new CellReadablePBIS(in, 1);
+ } else {
+ this.in = new PBIS(in, 1);
+ }
}
@Override
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java
index f99bfcb..8acdd98 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java
@@ -22,6 +22,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -65,7 +66,7 @@ public class KeyValueCodec implements Codec {
protected Cell parseCell() throws IOException {
// No tags here
- return KeyValueUtil.iscreate(in, false);
+ return CellUtil.createCell(in, false);
}
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java
index ad762b4..6f11142 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java
@@ -22,6 +22,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -71,7 +72,7 @@ public class KeyValueCodecWithTags implements Codec {
protected Cell parseCell() throws IOException {
// create KeyValue with tags
- return KeyValueUtil.iscreate(in, true);
+ return CellUtil.createCell(in, true);
}
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/CellReadable.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/CellReadable.java
new file mode 100644
index 0000000..205c303
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/CellReadable.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * This marks an InpuStream for support reading out a {@link Cell} directly from it
+ */
+@InterfaceAudience.Private
+public interface CellReadable {
+
+ Cell readCell(boolean withTags) throws IOException;
+
+ Cell readCell(int length, boolean withTags) throws IOException;
+}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/CellReadableByteArrayInputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/CellReadableByteArrayInputStream.java
new file mode 100644
index 0000000..e9ca29e
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/CellReadableByteArrayInputStream.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.NoTagsKeyValue;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * This class is intentionally made not thread safe. We will be using it single threaded.
+ */
+@InterfaceAudience.Private
+public class CellReadableByteArrayInputStream extends ByteArrayInputStream implements CellReadable {
+
+ // Buffer used to read an int from the stream
+ private byte[] intBuf = null;
+
+ public CellReadableByteArrayInputStream(byte[] buf) {
+ super(buf);
+ }
+
+ public CellReadableByteArrayInputStream(byte buf[], int offset, int length) {
+ super(buf, offset, length);
+ }
+
+ @Override
+ public Cell readCell(boolean withTags) throws IOException {
+ if (intBuf == null) {
+ // Lazy init. In real flow, we will use the readCell(int, boolean) API only
+ intBuf = new byte[Bytes.SIZEOF_INT];
+ }
+ int bytesRead = 0;
+ while (bytesRead < intBuf.length) {
+ int n = read(intBuf, bytesRead, intBuf.length - bytesRead);
+ if (n < 0) {
+ if (bytesRead == 0) {
+ throw new EOFException();
+ }
+ throw new IOException("Failed read of int, read " + bytesRead + " bytes");
+ }
+ bytesRead += n;
+ }
+ return readCell(Bytes.toInt(intBuf), withTags);
+ }
+
+ @Override
+ public Cell readCell(int length, boolean withTags) throws IOException {
+ if (withTags) {
+ return new KeyValue(buf, pos, length);
+ }
+ return new NoTagsKeyValue(buf, pos, length);
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 1155751..afb6c65 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -57,7 +57,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -279,6 +278,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
private volatile boolean allowFallbackToSimpleAuth;
+ private final boolean mslabEnabled;
+
/**
* Datastructure that holds all necessary to a method invocation and then afterward, carries
* the result.
@@ -1846,7 +1847,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
if (header.hasCellBlockMeta()) {
cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec,
- buf, offset, buf.length);
+ buf, offset, buf.length, mslabEnabled);
}
} catch (Throwable t) {
InetSocketAddress address = getListenerAddress();
@@ -2035,6 +2036,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
this.scheduler = scheduler;
this.scheduler.init(new RpcSchedulerContext(this));
+ this.mslabEnabled = conf.getBoolean(HConstants.USEMSLAB_KEY, HConstants.USEMSLAB_DEFAULT);
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
index 89ae0d1..b618204 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -68,8 +69,6 @@ import org.apache.htrace.Trace;
@InterfaceAudience.Private
public class DefaultMemStore implements MemStore {
private static final Log LOG = LogFactory.getLog(DefaultMemStore.class);
- static final String USEMSLAB_KEY = "hbase.hregion.memstore.mslab.enabled";
- private static final boolean USEMSLAB_DEFAULT = true;
static final String MSLAB_CLASS_NAME = "hbase.regionserver.mslab.class";
private Configuration conf;
@@ -122,7 +121,7 @@ public class DefaultMemStore implements MemStore {
snapshotTimeRangeTracker = new TimeRangeTracker();
this.size = new AtomicLong(DEEP_OVERHEAD);
this.snapshotSize = 0;
- if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) {
+ if (conf.getBoolean(HConstants.USEMSLAB_KEY, HConstants.USEMSLAB_DEFAULT)) {
String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName());
this.allocator = ReflectionUtils.instantiateWithCustomCtor(className,
new Class[] { Configuration.class }, new Object[] { conf });
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
index ec70740..ad97f42 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
@@ -795,7 +795,7 @@ public class TestDefaultMemStore extends TestCase {
*/
public void testUpsertMSLAB() throws Exception {
Configuration conf = HBaseConfiguration.create();
- conf.setBoolean(DefaultMemStore.USEMSLAB_KEY, true);
+ conf.setBoolean(HConstants.USEMSLAB_KEY, true);
memstore = new DefaultMemStore(conf, CellComparator.COMPARATOR);
int ROW_SIZE = 2048;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
index 80333e8..94b1c3f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -47,7 +48,7 @@ public class TestMemStoreChunkPool {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
- conf.setBoolean(DefaultMemStore.USEMSLAB_KEY, true);
+ conf.setBoolean(HConstants.USEMSLAB_KEY, true);
conf.setFloat(MemStoreChunkPool.CHUNK_POOL_MAXSIZE_KEY, 0.2f);
chunkPoolDisabledBeforeTest = MemStoreChunkPool.chunkPoolDisabled;
MemStoreChunkPool.chunkPoolDisabled = false;