diff --git a/src/main/java/org/apache/hadoop/hbase/client/Result.java b/src/main/java/org/apache/hadoop/hbase/client/Result.java index 4f09cdd..b0b3545 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/Result.java +++ b/src/main/java/org/apache/hadoop/hbase/client/Result.java @@ -24,6 +24,7 @@ import com.google.common.collect.Ordering; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.SplitKeyValue; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.io.WritableWithSize; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; @@ -65,7 +66,7 @@ import java.util.TreeMap; * through {@link KeyValue#getRow()}, {@link KeyValue#getFamily()}, {@link KeyValue#getQualifier()}, * {@link KeyValue#getTimestamp()}, and {@link KeyValue#getValue()}. */ -public class Result implements Writable { +public class Result implements Writable, WritableWithSize { private static final byte RESULT_VERSION = (byte)1; private KeyValue [] kvs = null; @@ -523,6 +524,22 @@ public class Result implements Writable { this.kvs = kvs.toArray(new KeyValue[kvs.size()]); } + public int getWritableSize() { + if (isEmpty()) + return Bytes.SIZEOF_INT; // int size + + int size = kvs.length; + size *= Bytes.SIZEOF_INT; // 1 int for each kv. + + size += Bytes.SIZEOF_INT; + + for (KeyValue kv : kvs) { + size += kv.getLength() + Bytes.SIZEOF_INT; + } + + return size; + } + public void write(final DataOutput out) throws IOException { if(isEmpty()) { @@ -540,6 +557,29 @@ public class Result implements Writable { } } + public static int getWriteArraySize(Result [] results) { + int size = Bytes.SIZEOF_BYTE; // RESULT_VERSION + if (results == null || results.length == 0) { + size += Bytes.SIZEOF_INT; + return size; + } + + size += Bytes.SIZEOF_INT; // results.length + size += Bytes.SIZEOF_INT; // bufLen + for (Result result : results) { + size += Bytes.SIZEOF_INT; // either 0 or result.size() + if (result == null || result.isEmpty()) + continue; + + for (KeyValue kv : result.raw()) { + size += Bytes.SIZEOF_INT; // kv.getLength(); + size += kv.getLength(); + } + } + + return size; + } + public static void writeArray(final DataOutput out, Result [] results) throws IOException { // Write version when writing array form. diff --git a/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java b/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java index 30dd877..6d0ef6c 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java +++ b/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java @@ -96,7 +96,7 @@ import org.apache.hadoop.io.WritableFactories; * name and reflection to instantiate class was costing in excess of the cell * handling). */ -public class HbaseObjectWritable implements Writable, Configurable { +public class HbaseObjectWritable implements Writable, WritableWithSize, Configurable { protected final static Log LOG = LogFactory.getLog(HbaseObjectWritable.class); // Here we maintain two static maps of classes to code and vice versa. @@ -260,6 +260,10 @@ public class HbaseObjectWritable implements Writable, Configurable { writeObject(out, instance, declaredClass, conf); } + public int getWritableSize() { + return getWritableSize(instance, declaredClass, conf); + } + private static class NullInstance extends Configured implements Writable { Class declaredClass; /** default constructor for writable */ @@ -314,6 +318,22 @@ public class HbaseObjectWritable implements Writable, Configurable { out.writeByte(code); } + + public static int getWritableSize(Object instance, Class declaredClass, + Configuration conf) { + int size = Bytes.SIZEOF_BYTE; // code + if (declaredClass.isArray()) { + if (declaredClass.equals(Result[].class)) { + + return size + Result.getWriteArraySize((Result[])instance); + } + } + if (declaredClass.equals(Result.class)) { + Result r = (Result) instance; + return r.getWritableSize() + size; + } + return 0; // no hint is the default. + } /** * Write a {@link Writable}, {@link String}, primitive type, or an array of * the preceding. diff --git a/src/main/java/org/apache/hadoop/hbase/io/WritableWithSize.java b/src/main/java/org/apache/hadoop/hbase/io/WritableWithSize.java new file mode 100644 index 0000000..68ea178 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/io/WritableWithSize.java @@ -0,0 +1,36 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.io; + +/** + * An optional interface to 'size' writables. + */ +public interface WritableWithSize { + /** + * Provide a size hint to the caller. write() should ideally + * not go beyond this if at all possible. + * + * You can return 0 if there is no size hint. + * + * @return the size of the writable + */ + public int getWritableSize(); +} diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferOutputStream.java b/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferOutputStream.java new file mode 100644 index 0000000..1a75234 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferOutputStream.java @@ -0,0 +1,97 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import org.apache.hadoop.hbase.util.Bytes; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +/** + * Not thread safe! + */ +public class ByteBufferOutputStream extends OutputStream { + + protected ByteBuffer buf; + + public ByteBufferOutputStream(int capacity) { + this(capacity, false); + } + + public ByteBufferOutputStream(int capacity, boolean useDirectByteBuffer) { + if (useDirectByteBuffer) { + buf = ByteBuffer.allocateDirect(capacity); + } else { + buf = ByteBuffer.allocate(capacity); + } + } + + /** + * This flips the underlying BB so be sure to use it _last_! + * @return + */ + public ByteBuffer getByteBuffer() { + buf.flip(); + return buf; + } + + private void checkSizeAndGrow(int extra) { + if ( (buf.position() + extra) >= buf.limit()) { + ByteBuffer newBuf = ByteBuffer.allocate(buf.capacity() * 2); + buf.flip(); + newBuf.put(buf); + buf = newBuf; + } + } + + // OutputStream + @Override + public void write(int b) throws IOException { + checkSizeAndGrow(Bytes.SIZEOF_BYTE); + + buf.put((byte)b); + } + + @Override + public void write(byte[] b) throws IOException { + checkSizeAndGrow(b.length); + + buf.put(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + checkSizeAndGrow(len); + + buf.put(b, off, len); + } + + @Override + public void flush() throws IOException { + // noop + } + + @Override + public void close() throws IOException { + // noop again. heh + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java index e4c356d..a6584a4 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java @@ -55,6 +55,8 @@ import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.io.WritableWithSize; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.ObjectWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; @@ -988,6 +990,8 @@ public abstract class HBaseServer { /** Handles queued calls . */ private class Handler extends Thread { private final BlockingQueue myCallQueue; + static final int BUFFER_INITIAL_SIZE = 1024; + public Handler(final BlockingQueue cq, int instanceNumber) { this.myCallQueue = cq; this.setDaemon(true); @@ -1004,8 +1008,6 @@ public abstract class HBaseServer { public void run() { LOG.info(getName() + ": starting"); SERVER.set(HBaseServer.this); - final int buffersize = 16 * 1024; - ByteArrayOutputStream buf = new ByteArrayOutputStream(buffersize); while (running) { try { Call call = myCallQueue.take(); // pop the queue; maybe blocked here @@ -1031,14 +1033,16 @@ public abstract class HBaseServer { UserGroupInformation.setCurrentUser(previous); CurCall.set(null); - if (buf.size() > buffersize) { - // Allocate a new BAOS as reset only moves size back to zero but - // keeps the buffer of whatever the largest write was -- see - // hbase-900. - buf = new ByteArrayOutputStream(buffersize); - } else { - buf.reset(); + int size = BUFFER_INITIAL_SIZE; + if (value instanceof WritableWithSize) { + // get the size hint. + WritableWithSize ohint = (WritableWithSize)value; + int hint = ohint.getWritableSize(); + if (hint > 0) { + size = hint + Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT; + } } + ByteBufferOutputStream buf = new ByteBufferOutputStream(size); DataOutputStream out = new DataOutputStream(buf); out.writeInt(call.id); // write call id out.writeBoolean(error != null); // write error flag @@ -1049,7 +1053,7 @@ public abstract class HBaseServer { WritableUtils.writeString(out, errorClass); WritableUtils.writeString(out, error); } - call.setResponse(ByteBuffer.wrap(buf.toByteArray())); + call.setResponse(buf.getByteBuffer()); responder.doRespond(call); } catch (InterruptedException e) { if (running) { // unexpected -- log it