diff --git a/pom.xml b/pom.xml index 67729d6..a21dcf8 100644 --- a/pom.xml +++ b/pom.xml @@ -18,7 +18,7 @@ org.apache.hbase hbase jar - 0.90.0-ryanp3 + 0.90.0-ryanq2 HBase HBase is the &lt;a href="http://hadoop.apache.org"&rt;Hadoop</a&rt; database. Use it when you need diff --git a/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 4e0e21a..a80eed3 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; +import org.apache.hadoop.hbase.ipc.NanoProfiler; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Writables; @@ -1022,6 +1023,7 @@ public class HTable implements HTableInterface { // Close the previous scanner if it's open if (this.callable != null) { this.callable.setClose(); + NanoProfiler.split("callable_close"); getConnection().getRegionServerWithRetries(callable); this.callable = null; } @@ -1055,6 +1057,7 @@ public class HTable implements HTableInterface { Bytes.toStringBinary(localStartKey) + "'"); } try { + NanoProfiler.split("newScanner_callable"); callable = getScannerCallable(localStartKey, nbRows); // Open a scanner on the region server starting at the // beginning of the region @@ -1097,10 +1100,12 @@ public class HTable implements HTableInterface { // Server returns a null values if scanning is to stop. Else, // returns an empty array if scanning is to go on and we've just // exhausted current region. + NanoProfiler.split("callable"); values = getConnection().getRegionServerWithRetries(callable); if (skipFirst) { skipFirst = false; // Reget. + NanoProfiler.split("callable2"); values = getConnection().getRegionServerWithRetries(callable); } } catch (DoNotRetryIOException e) { @@ -1136,6 +1141,7 @@ public class HTable implements HTableInterface { continue; } lastNext = System.currentTimeMillis(); + NanoProfiler.split("cache_add"); if (values != null && values.length > 0) { for (Result rs : values) { cache.add(rs); @@ -1147,10 +1153,13 @@ public class HTable implements HTableInterface { } } // Values == null means server-side filter has determined we must STOP - } while (remainingResultSize > 0 && countdown > 0 && nextScanner(countdown, values == null)); + } while (remainingResultSize > 0 && + countdown > 0 && + nextScanner(countdown, values == null)); } if (cache.size() > 0) { + NanoProfiler.split("cache.poll"); return cache.poll(); } return null; diff --git a/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 5ea38b4..8a92a10 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.RemoteExceptionHandler; +import org.apache.hadoop.hbase.ipc.NanoProfiler; import org.apache.hadoop.ipc.RemoteException; @@ -70,12 +71,15 @@ public class ScannerCallable extends ServerCallable { */ public Result [] call() throws IOException { if (scannerId != -1L && closed) { + NanoProfiler.split("close"); close(); } else if (scannerId == -1L && !closed) { + NanoProfiler.split("openScanner"); this.scannerId = openScanner(); } else { Result [] rrs = null; try { + NanoProfiler.split("next"); // calling into the proxy. rrs = server.next(scannerId, caching); } catch (IOException e) { IOException ioe = null; @@ -103,6 +107,7 @@ public class ScannerCallable extends ServerCallable { return; } try { + NanoProfiler.split("server.close"); this.server.close(this.scannerId); } catch (IOException e) { LOG.warn("Ignore, probably already closed", e); @@ -111,6 +116,7 @@ public class ScannerCallable extends ServerCallable { } protected long openScanner() throws IOException { + NanoProfiler.split("server.openScanner"); return this.server.openScanner(this.location.getRegionInfo().getRegionName(), this.scan); } diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferInputStream.java b/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferInputStream.java new file mode 100644 index 0000000..2d68744 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferInputStream.java @@ -0,0 +1,66 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +public class ByteBufferInputStream extends InputStream { + + protected ByteBuffer buf; + + public ByteBufferInputStream(ByteBuffer b) { + buf = b; + b.rewind(); + } + + private boolean isEof() { + return !buf.hasRemaining(); + } + + public ByteBuffer getByteBuffer() { + return buf; + } + + @Override + public int read() throws IOException { + if (isEof()) return -1; + return (int) (buf.get() & 0xFF); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (isEof()) return -1; + int amt = buf.remaining(); + if (len > amt) { + buf.get(b, off, amt); + return amt; + } + buf.get(b, off, len); + return len; + } + + @Override + public int available() throws IOException { + return buf.remaining(); + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java index 10d38de..d248f32 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java @@ -47,6 +47,8 @@ import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketTimeoutException; import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; import java.util.Hashtable; import java.util.Iterator; import java.util.Map.Entry; @@ -139,8 +141,8 @@ public class HBaseClient { private class Call { final int id; // call id final Writable param; // parameter - Writable value; // value, null if error - IOException error; // exception, null if value + ByteBuffer response = null; // the raw response buffer + IOException exception = null; // Exception sometimes boolean done; // true when call is done protected Call(Writable param) { @@ -157,25 +159,19 @@ public class HBaseClient { notify(); // notify caller } - /** Set the exception when there is an error. - * Notify the caller the call is done. - * - * @param error exception thrown by the call; either local or remote + /** + * Set the response buffer. */ - public synchronized void setException(IOException error) { - this.error = error; + public synchronized void setResponse(ByteBuffer response) { + this.response = response; callComplete(); } - /** Set the return value when there is no error. - * Notify the caller the call is done. - * - * @param value return value of the call. - */ - public synchronized void setValue(Writable value) { - this.value = value; + public synchronized void setException(IOException ex) { + this.exception = ex; callComplete(); } + } /** Thread that reads responses and notifies callers. Each connection owns a @@ -307,6 +303,8 @@ public class HBaseClient { this.socket = socketFactory.createSocket(); this.socket.setTcpNoDelay(tcpNoDelay); this.socket.setKeepAlive(tcpKeepAlive); + this.socket.setReceiveBufferSize(256*1024); + this.socket.setSendBufferSize(256*1024); // connection time out is 20s NetUtils.connect(this.socket, remoteId.getAddress(), 20000); if (remoteId.rpcTimeout > 0) { @@ -320,6 +318,7 @@ public class HBaseClient { handleConnectionFailure(ioFailures++, maxRetries, ie); } } + SocketChannel channel = this.socket.getChannel(); this.in = new DataInputStream(new BufferedInputStream (new PingInputStream(NetUtils.getInputStream(socket)))); this.out = new DataOutputStream @@ -481,8 +480,11 @@ public class HBaseClient { DataOutputBuffer d=null; try { + NanoProfiler.split(call.id, "sendParam_sync"); //noinspection SynchronizeOnNonFinalField synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC + NanoProfiler.split(call.id, "serAndSendParam"); + if (LOG.isDebugEnabled()) LOG.debug(getName() + " sending #" + call.id); @@ -497,6 +499,9 @@ public class HBaseClient { // fill in the placeholder Bytes.putInt(data, 0, dataLength - 4); out.write(data, 0, dataLength); + + NanoProfiler.split(call.id, "flush"); + out.flush(); } } catch(IOException e) { @@ -518,25 +523,29 @@ public class HBaseClient { touch(); try { + int size = in.readInt(); // size includes id int id = in.readInt(); // try to read an id + NanoProfiler.split(id, "receiveResponse"); if (LOG.isDebugEnabled()) LOG.debug(getName() + " got value #" + id); Call call = calls.get(id); - boolean isError = in.readBoolean(); // read if error - if (isError) { - //noinspection ThrowableInstanceNeverThrown - call.setException(new RemoteException( WritableUtils.readString(in), - WritableUtils.readString(in))); - calls.remove(id); - } else { - Writable value = ReflectionUtils.newInstance(valueClass, conf); - value.readFields(in); // read value - call.setValue(value); - calls.remove(id); - } + size -= 4; // 4 byte off for id because we already read it. + + ByteBuffer buf = ByteBuffer.allocate(size); + + IOUtils.readFully(in, buf.array(), buf.arrayOffset(), size); + //socket.getChannel().read() + + // since we went straight to the array like losers, lets do this. + buf.limit(size); + buf.rewind(); + NanoProfiler.split(id, "setResponse", "Data size: " + size); + call.setResponse(buf); + calls.remove(id); + } catch (IOException e) { markClosed(e); } @@ -623,21 +632,21 @@ public class HBaseClient { /** Result collector for parallel calls. */ private static class ParallelResults { - protected final Writable[] values; + protected final ByteBuffer[] responses; protected int size; protected int count; public ParallelResults(int size) { - this.values = new Writable[size]; + this.responses = new ByteBuffer[size]; this.size = size; } /* * Collect a result. */ - synchronized void callComplete(ParallelCall call) { + protected synchronized void callComplete(ParallelCall call) { // FindBugs IS2_INCONSISTENT_SYNC - values[call.index] = call.value; // store the value + responses[call.index] = call.response; count++; // count it if (count == size) // if all values are in notify(); // then notify waiting caller @@ -729,13 +738,17 @@ public class HBaseClient { UserGroupInformation ticket, int rpcTimeout) throws IOException { Call call = new Call(param); + NanoProfiler.setCallId(call.id); // tie the threadId with the call id Connection connection = getConnection(addr, ticket, rpcTimeout, call); connection.sendParam(call); // send the parameter boolean interrupted = false; + + NanoProfiler.split("call_sync"); //noinspection SynchronizationOnLocalVariableOrMethodParameter synchronized (call) { while (!call.done) { try { + NanoProfiler.split("call_wait"); call.wait(); // wait for the result } catch (InterruptedException ignored) { // save the fact that we were interrupted @@ -748,18 +761,34 @@ public class HBaseClient { Thread.currentThread().interrupt(); } - if (call.error != null) { - if (call.error instanceof RemoteException) { - call.error.fillInStackTrace(); - throw call.error; - } - // local exception - throw wrapException(addr, call.error); + NanoProfiler.split("call_deser"); + // ok we need to deserialize the responses now. + if (call.exception != null) { + // local exception. + throw wrapException(addr, call.exception); } - return call.value; + + Object resultOrException = deserializeResult(call.response); + if (resultOrException instanceof IOException) + throw (IOException) resultOrException; + return (Writable) resultOrException; } } + protected Object deserializeResult(ByteBuffer res) throws IOException { + ByteBufferInputStream bbis = new ByteBufferInputStream(res); + DataInputStream in = new DataInputStream(bbis); + boolean isError = in.readBoolean(); + if (isError) { + return new RemoteException(WritableUtils.readString(in), + WritableUtils.readString(in)); + } + // not error. + Writable value = ReflectionUtils.newInstance(valueClass, conf); + value.readFields(in); + return value; + } + /** * Take an IOException and the address we were trying to connect to * and return an IOException with the input exception as the cause. @@ -806,7 +835,6 @@ public class HBaseClient { if (addresses.length == 0) return new Writable[0]; ParallelResults results = new ParallelResults(params.length); - // TODO this synchronization block doesnt make any sense, we should possibly fix it //noinspection SynchronizationOnLocalVariableOrMethodParameter synchronized (results) { for (int i = 0; i < params.length; i++) { @@ -821,13 +849,36 @@ public class HBaseClient { results.size--; // wait for one fewer result } } + boolean interrupted = false; while (results.count != results.size) { try { results.wait(); // wait for all results - } catch (InterruptedException ignored) {} + } catch (InterruptedException ignored) { + interrupted = true; + } + } + if (interrupted) { + Thread.currentThread().interrupt(); + } + + int i = 0; + Writable[] res = new Writable[params.length]; + for (ByteBuffer bb : results.responses) { + Object aRes; + try { + aRes = deserializeResult(bb); + } catch (IOException ioe) { + res[i++] = null; + continue; + } + if (aRes instanceof Writable) { + res[i++] = (Writable)aRes; + } else { + res[i++] = null; + } } - return results.values; + return res; } } diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java index 867a059..afa7d6f 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java @@ -197,6 +197,7 @@ public abstract class HBaseServer { public static void bind(ServerSocket socket, InetSocketAddress address, int backlog) throws IOException { try { + socket.setReceiveBufferSize(256*1024); socket.bind(address, backlog); } catch (BindException e) { BindException bindException = @@ -486,6 +487,7 @@ public abstract class HBaseServer { channel.configureBlocking(false); channel.socket().setTcpNoDelay(tcpNoDelay); channel.socket().setKeepAlive(tcpKeepAlive); + channel.socket().setSendBufferSize(256*1024); Reader reader = getReader(); try { @@ -1045,7 +1047,8 @@ public abstract class HBaseServer { if (value instanceof WritableWithSize) { // get the size hint. WritableWithSize ohint = (WritableWithSize)value; - long hint = ohint.getWritableSize() + Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT; + long hint = ohint.getWritableSize() + Bytes.SIZEOF_BYTE + + (2 * Bytes.SIZEOF_INT); if (hint > 0) { if ((hint) > Integer.MAX_VALUE) { // oops, new problem. @@ -1060,6 +1063,7 @@ public abstract class HBaseServer { } ByteBufferOutputStream buf = new ByteBufferOutputStream(size); DataOutputStream out = new DataOutputStream(buf); + out.writeInt(0xdeadbeef); // size of response. out.writeInt(call.id); // write call id out.writeBoolean(error != null); // write error flag @@ -1075,8 +1079,14 @@ public abstract class HBaseServer { + StringUtils.humanReadableInt(buf.size())); } + ByteBuffer bb = buf.getByteBuffer(); + // dont include the size field. + int dataSize = bb.limit() - Bytes.SIZEOF_INT; + bb.rewind(); + bb.putInt(dataSize); // overwrite the 0xdeadbeef. + bb.rewind(); - call.setResponse(buf.getByteBuffer()); + call.setResponse(bb); responder.doRespond(call); } catch (InterruptedException e) { if (running) { // unexpected -- log it diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/NanoProfiler.java b/src/main/java/org/apache/hadoop/hbase/ipc/NanoProfiler.java new file mode 100644 index 0000000..3aa55ad --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/ipc/NanoProfiler.java @@ -0,0 +1,215 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.FileWriter; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; + +public class NanoProfiler { + private static final Log LOG = LogFactory.getLog(NanoProfiler.class); + + + static ThreadLocal ids = new ThreadLocal(); + + static Map labels = + new ConcurrentHashMap(); + static Map callIdToThreadId = + new ConcurrentHashMap(); + + static Map starts = + new ConcurrentHashMap(); + static Map splits = + new ConcurrentHashMap(); + + final static AtomicInteger cnt = new AtomicInteger(); + + private static void log(String s) { + + if (!writing) { + LOG.info(s); + return; + } + writeQ.add(s); + } + + + static volatile boolean stop = false; + static volatile boolean writing = false; + static WriterThread writer; + + static final LinkedBlockingDeque writeQ + = new LinkedBlockingDeque(); // no capacity. + + static class WriterThread extends Thread { + + public void run() { + FileWriter fr; + + try { + fr = new FileWriter("prof.txt." + ManagementFactory.getRuntimeMXBean().getName()); + } catch (IOException e) { + return; + } + + writing = true; + + while (!stop) { + String first; + try { + first = writeQ.take(); + } catch (InterruptedException e) { + // we were asked to quit probably, so do clean up. + break; + } + + List moreToWrite = new ArrayList(); + + writeQ.drainTo(moreToWrite); + try { + fr.append(first); + fr.append("\n"); + for (String s : moreToWrite) { + fr.append(s); + fr.append("\n"); + } + } catch (IOException e) { + // got an error. boo. + break; + } + } + + try { + fr.close(); + } catch (IOException ignored) { + } + writing = false; + } + } + + + static { + Runtime.getRuntime().addShutdownHook( + new Thread() { + public void run() { + stop = true; + writer.interrupt(); //bam + } + } + ); + + // start writing thread + writer = new WriterThread(); + writer.start(); + } + + + public static int startCallIdTime(int callId, String op) { + int threadId = cnt.incrementAndGet(); + labels.put(threadId, op); + long nt = System.nanoTime(); + starts.put(threadId, nt); + splits.put(threadId, nt); + callIdToThreadId.put(callId, threadId); + return threadId; + } + + public static int startThreadTimer(String op) { + int threadId = cnt.incrementAndGet(); + ids.set(threadId); + labels.put(threadId, op); + long nt = System.nanoTime(); + starts.put(threadId, nt); + splits.put(threadId, nt); + return threadId; + } + + public static void setCallId(int callId) { + Integer threadId = ids.get(); + if (threadId != null) { + callIdToThreadId.put(callId, threadId); + } + } + + public static void split(int callId, String nextSplitName) { + split(callId, nextSplitName, ""); + } + + public static void split(int callId, String nextSplitName, String extra) { + Integer threadId = callIdToThreadId.get(callId); + if (threadId == null) { + // arent logging this call for any reason. + return; + } + split(nextSplitName, threadId, extra); + } + + public static void split(String nextSplitName) { + Integer threadId = ids.get(); + if (threadId == null) { + return; // not profiling. + } + split(nextSplitName, threadId, ""); + } + + protected static void split(String nextSplitName, int threadId, String extra) { + long start = starts.get(threadId); + long split = splits.get(threadId); + long nt = System.nanoTime(); + splits.put(threadId, nt); // the new split. + + String op = labels.get(threadId); + labels.put(threadId, nextSplitName); + + log(threadId + " (" + op + ") split: " + + (nt - split) + " overall: " + (nt - start) + " " + extra); + } + + + /** Can only be called from the originating thread. */ + public static void stop() { + int threadId = ids.get(); + long start = starts.get(threadId); + long split = splits.get(threadId); + long nt = System.nanoTime(); + + String op = labels.get(threadId); + log(threadId + " (" + op + ") end split: " + (nt - split) + + " overall: " + (nt - start)); + + ids.remove(); // nuke + starts.remove(threadId); + splits.remove(threadId); + labels.remove(threadId); + + // TODO clean up callIdToThreadId + } +}