diff --git a/pom.xml b/pom.xml
index 67729d6..a21dcf8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -18,7 +18,7 @@
org.apache.hbase
hbase
jar
- 0.90.0-ryanp3
+ 0.90.0-ryanq2
HBase
HBase is the <a href="http://hadoop.apache.org"&rt;Hadoop</a&rt; database. Use it when you need
diff --git a/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 4e0e21a..a80eed3 100644
--- a/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
+import org.apache.hadoop.hbase.ipc.NanoProfiler;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Writables;
@@ -1022,6 +1023,7 @@ public class HTable implements HTableInterface {
// Close the previous scanner if it's open
if (this.callable != null) {
this.callable.setClose();
+ NanoProfiler.split("callable_close");
getConnection().getRegionServerWithRetries(callable);
this.callable = null;
}
@@ -1055,6 +1057,7 @@ public class HTable implements HTableInterface {
Bytes.toStringBinary(localStartKey) + "'");
}
try {
+ NanoProfiler.split("newScanner_callable");
callable = getScannerCallable(localStartKey, nbRows);
// Open a scanner on the region server starting at the
// beginning of the region
@@ -1097,10 +1100,12 @@ public class HTable implements HTableInterface {
// Server returns a null values if scanning is to stop. Else,
// returns an empty array if scanning is to go on and we've just
// exhausted current region.
+ NanoProfiler.split("callable");
values = getConnection().getRegionServerWithRetries(callable);
if (skipFirst) {
skipFirst = false;
// Reget.
+ NanoProfiler.split("callable2");
values = getConnection().getRegionServerWithRetries(callable);
}
} catch (DoNotRetryIOException e) {
@@ -1136,6 +1141,7 @@ public class HTable implements HTableInterface {
continue;
}
lastNext = System.currentTimeMillis();
+ NanoProfiler.split("cache_add");
if (values != null && values.length > 0) {
for (Result rs : values) {
cache.add(rs);
@@ -1147,10 +1153,13 @@ public class HTable implements HTableInterface {
}
}
// Values == null means server-side filter has determined we must STOP
- } while (remainingResultSize > 0 && countdown > 0 && nextScanner(countdown, values == null));
+ } while (remainingResultSize > 0 &&
+ countdown > 0 &&
+ nextScanner(countdown, values == null));
}
if (cache.size() > 0) {
+ NanoProfiler.split("cache.poll");
return cache.poll();
}
return null;
diff --git a/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
index 5ea38b4..8a92a10 100644
--- a/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
+++ b/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.ipc.NanoProfiler;
import org.apache.hadoop.ipc.RemoteException;
@@ -70,12 +71,15 @@ public class ScannerCallable extends ServerCallable {
*/
public Result [] call() throws IOException {
if (scannerId != -1L && closed) {
+ NanoProfiler.split("close");
close();
} else if (scannerId == -1L && !closed) {
+ NanoProfiler.split("openScanner");
this.scannerId = openScanner();
} else {
Result [] rrs = null;
try {
+ NanoProfiler.split("next"); // calling into the proxy.
rrs = server.next(scannerId, caching);
} catch (IOException e) {
IOException ioe = null;
@@ -103,6 +107,7 @@ public class ScannerCallable extends ServerCallable {
return;
}
try {
+ NanoProfiler.split("server.close");
this.server.close(this.scannerId);
} catch (IOException e) {
LOG.warn("Ignore, probably already closed", e);
@@ -111,6 +116,7 @@ public class ScannerCallable extends ServerCallable {
}
protected long openScanner() throws IOException {
+ NanoProfiler.split("server.openScanner");
return this.server.openScanner(this.location.getRegionInfo().getRegionName(),
this.scan);
}
diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferInputStream.java b/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferInputStream.java
new file mode 100644
index 0000000..2d68744
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferInputStream.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+public class ByteBufferInputStream extends InputStream {
+
+ protected ByteBuffer buf;
+
+ public ByteBufferInputStream(ByteBuffer b) {
+ buf = b;
+ b.rewind();
+ }
+
+ private boolean isEof() {
+ return !buf.hasRemaining();
+ }
+
+ public ByteBuffer getByteBuffer() {
+ return buf;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (isEof()) return -1;
+ return (int) (buf.get() & 0xFF);
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (isEof()) return -1;
+ int amt = buf.remaining();
+ if (len > amt) {
+ buf.get(b, off, amt);
+ return amt;
+ }
+ buf.get(b, off, len);
+ return len;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return buf.remaining();
+ }
+}
diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
index 10d38de..d248f32 100644
--- a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
+++ b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
@@ -47,6 +47,8 @@ import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map.Entry;
@@ -139,8 +141,8 @@ public class HBaseClient {
private class Call {
final int id; // call id
final Writable param; // parameter
- Writable value; // value, null if error
- IOException error; // exception, null if value
+ ByteBuffer response = null; // the raw response buffer
+ IOException exception = null; // Exception sometimes
boolean done; // true when call is done
protected Call(Writable param) {
@@ -157,25 +159,19 @@ public class HBaseClient {
notify(); // notify caller
}
- /** Set the exception when there is an error.
- * Notify the caller the call is done.
- *
- * @param error exception thrown by the call; either local or remote
+ /**
+ * Set the response buffer.
*/
- public synchronized void setException(IOException error) {
- this.error = error;
+ public synchronized void setResponse(ByteBuffer response) {
+ this.response = response;
callComplete();
}
- /** Set the return value when there is no error.
- * Notify the caller the call is done.
- *
- * @param value return value of the call.
- */
- public synchronized void setValue(Writable value) {
- this.value = value;
+ public synchronized void setException(IOException ex) {
+ this.exception = ex;
callComplete();
}
+
}
/** Thread that reads responses and notifies callers. Each connection owns a
@@ -307,6 +303,8 @@ public class HBaseClient {
this.socket = socketFactory.createSocket();
this.socket.setTcpNoDelay(tcpNoDelay);
this.socket.setKeepAlive(tcpKeepAlive);
+ this.socket.setReceiveBufferSize(256*1024);
+ this.socket.setSendBufferSize(256*1024);
// connection time out is 20s
NetUtils.connect(this.socket, remoteId.getAddress(), 20000);
if (remoteId.rpcTimeout > 0) {
@@ -320,6 +318,7 @@ public class HBaseClient {
handleConnectionFailure(ioFailures++, maxRetries, ie);
}
}
+ SocketChannel channel = this.socket.getChannel();
this.in = new DataInputStream(new BufferedInputStream
(new PingInputStream(NetUtils.getInputStream(socket))));
this.out = new DataOutputStream
@@ -481,8 +480,11 @@ public class HBaseClient {
DataOutputBuffer d=null;
try {
+ NanoProfiler.split(call.id, "sendParam_sync");
//noinspection SynchronizeOnNonFinalField
synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC
+ NanoProfiler.split(call.id, "serAndSendParam");
+
if (LOG.isDebugEnabled())
LOG.debug(getName() + " sending #" + call.id);
@@ -497,6 +499,9 @@ public class HBaseClient {
// fill in the placeholder
Bytes.putInt(data, 0, dataLength - 4);
out.write(data, 0, dataLength);
+
+ NanoProfiler.split(call.id, "flush");
+
out.flush();
}
} catch(IOException e) {
@@ -518,25 +523,29 @@ public class HBaseClient {
touch();
try {
+ int size = in.readInt(); // size includes id
int id = in.readInt(); // try to read an id
+ NanoProfiler.split(id, "receiveResponse");
if (LOG.isDebugEnabled())
LOG.debug(getName() + " got value #" + id);
Call call = calls.get(id);
- boolean isError = in.readBoolean(); // read if error
- if (isError) {
- //noinspection ThrowableInstanceNeverThrown
- call.setException(new RemoteException( WritableUtils.readString(in),
- WritableUtils.readString(in)));
- calls.remove(id);
- } else {
- Writable value = ReflectionUtils.newInstance(valueClass, conf);
- value.readFields(in); // read value
- call.setValue(value);
- calls.remove(id);
- }
+ size -= 4; // 4 byte off for id because we already read it.
+
+ ByteBuffer buf = ByteBuffer.allocate(size);
+
+ IOUtils.readFully(in, buf.array(), buf.arrayOffset(), size);
+ //socket.getChannel().read()
+
+ // since we went straight to the array like losers, lets do this.
+ buf.limit(size);
+ buf.rewind();
+ NanoProfiler.split(id, "setResponse", "Data size: " + size);
+ call.setResponse(buf);
+ calls.remove(id);
+
} catch (IOException e) {
markClosed(e);
}
@@ -623,21 +632,21 @@ public class HBaseClient {
/** Result collector for parallel calls. */
private static class ParallelResults {
- protected final Writable[] values;
+ protected final ByteBuffer[] responses;
protected int size;
protected int count;
public ParallelResults(int size) {
- this.values = new Writable[size];
+ this.responses = new ByteBuffer[size];
this.size = size;
}
/*
* Collect a result.
*/
- synchronized void callComplete(ParallelCall call) {
+ protected synchronized void callComplete(ParallelCall call) {
// FindBugs IS2_INCONSISTENT_SYNC
- values[call.index] = call.value; // store the value
+ responses[call.index] = call.response;
count++; // count it
if (count == size) // if all values are in
notify(); // then notify waiting caller
@@ -729,13 +738,17 @@ public class HBaseClient {
UserGroupInformation ticket, int rpcTimeout)
throws IOException {
Call call = new Call(param);
+ NanoProfiler.setCallId(call.id); // tie the threadId with the call id
Connection connection = getConnection(addr, ticket, rpcTimeout, call);
connection.sendParam(call); // send the parameter
boolean interrupted = false;
+
+ NanoProfiler.split("call_sync");
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (call) {
while (!call.done) {
try {
+ NanoProfiler.split("call_wait");
call.wait(); // wait for the result
} catch (InterruptedException ignored) {
// save the fact that we were interrupted
@@ -748,18 +761,34 @@ public class HBaseClient {
Thread.currentThread().interrupt();
}
- if (call.error != null) {
- if (call.error instanceof RemoteException) {
- call.error.fillInStackTrace();
- throw call.error;
- }
- // local exception
- throw wrapException(addr, call.error);
+ NanoProfiler.split("call_deser");
+ // ok we need to deserialize the responses now.
+ if (call.exception != null) {
+ // local exception.
+ throw wrapException(addr, call.exception);
}
- return call.value;
+
+ Object resultOrException = deserializeResult(call.response);
+ if (resultOrException instanceof IOException)
+ throw (IOException) resultOrException;
+ return (Writable) resultOrException;
}
}
+ protected Object deserializeResult(ByteBuffer res) throws IOException {
+ ByteBufferInputStream bbis = new ByteBufferInputStream(res);
+ DataInputStream in = new DataInputStream(bbis);
+ boolean isError = in.readBoolean();
+ if (isError) {
+ return new RemoteException(WritableUtils.readString(in),
+ WritableUtils.readString(in));
+ }
+ // not error.
+ Writable value = ReflectionUtils.newInstance(valueClass, conf);
+ value.readFields(in);
+ return value;
+ }
+
/**
* Take an IOException and the address we were trying to connect to
* and return an IOException with the input exception as the cause.
@@ -806,7 +835,6 @@ public class HBaseClient {
if (addresses.length == 0) return new Writable[0];
ParallelResults results = new ParallelResults(params.length);
- // TODO this synchronization block doesnt make any sense, we should possibly fix it
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (results) {
for (int i = 0; i < params.length; i++) {
@@ -821,13 +849,36 @@ public class HBaseClient {
results.size--; // wait for one fewer result
}
}
+ boolean interrupted = false;
while (results.count != results.size) {
try {
results.wait(); // wait for all results
- } catch (InterruptedException ignored) {}
+ } catch (InterruptedException ignored) {
+ interrupted = true;
+ }
+ }
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+
+ int i = 0;
+ Writable[] res = new Writable[params.length];
+ for (ByteBuffer bb : results.responses) {
+ Object aRes;
+ try {
+ aRes = deserializeResult(bb);
+ } catch (IOException ioe) {
+ res[i++] = null;
+ continue;
+ }
+ if (aRes instanceof Writable) {
+ res[i++] = (Writable)aRes;
+ } else {
+ res[i++] = null;
+ }
}
- return results.values;
+ return res;
}
}
diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
index 867a059..afa7d6f 100644
--- a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
+++ b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
@@ -197,6 +197,7 @@ public abstract class HBaseServer {
public static void bind(ServerSocket socket, InetSocketAddress address,
int backlog) throws IOException {
try {
+ socket.setReceiveBufferSize(256*1024);
socket.bind(address, backlog);
} catch (BindException e) {
BindException bindException =
@@ -486,6 +487,7 @@ public abstract class HBaseServer {
channel.configureBlocking(false);
channel.socket().setTcpNoDelay(tcpNoDelay);
channel.socket().setKeepAlive(tcpKeepAlive);
+ channel.socket().setSendBufferSize(256*1024);
Reader reader = getReader();
try {
@@ -1045,7 +1047,8 @@ public abstract class HBaseServer {
if (value instanceof WritableWithSize) {
// get the size hint.
WritableWithSize ohint = (WritableWithSize)value;
- long hint = ohint.getWritableSize() + Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT;
+ long hint = ohint.getWritableSize() + Bytes.SIZEOF_BYTE +
+ (2 * Bytes.SIZEOF_INT);
if (hint > 0) {
if ((hint) > Integer.MAX_VALUE) {
// oops, new problem.
@@ -1060,6 +1063,7 @@ public abstract class HBaseServer {
}
ByteBufferOutputStream buf = new ByteBufferOutputStream(size);
DataOutputStream out = new DataOutputStream(buf);
+ out.writeInt(0xdeadbeef); // size of response.
out.writeInt(call.id); // write call id
out.writeBoolean(error != null); // write error flag
@@ -1075,8 +1079,14 @@ public abstract class HBaseServer {
+ StringUtils.humanReadableInt(buf.size()));
}
+ ByteBuffer bb = buf.getByteBuffer();
+ // dont include the size field.
+ int dataSize = bb.limit() - Bytes.SIZEOF_INT;
+ bb.rewind();
+ bb.putInt(dataSize); // overwrite the 0xdeadbeef.
+ bb.rewind();
- call.setResponse(buf.getByteBuffer());
+ call.setResponse(bb);
responder.doRespond(call);
} catch (InterruptedException e) {
if (running) { // unexpected -- log it
diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/NanoProfiler.java b/src/main/java/org/apache/hadoop/hbase/ipc/NanoProfiler.java
new file mode 100644
index 0000000..3aa55ad
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/hbase/ipc/NanoProfiler.java
@@ -0,0 +1,215 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.FileWriter;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class NanoProfiler {
+ private static final Log LOG = LogFactory.getLog(NanoProfiler.class);
+
+
+ static ThreadLocal ids = new ThreadLocal();
+
+ static Map labels =
+ new ConcurrentHashMap();
+ static Map callIdToThreadId =
+ new ConcurrentHashMap();
+
+ static Map starts =
+ new ConcurrentHashMap();
+ static Map splits =
+ new ConcurrentHashMap();
+
+ final static AtomicInteger cnt = new AtomicInteger();
+
+ private static void log(String s) {
+
+ if (!writing) {
+ LOG.info(s);
+ return;
+ }
+ writeQ.add(s);
+ }
+
+
+ static volatile boolean stop = false;
+ static volatile boolean writing = false;
+ static WriterThread writer;
+
+ static final LinkedBlockingDeque writeQ
+ = new LinkedBlockingDeque(); // no capacity.
+
+ static class WriterThread extends Thread {
+
+ public void run() {
+ FileWriter fr;
+
+ try {
+ fr = new FileWriter("prof.txt." + ManagementFactory.getRuntimeMXBean().getName());
+ } catch (IOException e) {
+ return;
+ }
+
+ writing = true;
+
+ while (!stop) {
+ String first;
+ try {
+ first = writeQ.take();
+ } catch (InterruptedException e) {
+ // we were asked to quit probably, so do clean up.
+ break;
+ }
+
+ List moreToWrite = new ArrayList();
+
+ writeQ.drainTo(moreToWrite);
+ try {
+ fr.append(first);
+ fr.append("\n");
+ for (String s : moreToWrite) {
+ fr.append(s);
+ fr.append("\n");
+ }
+ } catch (IOException e) {
+ // got an error. boo.
+ break;
+ }
+ }
+
+ try {
+ fr.close();
+ } catch (IOException ignored) {
+ }
+ writing = false;
+ }
+ }
+
+
+ static {
+ Runtime.getRuntime().addShutdownHook(
+ new Thread() {
+ public void run() {
+ stop = true;
+ writer.interrupt(); //bam
+ }
+ }
+ );
+
+ // start writing thread
+ writer = new WriterThread();
+ writer.start();
+ }
+
+
+ public static int startCallIdTime(int callId, String op) {
+ int threadId = cnt.incrementAndGet();
+ labels.put(threadId, op);
+ long nt = System.nanoTime();
+ starts.put(threadId, nt);
+ splits.put(threadId, nt);
+ callIdToThreadId.put(callId, threadId);
+ return threadId;
+ }
+
+ public static int startThreadTimer(String op) {
+ int threadId = cnt.incrementAndGet();
+ ids.set(threadId);
+ labels.put(threadId, op);
+ long nt = System.nanoTime();
+ starts.put(threadId, nt);
+ splits.put(threadId, nt);
+ return threadId;
+ }
+
+ public static void setCallId(int callId) {
+ Integer threadId = ids.get();
+ if (threadId != null) {
+ callIdToThreadId.put(callId, threadId);
+ }
+ }
+
+ public static void split(int callId, String nextSplitName) {
+ split(callId, nextSplitName, "");
+ }
+
+ public static void split(int callId, String nextSplitName, String extra) {
+ Integer threadId = callIdToThreadId.get(callId);
+ if (threadId == null) {
+ // arent logging this call for any reason.
+ return;
+ }
+ split(nextSplitName, threadId, extra);
+ }
+
+ public static void split(String nextSplitName) {
+ Integer threadId = ids.get();
+ if (threadId == null) {
+ return; // not profiling.
+ }
+ split(nextSplitName, threadId, "");
+ }
+
+ protected static void split(String nextSplitName, int threadId, String extra) {
+ long start = starts.get(threadId);
+ long split = splits.get(threadId);
+ long nt = System.nanoTime();
+ splits.put(threadId, nt); // the new split.
+
+ String op = labels.get(threadId);
+ labels.put(threadId, nextSplitName);
+
+ log(threadId + " (" + op + ") split: " +
+ (nt - split) + " overall: " + (nt - start) + " " + extra);
+ }
+
+
+ /** Can only be called from the originating thread. */
+ public static void stop() {
+ int threadId = ids.get();
+ long start = starts.get(threadId);
+ long split = splits.get(threadId);
+ long nt = System.nanoTime();
+
+ String op = labels.get(threadId);
+ log(threadId + " (" + op + ") end split: " + (nt - split) +
+ " overall: " + (nt - start));
+
+ ids.remove(); // nuke
+ starts.remove(threadId);
+ splits.remove(threadId);
+ labels.remove(threadId);
+
+ // TODO clean up callIdToThreadId
+ }
+}