Index: src/main/java/org/apache/hadoop/hbase/security/User.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/security/User.java (revision 1295717) +++ src/main/java/org/apache/hadoop/hbase/security/User.java (working copy) @@ -24,6 +24,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.ipc.protobuf.RPCMessageProtos.ConnectionHeaderProto; +import org.apache.hadoop.hbase.ipc.protobuf.RPCMessageProtos.UserInformationProto; import org.apache.hadoop.hbase.util.Methods; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; @@ -164,6 +166,39 @@ return new HadoopUser(ugi); } + public static User create(ConnectionHeaderProto head) { + UserGroupInformation ugi = null; + + if (!head.hasUserInfo()) { + return create((ugi=null)); + } + UserInformationProto userInfoProto = head.getUserInfo(); + String effectiveUser = null; + if (userInfoProto.hasEffectiveUser()) { + effectiveUser = userInfoProto.getEffectiveUser(); + } + String realUser = null; + if (userInfoProto.hasRealUser()) { + realUser = userInfoProto.getRealUser(); + } + if (effectiveUser != null) { + if (realUser != null) { + org.apache.hadoop.security.UserGroupInformation realUserUgi = + org.apache.hadoop.security.UserGroupInformation + .createRemoteUser(realUser); + ugi = org.apache.hadoop.security.UserGroupInformation. + createProxyUser(effectiveUser, realUserUgi); + } else { + ugi = org.apache.hadoop.security.UserGroupInformation. + createRemoteUser(effectiveUser); + } + } else { + ugi = null; + } + + return create(ugi); + } + /** * Generates a new {@code User} instance specifically for use in test code. * @param name the full username Index: src/main/java/org/apache/hadoop/hbase/io/DataOutputOutputStream.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/DataOutputOutputStream.java (revision 1295717) +++ src/main/java/org/apache/hadoop/hbase/io/DataOutputOutputStream.java (working copy) @@ -27,7 +27,7 @@ * OutputStream implementation that wraps a DataOutput. */ @InterfaceAudience.Private -class DataOutputOutputStream extends OutputStream { +public class DataOutputOutputStream extends OutputStream { private final DataOutput out; Index: src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (revision 1295717) +++ src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (working copy) @@ -59,21 +59,30 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.io.DataOutputOutputStream; import org.apache.hadoop.hbase.io.HbaseObjectWritable; import org.apache.hadoop.hbase.io.WritableWithSize; +import org.apache.hadoop.hbase.ipc.protobuf.RPCMessageProtos.ConnectionHeaderProto; +import org.apache.hadoop.hbase.ipc.protobuf.RPCMessageProtos.RpcRequestWithHeaderProto; +import org.apache.hadoop.hbase.ipc.protobuf.RPCMessageProtos.RpcResponseWithHeaderProto; +import org.apache.hadoop.hbase.ipc.protobuf.RPCMessageProtos.RpcExceptionProto; +import org.apache.hadoop.hbase.ipc.protobuf.RPCMessageProtos.RpcRequestProto; +import org.apache.hadoop.hbase.ipc.protobuf.RPCMessageProtos.RpcResponseProto; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.ByteBufferOutputStream; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.ipc.RPC.VersionMismatch; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import com.google.common.base.Function; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.protobuf.ByteString; +import com.google.protobuf.Message; /** An abstract IPC service. IPC calls take a single {@link Writable} as a * parameter, and return a {@link Writable} as their value. A service runs on @@ -91,7 +100,7 @@ * The first four bytes of Hadoop RPC connections */ public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes()); - public static final byte CURRENT_VERSION = 3; + public static final byte CURRENT_VERSION = 5; /** * How many calls/handler are allowed in the queue. @@ -317,43 +326,47 @@ ByteBufferOutputStream buf = new ByteBufferOutputStream(size); DataOutputStream out = new DataOutputStream(buf); try { + RpcResponseWithHeaderProto.Builder builder = + RpcResponseWithHeaderProto.newBuilder(); // Call id. - out.writeInt(this.id); - // Write flag. - byte flag = (error != null)? - ResponseFlag.getErrorAndLengthSet(): ResponseFlag.getLengthSetOnly(); - out.writeByte(flag); - // Place holder for length set later below after we - // fill the buffer with data. - out.writeInt(0xdeadbeef); - out.writeInt(status.state); - } catch (IOException e) { - errorClass = e.getClass().getName(); - error = StringUtils.stringifyException(e); - } - - try { - if (error == null) { - result.write(out); + builder.setCallId(this.id); + builder.setError(error != null); + if (error != null) { + RpcExceptionProto.Builder b = + RpcExceptionProto.newBuilder(); + b.setExceptionName(errorClass); + b.setStackTrace(error); + builder.setException(b.build()); } else { - WritableUtils.writeString(out, errorClass); - WritableUtils.writeString(out, error); + builder.setResponse(maybeTranslate(value,result,size)); } + builder.build().writeDelimitedTo( + DataOutputOutputStream.constructOutputStream(out)); } catch (IOException e) { - LOG.warn("Error sending response to call: ", e); + LOG.warn("Exception while creating response " + e); } - - // Set the length into the ByteBuffer after call id and after - // byte flag. ByteBuffer bb = buf.getByteBuffer(); - int bufSiz = bb.remaining(); - // Move to the size location in our ByteBuffer past call.id - // and past the byte flag. - bb.position(Bytes.SIZEOF_INT + Bytes.SIZEOF_BYTE); - bb.putInt(bufSiz); bb.position(0); this.response = bb; } + + private RpcResponseProto maybeTranslate(Object value, Writable result, + int size) throws IOException { + if (value instanceof Message) { + //the app passed us a pure protobuf obj. Note this won't + //really happen until we have the rpc engine support protobuf (like + //Hadoop's ProtobufRpcEngine) + return (RpcResponseProto)value; + } + RpcResponseProto.Builder response = + RpcResponseProto.newBuilder(); + DataOutputBuffer d = new DataOutputBuffer(size); + result.write(d); + //makes a copy; but this part of code is not going to live long + //hopefully (only until we move all the protocols to protobuf) + response.setResponse(ByteString.copyFrom(d.getData())); + return response.build(); + } @Override public synchronized void endDelay(Object result) throws IOException { @@ -1045,9 +1058,9 @@ // disconnected, we can say where it used to connect to. protected String hostAddress; protected int remotePort; - ConnectionHeader header = new ConnectionHeader(); + ConnectionHeaderProto header; Class protocol; - protected User ticket = null; + protected User user = null; public Connection(SocketChannel channel, long lastContact) { this.channel = channel; @@ -1211,32 +1224,36 @@ /// Reads the connection header following version private void processHeader() throws IOException { - DataInputStream in = - new DataInputStream(new ByteArrayInputStream(data.array())); - header.readFields(in); + header = + ConnectionHeaderProto.parseFrom( + new ByteArrayInputStream(data.array())); try { String protocolClassName = header.getProtocol(); - if (protocolClassName == null) { - protocolClassName = "org.apache.hadoop.hbase.ipc.HRegionInterface"; - } protocol = getProtocolClass(protocolClassName, conf); } catch (ClassNotFoundException cnfe) { throw new IOException("Unknown protocol: " + header.getProtocol()); } - ticket = header.getUser(); + user = User.create(header); } protected void processData(byte[] buf) throws IOException, InterruptedException { - DataInputStream dis = - new DataInputStream(new ByteArrayInputStream(buf)); - int id = dis.readInt(); // try to read an id + RpcRequestWithHeaderProto request = + RpcRequestWithHeaderProto.parseFrom(buf); + int id = request.getCallId(); + RpcRequestProto clientRequest = request.getRequest(); if (LOG.isDebugEnabled()) LOG.debug(" got call #" + id + ", " + buf.length + " bytes"); Writable param; try { + //this is short term; eventually we will not + //support anything other than protobuf messages on the wire. We can + //then treat the request as pure protobuf objects. + DataInputStream dis = + new DataInputStream( + clientRequest.getRequest().newInput()); param = ReflectionUtils.newInstance(paramClass, conf);//read param param.readFields(dis); } catch (Throwable t) { @@ -1335,12 +1352,12 @@ throw new ServerNotRunningYetException("Server is not running yet"); if (LOG.isDebugEnabled()) { - User remoteUser = call.connection.ticket; + User remoteUser = call.connection.user; LOG.debug(getName() + ": call #" + call.id + " executing as " + (remoteUser == null ? "NULL principal" : remoteUser.getName())); } - RequestContext.set(call.connection.ticket, getRemoteIp(), + RequestContext.set(call.connection.user, getRemoteIp(), call.connection.protocol); // make the call value = call(call.connection.protocol, call.param, call.timestamp, Index: src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (revision 1295717) +++ src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (working copy) @@ -46,18 +46,26 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ipc.protobuf.RPCMessageProtos; +import org.apache.hadoop.hbase.ipc.protobuf.RPCMessageProtos.ConnectionHeaderProto; +import org.apache.hadoop.hbase.ipc.protobuf.RPCMessageProtos.RpcRequestWithHeaderProto; +import org.apache.hadoop.hbase.ipc.protobuf.RPCMessageProtos.RpcResponseWithHeaderProto; +import org.apache.hadoop.hbase.ipc.protobuf.RPCMessageProtos.RpcRequestProto; +import org.apache.hadoop.hbase.ipc.protobuf.RPCMessageProtos.RpcResponseProto; import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.PoolMap; import org.apache.hadoop.hbase.util.PoolMap.PoolType; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.hbase.io.DataOutputOutputStream; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.ReflectionUtils; +import com.google.protobuf.ByteString; +import com.google.protobuf.Message; + /** A client for an IPC service. IPC calls take a single {@link Writable} as a * parameter, and return a {@link Writable} as their value. A service runs on * a port and is defined by a parameter class and a value class. @@ -212,7 +220,7 @@ * socket connected to a remote address. Calls are multiplexed through this * socket: responses may be delivered out of order. */ protected class Connection extends Thread { - private ConnectionHeader header; // connection header + private ConnectionHeaderProto header; // connection header protected ConnectionId remoteId; protected Socket socket = null; // connected socket protected DataInputStream in; @@ -233,8 +241,9 @@ User ticket = remoteId.getTicket(); Class protocol = remoteId.getProtocol(); - header = new ConnectionHeader( - protocol == null ? null : protocol.getName(), ticket); + ConnectionHeaderProto.Builder builder = ConnectionHeaderProto.newBuilder(); + builder.setProtocol(protocol == null ? "" : protocol.getName()); + this.header = builder.build(); this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " + remoteId.getAddress().toString() + @@ -436,13 +445,8 @@ private void writeHeader() throws IOException { out.write(HBaseServer.HEADER.array()); out.write(HBaseServer.CURRENT_VERSION); - //When there are more fields we can have ConnectionHeader Writable. - DataOutputBuffer buf = new DataOutputBuffer(); - header.write(buf); - - int bufLen = buf.getLength(); - out.writeInt(bufLen); - out.write(buf.getData(), 0, bufLen); + out.writeInt(header.getSerializedSize()); + header.writeTo(out); } /* wait till someone signals us to start reading RPC response or @@ -526,34 +530,47 @@ if (shouldCloseConnection.get()) { return; } - - // For serializing the data to be written. - - final DataOutputBuffer d = new DataOutputBuffer(); try { if (LOG.isDebugEnabled()) LOG.debug(getName() + " sending #" + call.id); - - d.writeInt(0xdeadbeef); // placeholder for data length - d.writeInt(call.id); - call.param.write(d); - byte[] data = d.getData(); - int dataLength = d.getLength(); - // fill in the placeholder - Bytes.putInt(data, 0, dataLength - 4); + RPCMessageProtos.RpcRequestWithHeaderProto.Builder builder = + RPCMessageProtos.RpcRequestWithHeaderProto.newBuilder(); + builder.setCallId(call.id); + builder.setRequest(maybeTranslate(call.param)); //noinspection SynchronizeOnNonFinalField synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC - out.write(data, 0, dataLength); - out.flush(); + RpcRequestWithHeaderProto obj = builder.build(); + this.out.writeInt(obj.getSerializedSize()); + obj.writeTo(DataOutputOutputStream.constructOutputStream(this.out)); + this.out.flush(); } } catch(IOException e) { markClosed(e); - } finally { - //the buffer is just an in-memory buffer, but it is still polite to - // close early - IOUtils.closeStream(d); } } + + private RpcRequestProto maybeTranslate(Object param) + throws IOException { + if (param instanceof Message) { + //the app passed us a pure protobuf obj. Note this probably won't + //really happen until we have the rpc engine support protobuf (like + //Hadoop's ProtobufRpcEngine) + return (RpcRequestProto)param; + } + if (param instanceof Invocation) { + RpcRequestProto.Builder builder = + RpcRequestProto.newBuilder(); + Invocation invocation = (Invocation)param; + DataOutputBuffer d = new DataOutputBuffer(); + invocation.write(d); + //makes a copy; but this part of code is not going to live long + //hopefully (only until we move all the protocols to protobuf) + builder.setRequest(ByteString.copyFrom(d.getData())); + return builder.build(); + } + throw new RuntimeException("Unknown RPC request class" + + param.getClass()); + } /* Receive a response. * Because only one receiver, so no synchronization on in. @@ -566,33 +583,33 @@ try { // See HBaseServer.Call.setResponse for where we write out the response. - // It writes the call.id (int), a flag byte, then optionally the length - // of the response (int) followed by data. + // It writes the call.id (int), a boolean signifying any error (and if + // so the exception name/trace), and the response bytes // Read the call id. - int id = in.readInt(); + RpcResponseWithHeaderProto response = + RpcResponseWithHeaderProto.parseDelimitedFrom(in); + int id = response.getCallId(); if (LOG.isDebugEnabled()) LOG.debug(getName() + " got value #" + id); Call call = calls.remove(id); - // Read the flag byte - byte flag = in.readByte(); - boolean isError = ResponseFlag.isError(flag); - if (ResponseFlag.isLength(flag)) { - // Currently length if present is unused. - in.readInt(); - } - int state = in.readInt(); // Read the state. Currently unused. + boolean isError = response.getError(); if (isError) { if (call != null) { //noinspection ThrowableInstanceNeverThrown - call.setException(new RemoteException(WritableUtils.readString(in), - WritableUtils.readString(in))); + call.setException(new RemoteException( + response.getException().getExceptionName(), + response.getException().getStackTrace())); } } else { + RpcResponseProto responseObj = response.getResponse(); + DataInputStream dis = + new DataInputStream( + responseObj.getResponse().newInput()); Writable value = ReflectionUtils.newInstance(valueClass, conf); - value.readFields(in); // read value + value.readFields(dis); // read value // it's possible that this call may have been cleaned up due to a RPC // timeout, so check if it still exists before setting the value. if (call != null) { Index: src/main/proto/RPCMessageProto.proto =================================================================== --- src/main/proto/RPCMessageProto.proto (revision 0) +++ src/main/proto/RPCMessageProto.proto (revision 0) @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Specification of (unsecure) HBase RPC: + * - As part of setting up a connection to a server, the client needs to send + * the ConnectionHeaderProto header. At the data level, this looks like + * <"hrpc"-bytearray><5-byte> + * The first two parts are there to maintain "failure-compatibility" with old clients (pre-0.96) + * so that they can fail gracefully. We should remove this two parts in 0.98. + * - For every RPC that the client makes it needs to send the + * RpcRequestWithHeaderProto. At the data level this looks like + * + * - The server sends back a RpcResponseWithHeaderProto object as response. + * At the data level this looks like + * + * + * - There is one special message that's sent from client to server - + * the Ping message. At the data level, this is just the bytes corresponding + * to integer -1. + */ +option java_package = "org.apache.hadoop.hbase.ipc.protobuf"; +option java_outer_classname = "RPCMessageProtos"; +option java_generate_equals_and_hash = true; + +message UserInformationProto { + optional string effectiveUser = 1; + optional string realUser = 2; +} + +message ConnectionHeaderProto { + /** User Info beyond what is established at connection establishment + * (applies to secure HBase setup) + */ + optional UserInformationProto userInfo = 1; + /** Protocol name for next rpc layer + * the client created a proxy with this protocol name + */ + optional string protocol = 2 [default = "org.apache.hadoop.hbase.ipc.HRegionInterface"]; +} + +/** + * Message used to marshal the client request + * from RPC client to the RPC server. + */ +message RpcRequestProto { + /** Bytes corresponding to the client protobuf request. The bytes could be + * a Writable (and that's what is currently always the case) + */ + optional bytes request = 1; +} + +/** + * The complete RPC request message with headers + */ +message RpcRequestWithHeaderProto { + optional RpcRequestProto request = 1; + /** Monotonically increasing callId, mostly to keep track of RPCs */ + optional int32 callId = 2; +} + +/** + * At the RPC layer, this message is used to indicate + * the server side exception to the RPC client. + * + * HBase RPC client throws an exception indicated + * by exceptionName with the stackTrace. + */ +message RpcExceptionProto { + /** Class name of the exception thrown from the server */ + optional string exceptionName = 1; + + /** Exception stack trace from the server side */ + optional string stackTrace = 2; +} + +/** + * This message is used to marshal the response from + * RPC server to the client. + */ +message RpcResponseProto { + /** Protobuf response payload from the server. The bytes could be + * a Writable (and that's what is currently always the case) + */ + optional bytes response = 1; +} + +/** + * The complete RPC response message with headers + */ +message RpcResponseWithHeaderProto { + optional RpcResponseProto response = 1; + /** Echo back the callId the client sent */ + optional int32 callId = 2; + /** Did the RPC execution encounter an error at the server */ + optional bool error = 3; + /** Optional exception when error is true*/ + optional RpcExceptionProto exception = 4; +} \ No newline at end of file Index: pom.xml =================================================================== --- pom.xml (revision 1295717) +++ pom.xml (working copy) @@ -324,6 +324,47 @@ can be overwritten here. --> + + org.apache.maven.plugins + maven-antrun-plugin + + + compile-proto + generate-sources + + run + + + + + PROTO_DIR=src/main/proto + JAVA_DIR=target/generated-sources/java + which cygpath 2> /dev/null + if [ $? = 1 ]; then + IS_WIN=false + else + IS_WIN=true + WIN_PROTO_DIR=`cygpath --windows $PROTO_DIR` + WIN_JAVA_DIR=`cygpath --windows $JAVA_DIR` + fi + mkdir -p $JAVA_DIR 2> /dev/null + for PROTO_FILE in `ls $PROTO_DIR/*.proto 2> /dev/null` + do + if [ "$IS_WIN" = "true" ]; then + protoc -I$WIN_PROTO_DIR --java_out=$WIN_JAVA_DIR $PROTO_FILE + else + protoc -I$PROTO_DIR --java_out=$JAVA_DIR $PROTO_FILE + fi + done + + + + + + + + + maven-compiler-plugin