Index: hbase-server/src/main/protobuf/RPC.proto =================================================================== --- hbase-server/src/main/protobuf/RPC.proto (revision 1354768) +++ hbase-server/src/main/protobuf/RPC.proto (working copy) @@ -67,9 +67,15 @@ */ message RpcRequest { /** Monotonically increasing callId, mostly to keep track of RPCs */ - required int32 callId = 1; - /** The request bytes */ - optional bytes request = 2; + required uint32 callId = 1; + /** Name of the RPC method */ + required string methodName = 2; + + /** Bytes corresponding to the client protobuf request */ + optional bytes request = 3; + + /** protocol version of class declaring the called method [for ProtobufRpcEngine] */ + optional uint64 clientProtocolVersion = 4; } /** Index: hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java (revision 1354768) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java (working copy) @@ -1102,13 +1102,21 @@ public interface RpcRequestOrBuilder extends com.google.protobuf.MessageOrBuilder { - // required int32 callId = 1; + // required uint32 callId = 1; boolean hasCallId(); int getCallId(); - // optional bytes request = 2; + // required string methodName = 2; + boolean hasMethodName(); + String getMethodName(); + + // optional bytes request = 3; boolean hasRequest(); com.google.protobuf.ByteString getRequest(); + + // optional uint64 clientProtocolVersion = 4; + boolean hasClientProtocolVersion(); + long getClientProtocolVersion(); } public static final class RpcRequest extends com.google.protobuf.GeneratedMessage @@ -1139,7 +1147,7 @@ } private int bitField0_; - // required int32 callId = 1; + // required uint32 callId = 1; public static final int CALLID_FIELD_NUMBER = 1; private int callId_; public boolean hasCallId() { @@ -1149,19 +1157,63 @@ return callId_; } - // optional bytes request = 2; - public static final int REQUEST_FIELD_NUMBER = 2; + // required string methodName = 2; + public static final int METHODNAME_FIELD_NUMBER = 2; + private java.lang.Object methodName_; + public boolean hasMethodName() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public String getMethodName() { + java.lang.Object ref = methodName_; + if (ref instanceof String) { + return (String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + String s = bs.toStringUtf8(); + if (com.google.protobuf.Internal.isValidUtf8(bs)) { + methodName_ = s; + } + return s; + } + } + private com.google.protobuf.ByteString getMethodNameBytes() { + java.lang.Object ref = methodName_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8((String) ref); + methodName_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + // optional bytes request = 3; + public static final int REQUEST_FIELD_NUMBER = 3; private com.google.protobuf.ByteString request_; public boolean hasRequest() { - return ((bitField0_ & 0x00000002) == 0x00000002); + return ((bitField0_ & 0x00000004) == 0x00000004); } public com.google.protobuf.ByteString getRequest() { return request_; } + // optional uint64 clientProtocolVersion = 4; + public static final int CLIENTPROTOCOLVERSION_FIELD_NUMBER = 4; + private long clientProtocolVersion_; + public boolean hasClientProtocolVersion() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + public long getClientProtocolVersion() { + return clientProtocolVersion_; + } + private void initFields() { callId_ = 0; + methodName_ = ""; request_ = com.google.protobuf.ByteString.EMPTY; + clientProtocolVersion_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -1172,6 +1224,10 @@ memoizedIsInitialized = 0; return false; } + if (!hasMethodName()) { + memoizedIsInitialized = 0; + return false; + } memoizedIsInitialized = 1; return true; } @@ -1180,11 +1236,17 @@ throws java.io.IOException { getSerializedSize(); if (((bitField0_ & 0x00000001) == 0x00000001)) { - output.writeInt32(1, callId_); + output.writeUInt32(1, callId_); } if (((bitField0_ & 0x00000002) == 0x00000002)) { - output.writeBytes(2, request_); + output.writeBytes(2, getMethodNameBytes()); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeBytes(3, request_); + } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + output.writeUInt64(4, clientProtocolVersion_); + } getUnknownFields().writeTo(output); } @@ -1196,12 +1258,20 @@ size = 0; if (((bitField0_ & 0x00000001) == 0x00000001)) { size += com.google.protobuf.CodedOutputStream - .computeInt32Size(1, callId_); + .computeUInt32Size(1, callId_); } if (((bitField0_ & 0x00000002) == 0x00000002)) { size += com.google.protobuf.CodedOutputStream - .computeBytesSize(2, request_); + .computeBytesSize(2, getMethodNameBytes()); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(3, request_); + } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt64Size(4, clientProtocolVersion_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -1230,11 +1300,21 @@ result = result && (getCallId() == other.getCallId()); } + result = result && (hasMethodName() == other.hasMethodName()); + if (hasMethodName()) { + result = result && getMethodName() + .equals(other.getMethodName()); + } result = result && (hasRequest() == other.hasRequest()); if (hasRequest()) { result = result && getRequest() .equals(other.getRequest()); } + result = result && (hasClientProtocolVersion() == other.hasClientProtocolVersion()); + if (hasClientProtocolVersion()) { + result = result && (getClientProtocolVersion() + == other.getClientProtocolVersion()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -1248,10 +1328,18 @@ hash = (37 * hash) + CALLID_FIELD_NUMBER; hash = (53 * hash) + getCallId(); } + if (hasMethodName()) { + hash = (37 * hash) + METHODNAME_FIELD_NUMBER; + hash = (53 * hash) + getMethodName().hashCode(); + } if (hasRequest()) { hash = (37 * hash) + REQUEST_FIELD_NUMBER; hash = (53 * hash) + getRequest().hashCode(); } + if (hasClientProtocolVersion()) { + hash = (37 * hash) + CLIENTPROTOCOLVERSION_FIELD_NUMBER; + hash = (53 * hash) + hashLong(getClientProtocolVersion()); + } hash = (29 * hash) + getUnknownFields().hashCode(); return hash; } @@ -1370,8 +1458,12 @@ super.clear(); callId_ = 0; bitField0_ = (bitField0_ & ~0x00000001); + methodName_ = ""; + bitField0_ = (bitField0_ & ~0x00000002); request_ = com.google.protobuf.ByteString.EMPTY; - bitField0_ = (bitField0_ & ~0x00000002); + bitField0_ = (bitField0_ & ~0x00000004); + clientProtocolVersion_ = 0L; + bitField0_ = (bitField0_ & ~0x00000008); return this; } @@ -1417,7 +1509,15 @@ if (((from_bitField0_ & 0x00000002) == 0x00000002)) { to_bitField0_ |= 0x00000002; } + result.methodName_ = methodName_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } result.request_ = request_; + if (((from_bitField0_ & 0x00000008) == 0x00000008)) { + to_bitField0_ |= 0x00000008; + } + result.clientProtocolVersion_ = clientProtocolVersion_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -1437,9 +1537,15 @@ if (other.hasCallId()) { setCallId(other.getCallId()); } + if (other.hasMethodName()) { + setMethodName(other.getMethodName()); + } if (other.hasRequest()) { setRequest(other.getRequest()); } + if (other.hasClientProtocolVersion()) { + setClientProtocolVersion(other.getClientProtocolVersion()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -1449,6 +1555,10 @@ return false; } + if (!hasMethodName()) { + + return false; + } return true; } @@ -1477,21 +1587,31 @@ } case 8: { bitField0_ |= 0x00000001; - callId_ = input.readInt32(); + callId_ = input.readUInt32(); break; } case 18: { bitField0_ |= 0x00000002; + methodName_ = input.readBytes(); + break; + } + case 26: { + bitField0_ |= 0x00000004; request_ = input.readBytes(); break; } + case 32: { + bitField0_ |= 0x00000008; + clientProtocolVersion_ = input.readUInt64(); + break; + } } } } private int bitField0_; - // required int32 callId = 1; + // required uint32 callId = 1; private int callId_ ; public boolean hasCallId() { return ((bitField0_ & 0x00000001) == 0x00000001); @@ -1512,10 +1632,46 @@ return this; } - // optional bytes request = 2; + // required string methodName = 2; + private java.lang.Object methodName_ = ""; + public boolean hasMethodName() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public String getMethodName() { + java.lang.Object ref = methodName_; + if (!(ref instanceof String)) { + String s = ((com.google.protobuf.ByteString) ref).toStringUtf8(); + methodName_ = s; + return s; + } else { + return (String) ref; + } + } + public Builder setMethodName(String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000002; + methodName_ = value; + onChanged(); + return this; + } + public Builder clearMethodName() { + bitField0_ = (bitField0_ & ~0x00000002); + methodName_ = getDefaultInstance().getMethodName(); + onChanged(); + return this; + } + void setMethodName(com.google.protobuf.ByteString value) { + bitField0_ |= 0x00000002; + methodName_ = value; + onChanged(); + } + + // optional bytes request = 3; private com.google.protobuf.ByteString request_ = com.google.protobuf.ByteString.EMPTY; public boolean hasRequest() { - return ((bitField0_ & 0x00000002) == 0x00000002); + return ((bitField0_ & 0x00000004) == 0x00000004); } public com.google.protobuf.ByteString getRequest() { return request_; @@ -1524,18 +1680,39 @@ if (value == null) { throw new NullPointerException(); } - bitField0_ |= 0x00000002; + bitField0_ |= 0x00000004; request_ = value; onChanged(); return this; } public Builder clearRequest() { - bitField0_ = (bitField0_ & ~0x00000002); + bitField0_ = (bitField0_ & ~0x00000004); request_ = getDefaultInstance().getRequest(); onChanged(); return this; } + // optional uint64 clientProtocolVersion = 4; + private long clientProtocolVersion_ ; + public boolean hasClientProtocolVersion() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + public long getClientProtocolVersion() { + return clientProtocolVersion_; + } + public Builder setClientProtocolVersion(long value) { + bitField0_ |= 0x00000008; + clientProtocolVersion_ = value; + onChanged(); + return this; + } + public Builder clearClientProtocolVersion() { + bitField0_ = (bitField0_ & ~0x00000008); + clientProtocolVersion_ = 0L; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:RpcRequest) } @@ -2871,16 +3048,17 @@ "iveUser\030\001 \002(\t\022\020\n\010realUser\030\002 \001(\t\"w\n\020Conne" + "ctionHeader\022\"\n\010userInfo\030\001 \001(\0132\020.UserInfo" + "rmation\022?\n\010protocol\030\002 \001(\t:-org.apache.ha" + - "doop.hbase.client.ClientProtocol\"-\n\nRpcR" + - "equest\022\016\n\006callId\030\001 \002(\005\022\017\n\007request\030\002 \001(\014\"" + - "9\n\014RpcException\022\025\n\rexceptionName\030\001 \002(\t\022\022" + - "\n\nstackTrace\030\002 \001(\t\"\243\001\n\013RpcResponse\022\016\n\006ca" + - "llId\030\001 \002(\005\022#\n\006status\030\002 \002(\0162\023.RpcResponse" + - ".Status\022\020\n\010response\030\003 \001(\014\022 \n\texception\030\004", - " \001(\0132\r.RpcException\"+\n\006Status\022\013\n\007SUCCESS" + - "\020\000\022\t\n\005ERROR\020\001\022\t\n\005FATAL\020\002B<\n*org.apache.h" + - "adoop.hbase.protobuf.generatedB\tRPCProto" + - "sH\001\240\001\001" + "doop.hbase.client.ClientProtocol\"`\n\nRpcR" + + "equest\022\016\n\006callId\030\001 \002(\r\022\022\n\nmethodName\030\002 \002" + + "(\t\022\017\n\007request\030\003 \001(\014\022\035\n\025clientProtocolVer" + + "sion\030\004 \001(\004\"9\n\014RpcException\022\025\n\rexceptionN" + + "ame\030\001 \002(\t\022\022\n\nstackTrace\030\002 \001(\t\"\243\001\n\013RpcRes" + + "ponse\022\016\n\006callId\030\001 \002(\005\022#\n\006status\030\002 \002(\0162\023.", + "RpcResponse.Status\022\020\n\010response\030\003 \001(\014\022 \n\t" + + "exception\030\004 \001(\0132\r.RpcException\"+\n\006Status" + + "\022\013\n\007SUCCESS\020\000\022\t\n\005ERROR\020\001\022\t\n\005FATAL\020\002B<\n*o" + + "rg.apache.hadoop.hbase.protobuf.generate" + + "dB\tRPCProtosH\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -2908,7 +3086,7 @@ internal_static_RpcRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_RpcRequest_descriptor, - new java.lang.String[] { "CallId", "Request", }, + new java.lang.String[] { "CallId", "MethodName", "Request", "ClientProtocolVersion", }, org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequest.class, org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequest.Builder.class); internal_static_RpcException_descriptor = Index: hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java (revision 1354768) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java (working copy) @@ -41,6 +41,8 @@ public abstract void setRPC(String methodName, Object [] params, long queueTime); + public abstract void setRPC(String methodName, + long queueTime); public abstract void setRPCPacket(Writable param); public abstract void setConnection(String clientAddress, int remotePort); } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java (revision 1354768) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java (working copy) @@ -226,6 +226,14 @@ this.packet = null; } + @Override + public void setRPC(String methodName, long queueTime) { + this.methodName = methodName; + this.rpcStartTime = System.currentTimeMillis(); + this.rpcQueueTime = queueTime; + this.state = State.RUNNING; + } + public synchronized Map toMap() { // only include RPC info if the Handler is actively servicing an RPC call Map map = super.toMap(); @@ -261,5 +269,4 @@ } return super.toString() + ", rpcMethod=" + getRPC(); } - } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java (revision 0) @@ -0,0 +1,65 @@ +package org.apache.hadoop.hbase.ipc; + +import java.util.HashMap; +import java.util.Map; + +import javax.net.SocketFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.io.HbaseObjectWritable; +import org.apache.hadoop.io.Writable; + +/* Cache a client using its socket factory as the hash key */ +class ClientCache { + private Map clients = + new HashMap(); + + protected ClientCache() {} + + /** + * Construct & cache an IPC client with the user-provided SocketFactory + * if no cached client exists. + * + * @param conf Configuration + * @param factory socket factory + * @return an IPC client + */ + protected synchronized HBaseClient getClient(Configuration conf, + SocketFactory factory) { + return getClient(conf,factory,HbaseObjectWritable.class); + } + protected synchronized HBaseClient getClient(Configuration conf, + SocketFactory factory, Class valueClass) { + // Construct & cache client. The configuration is only used for timeout, + // and Clients have connection pools. So we can either (a) lose some + // connection pooling and leak sockets, or (b) use the same timeout for + // all configurations. Since the IPC is usually intended globally, not + // per-job, we choose (a). + HBaseClient client = clients.get(factory); + if (client == null) { + // Make an hbase client instead of hadoop Client. + client = new HBaseClient(valueClass, conf, factory); + clients.put(factory, client); + } else { + client.incCount(); + } + return client; + } + + /** + * Stop a RPC client connection + * A RPC client is closed only when its reference count becomes zero. + * @param client client to stop + */ + protected void stopClient(HBaseClient client) { + synchronized (this) { + client.decCount(); + if (client.isZeroReference()) { + clients.remove(client.getSocketFactory()); + } + } + if (client.isZeroReference()) { + client.stop(); + } + } +} \ No newline at end of file Index: hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java (revision 1354768) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java (working copy) @@ -69,57 +69,6 @@ // DEBUG log level does NOT emit RPC-level logging. private static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.RPCEngine"); - /* Cache a client using its socket factory as the hash key */ - static private class ClientCache { - private Map clients = - new HashMap(); - - protected ClientCache() {} - - /** - * Construct & cache an IPC client with the user-provided SocketFactory - * if no cached client exists. - * - * @param conf Configuration - * @param factory socket factory - * @return an IPC client - */ - protected synchronized HBaseClient getClient(Configuration conf, - SocketFactory factory) { - // Construct & cache client. The configuration is only used for timeout, - // and Clients have connection pools. So we can either (a) lose some - // connection pooling and leak sockets, or (b) use the same timeout for - // all configurations. Since the IPC is usually intended globally, not - // per-job, we choose (a). - HBaseClient client = clients.get(factory); - if (client == null) { - // Make an hbase client instead of hadoop Client. - client = new HBaseClient(HbaseObjectWritable.class, conf, factory); - clients.put(factory, client); - } else { - client.incCount(); - } - return client; - } - - /** - * Stop a RPC client connection - * A RPC client is closed only when its reference count becomes zero. - * @param client client to stop - */ - protected void stopClient(HBaseClient client) { - synchronized (this) { - client.decCount(); - if (client.isZeroReference()) { - clients.remove(client.getSocketFactory()); - } - } - if (client.isZeroReference()) { - client.stop(); - } - } - } - protected final static ClientCache CLIENTS = new ClientCache(); private static class Invoker implements InvocationHandler { @@ -160,7 +109,7 @@ return value.get(); } catch (Throwable t) { // For protobuf protocols, ServiceException is expected - if (Invocation.PROTOBUF_PROTOCOLS.contains(protocol)) { + if (ProtobufRpcEngine.PROTOBUF_PROTOCOLS.contains(protocol)) { if (t instanceof RemoteException) { Throwable cause = ((RemoteException)t).unwrapRemoteException(); throw new ServiceException(cause); @@ -261,7 +210,7 @@ private final int warnResponseTime; private final int warnResponseSize; - private static String classNameBase(String className) { + static String classNameBase(String className) { String[] names = className.split("\\.", -1); if (names == null || names.length == 0) { return className; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java (revision 1354768) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java (working copy) @@ -24,10 +24,6 @@ import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.Method; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configurable; @@ -35,9 +31,6 @@ import org.apache.hadoop.hbase.client.AdminProtocol; import org.apache.hadoop.hbase.client.ClientProtocol; import org.apache.hadoop.hbase.io.HbaseObjectWritable; -import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; -import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService; import org.apache.hadoop.hbase.RegionServerStatusProtocol; import org.apache.hadoop.io.VersionMismatchException; import org.apache.hadoop.io.VersionedWritable; @@ -52,32 +45,6 @@ protected Configuration conf; private long clientVersion; private int clientMethodsHash; - - // For generated protocol classes which don't have VERSION field, - // such as protobuf interfaces. - private static final Map, Long> - PROTOCOL_VERSION = new HashMap, Long>(); - - static { - PROTOCOL_VERSION.put(ClientService.BlockingInterface.class, - Long.valueOf(ClientProtocol.VERSION)); - PROTOCOL_VERSION.put(AdminService.BlockingInterface.class, - Long.valueOf(AdminProtocol.VERSION)); - PROTOCOL_VERSION.put(RegionServerStatusService.BlockingInterface.class, - Long.valueOf(RegionServerStatusProtocol.VERSION)); - } - - // For protobuf protocols, which use ServiceException, instead of IOException - protected static final Set> - PROTOBUF_PROTOCOLS = new HashSet>(); - - static { - PROTOBUF_PROTOCOLS.add(ClientProtocol.class); - PROTOBUF_PROTOCOLS.add(AdminProtocol.class); - PROTOBUF_PROTOCOLS.add(RegionServerStatusProtocol.class); - PROTOBUF_PROTOCOLS.add(HMasterInterface.class); - } - private static byte RPC_VERSION = 1; public Invocation() {} @@ -93,7 +60,7 @@ clientMethodsHash = 0; } else { try { - Long version = PROTOCOL_VERSION.get(declaringClass); + Long version = ProtobufRpcEngine.PROTOCOL_VERSION.get(declaringClass); if (version != null) { this.clientVersion = version.longValue(); } else { Index: hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (revision 1354768) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (working copy) @@ -34,6 +34,7 @@ import javax.net.SocketFactory; import java.io.IOException; +import java.lang.reflect.Field; import java.lang.reflect.Proxy; import java.net.ConnectException; import java.net.InetSocketAddress; @@ -101,6 +102,13 @@ } }; + static long getProtocolVersion(Class protocol) + throws NoSuchFieldException, IllegalAccessException { + Field versionField = protocol.getField("VERSION"); + versionField.setAccessible(true); + return versionField.getLong(protocol); + } + // set a protocol to use a non-default RpcEngine static void setProtocolEngine(Configuration conf, Class protocol, Class engine) { Index: hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcEngine.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcEngine.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcEngine.java (revision 0) @@ -0,0 +1,553 @@ +package org.apache.hadoop.hbase.ipc; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import javax.net.SocketFactory; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.RegionServerStatusProtocol; +import org.apache.hadoop.hbase.client.AdminProtocol; +import org.apache.hadoop.hbase.client.ClientProtocol; +import org.apache.hadoop.hbase.client.Operation; +import org.apache.hadoop.hbase.io.DataOutputOutputStream; +import org.apache.hadoop.hbase.ipc.WritableRpcEngine.Server; +import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequest; +import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.io.*; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.token.SecretManager; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Objects; +import org.apache.hadoop.hbase.util.ProtoUtil; +import org.codehaus.jackson.map.ObjectMapper; + +import com.google.protobuf.Message; +import com.google.protobuf.ServiceException; + +public class ProtobufRpcEngine implements RpcEngine { + private static final Log LOG = + LogFactory.getLog("org.apache.hadoop.hbase.ipc.ProtobufRpcEngine"); + protected final static ClientCache CLIENTS = new ClientCache(); + // For generated protocol classes which don't have VERSION field, + // such as protobuf interfaces. + static final Map, Long> + PROTOCOL_VERSION = new HashMap, Long>(); + static { + ProtobufRpcEngine.PROTOCOL_VERSION.put(ClientService.BlockingInterface.class, + Long.valueOf(ClientProtocol.VERSION)); + ProtobufRpcEngine.PROTOCOL_VERSION.put(AdminService.BlockingInterface.class, + Long.valueOf(AdminProtocol.VERSION)); + ProtobufRpcEngine.PROTOCOL_VERSION.put(RegionServerStatusService.BlockingInterface.class, + Long.valueOf(RegionServerStatusProtocol.VERSION)); + } + + // For protobuf protocols, which use ServiceException, instead of IOException + protected static final Set> + PROTOBUF_PROTOCOLS = new HashSet>(); + static { + ProtobufRpcEngine.PROTOBUF_PROTOCOLS.add(ClientProtocol.class); + ProtobufRpcEngine.PROTOBUF_PROTOCOLS.add(AdminProtocol.class); + ProtobufRpcEngine.PROTOBUF_PROTOCOLS.add(RegionServerStatusProtocol.class); + ProtobufRpcEngine.PROTOBUF_PROTOCOLS.add(HMasterInterface.class); + } + + @Override + public VersionedProtocol getProxy( + Class protocol, long clientVersion, + InetSocketAddress addr, User ticket, Configuration conf, + SocketFactory factory, int rpcTimeout) throws IOException { + final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory, + rpcTimeout); + return (VersionedProtocol)Proxy.newProxyInstance( + protocol.getClassLoader(), new Class[]{protocol}, invoker); + } + + @Override + public void stopProxy(VersionedProtocol proxy) { + if (proxy!=null) { + ((Invoker)Proxy.getInvocationHandler(proxy)).close(); + } + } + + @Override + public Server getServer(Class protocol, + Object instance, Class[] ifaces, String bindAddress, int port, + int numHandlers, int metaHandlerCount, boolean verbose, + Configuration conf, int highPriorityLevel) throws IOException { + return new Server(instance, ifaces, conf, bindAddress, port, numHandlers, + metaHandlerCount, verbose, highPriorityLevel); + } + private static class Invoker implements InvocationHandler { + private final Map returnTypes = + new ConcurrentHashMap(); + private Class protocol; + private InetSocketAddress address; + private User ticket; + private HBaseClient client; + private boolean isClosed = false; + final private int rpcTimeout; + private final long clientProtocolVersion; + + public Invoker(Class protocol, + InetSocketAddress addr, User ticket, Configuration conf, + SocketFactory factory, int rpcTimeout) throws IOException { + this.protocol = protocol; + this.address = addr; + this.ticket = ticket; + this.client = CLIENTS.getClient(conf, factory, RpcResponseWritable.class); + this.rpcTimeout = rpcTimeout; + Long version = PROTOCOL_VERSION.get(protocol); + if (version != null) { + this.clientProtocolVersion = version; + } else { + try { + this.clientProtocolVersion = HBaseRPC.getProtocolVersion(protocol); + } catch (NoSuchFieldException e) { + throw new RuntimeException("The " + protocol, e); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + } + } + + private RpcRequest constructRpcRequest(Method method, + Object[] params) throws ServiceException { + RpcRequest rpcRequest; + RpcRequest.Builder builder = RpcRequest.newBuilder(); + builder.setMethodName(method.getName()); + + if (params.length != 2) { // RpcController + Message + throw new ServiceException("Too many parameters for request. Method: [" + + method.getName() + "]" + ", Expected: 2, Actual: " + + params.length); + } + if (params[1] == null) { + throw new ServiceException("null param while calling Method: [" + + method.getName() + "]"); + } + + Message param = (Message) params[1]; + builder.setRequest(param.toByteString()); + builder.setClientProtocolVersion(clientProtocolVersion); + rpcRequest = builder.build(); + return rpcRequest; + } + + /** + * This is the client side invoker of RPC method. It only throws + * ServiceException, since the invocation proxy expects only + * ServiceException to be thrown by the method in case protobuf service. + * + * ServiceException has the following causes: + *
    + *
  1. Exceptions encountered on the client side in this method are + * set as cause in ServiceException as is.
  2. + *
  3. Exceptions from the server are wrapped in RemoteException and are + * set as cause in ServiceException
  4. + *
+ * + * Note that the client calling protobuf RPC methods, must handle + * ServiceException by getting the cause from the ServiceException. If the + * cause is RemoteException, then unwrap it to get the exception thrown by + * the server. + */ + @Override + public Object invoke(Object proxy, Method method, Object[] args) + throws ServiceException { + long startTime = 0; + if (LOG.isDebugEnabled()) { + startTime = System.currentTimeMillis(); + } + + RpcRequest rpcRequest = constructRpcRequest(method, args); + RpcResponseWritable val = null; + try { + val = (RpcResponseWritable) client.call( + new RpcRequestWritable(rpcRequest), address, ticket, rpcTimeout); + } catch (Throwable e) { + throw new ServiceException(e); + } + + if (LOG.isDebugEnabled()) { + long callTime = System.currentTimeMillis() - startTime; + LOG.debug("Call: " + method.getName() + " " + callTime); + } + + Message prototype = null; + try { + prototype = getReturnProtoType(method); + } catch (Exception e) { + throw new ServiceException(e); + } + Message returnMessage; + try { + returnMessage = prototype.newBuilderForType() + .mergeFrom(val.responseMessage).build(); + } catch (Throwable e) { + throw new ServiceException(e); + } + return returnMessage; + } + + synchronized protected void close() { + if (!isClosed) { + isClosed = true; + CLIENTS.stopClient(client); + } + } + + private Message getReturnProtoType(Method method) throws Exception { + if (returnTypes.containsKey(method.getName())) { + return returnTypes.get(method.getName()); + } + + Class returnType = method.getReturnType(); + Method newInstMethod = returnType.getMethod("getDefaultInstance"); + newInstMethod.setAccessible(true); + Message prototype = (Message) newInstMethod.invoke(null, (Object[]) null); + returnTypes.put(method.getName(), prototype); + return prototype; + } + } + + /** + * Writable Wrapper for Protocol Buffer Requests + */ + private static class RpcRequestWritable implements Writable { + RpcRequest message; + + @SuppressWarnings("unused") + public RpcRequestWritable() { + } + + RpcRequestWritable(RpcRequest message) { + this.message = message; + } + + @Override + public void write(DataOutput out) throws IOException { + ((Message)message).writeDelimitedTo( + DataOutputOutputStream.constructOutputStream(out)); + } + + @Override + public void readFields(DataInput in) throws IOException { + int length = ProtoUtil.readRawVarint32(in); + byte[] bytes = new byte[length]; + in.readFully(bytes); + message = RpcRequest.parseFrom(bytes); + } + + @Override + public String toString() { + return "CallID: " + message.getCallId() + " Client Protocol Version: " + + message.getClientProtocolVersion() + " MethodName: " + + message.getMethodName(); + } + } + + /** + * Writable Wrapper for Protocol Buffer Responses + */ + private static class RpcResponseWritable implements Writable { + byte[] responseMessage; + + @SuppressWarnings("unused") + public RpcResponseWritable() { + } + + public RpcResponseWritable(Message message) { + this.responseMessage = message.toByteArray(); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(responseMessage.length); + out.write(responseMessage); + } + + @Override + public void readFields(DataInput in) throws IOException { + int length = in.readInt(); + byte[] bytes = new byte[length]; + in.readFully(bytes); + responseMessage = bytes; + } + } + public static class Server extends HBaseServer { + boolean verbose; + Object instance; + Class implementation; + private static final String WARN_RESPONSE_TIME = + "hbase.ipc.warn.response.time"; + private static final String WARN_RESPONSE_SIZE = + "hbase.ipc.warn.response.size"; + + /** Default value for above params */ + private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds + private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024; + + /** Names for suffixed metrics */ + private static final String ABOVE_ONE_SEC_METRIC = ".aboveOneSec."; + + private final int warnResponseTime; + private final int warnResponseSize; + /** + * Construct an RPC server. + * + * @param protocolClass the class of protocol + * @param protocolImpl the protocolImpl whose methods will be called + * @param conf the configuration to use + * @param bindAddress the address to bind on to listen for connection + * @param port the port to listen for connections on + * @param numHandlers the number of method handler threads to run + * @param verbose whether each call should be logged + * @param portRangeConfig A config parameter that can be used to restrict + * the range of ports used when port is 0 (an ephemeral port) + */ + public Server(Object instance, final Class[] ifaces, + Configuration conf, String bindAddress, int port, + int numHandlers, int metaHandlerCount, boolean verbose, + int highPriorityLevel) + throws IOException { + super(bindAddress, port, Invocation.class, numHandlers, metaHandlerCount, + conf, WritableRpcEngine.Server.classNameBase(instance.getClass().getName()), + highPriorityLevel); + this.verbose = verbose; + this.instance = instance; + this.implementation = instance.getClass(); + // create metrics for the advertised interfaces this server implements. + String [] metricSuffixes = new String [] {ABOVE_ONE_SEC_METRIC}; + this.rpcMetrics.createMetrics(ifaces, false, metricSuffixes); + + this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, + DEFAULT_WARN_RESPONSE_TIME); + this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, + DEFAULT_WARN_RESPONSE_SIZE); + } + private final Map methodArg = + new ConcurrentHashMap(); + @Override + /** + * This is a server side method, which is invoked over RPC. On success + * the return response has protobuf response payload. On failure, the + * exception name and the stack trace are return in the resposne. + * See {@link HadoopRpcResponseProto} + * + * In this method there three types of exceptions possible and they are + * returned in response as follows. + *
    + *
  1. Exceptions encountered in this method that are returned + * as {@link RpcServerException}
  2. + *
  3. Exceptions thrown by the service is wrapped in ServiceException. + * In that this method returns in response the exception thrown by the + * service.
  4. + *
  5. Other exceptions thrown by the service. They are returned as + * it is.
  6. + *
+ */ + public Writable call(Class protocol, + Writable writableRequest, long receiveTime, MonitoredRPCHandler status) + throws IOException { + try { + RpcRequestWritable request = (RpcRequestWritable) writableRequest; + RpcRequest rpcRequest = request.message; + String methodName = rpcRequest.getMethodName(); + + + /** + * RPCs for a particular interface (ie protocol) are done using a + * IPC connection that is setup using rpcProxy. + * The rpcProxy's has a declared protocol name that is + * sent form client to server at connection time. + */ + //TODO: use the clientVersion to do protocol compatibility checks + long clientVersion = rpcRequest.getClientProtocolVersion(); + + if (verbose) { + LOG.info("Call: protocol name=" + protocol.getName() + + ", method=" + methodName); + } + + status.setRPC(rpcRequest.getMethodName(), receiveTime); + status.setRPCPacket(writableRequest); + status.resume("Servicing call"); + //we don't allow RPCs with overloaded method names + Method method = + protocol.getMethod(rpcRequest.getMethodName()); + method.setAccessible(true); + + //get an instance of the first arg type + Message prototype = getMethodArgType(method); + Message param = prototype.newBuilderForType() + .mergeFrom(rpcRequest.getRequest()).build(); + Message result; + Object impl = null; + if (protocol.isAssignableFrom(this.implementation)) { + impl = this.instance; + } + else { + throw new HBaseRPC.UnknownProtocolException(protocol); + } + + long startTime = System.currentTimeMillis(); + result = (Message)method.invoke(impl, param); + int processingTime = (int) (System.currentTimeMillis() - startTime); + int qTime = (int) (startTime-receiveTime); + if (TRACELOG.isDebugEnabled()) { + TRACELOG.debug("Call #" + CurCall.get().id + + "; Served: " + protocol.getSimpleName()+"#"+method.getName() + + " queueTime=" + qTime + + " processingTime=" + processingTime + + " contents=" + Objects.describeQuantity(param)); + } + rpcMetrics.rpcQueueTime.inc(qTime); + rpcMetrics.rpcProcessingTime.inc(processingTime); + rpcMetrics.inc(method.getName(), processingTime); + if (verbose) log("Return: "+result); + long responseSize = result.getSerializedSize(); + // log any RPC responses that are slower than the configured warn + // response time or larger than configured warning size + boolean tooSlow = (processingTime > warnResponseTime + && warnResponseTime > -1); + boolean tooLarge = (responseSize > warnResponseSize + && warnResponseSize > -1); + if (tooSlow || tooLarge) { + // when tagging, we let TooLarge trump TooSmall to keep output simple + // note that large responses will often also be slow. + logResponse(rpcRequest, (tooLarge ? "TooLarge" : "TooSlow"), + status.getClient(), startTime, processingTime, qTime, + responseSize); + // provides a count of log-reported slow responses + if (tooSlow) { + rpcMetrics.rpcSlowResponseTime.inc(processingTime); + } + } + if (processingTime > 1000) { + // we use a hard-coded one second period so that we can clearly + // indicate the time period we're warning about in the name of the + // metric itself + rpcMetrics.inc(method.getName() + ABOVE_ONE_SEC_METRIC, + processingTime); + } + return new RpcResponseWritable(result); + } catch (InvocationTargetException e) { + Throwable target = e.getTargetException(); + if (target instanceof IOException) { + throw (IOException)target; + } + if (target instanceof ServiceException) { + throw ProtobufUtil.getRemoteException((ServiceException)target); + } + IOException ioe = new IOException(target.toString()); + ioe.setStackTrace(target.getStackTrace()); + throw ioe; + } catch (Throwable e) { + if (!(e instanceof IOException)) { + LOG.error("Unexpected throwable object ", e); + } + IOException ioe = new IOException(e.toString()); + ioe.setStackTrace(e.getStackTrace()); + throw ioe; + } + } + + private Message getMethodArgType(Method method) throws Exception { + if (methodArg.containsKey(method.getName())) { + return methodArg.get(method.getName()); + } + + Class[] args = method.getParameterTypes(); + //in the protobuf methods, args[0] is the only significant argument + Method newInstMethod = args[0].getMethod("getDefaultInstance"); + newInstMethod.setAccessible(true); + Message prototype = (Message) newInstMethod.invoke(null, (Object[]) null); + methodArg.put(method.getName(), prototype); + return prototype; + } + /** + * Logs an RPC response to the LOG file, producing valid JSON objects for + * client Operations. + * @param call The call to log. + * @param tag The tag that will be used to indicate this event in the log. + * @param client The address of the client who made this call. + * @param startTime The time that the call was initiated, in ms. + * @param processingTime The duration that the call took to run, in ms. + * @param qTime The duration that the call spent on the queue + * prior to being initiated, in ms. + * @param responseSize The size in bytes of the response buffer. + */ + private void logResponse(RpcRequest call, String tag, String clientAddress, + long startTime, int processingTime, int qTime, long responseSize) + throws IOException { + Object params[] = new Object[]{call.getRequest()}; + // for JSON encoding + ObjectMapper mapper = new ObjectMapper(); + // base information that is reported regardless of type of call + Map responseInfo = new HashMap(); + responseInfo.put("starttimems", startTime); + responseInfo.put("processingtimems", processingTime); + responseInfo.put("queuetimems", qTime); + responseInfo.put("responsesize", responseSize); + responseInfo.put("client", clientAddress); + responseInfo.put("class", instance.getClass().getSimpleName()); + responseInfo.put("method", call.getMethodName()); + if (params.length == 2 && instance instanceof HRegionServer && + params[0] instanceof byte[] && + params[1] instanceof Operation) { + // if the slow process is a query, we want to log its table as well + // as its own fingerprint + byte [] tableName = + HRegionInfo.parseRegionName((byte[]) params[0])[0]; + responseInfo.put("table", Bytes.toStringBinary(tableName)); + // annotate the response map with operation details + responseInfo.putAll(((Operation) params[1]).toMap()); + // report to the log file + LOG.warn("(operation" + tag + "): " + + mapper.writeValueAsString(responseInfo)); + } else if (params.length == 1 && instance instanceof HRegionServer && + params[0] instanceof Operation) { + // annotate the response map with operation details + responseInfo.putAll(((Operation) params[0]).toMap()); + // report to the log file + LOG.warn("(operation" + tag + "): " + + mapper.writeValueAsString(responseInfo)); + } else { + // can't get JSON details, so just report call.toString() along with + // a more generic tag. + responseInfo.put("call", call.toString()); + LOG.warn("(response" + tag + "): " + + mapper.writeValueAsString(responseInfo)); + } + } + } + + protected static void log(String value) { + String v = value; + if (v != null && v.length() > 55) + v = v.substring(0, 55)+"..."; + LOG.info(v); + } +}