Index: src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (revision 10029) +++ src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (working copy) @@ -23,6 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.util.HBaseByteArrayOutputStream; import org.apache.hadoop.io.ObjectWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; @@ -91,6 +92,23 @@ protected static final ThreadLocal SERVER = new ThreadLocal(); + /** Parameter name for max response buffer size to reuse (keep around) . */ + private static final String MAX_REUSE_BUFFER_SIZE = + "hbase.ipc.max.reuse.buffer.size"; + + /** Default value for above param */ + private static final int DEFAULT_MAX_REUSE_BUFFER_SIZE = 1 * 1024 * 1024; + + /** Parameter name for response size above which to log warning */ + private static final String WARN_RESPONSE_SIZE = + "hbase.ipc.warn.response.size"; + + /** Default value for above param */ + private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024; + + private final int maxReuseBufferSize; + private final int warnResponseSize; + /** Returns the server instance called under or null. May be called under * {@link #call(Writable, long)} implementations, and under {@link Writable} * methods of paramters and return values. Permits applications to access @@ -899,7 +917,7 @@ LOG.info(getName() + ": starting"); SERVER.set(HBaseServer.this); final int buffersize = 16 * 1024; - ByteArrayOutputStream buf = new ByteArrayOutputStream(buffersize); + ByteArrayOutputStream buf = new HBaseByteArrayOutputStream(buffersize); while (running) { try { Call call = callQueue.take(); // pop the queue; maybe blocked here @@ -925,14 +943,6 @@ UserGroupInformation.setCurrentUser(previous); CurCall.set(null); - if (buf.size() > buffersize) { - // Allocate a new BAOS as reset only moves size back to zero but - // keeps the buffer of whatever the largest write was -- see - // hbase-900. - buf = new ByteArrayOutputStream(buffersize); - } else { - buf.reset(); - } DataOutputStream out = new DataOutputStream(buf); out.writeInt(call.id); // write call id out.writeBoolean(error != null); // write error flag @@ -943,8 +953,24 @@ WritableUtils.writeString(out, errorClass); WritableUtils.writeString(out, error); } + call.setResponse(ByteBuffer.wrap(buf.toByteArray())); responder.doRespond(call); + + // issue a warning for large responses... + if (buf.size() > warnResponseSize) { + LOG.warn(getName()+", responseTooLarge for: "+call+": Size: " + + StringUtils.humanReadableInt(buf.size())); + } + + if (buf.size() > maxReuseBufferSize) { + // Allocate a new BAOS as reset only moves size back to zero but + // keeps the buffer of whatever the largest write was -- see + // hbase-900. + buf = new HBaseByteArrayOutputStream(buffersize); + } else { + buf.reset(); + } } catch (InterruptedException e) { if (running) { // unexpected -- log it LOG.info(getName() + " caught: " + @@ -997,6 +1023,10 @@ this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000); this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10); this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000); + this.maxReuseBufferSize = conf.getInt(MAX_REUSE_BUFFER_SIZE, + DEFAULT_MAX_REUSE_BUFFER_SIZE); + this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, + DEFAULT_WARN_RESPONSE_SIZE); // Start the listener here and let it bind to the port listener = new Listener(); Index: src/main/java/org/apache/hadoop/hbase/util/HBaseByteArrayOutputStream.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/HBaseByteArrayOutputStream.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/util/HBaseByteArrayOutputStream.java (revision 0) @@ -0,0 +1,91 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import java.io.ByteArrayOutputStream; +import java.util.Arrays; + +/** + * An extension of ByteArrayOutputStream to better handle buffers + * in the 1-2G range. + * The issue with ByteArrayOutputStream is that the logic to + * grow the buffer by doubling each time was doing << 1 without + * checking for int overflow. So, if the buffer was say at 1.1g, + * the next shift would cause it to become negative. At this point + * the implementation would fall into a pathological case of growing + * just the needed amount on each write, and resulting in lots + * and lots of resizing/copies. + */ +public class HBaseByteArrayOutputStream extends ByteArrayOutputStream { + + public HBaseByteArrayOutputStream(int size) { + super(size); + } + + /** + * Writes the specified byte to this byte array output stream. + * + * @param b the byte to be written. + */ + public synchronized void write(int b) { + // are we already at the limit? + if (count == Integer.MAX_VALUE) { + throw new IndexOutOfBoundsException(); + } + int newcount = count + 1; + if (newcount > buf.length) { + int newSize = (int)Math.min((((long)buf.length) << 1), + (long)(Integer.MAX_VALUE)); + buf = Arrays.copyOf(buf, Math.max(newSize, newcount)); + } + buf[count] = (byte)b; + count = newcount; + } + + /** + * Writes len bytes from the specified byte array + * starting at offset off to this byte array output stream. + * + * @param b the data. + * @param off the start offset in the data. + * @param len the number of bytes to write. + */ + public synchronized void write(byte b[], int off, int len) { + if ((off < 0) || (off > b.length) || (len < 0) || + ((off + len) > b.length) || ((off + len) < 0)) { + throw new IndexOutOfBoundsException(); + } else if (len == 0) { + return; + } + long newcount = (long)count + (long)len; + if (newcount > Integer.MAX_VALUE) { + throw new IndexOutOfBoundsException(); + } + if (newcount > buf.length) { + int newSize = (int)Math.min((((long)buf.length) << 1), + (long)(Integer.MAX_VALUE)); + buf = Arrays.copyOf(buf, Math.max(newSize, (int)newcount)); + } + System.arraycopy(b, off, buf, count, len); + count = (int)newcount; + } +} + Index: src/main/java/org/apache/hadoop/hbase/client/Result.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/Result.java (revision 10029) +++ src/main/java/org/apache/hadoop/hbase/client/Result.java (working copy) @@ -20,11 +20,14 @@ package org.apache.hadoop.hbase.client; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.SplitKeyValue; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.StringUtils; import java.io.DataInput; import java.io.DataOutput; @@ -64,7 +67,7 @@ */ public class Result implements Writable { private static final byte RESULT_VERSION = (byte)1; - + private static final Log LOG = LogFactory.getLog(Result.class); private KeyValue [] kvs = null; private NavigableMap>> familyMap = null; @@ -432,11 +435,14 @@ if(isEmpty()) { out.writeInt(0); } else { - int totalLen = 0; + long totalLen = 0; for(KeyValue kv : kvs) { totalLen += kv.getLength() + Bytes.SIZEOF_INT; + if (totalLen > Integer.MAX_VALUE) { + resultTooLarge(totalLen, kv); + } } - out.writeInt(totalLen); + out.writeInt((int)totalLen); for(KeyValue kv : kvs) { out.writeInt(kv.getLength()); out.write(kv.getBuffer(), kv.getOffset(), kv.getLength()); @@ -444,6 +450,18 @@ } } + // Complain that response is too large, and raise an error. + // The passed in kv is one of the kvs in the result and is intended + // to help aid debugging. + private static void resultTooLarge(long responseSize, KeyValue kv) + throws IOException { + String error = ("resultTooLarge: " + + StringUtils.humanReadableInt(responseSize) + + "; one KV in result: " + kv); + LOG.error(error); + throw new IOException(error); + } + public static void writeArray(final DataOutput out, Result [] results) throws IOException { // Write version when writing array form. @@ -456,7 +474,7 @@ return; } out.writeInt(results.length); - int bufLen = 0; + long bufLen = 0; for(Result result : results) { bufLen += Bytes.SIZEOF_INT; if(result == null || result.isEmpty()) { @@ -464,9 +482,13 @@ } for(KeyValue key : result.raw()) { bufLen += key.getLength() + Bytes.SIZEOF_INT; + if (bufLen > Integer.MAX_VALUE) { + resultTooLarge(bufLen, key); + } } } - out.writeInt(bufLen); + + out.writeInt((int)bufLen); for(Result result : results) { if(result == null || result.isEmpty()) { out.writeInt(0);