commit ccefe6d40bd80ccf5f50fc92db92c9fd1ff116cc Author: Ryan Rawson Date: Thu Jan 20 11:45:35 2011 -0800 nio client diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java index d248f32..3451c4b 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java @@ -23,6 +23,7 @@ package org.apache.hadoop.hbase.ipc; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.IOUtils; @@ -31,6 +32,8 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.net.SocketInputStream; +import org.apache.hadoop.net.SocketOutputStream; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; @@ -47,6 +50,7 @@ import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketTimeoutException; import java.net.UnknownHostException; +import java.nio.Buffer; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.Hashtable; @@ -180,8 +184,11 @@ public class HBaseClient { private class Connection extends Thread { private ConnectionId remoteId; private Socket socket = null; // connected socket - private DataInputStream in; - private DataOutputStream out; + private SocketChannel channel; // read/write against socket... + SocketInputStream in; + SocketOutputStream out; + //private DataInputStream in; + //private DataOutputStream out; // currently active calls private final Hashtable calls = new Hashtable(); @@ -226,60 +233,12 @@ public class HBaseClient { return true; } - /** This class sends a ping to the remote side when timeout on - * reading. If no failure is detected, it retries until at least - * a byte is read. - */ - private class PingInputStream extends FilterInputStream { - /* constructor */ - protected PingInputStream(InputStream in) { - super(in); - } - - /* Process timeout exception - * if the connection is not going to be closed, send a ping. - * otherwise, throw the timeout exception. - */ - private void handleTimeout(SocketTimeoutException e) throws IOException { - if (shouldCloseConnection.get() || !running.get() || - remoteId.rpcTimeout > 0) { - throw e; - } - sendPing(); - } - - /** Read a byte from the stream. - * Send a ping if timeout on read. Retries if no failure is detected - * until a byte is read. - * @throws IOException for any IO problem other than socket timeout - */ - @Override - public int read() throws IOException { - do { - try { - return super.read(); - } catch (SocketTimeoutException e) { - handleTimeout(e); - } - } while (true); - } - - /** Read bytes into a buffer starting from offset off - * Send a ping if timeout on read. Retries if no failure is detected - * until a byte is read. - * - * @return the total number of bytes read; -1 if the connection is closed. - */ - @Override - public int read(byte[] buf, int off, int len) throws IOException { - do { - try { - return super.read(buf, off, len); - } catch (SocketTimeoutException e) { - handleTimeout(e); - } - } while (true); + private void handleTimeout(SocketTimeoutException e) throws IOException { + if (shouldCloseConnection.get() || !running.get() || + remoteId.rpcTimeout > 0) { + throw e; } + sendPing(); } /** Connect to the server and set up the I/O streams. It then sends @@ -300,7 +259,8 @@ public class HBaseClient { } while (true) { try { - this.socket = socketFactory.createSocket(); + this.channel = SocketChannel.open(); + this.socket = this.channel.socket(); this.socket.setTcpNoDelay(tcpNoDelay); this.socket.setKeepAlive(tcpKeepAlive); this.socket.setReceiveBufferSize(256*1024); @@ -310,7 +270,7 @@ public class HBaseClient { if (remoteId.rpcTimeout > 0) { pingInterval = remoteId.rpcTimeout; // overwrite pingInterval } - this.socket.setSoTimeout(pingInterval); + this.socket.setSoTimeout(pingInterval); // no work for channels break; } catch (SocketTimeoutException toe) { handleConnectionFailure(timeoutFailures++, maxRetries, toe); @@ -318,11 +278,8 @@ public class HBaseClient { handleConnectionFailure(ioFailures++, maxRetries, ie); } } - SocketChannel channel = this.socket.getChannel(); - this.in = new DataInputStream(new BufferedInputStream - (new PingInputStream(NetUtils.getInputStream(socket)))); - this.out = new DataOutputStream - (new BufferedOutputStream(NetUtils.getOutputStream(socket))); + this.in = new SocketInputStream(this.channel, remoteId.rpcTimeout); + this.out = new SocketOutputStream(this.channel, remoteId.rpcTimeout); writeHeader(); // update last activity time @@ -385,15 +342,25 @@ public class HBaseClient { * Out is not synchronized because only the first thread does this. */ private void writeHeader() throws IOException { - out.write(HBaseServer.HEADER.array()); - out.write(HBaseServer.CURRENT_VERSION); + // i hope 1024 bytes are enough. + // TODO be smarter about what size to allocate here. + ByteBuffer out = ByteBuffer.allocate(1024); + out.put(HBaseServer.HEADER.array()); + out.put(HBaseServer.CURRENT_VERSION); + //When there are more fields we can have ConnectionHeader Writable. DataOutputBuffer buf = new DataOutputBuffer(); ObjectWritable.writeObject(buf, remoteId.getTicket(), UserGroupInformation.class, conf); int bufLen = buf.getLength(); - out.writeInt(bufLen); - out.write(buf.getData(), 0, bufLen); + out.putInt(bufLen); + out.put(buf.getData(), 0, bufLen); + + out.flip(); + + while (out.hasRemaining()) { + this.out.write(out); + } } /* wait till someone signals us to start reading RPC response or @@ -439,10 +406,13 @@ public class HBaseClient { long curTime = System.currentTimeMillis(); if ( curTime - lastActivity.get() >= pingInterval) { lastActivity.set(curTime); - //noinspection SynchronizeOnNonFinalField - synchronized (this.out) { - out.writeInt(PING_CALL_ID); - out.flush(); + + ByteBuffer buf = ByteBuffer.allocate(Bytes.SIZEOF_INT); + buf.putInt(PING_CALL_ID); + + buf.flip(); + while (buf.hasRemaining()) { + this.out.write(buf); } } } @@ -478,38 +448,39 @@ public class HBaseClient { return; } - DataOutputBuffer d=null; try { NanoProfiler.split(call.id, "sendParam_sync"); //noinspection SynchronizeOnNonFinalField synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC NanoProfiler.split(call.id, "serAndSendParam"); + NanoProfiler.split(call.id, "serAndSendParam"); + if (LOG.isDebugEnabled()) LOG.debug(getName() + " sending #" + call.id); - //for serializing the - //data to be written - d = new DataOutputBuffer(); + //for serializing the data to be written + ByteBufferOutputStream buf = new ByteBufferOutputStream(1024); + DataOutputStream d = new DataOutputStream(buf); d.writeInt(0xdeadbeef); // placeholder for data length d.writeInt(call.id); call.param.write(d); - byte[] data = d.getData(); - int dataLength = d.getLength(); - // fill in the placeholder - Bytes.putInt(data, 0, dataLength - 4); - out.write(data, 0, dataLength); - NanoProfiler.split(call.id, "flush"); + ByteBuffer bb = buf.getByteBuffer(); // does the flip. + int len = bb.limit(); + len -= Bytes.SIZEOF_INT; // remove lenght, include call.id + bb.rewind(); + bb.putInt(len); + bb.rewind(); + + NanoProfiler.split(call.id, "write"); - out.flush(); + while (bb.hasRemaining()) { + this.out.write(bb); + } } } catch(IOException e) { markClosed(e); - } finally { - //the buffer is just an in-memory buffer, but it is still polite to - // close early - IOUtils.closeStream(d); } } @@ -523,8 +494,30 @@ public class HBaseClient { touch(); try { - int size = in.readInt(); // size includes id - int id = in.readInt(); // try to read an id + ByteBuffer headbuf = ByteBuffer.allocate(Bytes.SIZEOF_INT * 2); + + while (true) { + + // handle the timeout at this phase only, if we time out reading + // then send a ping to remind the remote side we are alive, then + // get back to sleeping again. + try { + this.in.read(headbuf); + + // if we didnt get everything, then there is a serious problem. + if (headbuf.hasRemaining()) { + throw new IOException("Short read in receiveResponse"); + } + break; // got it all! + } catch (SocketTimeoutException timedOut) { + LOG.debug("Got socket timeout exception, delegating to handleTimeout"); + handleTimeout(timedOut); + } + } + headbuf.flip(); + + int size = headbuf.getInt(); // size includes id + int id = headbuf.getInt(); NanoProfiler.split(id, "receiveResponse"); if (LOG.isDebugEnabled()) @@ -535,13 +528,15 @@ public class HBaseClient { size -= 4; // 4 byte off for id because we already read it. ByteBuffer buf = ByteBuffer.allocate(size); - - IOUtils.readFully(in, buf.array(), buf.arrayOffset(), size); - //socket.getChannel().read() + // dont handle the timeout, if we timeout reading a response + // then something is seriously wrong, and fail. + while (buf.hasRemaining()) { + // we wont short read like this, but we might time out waiting. + this.in.read(buf); + } + buf.flip(); // since we went straight to the array like losers, lets do this. - buf.limit(size); - buf.rewind(); NanoProfiler.split(id, "setResponse", "Data size: " + size); call.setResponse(buf); calls.remove(id); @@ -573,9 +568,10 @@ public class HBaseClient { } } - // close the streams and therefore the socket - IOUtils.closeStream(out); - IOUtils.closeStream(in); + try { + channel.close(); + } catch (IOException ignored) { + } // clean up all calls if (closeException == null) { diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java index 4f4828b..fed0739 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java @@ -203,6 +203,7 @@ public class HBaseRPC { * @return an IPC client */ protected synchronized HBaseClient getClient(Configuration conf) { + // default socket factory? return getClient(conf, SocketFactory.getDefault()); } diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java index 9c8d022..1eefa3e 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java @@ -930,6 +930,7 @@ public abstract class HBaseServer { if (dataLength == HBaseClient.PING_CALL_ID) { dataLengthBuffer.clear(); + LOG.debug("Got ping message from " + hostAddress + ":" + remotePort); return 0; //ping message } data = ByteBuffer.allocate(dataLength); diff --git a/src/main/ruby/hbase/table.rb b/src/main/ruby/hbase/table.rb index c8e0076..cd90132 100644 --- a/src/main/ruby/hbase/table.rb +++ b/src/main/ruby/hbase/table.rb @@ -138,19 +138,17 @@ module Hbase get.addFamily(family) end end - - # Additional params - get.setMaxVersions(args[VERSIONS] || 1) - get.setTimeStamp(args[TIMESTAMP]) if args[TIMESTAMP] else # May have passed TIMESTAMP and row only; wants all columns from ts. - unless ts = args[TIMESTAMP] - raise ArgumentError, "Failed parse of #{args.inspect}, #{args.class}" + if ts = args[TIMESTAMP] + # Set the timestamp + get.setTimeStamp(ts.to_i) end - - # Set the timestamp - get.setTimeStamp(ts.to_i) end + + # Additional params + get.setMaxVersions(args[VERSIONS] || 1) + get.setTimeStamp(args[TIMESTAMP]) if args[TIMESTAMP] end # Call hbase for the results