commit 28270c818617505b65eb92bdadd8d531daef4a76 Author: Ryan Rawson Date: Fri Oct 1 17:20:56 2010 -0700 attempt to directly use BB to make serialization faster diff --git a/src/main/java/org/apache/hadoop/hbase/client/Result.java b/src/main/java/org/apache/hadoop/hbase/client/Result.java index 9326e01..0a6a5ba 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/Result.java +++ b/src/main/java/org/apache/hadoop/hbase/client/Result.java @@ -24,6 +24,7 @@ import com.google.common.collect.Ordering; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.SplitKeyValue; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.io.WritableToByteBuffer; import org.apache.hadoop.hbase.io.WritableWithSize; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; @@ -31,6 +32,7 @@ import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; @@ -66,7 +68,7 @@ import java.util.TreeMap; * through {@link KeyValue#getRow()}, {@link KeyValue#getFamily()}, {@link KeyValue#getQualifier()}, * {@link KeyValue#getTimestamp()}, and {@link KeyValue#getValue()}. */ -public class Result implements Writable, WritableWithSize { +public class Result implements Writable, WritableToByteBuffer { private static final byte RESULT_VERSION = (byte)1; private KeyValue [] kvs = null; @@ -537,13 +539,11 @@ public class Result implements Writable, WritableWithSize { if (isEmpty()) return Bytes.SIZEOF_INT; // int size - int size = kvs.length; - size *= Bytes.SIZEOF_INT; // 1 int for each kv. - - size += Bytes.SIZEOF_INT; + int size = Bytes.SIZEOF_INT; // totalLen for (KeyValue kv : kvs) { - size += kv.getLength() + Bytes.SIZEOF_INT; + size += Bytes.SIZEOF_INT; // kv.getLength + size += kv.getLength(); } return size; @@ -589,6 +589,58 @@ public class Result implements Writable, WritableWithSize { return size; } + + public void writeWithByteBuffer(ByteBuffer buf, + DataOutput out) throws IOException { + if (isEmpty()) { + buf.putInt(0); + return; + } + int totalLen = 0; + for (KeyValue kv : kvs) { + totalLen += Bytes.SIZEOF_INT + kv.getLength(); + } + buf.putInt(totalLen); + for (KeyValue kv : kvs) { + buf.putInt(kv.getLength()); + buf.put(kv.getBuffer(), kv.getOffset(), kv.getLength()); + } + } + + public static void writeArray(ByteBuffer buf, Result[] results) { + + buf.put(RESULT_VERSION); + if (results == null || results.length == 0) { + buf.putInt(0); + return; + } + + buf.putInt(results.length); + int bufLen = 0; + for (Result result : results) { + bufLen += Bytes.SIZEOF_INT; + if (result == null || result.isEmpty()) { + continue; + } + for (KeyValue key : result.raw()) { + bufLen += key.getLength() + Bytes.SIZEOF_INT; + } + } + buf.putInt(bufLen); + for (Result result : results) { + if (result == null || result.isEmpty()) { + buf.putInt(0); + continue; + } + buf.putInt(result.size()); + for (KeyValue kv : result.raw()) { + buf.putInt(kv.getLength()); + buf.put(kv.getBuffer(), kv.getOffset(), kv.getLength()); + } + } + } + + public static void writeArray(final DataOutput out, Result [] results) throws IOException { // Write version when writing array form. diff --git a/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java b/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java index b9d880b..6d2dc3a 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java +++ b/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java @@ -20,8 +20,10 @@ package org.apache.hadoop.hbase.io; import java.io.DataInput; import java.io.DataOutput; +import java.io.DataOutputStream; import java.io.IOException; import java.lang.reflect.Array; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -70,6 +72,7 @@ import org.apache.hadoop.hbase.filter.SkipFilter; import org.apache.hadoop.hbase.filter.ValueFilter; import org.apache.hadoop.hbase.filter.WhileMatchFilter; import org.apache.hadoop.hbase.filter.WritableByteArrayComparable; +import org.apache.hadoop.hbase.ipc.ByteBufferOutputStream; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; @@ -95,7 +98,7 @@ import org.apache.hadoop.io.WritableFactories; * name and reflection to instantiate class was costing in excess of the cell * handling). */ -public class HbaseObjectWritable implements Writable, WritableWithSize, Configurable { +public class HbaseObjectWritable implements Writable, WritableToByteBuffer, Configurable { protected final static Log LOG = LogFactory.getLog(HbaseObjectWritable.class); // Here we maintain two static maps of classes to code and vice versa. @@ -248,6 +251,10 @@ public class HbaseObjectWritable implements Writable, WritableWithSize, Configur return "OW[class=" + declaredClass + ",value=" + instance + "]"; } + public void writeWithByteBuffer(ByteBuffer buf, + DataOutput out) throws IOException { + writeObject(buf, out, instance, declaredClass, conf); + } public void readFields(DataInput in) throws IOException { readObject(in, this, this.conf); @@ -327,10 +334,18 @@ public class HbaseObjectWritable implements Writable, WritableWithSize, Configur } if (declaredClass.equals(Result.class)) { Result r = (Result) instance; - return r.getWritableSize() + size; + return (2*size) + r.getWritableSize(); } return 0; // no hint is the default. } + + + public static void writeObject(DataOutput out, Object instance, + Class declaredClass, + Configuration conf) throws IOException { + writeObject(null, out, instance, declaredClass, conf); + } + /** * Write a {@link Writable}, {@link String}, primitive type, or an array of * the preceding. @@ -341,7 +356,8 @@ public class HbaseObjectWritable implements Writable, WritableWithSize, Configur * @throws IOException */ @SuppressWarnings("unchecked") - public static void writeObject(DataOutput out, Object instance, + public static void writeObject(ByteBuffer buf, + DataOutput out, Object instance, Class declaredClass, Configuration conf) throws IOException { @@ -360,12 +376,16 @@ public class HbaseObjectWritable implements Writable, WritableWithSize, Configur if (declClass.equals(byte [].class)) { Bytes.writeByteArray(out, (byte [])instanceObj); } else if(declClass.equals(Result [].class)) { - Result.writeArray(out, (Result [])instanceObj); + if (buf != null) { + Result.writeArray(buf, (Result [])instanceObj); + } else { + Result.writeArray(out, (Result [])instanceObj); + } } else { int length = Array.getLength(instanceObj); out.writeInt(length); for (int i = 0; i < length; i++) { - writeObject(out, Array.get(instanceObj, i), + writeObject(buf, out, Array.get(instanceObj, i), declClass.getComponentType(), conf); } } @@ -374,7 +394,7 @@ public class HbaseObjectWritable implements Writable, WritableWithSize, Configur int length = list.size(); out.writeInt(length); for (int i = 0; i < length; i++) { - writeObject(out, list.get(i), + writeObject(buf, out, list.get(i), list.get(i).getClass(), conf); } } else if (declClass == String.class) { // String @@ -411,7 +431,11 @@ public class HbaseObjectWritable implements Writable, WritableWithSize, Configur } else { writeClassCode(out, c); } - ((Writable)instanceObj).write(out); + if (buf != null && instanceObj instanceof WritableToByteBuffer) { + ((WritableToByteBuffer)instanceObj).writeWithByteBuffer(buf, out); + } else { + ((Writable)instanceObj).write(out); + } } else { throw new IOException("Can't write: "+instanceObj+" as "+declClass); } diff --git a/src/main/java/org/apache/hadoop/hbase/io/WritableToByteBuffer.java b/src/main/java/org/apache/hadoop/hbase/io/WritableToByteBuffer.java new file mode 100644 index 0000000..9595f09 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/io/WritableToByteBuffer.java @@ -0,0 +1,44 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.io; + +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * A Writable which supports _writing_ to a byte buffer. Extends WritableWithSize because + * the caller code needs a hint of how big to size your ByteBuffer. + */ +public interface WritableToByteBuffer extends WritableWithSize { + + /** + * Write with either a byte buffer, or a output stream depending on which you'd rather prefer. + * The ByteBuffer is more performant. + * + * @param buf + * @param out + * @throws IOException + */ + public void writeWithByteBuffer(ByteBuffer buf, + DataOutput out) throws IOException; +} diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferOutputStream.java b/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferOutputStream.java index d7e4b9c..786e4c0 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferOutputStream.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferOutputStream.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.ipc; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; @@ -30,14 +32,11 @@ import java.nio.ByteBuffer; * Not thread safe! */ public class ByteBufferOutputStream extends OutputStream { + private static final Log LOG = LogFactory.getLog(ByteBufferOutputStream.class); protected ByteBuffer buf; - ByteBufferOutputStream(int capacity, boolean useDirectByteBuffer) { - if (useDirectByteBuffer) { - buf = ByteBuffer.allocateDirect(capacity); - } else { - buf = ByteBuffer.allocate(capacity); - } + public ByteBufferOutputStream(ByteBuffer buf) { + this.buf = buf; } /** @@ -50,7 +49,9 @@ public class ByteBufferOutputStream extends OutputStream { } private void checkSizeAndGrow(int extra) { - if ( (buf.position() + extra) >= buf.limit()) { + if ( (buf.position() + extra) > buf.limit()) { + //LOG.info("XXX Expanding buffer size: " + buf.capacity() + " to 2x that="+buf.capacity()*2 + // + " due to extra size: " + extra); ByteBuffer newBuf = ByteBuffer.allocate(buf.capacity() * 2); buf.flip(); newBuf.put(buf); diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java index e23a629..b4942f7 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java @@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.io.HbaseObjectWritable; +import org.apache.hadoop.hbase.io.WritableToByteBuffer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; import org.apache.hadoop.ipc.VersionedProtocol; @@ -545,7 +546,7 @@ public class HBaseRPC { } @Override - public Writable call(Writable param, long receivedTime) throws IOException { + public WritableToByteBuffer call(Writable param, long receivedTime) throws IOException { try { Invocation call = (Invocation)param; if(call.getMethodName() == null) { diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java index 9cd1361..7a4c0d0 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java @@ -55,6 +55,7 @@ import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.io.WritableToByteBuffer; import org.apache.hadoop.hbase.io.WritableWithSize; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.ObjectWritable; @@ -211,6 +212,7 @@ public abstract class HBaseServer { protected int id; // the client's call id protected Writable param; // the parameter passed protected Connection connection; // connection to client + protected long nanoTS = 0; protected long timestamp; // the time received when response is null // the time served when response is not null protected ByteBuffer response; // the response for this call @@ -704,6 +706,8 @@ public abstract class HBaseServer { // Extract the first call // call = responseQueue.removeFirst(); + if (call.nanoTS == 0) call.nanoTS = System.nanoTime(); + SocketChannel channel = call.connection.channel; if (LOG.isDebugEnabled()) { LOG.debug(getName() + ": responding to #" + call.id + " from " + @@ -728,6 +732,8 @@ public abstract class HBaseServer { LOG.debug(getName() + ": responding to #" + call.id + " from " + call.connection + " Wrote " + numBytes + " bytes."); } + long diff = System.nanoTime() - call.nanoTS; + LOG.warn("Finished response in nanotime: " + diff); } else { // // If we were unable to write the entire response out, then @@ -738,6 +744,7 @@ public abstract class HBaseServer { if (inHandler) { // set the serve time when the response has to be sent later call.timestamp = System.currentTimeMillis(); + call.nanoTS = System.nanoTime(); incPending(); try { @@ -1018,7 +1025,7 @@ public abstract class HBaseServer { String errorClass = null; String error = null; - Writable value = null; + WritableToByteBuffer value = null; CurCall.set(call); UserGroupInformation previous = UserGroupInformation.getCurrentUGI(); @@ -1034,29 +1041,33 @@ public abstract class HBaseServer { CurCall.set(null); int size = BUFFER_INITIAL_SIZE; - boolean useDirectBB = false; if (value instanceof WritableWithSize) { - // get the size hint. WritableWithSize ohint = (WritableWithSize)value; int hint = ohint.getWritableSize(); if (hint > 0) { size = hint + Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT; - useDirectBB = true; // exact match use directBB + //LOG.info("XXX Using hinted buffer size: " + size); } } - // TODO switch false => useDirectBB - ByteBufferOutputStream buf = new ByteBufferOutputStream(size, false); - DataOutputStream out = new DataOutputStream(buf); - out.writeInt(call.id); // write call id - out.writeBoolean(error != null); // write error flag + long start = System.nanoTime(); + ByteBuffer buf = ByteBuffer.allocate(size); + buf.putInt(call.id); + buf.put( (byte) ((error != null) ? 1 : 0) ) ; + + ByteBufferOutputStream b = new ByteBufferOutputStream(buf); + DataOutputStream out = new DataOutputStream(b); if (error == null) { - value.write(out); + + value.writeWithByteBuffer(buf, out); } else { WritableUtils.writeString(out, errorClass); WritableUtils.writeString(out, error); } - call.setResponse(buf.getByteBuffer()); + buf.flip(); + long len = System.nanoTime() - start; + LOG.info("Time to serialize result: " + len); + call.setResponse(buf); responder.doRespond(call); } catch (InterruptedException e) { if (running) { // unexpected -- log it @@ -1236,8 +1247,8 @@ public abstract class HBaseServer { * @return Writable * @throws IOException e */ - public abstract Writable call(Writable param, long receiveTime) - throws IOException; + public abstract WritableToByteBuffer call(Writable param, long receiveTime) + throws IOException; /** * The number of open RPC conections diff --git a/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java b/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java index 32ccbea..8fd7e56 100644 --- a/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java +++ b/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java @@ -195,7 +195,7 @@ public abstract class HBaseTestCase extends TestCase { HTableDescriptor htd = new HTableDescriptor(name); htd.addFamily(new HColumnDescriptor(fam1, versions, HColumnDescriptor.DEFAULT_COMPRESSION, false, false, - Integer.MAX_VALUE, HConstants.FOREVER, + Integer.MAX_VALUE, HConstants.FOREVER, HColumnDescriptor.DEFAULT_BLOOMFILTER, HConstants.REPLICATION_SCOPE_LOCAL)); htd.addFamily(new HColumnDescriptor(fam2, versions,