Index: hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java (revision 1371363)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java (working copy)
@@ -54,14 +54,10 @@
*/
protected synchronized HBaseClient getClient(Configuration conf,
SocketFactory factory) {
- return getClient(conf, factory, HbaseObjectWritable.class);
- }
- protected synchronized HBaseClient getClient(Configuration conf,
- SocketFactory factory, Class extends Writable> valueClass) {
HBaseClient client = clients.get(factory);
if (client == null) {
// Make an hbase client instead of hadoop Client.
- client = new HBaseClient(valueClass, conf, factory);
+ client = new HBaseClient(conf, factory);
clients.put(factory, client);
} else {
client.incCount();
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (revision 1371363)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (working copy)
@@ -55,6 +55,8 @@
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseBody;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader.Status;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
@@ -69,10 +71,8 @@
import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
import org.apache.hadoop.hbase.util.PoolMap;
import org.apache.hadoop.hbase.util.PoolMap.PoolType;
-import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
@@ -80,11 +80,14 @@
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.TokenSelector;
-import org.apache.hadoop.util.ReflectionUtils;
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.Message;
+import com.google.protobuf.Message.Builder;
-/** A client for an IPC service. IPC calls take a single {@link Writable} as a
- * parameter, and return a {@link Writable} as their value. A service runs on
+
+/** A client for an IPC service. IPC calls take a single Protobuf message as a
+ * parameter, and return a single Protobuf message as their value. A service runs on
* a port and is defined by a parameter class and a value class.
*
*
This is the org.apache.hadoop.ipc.Client renamed as HBaseClient and
@@ -99,7 +102,6 @@
.getLog("org.apache.hadoop.ipc.HBaseClient");
protected final PoolMap connections;
- protected final Class extends Writable> valueClass; // class of call values
protected int counter; // counter for call ids
protected final AtomicBoolean running = new AtomicBoolean(true); // if client runs
final protected Configuration conf;
@@ -187,13 +189,13 @@
/** A call waiting for a value. */
protected class Call {
final int id; // call id
- final Writable param; // parameter
- Writable value; // value, null if error
+ final RpcRequestBody param; // rpc request object
+ Message value; // value, null if error
IOException error; // exception, null if value
boolean done; // true when call is done
long startTime;
- protected Call(Writable param) {
+ protected Call(RpcRequestBody param) {
this.param = param;
this.startTime = System.currentTimeMillis();
synchronized (HBaseClient.this) {
@@ -223,7 +225,7 @@
*
* @param value return value of the call.
*/
- public synchronized void setValue(Writable value) {
+ public synchronized void setValue(Message value) {
this.value = value;
callComplete();
}
@@ -825,15 +827,18 @@
try {
if (LOG.isDebugEnabled())
LOG.debug(getName() + " sending #" + call.id);
- RpcRequestHeader.Builder builder = RPCProtos.RpcRequestHeader.newBuilder();
- builder.setCallId(call.id);
- DataOutputBuffer d = new DataOutputBuffer();
- builder.build().writeDelimitedTo(d);
- call.param.write(d);
+ RpcRequestHeader.Builder headerBuilder = RPCProtos.RpcRequestHeader.newBuilder();
+ headerBuilder.setCallId(call.id);
//noinspection SynchronizeOnNonFinalField
synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC
- this.out.writeInt(d.getLength());
- this.out.write(d.getData(), 0, d.getLength());
+ int serializedHeaderSize = headerBuilder.build().getSerializedSize();
+ int requestSerializedSize = call.param.getSerializedSize();
+ this.out.writeInt(serializedHeaderSize +
+ CodedOutputStream.computeRawVarint32Size(serializedHeaderSize) +
+ requestSerializedSize +
+ CodedOutputStream.computeRawVarint32Size(requestSerializedSize));
+ headerBuilder.build().writeDelimitedTo(this.out);
+ call.param.writeDelimitedTo(this.out);
this.out.flush();
}
} catch(IOException e) {
@@ -870,8 +875,17 @@
Status status = response.getStatus();
if (status == Status.SUCCESS) {
- Writable value = ReflectionUtils.newInstance(valueClass, conf);
- value.readFields(in); // read value
+ Message rpcResponseType;
+ try {
+ rpcResponseType = ProtobufRpcEngine.Invoker.getReturnProtoType(
+ ProtobufRpcEngine.Server.getMethod(remoteId.getProtocol(),
+ call.param.getMethodName()));
+ } catch (Exception e) {
+ throw new RuntimeException(e); //local exception
+ }
+ Builder builder = rpcResponseType.newBuilderForType();
+ builder.mergeDelimitedFrom(in);
+ Message value = builder.build();
// it's possible that this call may have been cleaned up due to a RPC
// timeout, so check if it still exists before setting the value.
if (call != null) {
@@ -983,7 +997,7 @@
private final ParallelResults results;
protected final int index;
- public ParallelCall(Writable param, ParallelResults results, int index) {
+ public ParallelCall(RpcRequestBody param, ParallelResults results, int index) {
super(param);
this.results = results;
this.index = index;
@@ -998,12 +1012,12 @@
/** Result collector for parallel calls. */
protected static class ParallelResults {
- protected final Writable[] values;
+ protected final Message[] values;
protected int size;
protected int count;
public ParallelResults(int size) {
- this.values = new Writable[size];
+ this.values = new RpcResponseBody[size];
this.size = size;
}
@@ -1020,15 +1034,13 @@
}
/**
- * Construct an IPC client whose values are of the given {@link Writable}
+ * Construct an IPC client whose values are of the {@link Message}
* class.
* @param valueClass value class
* @param conf configuration
* @param factory socket factory
*/
- public HBaseClient(Class extends Writable> valueClass, Configuration conf,
- SocketFactory factory) {
- this.valueClass = valueClass;
+ public HBaseClient(Configuration conf, SocketFactory factory) {
this.maxIdleTime =
conf.getInt("hbase.ipc.client.connection.maxidletime", 10000); //10s
this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
@@ -1051,8 +1063,8 @@
* @param valueClass value class
* @param conf configuration
*/
- public HBaseClient(Class extends Writable> valueClass, Configuration conf) {
- this(valueClass, conf, NetUtils.getDefaultSocketFactory(conf));
+ public HBaseClient(Configuration conf) {
+ this(conf, NetUtils.getDefaultSocketFactory(conf));
}
/**
@@ -1124,17 +1136,17 @@
/** Make a call, passing param, to the IPC server running at
* address, returning the value. Throws exceptions if there are
* network problems or if the remote code threw an exception.
- * @param param writable parameter
+ * @param param RpcRequestBody parameter
* @param address network address
- * @return Writable
+ * @return Message
* @throws IOException e
*/
- public Writable call(Writable param, InetSocketAddress address)
+ public Message call(RpcRequestBody param, InetSocketAddress address)
throws IOException, InterruptedException {
return call(param, address, null, 0);
}
- public Writable call(Writable param, InetSocketAddress addr,
+ public Message call(RpcRequestBody param, InetSocketAddress addr,
User ticket, int rpcTimeout)
throws IOException, InterruptedException {
return call(param, addr, null, ticket, rpcTimeout);
@@ -1145,7 +1157,7 @@
* with the ticket credentials, returning the value.
* Throws exceptions if there are network problems or if the remote code
* threw an exception. */
- public Writable call(Writable param, InetSocketAddress addr,
+ public Message call(RpcRequestBody param, InetSocketAddress addr,
Class extends VersionedProtocol> protocol,
User ticket, int rpcTimeout)
throws InterruptedException, IOException {
@@ -1217,14 +1229,14 @@
* corresponding address. When all values are available, or have timed out
* or errored, the collected results are returned in an array. The array
* contains nulls for calls that timed out or errored.
- * @param params writable parameters
+ * @param params RpcRequestBody parameters
* @param addresses socket addresses
- * @return Writable[]
+ * @return RpcResponseBody[]
* @throws IOException e
- * @deprecated Use {@link #call(Writable[], InetSocketAddress[], Class, User)} instead
+ * @deprecated Use {@link #call(RpcRequestBody[], InetSocketAddress[], Class, User)} instead
*/
@Deprecated
- public Writable[] call(Writable[] params, InetSocketAddress[] addresses)
+ public Message[] call(RpcRequestBody[] params, InetSocketAddress[] addresses)
throws IOException, InterruptedException {
return call(params, addresses, null, null);
}
@@ -1233,11 +1245,11 @@
* corresponding address. When all values are available, or have timed out
* or errored, the collected results are returned in an array. The array
* contains nulls for calls that timed out or errored. */
- public Writable[] call(Writable[] params, InetSocketAddress[] addresses,
+ public Message[] call(RpcRequestBody[] params, InetSocketAddress[] addresses,
Class extends VersionedProtocol> protocol,
User ticket)
throws IOException, InterruptedException {
- if (addresses.length == 0) return new Writable[0];
+ if (addresses.length == 0) return new RpcResponseBody[0];
ParallelResults results = new ParallelResults(params.length);
// TODO this synchronization block doesnt make any sense, we should possibly fix it
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (revision 1371363)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (working copy)
@@ -28,7 +28,6 @@
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.ReflectionUtils;
@@ -46,24 +45,13 @@
*
* This is a local hbase copy of the hadoop RPC so we can do things like
* address HADOOP-414 for hbase-only and try other hbase-specific
- * optimizations like using our own version of ObjectWritable. Class has been
- * renamed to avoid confusing it w/ hadoop versions.
+ * optimizations. Class has been renamed to avoid confusing it w/ hadoop
+ * versions.
*
*
*
* A protocol is a Java interface. All parameters and return types must
- * be one of:
- *
- *
- a primitive type,
boolean, byte,
- * char, short, int, long,
- * float, double, or void; or
- *
- * - a {@link String}; or
- *
- * - a {@link Writable}; or
- *
- * - an array of the above types
- *
+ * be Protobuf objects.
* All methods in the protocol should throw only IOException. No field data of
* the protocol instance is transmitted.
*/
@@ -122,7 +110,7 @@
if (engine == null) {
// check for a configured default engine
Class> defaultEngine =
- conf.getClass(RPC_ENGINE_PROP, WritableRpcEngine.class);
+ conf.getClass(RPC_ENGINE_PROP, ProtobufRpcEngine.class);
// check for a per interface override
Class> impl = conf.getClass(RPC_ENGINE_PROP+"."+protocol.getName(),
@@ -345,16 +333,6 @@
VersionedProtocol proxy = engine
.getProxy(protocol, clientVersion, addr, ticket, conf, factory,
Math.min(rpcTimeout, HBaseRPC.getRpcTimeout()));
- if (engine instanceof WritableRpcEngine) {
- long serverVersion = proxy.getProtocolVersion(protocol.getName(),
- clientVersion);
- if (serverVersion == clientVersion) {
- return proxy;
- }
-
- throw new VersionMismatch(protocol.getName(), clientVersion,
- serverVersion);
- }
return proxy;
}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (revision 1371363)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (working copy)
@@ -68,12 +68,10 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.io.DataOutputOutputStream;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.io.HbaseObjectWritable;
-import org.apache.hadoop.hbase.io.WritableWithSize;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestHeader;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
@@ -87,9 +85,7 @@
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslStatus;
import org.apache.hadoop.hbase.util.ByteBufferOutputStream;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
@@ -104,17 +100,16 @@
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
-import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Function;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
import org.cliffc.high_scale_lib.Counter;
-/** An abstract IPC service. IPC calls take a single {@link Writable} as a
- * parameter, and return a {@link Writable} as their value. A service runs on
+/** A client for an IPC service. IPC calls take a single Protobuf message as a
+ * parameter, and return a single Protobuf message as their value. A service runs on
* a port and is defined by a parameter class and a value class.
*
*
@@ -193,8 +188,8 @@
}
/** Returns the server instance called under or null. May be called under
- * {@link #call(Class, Writable, long, MonitoredRPCHandler)} implementations,
- * and under {@link Writable} methods of paramters and return values.
+ * {@link #call(Class, RpcRequestBody, long, MonitoredRPCHandler)} implementations,
+ * and under protobuf methods of paramters and return values.
* Permits applications to access the server context.
* @return HBaseServer
*/
@@ -235,7 +230,6 @@
private int handlerCount; // number of handler threads
private int priorityHandlerCount;
private int readThreads; // number of read threads
- protected Class extends Writable> paramClass; // class of call parameters
protected int maxIdleTime; // the maximum idle time after
// which a client may be
// disconnected
@@ -312,7 +306,7 @@
/** A call queued for handling. */
protected class Call implements RpcCallContext {
protected int id; // the client's call id
- protected Writable param; // the parameter passed
+ protected RpcRequestBody param; // the parameter passed
protected Connection connection; // connection to client
protected long timestamp; // the time received when response is null
// the time served when response is not null
@@ -324,7 +318,7 @@
protected long size; // size of current call
protected boolean isError;
- public Call(int id, Writable param, Connection connection,
+ public Call(int id, RpcRequestBody param, Connection connection,
Responder responder, long size) {
this.id = id;
this.param = param;
@@ -353,34 +347,13 @@
if (errorClass != null) {
this.isError = true;
}
- Writable result = null;
- if (value instanceof Writable) {
- result = (Writable) value;
+
+ ByteBufferOutputStream buf = null;
+ if (value != null) {
+ buf = new ByteBufferOutputStream(((Message)value).getSerializedSize());
} else {
- /* We might have a null value and errors. Avoid creating a
- * HbaseObjectWritable, because the constructor fails on null. */
- if (value != null) {
- result = new HbaseObjectWritable(value);
- }
+ buf = new ByteBufferOutputStream(BUFFER_INITIAL_SIZE);
}
-
- int size = BUFFER_INITIAL_SIZE;
- if (result instanceof WritableWithSize) {
- // get the size hint.
- WritableWithSize ohint = (WritableWithSize) result;
- long hint = ohint.getWritableSize() + 2*Bytes.SIZEOF_INT;
- if (hint > Integer.MAX_VALUE) {
- // oops, new problem.
- IOException ioe =
- new IOException("Result buffer size too large: " + hint);
- errorClass = ioe.getClass().getName();
- error = StringUtils.stringifyException(ioe);
- } else {
- size = (int)hint;
- }
- }
-
- ByteBufferOutputStream buf = new ByteBufferOutputStream(size);
DataOutputStream out = new DataOutputStream(buf);
try {
RpcResponseHeader.Builder builder = RpcResponseHeader.newBuilder();
@@ -394,7 +367,9 @@
b.setStackTrace(error);
b.build().writeDelimitedTo(out);
} else {
- result.write(out);
+ if (value != null) {
+ ((Message)value).writeDelimitedTo(out);
+ }
}
if (connection.useWrap) {
wrapWithSasl(buf);
@@ -709,7 +684,7 @@
closeCurrentConnection(key, e);
cleanupConnections(true);
try { Thread.sleep(60000); } catch (Exception ignored) {}
- }
+ }
} catch (Exception e) {
closeCurrentConnection(key, e);
}
@@ -1418,7 +1393,7 @@
AccessControlException ae = new AccessControlException(
"Authentication is required");
setupResponse(authFailedResponse, authFailedCall, Status.FATAL,
- null, ae.getClass().getName(), ae.getMessage());
+ ae.getClass().getName(), ae.getMessage());
responder.doRespond(authFailedCall);
throw ae;
}
@@ -1506,7 +1481,7 @@
// Versions 3 and greater can interpret this exception
// response in the same manner
setupResponse(buffer, fakeCall, Status.FATAL,
- null, VersionMismatch.class.getName(), errMsg);
+ VersionMismatch.class.getName(), errMsg);
responder.doRespond(fakeCall);
}
@@ -1623,23 +1598,21 @@
if (LOG.isDebugEnabled()) {
LOG.debug(" got call #" + id + ", " + callSize + " bytes");
}
-
// Enforcing the call queue size, this triggers a retry in the client
if ((callSize + callQueueSize.get()) > maxQueueSize) {
final Call callTooBig =
new Call(id, null, this, responder, callSize);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
- setupResponse(responseBuffer, callTooBig, Status.FATAL, null,
+ setupResponse(responseBuffer, callTooBig, Status.FATAL,
IOException.class.getName(),
"Call queue is full, is ipc.server.max.callqueue.size too small?");
responder.doRespond(callTooBig);
return;
}
- Writable param;
+ RpcRequestBody param;
try {
- param = ReflectionUtils.newInstance(paramClass, conf);//read param
- param.readFields(dis);
+ param = RpcRequestBody.parseDelimitedFrom(dis);
} catch (Throwable t) {
LOG.warn("Unable to read call parameters for client " +
getHostAddress(), t);
@@ -1647,7 +1620,7 @@
new Call(id, null, this, responder, callSize);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
- setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null,
+ setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL,
t.getClass().getName(),
"IPC server unable to read call parameters: " + t.getMessage());
responder.doRespond(readParamsFailedCall);
@@ -1683,7 +1656,7 @@
} catch (AuthorizationException ae) {
LOG.debug("Connection authorization failed: "+ae.getMessage(), ae);
rpcMetrics.authorizationFailures.inc();
- setupResponse(authFailedResponse, authFailedCall, Status.FATAL, null,
+ setupResponse(authFailedResponse, authFailedCall, Status.FATAL,
ae.getClass().getName(), ae.getMessage());
responder.doRespond(authFailedCall);
return false;
@@ -1785,7 +1758,7 @@
String errorClass = null;
String error = null;
- Writable value = null;
+ Message value = null;
CurCall.set(call);
try {
@@ -1855,7 +1828,7 @@
}
- private Function qosFunction = null;
+ private Function qosFunction = null;
/**
* Gets the QOS level for this call. If it is higher than the highPriorityLevel and there
@@ -1864,11 +1837,11 @@
* @param newFunc
*/
@Override
- public void setQosFunction(Function newFunc) {
+ public void setQosFunction(Function newFunc) {
qosFunction = newFunc;
}
- protected int getQosLevel(Writable param) {
+ protected int getQosLevel(RpcRequestBody param) {
if (qosFunction == null) {
return 0;
}
@@ -1886,14 +1859,13 @@
*
*/
protected HBaseServer(String bindAddress, int port,
- Class extends Writable> paramClass, int handlerCount,
+ int handlerCount,
int priorityHandlerCount, Configuration conf, String serverName,
int highPriorityLevel)
throws IOException {
this.bindAddress = bindAddress;
this.conf = conf;
this.port = port;
- this.paramClass = paramClass;
this.handlerCount = handlerCount;
this.priorityHandlerCount = priorityHandlerCount;
this.socketSendBufferSize = 0;
@@ -1963,26 +1935,10 @@
*/
private void setupResponse(ByteArrayOutputStream response,
Call call, Status status,
- Writable rv, String errorClass, String error)
+ String errorClass, String error)
throws IOException {
response.reset();
- DataOutputStream out = new DataOutputStream(response);
-
- if (status == Status.SUCCESS) {
- try {
- rv.write(out);
- call.setResponse(rv, status, null, null);
- } catch (Throwable t) {
- LOG.warn("Error serializing call response for call " + call, t);
- // Call back to same function - this is OK since the
- // buffer is reset at the top, and since status is changed
- // to ERROR it won't infinite loop.
- call.setResponse(null, status.ERROR, t.getClass().getName(),
- StringUtils.stringifyException(t));
- }
- } else {
- call.setResponse(rv, status, errorClass, error);
- }
+ call.setResponse(null, status, errorClass, error);
}
protected void closeConnection(Connection connection) {
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcEngine.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcEngine.java (revision 1371363)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcEngine.java (working copy)
@@ -18,14 +18,13 @@
package org.apache.hadoop.hbase.ipc;
-import java.io.DataInput;
-import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
+import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -35,14 +34,20 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.io.DataOutputOutputStream;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.client.Operation;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.security.HBasePolicyProvider;
import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.io.*;
+import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Objects;
-import org.apache.hadoop.hbase.util.ProtoUtil;
+import org.codehaus.jackson.map.ObjectMapper;
import com.google.protobuf.Message;
import com.google.protobuf.ServiceException;
@@ -80,8 +85,9 @@
return new Server(instance, ifaces, conf, bindAddress, port, numHandlers,
metaHandlerCount, verbose, highPriorityLevel);
}
- private static class Invoker implements InvocationHandler {
- private final Map returnTypes =
+
+ static class Invoker implements InvocationHandler {
+ private static final Map returnTypes =
new ConcurrentHashMap();
private Class extends VersionedProtocol> protocol;
private InetSocketAddress address;
@@ -97,7 +103,7 @@
this.protocol = protocol;
this.address = addr;
this.ticket = ticket;
- this.client = CLIENTS.getClient(conf, factory, RpcResponseWritable.class);
+ this.client = CLIENTS.getClient(conf, factory);
this.rpcTimeout = rpcTimeout;
Long version = Invocation.PROTOCOL_VERSION.get(protocol);
if (version != null) {
@@ -133,6 +139,7 @@
+ method.getName() + "]" + ", Expected: 2, Actual: "
+ params.length);
}
+ builder.setRequestClassName(param.getClass().getName());
builder.setRequest(param.toByteString());
builder.setClientProtocolVersion(clientProtocolVersion);
rpcRequest = builder.build();
@@ -166,24 +173,20 @@
}
RpcRequestBody rpcRequest = constructRpcRequest(method, args);
- RpcResponseWritable val = null;
+ Message val = null;
try {
- val = (RpcResponseWritable) client.call(
- new RpcRequestWritable(rpcRequest), address, protocol, ticket,
- rpcTimeout);
+ val = client.call(rpcRequest, address, protocol, ticket, rpcTimeout);
if (LOG.isDebugEnabled()) {
long callTime = System.currentTimeMillis() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime);
}
-
- Message protoType = null;
- protoType = getReturnProtoType(method);
- Message returnMessage;
- returnMessage = protoType.newBuilderForType()
- .mergeFrom(val.responseMessage).build();
- return returnMessage;
+ return val;
} catch (Throwable e) {
+ if (e instanceof RemoteException) {
+ Throwable cause = ((RemoteException)e).unwrapRemoteException();
+ throw new ServiceException(cause);
+ }
throw new ServiceException(e);
}
}
@@ -195,7 +198,7 @@
}
}
- private Message getReturnProtoType(Method method) throws Exception {
+ static Message getReturnProtoType(Method method) throws Exception {
if (returnTypes.containsKey(method.getName())) {
return returnTypes.get(method.getName());
}
@@ -209,78 +212,11 @@
}
}
- /**
- * Writable Wrapper for Protocol Buffer Requests
- */
- private static class RpcRequestWritable implements Writable {
- RpcRequestBody message;
-
- @SuppressWarnings("unused")
- public RpcRequestWritable() {
- }
-
- RpcRequestWritable(RpcRequestBody message) {
- this.message = message;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- ((Message)message).writeDelimitedTo(
- DataOutputOutputStream.constructOutputStream(out));
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- int length = ProtoUtil.readRawVarint32(in);
- byte[] bytes = new byte[length];
- in.readFully(bytes);
- message = RpcRequestBody.parseFrom(bytes);
- }
-
- public int getSerializedSize() {
- return message.getSerializedSize();
- }
-
- @Override
- public String toString() {
- return " Client Protocol Version: " +
- message.getClientProtocolVersion() + " MethodName: " +
- message.getMethodName();
- }
- }
-
- /**
- * Writable Wrapper for Protocol Buffer Responses
- */
- private static class RpcResponseWritable implements Writable {
- byte[] responseMessage;
-
- @SuppressWarnings("unused")
- public RpcResponseWritable() {
- }
-
- public RpcResponseWritable(Message message) {
- this.responseMessage = message.toByteArray();
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeInt(responseMessage.length);
- out.write(responseMessage);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- int length = in.readInt();
- byte[] bytes = new byte[length];
- in.readFully(bytes);
- responseMessage = bytes;
- }
- }
- public static class Server extends WritableRpcEngine.Server {
+ public static class Server extends HBaseServer {
boolean verbose;
Object instance;
Class> implementation;
+ private Class>[] ifaces;
private static final String WARN_RESPONSE_TIME =
"hbase.ipc.warn.response.time";
private static final String WARN_RESPONSE_SIZE =
@@ -295,41 +231,84 @@
private final int warnResponseTime;
private final int warnResponseSize;
+ private static String classNameBase(String className) {
+ String[] names = className.split("\\.", -1);
+ if (names == null || names.length == 0) {
+ return className;
+ }
+ return names[names.length-1];
+ }
public Server(Object instance, final Class>[] ifaces,
Configuration conf, String bindAddress, int port,
int numHandlers, int metaHandlerCount, boolean verbose,
int highPriorityLevel)
throws IOException {
- super(instance, ifaces, RpcRequestWritable.class, conf, bindAddress, port,
- numHandlers, metaHandlerCount, verbose, highPriorityLevel);
- this.verbose = verbose;
+ super(bindAddress, port, numHandlers, metaHandlerCount,
+ conf, classNameBase(instance.getClass().getName()),
+ highPriorityLevel);
this.instance = instance;
this.implementation = instance.getClass();
+ this.verbose = verbose;
+
+ this.ifaces = ifaces;
+
// create metrics for the advertised interfaces this server implements.
String [] metricSuffixes = new String [] {ABOVE_ONE_SEC_METRIC};
- this.rpcMetrics.createMetrics(ifaces, false, metricSuffixes);
+ this.rpcMetrics.createMetrics(this.ifaces, false, metricSuffixes);
this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME,
DEFAULT_WARN_RESPONSE_TIME);
this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE,
DEFAULT_WARN_RESPONSE_SIZE);
+ this.verbose = verbose;
+ this.instance = instance;
+ this.implementation = instance.getClass();
}
- private final Map methodArg =
+ private static final Map methodArg =
new ConcurrentHashMap();
- private final Map methodInstances =
+ private static final Map methodInstances =
new ConcurrentHashMap();
+
+ private AuthenticationTokenSecretManager createSecretManager(){
+ if (!User.isSecurityEnabled() ||
+ !(instance instanceof org.apache.hadoop.hbase.Server)) {
+ return null;
+ }
+ org.apache.hadoop.hbase.Server server =
+ (org.apache.hadoop.hbase.Server)instance;
+ Configuration conf = server.getConfiguration();
+ long keyUpdateInterval =
+ conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000);
+ long maxAge =
+ conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000);
+ return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
+ server.getServerName().toString(), keyUpdateInterval, maxAge);
+ }
+
@Override
+ public void startThreads() {
+ AuthenticationTokenSecretManager mgr = createSecretManager();
+ if (mgr != null) {
+ setSecretManager(mgr);
+ mgr.start();
+ }
+ this.authManager = new ServiceAuthorizationManager();
+ HBasePolicyProvider.init(conf, authManager);
+
+ // continue with base startup
+ super.startThreads();
+ }
+
+ @Override
/**
* This is a server side method, which is invoked over RPC. On success
* the return response has protobuf response payload. On failure, the
* exception name and the stack trace are returned in the protobuf response.
*/
- public Writable call(Class extends VersionedProtocol> protocol,
- Writable writableRequest, long receiveTime, MonitoredRPCHandler status)
+ public Message call(Class extends VersionedProtocol> protocol,
+ RpcRequestBody rpcRequest, long receiveTime, MonitoredRPCHandler status)
throws IOException {
try {
- RpcRequestWritable request = (RpcRequestWritable) writableRequest;
- RpcRequestBody rpcRequest = request.message;
String methodName = rpcRequest.getMethodName();
Method method = getMethod(protocol, methodName);
if (method == null) {
@@ -358,7 +337,7 @@
status.setRPC(rpcRequest.getMethodName(),
new Object[]{rpcRequest.getRequest()}, receiveTime);
- status.setRPCPacket(writableRequest);
+ status.setRPCPacket(rpcRequest);
status.resume("Servicing call");
//get an instance of the method arg type
Message protoType = getMethodArgType(method);
@@ -398,7 +377,7 @@
rpcMetrics.rpcProcessingTime.inc(processingTime);
rpcMetrics.inc(method.getName(), processingTime);
if (verbose) {
- WritableRpcEngine.log("Return: "+result, LOG);
+ log("Return: "+result, LOG);
}
long responseSize = result.getSerializedSize();
// log any RPC responses that are slower than the configured warn
@@ -432,7 +411,7 @@
rpcMetrics.inc(method.getName() + ABOVE_ONE_SEC_METRIC,
processingTime);
}
- return new RpcResponseWritable(result);
+ return result;
} catch (InvocationTargetException e) {
Throwable target = e.getTargetException();
if (target instanceof IOException) {
@@ -454,7 +433,7 @@
}
}
- private Method getMethod(Class extends VersionedProtocol> protocol,
+ static Method getMethod(Class extends VersionedProtocol> protocol,
String methodName) {
Method method = methodInstances.get(methodName);
if (method != null) {
@@ -472,7 +451,7 @@
return null;
}
- private Message getMethodArgType(Method method) throws Exception {
+ static Message getMethodArgType(Method method) throws Exception {
Message protoType = methodArg.get(method.getName());
if (protoType != null) {
return protoType;
@@ -497,5 +476,68 @@
methodArg.put(method.getName(), protoType);
return protoType;
}
+ /**
+ * Logs an RPC response to the LOG file, producing valid JSON objects for
+ * client Operations.
+ * @param params The parameters received in the call.
+ * @param methodName The name of the method invoked
+ * @param call The string representation of the call
+ * @param tag The tag that will be used to indicate this event in the log.
+ * @param client The address of the client who made this call.
+ * @param startTime The time that the call was initiated, in ms.
+ * @param processingTime The duration that the call took to run, in ms.
+ * @param qTime The duration that the call spent on the queue
+ * prior to being initiated, in ms.
+ * @param responseSize The size in bytes of the response buffer.
+ */
+ void logResponse(Object[] params, String methodName, String call, String tag,
+ String clientAddress, long startTime, int processingTime, int qTime,
+ long responseSize)
+ throws IOException {
+ // for JSON encoding
+ ObjectMapper mapper = new ObjectMapper();
+ // base information that is reported regardless of type of call
+ Map responseInfo = new HashMap();
+ responseInfo.put("starttimems", startTime);
+ responseInfo.put("processingtimems", processingTime);
+ responseInfo.put("queuetimems", qTime);
+ responseInfo.put("responsesize", responseSize);
+ responseInfo.put("client", clientAddress);
+ responseInfo.put("class", instance.getClass().getSimpleName());
+ responseInfo.put("method", methodName);
+ if (params.length == 2 && instance instanceof HRegionServer &&
+ params[0] instanceof byte[] &&
+ params[1] instanceof Operation) {
+ // if the slow process is a query, we want to log its table as well
+ // as its own fingerprint
+ byte [] tableName =
+ HRegionInfo.parseRegionName((byte[]) params[0])[0];
+ responseInfo.put("table", Bytes.toStringBinary(tableName));
+ // annotate the response map with operation details
+ responseInfo.putAll(((Operation) params[1]).toMap());
+ // report to the log file
+ LOG.warn("(operation" + tag + "): " +
+ mapper.writeValueAsString(responseInfo));
+ } else if (params.length == 1 && instance instanceof HRegionServer &&
+ params[0] instanceof Operation) {
+ // annotate the response map with operation details
+ responseInfo.putAll(((Operation) params[0]).toMap());
+ // report to the log file
+ LOG.warn("(operation" + tag + "): " +
+ mapper.writeValueAsString(responseInfo));
+ } else {
+ // can't get JSON details, so just report call.toString() along with
+ // a more generic tag.
+ responseInfo.put("call", call);
+ LOG.warn("(response" + tag + "): " +
+ mapper.writeValueAsString(responseInfo));
+ }
+ }
+ protected static void log(String value, Log LOG) {
+ String v = value;
+ if (v != null && v.length() > 55)
+ v = v.substring(0, 55)+"...";
+ LOG.info(v);
+ }
}
}
\ No newline at end of file
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java (revision 1371363)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java (working copy)
@@ -21,10 +21,12 @@
package org.apache.hadoop.hbase.ipc;
import com.google.common.base.Function;
-import org.apache.hadoop.io.Writable;
+import com.google.protobuf.Message;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.VersionedProtocol;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -47,16 +49,16 @@
/** Called for each call.
* @param param writable parameter
* @param receiveTime time
- * @return Writable
+ * @return Message
* @throws java.io.IOException e
*/
- Writable call(Class extends VersionedProtocol> protocol,
- Writable param, long receiveTime, MonitoredRPCHandler status)
+ Message call(Class extends VersionedProtocol> protocol,
+ RpcRequestBody param, long receiveTime, MonitoredRPCHandler status)
throws IOException;
void setErrorHandler(HBaseRPCErrorHandler handler);
- void setQosFunction(Function newFunc);
+ void setQosFunction(Function newFunc);
void openServer();
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java (revision 1371363)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java (working copy)
@@ -1,468 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.ipc;
-
-import java.lang.reflect.Proxy;
-import java.lang.reflect.Method;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.UndeclaredThrowableException;
-
-import java.net.InetSocketAddress;
-import java.io.*;
-import java.util.Map;
-import java.util.HashMap;
-
-import javax.net.SocketFactory;
-
-import org.apache.commons.logging.*;
-
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.client.Operation;
-import org.apache.hadoop.hbase.io.HbaseObjectWritable;
-import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Objects;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
-import org.apache.hadoop.hbase.security.HBasePolicyProvider;
-import org.apache.hadoop.hbase.ipc.VersionedProtocol;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.*;
-
-import org.codehaus.jackson.map.ObjectMapper;
-
-import com.google.protobuf.ServiceException;
-
-/** An RpcEngine implementation for Writable data. */
-@InterfaceAudience.Private
-class WritableRpcEngine implements RpcEngine {
- // LOG is NOT in hbase subpackage intentionally so that the default HBase
- // DEBUG log level does NOT emit RPC-level logging.
- private static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.RPCEngine");
-
- protected final static ClientCache CLIENTS = new ClientCache();
-
- private static class Invoker implements InvocationHandler {
- private Class extends VersionedProtocol> protocol;
- private InetSocketAddress address;
- private User ticket;
- private HBaseClient client;
- private boolean isClosed = false;
- final private int rpcTimeout;
-
- public Invoker(Class extends VersionedProtocol> protocol,
- InetSocketAddress address, User ticket,
- Configuration conf, SocketFactory factory, int rpcTimeout) {
- this.protocol = protocol;
- this.address = address;
- this.ticket = ticket;
- this.client = CLIENTS.getClient(conf, factory);
- this.rpcTimeout = rpcTimeout;
- }
-
- public Object invoke(Object proxy, Method method, Object[] args)
- throws Throwable {
- final boolean logDebug = LOG.isDebugEnabled();
- long startTime = 0;
- if (logDebug) {
- startTime = System.currentTimeMillis();
- }
-
- try {
- HbaseObjectWritable value = (HbaseObjectWritable)
- client.call(new Invocation(method, args), address, protocol, ticket,
- rpcTimeout);
- if (logDebug) {
- // FIGURE HOW TO TURN THIS OFF!
- long callTime = System.currentTimeMillis() - startTime;
- LOG.debug("Call: " + method.getName() + " " + callTime);
- }
- return value.get();
- } catch (Throwable t) {
- // For protobuf protocols, ServiceException is expected
- if (Invocation.PROTOBUF_PROTOCOLS.contains(protocol)) {
- if (t instanceof RemoteException) {
- Throwable cause = ((RemoteException)t).unwrapRemoteException();
- throw new ServiceException(cause);
- }
- throw new ServiceException(t);
- }
- throw t;
- }
- }
-
- /* close the IPC client that's responsible for this invoker's RPCs */
- synchronized protected void close() {
- if (!isClosed) {
- isClosed = true;
- CLIENTS.stopClient(client);
- }
- }
- }
-
- /** Construct a client-side proxy object that implements the named protocol,
- * talking to a server at the named address. */
- public VersionedProtocol getProxy(
- Class extends VersionedProtocol> protocol, long clientVersion,
- InetSocketAddress addr, User ticket,
- Configuration conf, SocketFactory factory, int rpcTimeout)
- throws IOException {
-
- VersionedProtocol proxy =
- (VersionedProtocol) Proxy.newProxyInstance(
- protocol.getClassLoader(), new Class[] { protocol },
- new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
- try {
- long serverVersion = ((VersionedProtocol)proxy)
- .getProtocolVersion(protocol.getName(), clientVersion);
- if (serverVersion != clientVersion) {
- throw new HBaseRPC.VersionMismatch(protocol.getName(), clientVersion,
- serverVersion);
- }
- } catch (Throwable t) {
- if (t instanceof UndeclaredThrowableException) {
- t = t.getCause();
- }
- if (t instanceof ServiceException) {
- throw ProtobufUtil.getRemoteException((ServiceException)t);
- }
- if (!(t instanceof IOException)) {
- LOG.error("Unexpected throwable object ", t);
- throw new IOException(t);
- }
- throw (IOException)t;
- }
- return proxy;
- }
-
- /**
- * Stop this proxy and release its invoker's resource
- * @param proxy the proxy to be stopped
- */
- public void stopProxy(VersionedProtocol proxy) {
- if (proxy!=null) {
- ((Invoker)Proxy.getInvocationHandler(proxy)).close();
- }
- }
-
- /** Construct a server for a protocol implementation instance listening on a
- * port and address. */
- public Server getServer(Class extends VersionedProtocol> protocol,
- Object instance,
- Class>[] ifaces,
- String bindAddress, int port,
- int numHandlers,
- int metaHandlerCount, boolean verbose,
- Configuration conf, int highPriorityLevel)
- throws IOException {
- return new Server(instance, ifaces, conf, bindAddress, port, numHandlers,
- metaHandlerCount, verbose, highPriorityLevel);
- }
-
- /** An RPC Server. */
- public static class Server extends HBaseServer {
- private Object instance;
- private Class> implementation;
- private Class>[] ifaces;
- private boolean verbose;
-
- private static final String WARN_RESPONSE_TIME =
- "hbase.ipc.warn.response.time";
- private static final String WARN_RESPONSE_SIZE =
- "hbase.ipc.warn.response.size";
-
- /** Default value for above params */
- private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
- private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
-
- /** Names for suffixed metrics */
- private static final String ABOVE_ONE_SEC_METRIC = ".aboveOneSec.";
-
- private final int warnResponseTime;
- private final int warnResponseSize;
-
- private static String classNameBase(String className) {
- String[] names = className.split("\\.", -1);
- if (names == null || names.length == 0) {
- return className;
- }
- return names[names.length-1];
- }
-
- /** Construct an RPC server.
- * @param instance the instance whose methods will be called
- * @param ifaces the interfaces the server supports
- * @param paramClass an instance of this class is used to read the RPC requests
- * @param conf the configuration to use
- * @param bindAddress the address to bind on to listen for connection
- * @param port the port to listen for connections on
- * @param numHandlers the number of method handler threads to run
- * @param metaHandlerCount the number of meta handlers desired
- * @param verbose whether each call should be logged
- * @param highPriorityLevel the priority level this server treats as high priority RPCs
- * @throws IOException e
- */
- public Server(Object instance, final Class>[] ifaces,
- Class extends Writable> paramClass,
- Configuration conf, String bindAddress, int port,
- int numHandlers, int metaHandlerCount, boolean verbose,
- int highPriorityLevel) throws IOException {
- super(bindAddress, port, paramClass, numHandlers, metaHandlerCount,
- conf, classNameBase(instance.getClass().getName()),
- highPriorityLevel);
- this.instance = instance;
- this.implementation = instance.getClass();
- this.verbose = verbose;
-
- this.ifaces = ifaces;
-
- // create metrics for the advertised interfaces this server implements.
- String [] metricSuffixes = new String [] {ABOVE_ONE_SEC_METRIC};
- this.rpcMetrics.createMetrics(this.ifaces, false, metricSuffixes);
-
- this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME,
- DEFAULT_WARN_RESPONSE_TIME);
- this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE,
- DEFAULT_WARN_RESPONSE_SIZE);
- }
-
- public Server(Object instance, final Class>[] ifaces,
- Configuration conf, String bindAddress, int port,
- int numHandlers, int metaHandlerCount, boolean verbose,
- int highPriorityLevel) throws IOException {
- this(instance, ifaces, Invocation.class, conf, bindAddress, port,
- numHandlers, metaHandlerCount, verbose, highPriorityLevel);
- }
-
- public AuthenticationTokenSecretManager createSecretManager(){
- if (!User.isSecurityEnabled() ||
- !(instance instanceof org.apache.hadoop.hbase.Server)) {
- return null;
- }
- org.apache.hadoop.hbase.Server server =
- (org.apache.hadoop.hbase.Server)instance;
- Configuration conf = server.getConfiguration();
- long keyUpdateInterval =
- conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000);
- long maxAge =
- conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000);
- return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
- server.getServerName().toString(), keyUpdateInterval, maxAge);
- }
-
- @Override
- public void startThreads() {
- AuthenticationTokenSecretManager mgr = createSecretManager();
- if (mgr != null) {
- setSecretManager(mgr);
- mgr.start();
- }
- this.authManager = new ServiceAuthorizationManager();
- HBasePolicyProvider.init(conf, authManager);
-
- // continue with base startup
- super.startThreads();
- }
-
- @Override
- public Writable call(Class extends VersionedProtocol> protocol,
- Writable param, long receivedTime, MonitoredRPCHandler status)
- throws IOException {
- try {
- Invocation call = (Invocation)param;
- if(call.getMethodName() == null) {
- throw new IOException("Could not find requested method, the usual " +
- "cause is a version mismatch between client and server.");
- }
- if (verbose) log("Call: " + call, LOG);
- status.setRPC(call.getMethodName(), call.getParameters(), receivedTime);
- status.setRPCPacket(param);
- status.resume("Servicing call");
-
- Method method =
- protocol.getMethod(call.getMethodName(),
- call.getParameterClasses());
- method.setAccessible(true);
-
- //Verify protocol version.
- //Bypass the version check for VersionedProtocol
- if (!method.getDeclaringClass().equals(VersionedProtocol.class)) {
- long clientVersion = call.getProtocolVersion();
- ProtocolSignature serverInfo = ((VersionedProtocol) instance)
- .getProtocolSignature(protocol.getCanonicalName(), call
- .getProtocolVersion(), call.getClientMethodsHash());
- long serverVersion = serverInfo.getVersion();
- if (serverVersion != clientVersion) {
- LOG.warn("Version mismatch: client version=" + clientVersion
- + ", server version=" + serverVersion);
- throw new RPC.VersionMismatch(protocol.getName(), clientVersion,
- serverVersion);
- }
- }
- Object impl = null;
- if (protocol.isAssignableFrom(this.implementation)) {
- impl = this.instance;
- }
- else {
- throw new HBaseRPC.UnknownProtocolException(protocol);
- }
-
- long startTime = System.currentTimeMillis();
- Object[] params = call.getParameters();
- Object value = method.invoke(impl, params);
- int processingTime = (int) (System.currentTimeMillis() - startTime);
- int qTime = (int) (startTime-receivedTime);
- if (TRACELOG.isDebugEnabled()) {
- TRACELOG.debug("Call #" + CurCall.get().id +
- "; Served: " + protocol.getSimpleName()+"#"+call.getMethodName() +
- " queueTime=" + qTime +
- " processingTime=" + processingTime +
- " contents=" + Objects.describeQuantity(params));
- }
- rpcMetrics.rpcQueueTime.inc(qTime);
- rpcMetrics.rpcProcessingTime.inc(processingTime);
- rpcMetrics.inc(call.getMethodName(), processingTime);
- if (verbose) log("Return: "+value, LOG);
-
- HbaseObjectWritable retVal =
- new HbaseObjectWritable(method.getReturnType(), value);
- long responseSize = retVal.getWritableSize();
- // log any RPC responses that are slower than the configured warn
- // response time or larger than configured warning size
- boolean tooSlow = (processingTime > warnResponseTime
- && warnResponseTime > -1);
- boolean tooLarge = (responseSize > warnResponseSize
- && warnResponseSize > -1);
- if (tooSlow || tooLarge) {
- // when tagging, we let TooLarge trump TooSmall to keep output simple
- // note that large responses will often also be slow.
- logResponse(call.getParameters(), call.getMethodName(),
- call.toString(), (tooLarge ? "TooLarge" : "TooSlow"),
- status.getClient(), startTime, processingTime, qTime,
- responseSize);
- // provides a count of log-reported slow responses
- if (tooSlow) {
- rpcMetrics.rpcSlowResponseTime.inc(processingTime);
- }
- }
- if (processingTime > 1000) {
- // we use a hard-coded one second period so that we can clearly
- // indicate the time period we're warning about in the name of the
- // metric itself
- rpcMetrics.inc(call.getMethodName() + ABOVE_ONE_SEC_METRIC,
- processingTime);
- }
-
- return retVal;
- } catch (InvocationTargetException e) {
- Throwable target = e.getTargetException();
- if (target instanceof IOException) {
- throw (IOException)target;
- }
- if (target instanceof ServiceException) {
- throw ProtobufUtil.getRemoteException((ServiceException)target);
- }
- IOException ioe = new IOException(target.toString());
- ioe.setStackTrace(target.getStackTrace());
- throw ioe;
- } catch (Throwable e) {
- if (!(e instanceof IOException)) {
- LOG.error("Unexpected throwable object ", e);
- }
- IOException ioe = new IOException(e.toString());
- ioe.setStackTrace(e.getStackTrace());
- throw ioe;
- }
- }
-
- /**
- * Logs an RPC response to the LOG file, producing valid JSON objects for
- * client Operations.
- * @param params The parameters received in the call.
- * @param methodName The name of the method invoked
- * @param call The string representation of the call
- * @param tag The tag that will be used to indicate this event in the log.
- * @param client The address of the client who made this call.
- * @param startTime The time that the call was initiated, in ms.
- * @param processingTime The duration that the call took to run, in ms.
- * @param qTime The duration that the call spent on the queue
- * prior to being initiated, in ms.
- * @param responseSize The size in bytes of the response buffer.
- */
- void logResponse(Object[] params, String methodName, String call, String tag,
- String clientAddress, long startTime, int processingTime, int qTime,
- long responseSize)
- throws IOException {
- // for JSON encoding
- ObjectMapper mapper = new ObjectMapper();
- // base information that is reported regardless of type of call
- Map responseInfo = new HashMap();
- responseInfo.put("starttimems", startTime);
- responseInfo.put("processingtimems", processingTime);
- responseInfo.put("queuetimems", qTime);
- responseInfo.put("responsesize", responseSize);
- responseInfo.put("client", clientAddress);
- responseInfo.put("class", instance.getClass().getSimpleName());
- responseInfo.put("method", methodName);
- if (params.length == 2 && instance instanceof HRegionServer &&
- params[0] instanceof byte[] &&
- params[1] instanceof Operation) {
- // if the slow process is a query, we want to log its table as well
- // as its own fingerprint
- byte [] tableName =
- HRegionInfo.parseRegionName((byte[]) params[0])[0];
- responseInfo.put("table", Bytes.toStringBinary(tableName));
- // annotate the response map with operation details
- responseInfo.putAll(((Operation) params[1]).toMap());
- // report to the log file
- LOG.warn("(operation" + tag + "): " +
- mapper.writeValueAsString(responseInfo));
- } else if (params.length == 1 && instance instanceof HRegionServer &&
- params[0] instanceof Operation) {
- // annotate the response map with operation details
- responseInfo.putAll(((Operation) params[0]).toMap());
- // report to the log file
- LOG.warn("(operation" + tag + "): " +
- mapper.writeValueAsString(responseInfo));
- } else {
- // can't get JSON details, so just report call.toString() along with
- // a more generic tag.
- responseInfo.put("call", call);
- LOG.warn("(response" + tag + "): " +
- mapper.writeValueAsString(responseInfo));
- }
- }
- }
-
- protected static void log(String value, Log LOG) {
- String v = value;
- if (v != null && v.length() > 55)
- v = v.substring(0, 55)+"...";
- LOG.info(v);
- }
-}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java (revision 1371363)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java (working copy)
@@ -20,8 +20,7 @@
package org.apache.hadoop.hbase.monitoring;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
/**
* A MonitoredTask implementation optimized for use with RPC Handlers
@@ -38,9 +37,9 @@
public abstract long getRPCQueueTime();
public abstract boolean isRPCRunning();
public abstract boolean isOperationRunning();
-
+
public abstract void setRPC(String methodName, Object [] params,
long queueTime);
- public abstract void setRPCPacket(Writable param);
+ public abstract void setRPCPacket(RpcRequestBody param);
public abstract void setConnection(String clientAddress, int remotePort);
}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java (revision 1371363)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java (working copy)
@@ -22,6 +22,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Operation;
import org.apache.hadoop.hbase.io.WritableWithSize;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
@@ -46,7 +47,7 @@
private long rpcStartTime;
private String methodName = "";
private Object [] params = {};
- private Writable packet;
+ private RpcRequestBody packet;
public MonitoredRPCHandlerImpl() {
super();
@@ -141,11 +142,7 @@
// no RPC is currently running, or we don't have an RPC's packet info
return -1L;
}
- if (!(packet instanceof WritableWithSize)) {
- // the packet passed to us doesn't expose size information
- return -1L;
- }
- return ((WritableWithSize) packet).getWritableSize();
+ return packet.getSerializedSize();
}
/**
@@ -201,11 +198,11 @@
}
/**
- * Gives this instance a reference to the Writable received by the RPC, so
+ * Gives this instance a reference to the protobuf received by the RPC, so
* that it can later compute its size if asked for it.
- * @param param The Writable received by the RPC for this call
+ * @param param The protobuf received by the RPC for this call
*/
- public void setRPCPacket(Writable param) {
+ public void setRPCPacket(RpcRequestBody param) {
this.packet = param;
}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java (revision 1371363)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java (working copy)
@@ -1492,6 +1492,10 @@
// optional bytes request = 3;
boolean hasRequest();
com.google.protobuf.ByteString getRequest();
+
+ // optional string requestClassName = 4;
+ boolean hasRequestClassName();
+ String getRequestClassName();
}
public static final class RpcRequestBody extends
com.google.protobuf.GeneratedMessage
@@ -1574,10 +1578,43 @@
return request_;
}
+ // optional string requestClassName = 4;
+ public static final int REQUESTCLASSNAME_FIELD_NUMBER = 4;
+ private java.lang.Object requestClassName_;
+ public boolean hasRequestClassName() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ public String getRequestClassName() {
+ java.lang.Object ref = requestClassName_;
+ if (ref instanceof String) {
+ return (String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ String s = bs.toStringUtf8();
+ if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+ requestClassName_ = s;
+ }
+ return s;
+ }
+ }
+ private com.google.protobuf.ByteString getRequestClassNameBytes() {
+ java.lang.Object ref = requestClassName_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+ requestClassName_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
private void initFields() {
methodName_ = "";
clientProtocolVersion_ = 0L;
request_ = com.google.protobuf.ByteString.EMPTY;
+ requestClassName_ = "";
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -1604,6 +1641,9 @@
if (((bitField0_ & 0x00000004) == 0x00000004)) {
output.writeBytes(3, request_);
}
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ output.writeBytes(4, getRequestClassNameBytes());
+ }
getUnknownFields().writeTo(output);
}
@@ -1625,6 +1665,10 @@
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(3, request_);
}
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(4, getRequestClassNameBytes());
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -1663,6 +1707,11 @@
result = result && getRequest()
.equals(other.getRequest());
}
+ result = result && (hasRequestClassName() == other.hasRequestClassName());
+ if (hasRequestClassName()) {
+ result = result && getRequestClassName()
+ .equals(other.getRequestClassName());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -1684,6 +1733,10 @@
hash = (37 * hash) + REQUEST_FIELD_NUMBER;
hash = (53 * hash) + getRequest().hashCode();
}
+ if (hasRequestClassName()) {
+ hash = (37 * hash) + REQUESTCLASSNAME_FIELD_NUMBER;
+ hash = (53 * hash) + getRequestClassName().hashCode();
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
return hash;
}
@@ -1806,6 +1859,8 @@
bitField0_ = (bitField0_ & ~0x00000002);
request_ = com.google.protobuf.ByteString.EMPTY;
bitField0_ = (bitField0_ & ~0x00000004);
+ requestClassName_ = "";
+ bitField0_ = (bitField0_ & ~0x00000008);
return this;
}
@@ -1856,6 +1911,10 @@
to_bitField0_ |= 0x00000004;
}
result.request_ = request_;
+ if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+ to_bitField0_ |= 0x00000008;
+ }
+ result.requestClassName_ = requestClassName_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -1881,6 +1940,9 @@
if (other.hasRequest()) {
setRequest(other.getRequest());
}
+ if (other.hasRequestClassName()) {
+ setRequestClassName(other.getRequestClassName());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -1931,6 +1993,11 @@
request_ = input.readBytes();
break;
}
+ case 34: {
+ bitField0_ |= 0x00000008;
+ requestClassName_ = input.readBytes();
+ break;
+ }
}
}
}
@@ -2018,6 +2085,42 @@
return this;
}
+ // optional string requestClassName = 4;
+ private java.lang.Object requestClassName_ = "";
+ public boolean hasRequestClassName() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ public String getRequestClassName() {
+ java.lang.Object ref = requestClassName_;
+ if (!(ref instanceof String)) {
+ String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+ requestClassName_ = s;
+ return s;
+ } else {
+ return (String) ref;
+ }
+ }
+ public Builder setRequestClassName(String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000008;
+ requestClassName_ = value;
+ onChanged();
+ return this;
+ }
+ public Builder clearRequestClassName() {
+ bitField0_ = (bitField0_ & ~0x00000008);
+ requestClassName_ = getDefaultInstance().getRequestClassName();
+ onChanged();
+ return this;
+ }
+ void setRequestClassName(com.google.protobuf.ByteString value) {
+ bitField0_ |= 0x00000008;
+ requestClassName_ = value;
+ onChanged();
+ }
+
// @@protoc_insertion_point(builder_scope:RpcRequestBody)
}
@@ -2032,7 +2135,7 @@
public interface RpcResponseHeaderOrBuilder
extends com.google.protobuf.MessageOrBuilder {
- // required int32 callId = 1;
+ // required uint32 callId = 1;
boolean hasCallId();
int getCallId();
@@ -2141,7 +2244,7 @@
}
private int bitField0_;
- // required int32 callId = 1;
+ // required uint32 callId = 1;
public static final int CALLID_FIELD_NUMBER = 1;
private int callId_;
public boolean hasCallId() {
@@ -2186,7 +2289,7 @@
throws java.io.IOException {
getSerializedSize();
if (((bitField0_ & 0x00000001) == 0x00000001)) {
- output.writeInt32(1, callId_);
+ output.writeUInt32(1, callId_);
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
output.writeEnum(2, status_.getNumber());
@@ -2202,7 +2305,7 @@
size = 0;
if (((bitField0_ & 0x00000001) == 0x00000001)) {
size += com.google.protobuf.CodedOutputStream
- .computeInt32Size(1, callId_);
+ .computeUInt32Size(1, callId_);
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
size += com.google.protobuf.CodedOutputStream
@@ -2487,7 +2590,7 @@
}
case 8: {
bitField0_ |= 0x00000001;
- callId_ = input.readInt32();
+ callId_ = input.readUInt32();
break;
}
case 16: {
@@ -2507,7 +2610,7 @@
private int bitField0_;
- // required int32 callId = 1;
+ // required uint32 callId = 1;
private int callId_ ;
public boolean hasCallId() {
return ((bitField0_ & 0x00000001) == 0x00000001);
@@ -3505,16 +3608,17 @@
"ctionHeader\022\"\n\010userInfo\030\001 \001(\0132\020.UserInfo" +
"rmation\022?\n\010protocol\030\002 \001(\t:-org.apache.ha" +
"doop.hbase.client.ClientProtocol\"\"\n\020RpcR" +
- "equestHeader\022\016\n\006callId\030\001 \002(\r\"T\n\016RpcReque" +
+ "equestHeader\022\016\n\006callId\030\001 \002(\r\"n\n\016RpcReque" +
"stBody\022\022\n\nmethodName\030\001 \002(\t\022\035\n\025clientProt" +
- "ocolVersion\030\002 \001(\004\022\017\n\007request\030\003 \001(\014\"{\n\021Rp" +
- "cResponseHeader\022\016\n\006callId\030\001 \002(\005\022)\n\006statu" +
- "s\030\002 \002(\0162\031.RpcResponseHeader.Status\"+\n\006St",
- "atus\022\013\n\007SUCCESS\020\000\022\t\n\005ERROR\020\001\022\t\n\005FATAL\020\002\"" +
- "#\n\017RpcResponseBody\022\020\n\010response\030\001 \001(\014\"9\n\014" +
- "RpcException\022\025\n\rexceptionName\030\001 \002(\t\022\022\n\ns" +
- "tackTrace\030\002 \001(\tB<\n*org.apache.hadoop.hba" +
- "se.protobuf.generatedB\tRPCProtosH\001\240\001\001"
+ "ocolVersion\030\002 \001(\004\022\017\n\007request\030\003 \001(\014\022\030\n\020re" +
+ "questClassName\030\004 \001(\t\"{\n\021RpcResponseHeade" +
+ "r\022\016\n\006callId\030\001 \002(\r\022)\n\006status\030\002 \002(\0162\031.RpcR",
+ "esponseHeader.Status\"+\n\006Status\022\013\n\007SUCCES" +
+ "S\020\000\022\t\n\005ERROR\020\001\022\t\n\005FATAL\020\002\"#\n\017RpcResponse" +
+ "Body\022\020\n\010response\030\001 \001(\014\"9\n\014RpcException\022\025" +
+ "\n\rexceptionName\030\001 \002(\t\022\022\n\nstackTrace\030\002 \001(" +
+ "\tB<\n*org.apache.hadoop.hbase.protobuf.ge" +
+ "neratedB\tRPCProtosH\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -3550,7 +3654,7 @@
internal_static_RpcRequestBody_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_RpcRequestBody_descriptor,
- new java.lang.String[] { "MethodName", "ClientProtocolVersion", "Request", },
+ new java.lang.String[] { "MethodName", "ClientProtocolVersion", "Request", "RequestClassName", },
org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody.class,
org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody.Builder.class);
internal_static_RpcResponseHeader_descriptor =
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1371363)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy)
@@ -27,10 +27,13 @@
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryUsage;
import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.BindException;
import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -40,11 +43,13 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.SortedSet;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -168,6 +173,7 @@
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@@ -224,6 +230,8 @@
import com.google.common.base.Function;
import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
/**
@@ -436,7 +444,12 @@
*/
private final int scannerLeaseTimeoutPeriod;
+ /**
+ * The reference to the QosFunction
+ */
+ private final QosFunction qosFunction;
+
/**
* Starts a HRegionServer at the default location
*
@@ -513,7 +526,7 @@
this.isa = this.rpcServer.getListenerAddress();
this.rpcServer.setErrorHandler(this);
- this.rpcServer.setQosFunction(new QosFunction());
+ this.rpcServer.setQosFunction((qosFunction = new QosFunction()));
this.startcode = System.currentTimeMillis();
// login the server principal (if using secure Hadoop)
@@ -545,13 +558,59 @@
int priority() default 0;
}
+ QosFunction getQosFunction() {
+ return qosFunction;
+ }
+
+ RegionScanner getScanner(long scannerId) {
+ String scannerIdString = Long.toString(scannerId);
+ return scanners.get(scannerIdString);
+ }
+
/**
* Utility used ensuring higher quality of service for priority rpcs; e.g.
* rpcs to .META. and -ROOT-, etc.
*/
- class QosFunction implements Function {
+ class QosFunction implements Function {
private final Map annotatedQos;
+ private HRegionServer hRegionServer = HRegionServer.this;
+ //The logic for figuring out high priority RPCs is as follows:
+ //1. if the method is annotated with a QosPriority of QOS_HIGH,
+ // that is honored
+ //2. parse out the protobuf message and see if the request is for meta
+ // region, and if so, treat it as a high priority RPC
+ //Some optimizations for (2) are done here -
+ //Clients send the argument classname as part of making the RPC. The server
+ //decides whether to deserialize the proto argument message based on the
+ //pre-established set of argument classes (knownArgumentClasses below).
+ //This prevents the server from having to deserialize all proto argument
+ //messages prematurely.
+ //All the argument classes declare a 'getRegion' method that returns a
+ //RegionSpecifier object. Methods can be invoked on the returned object
+ //to figure out whether it is a meta region or not.
+ @SuppressWarnings("unchecked")
+ private final Class extends Message>[] knownArgumentClasses = new Class[]{
+ GetRegionInfoRequest.class,
+ GetStoreFileRequest.class,
+ CloseRegionRequest.class,
+ FlushRegionRequest.class,
+ SplitRegionRequest.class,
+ CompactRegionRequest.class,
+ GetRequest.class,
+ MutateRequest.class,
+ ScanRequest.class,
+ LockRowRequest.class,
+ UnlockRowRequest.class,
+ MultiRequest.class
+ };
+
+ //Some caches for helping performance
+ private final Map> argumentToClassMap =
+ new HashMap>();
+ private final Map, Method>>
+ methodMap = new HashMap, Method>>();
+
public QosFunction() {
Map qosMap = new HashMap();
for (Method m : HRegionServer.class.getMethods()) {
@@ -562,12 +621,34 @@
}
annotatedQos = qosMap;
+ if (methodMap.get("parseFrom") == null) {
+ methodMap.put("parseFrom",
+ new HashMap, Method>());
+ }
+ if (methodMap.get("getRegion") == null) {
+ methodMap.put("getRegion",
+ new HashMap, Method>());
+ }
+ for (Class extends Message> cls : knownArgumentClasses) {
+ argumentToClassMap.put(cls.getCanonicalName(), cls);
+ try {
+ methodMap.get("parseFrom").put(cls,
+ cls.getDeclaredMethod("parseFrom",ByteString.class));
+ methodMap.get("getRegion").put(cls, cls.getDeclaredMethod("getRegion"));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
}
+ void setRegionServer(HRegionServer server) {
+ this.hRegionServer = server;
+ }
+
public boolean isMetaRegion(byte[] regionName) {
HRegion region;
try {
- region = getRegion(regionName);
+ region = hRegionServer.getRegion(regionName);
} catch (NotServingRegionException ignored) {
return false;
}
@@ -575,64 +656,69 @@
}
@Override
- public Integer apply(Writable from) {
- if (!(from instanceof Invocation)) return NORMAL_QOS;
+ public Integer apply(RpcRequestBody from) {
+ String methodName = from.getMethodName();
+ Class extends Message> rpcArgClass = null;
+ if (from.hasRequestClassName()) {
+ String cls = from.getRequestClassName();
+ rpcArgClass = argumentToClassMap.get(cls);
+ }
- Invocation inv = (Invocation) from;
- String methodName = inv.getMethodName();
-
Integer priorityByAnnotation = annotatedQos.get(methodName);
if (priorityByAnnotation != null) {
return priorityByAnnotation;
}
- // scanner methods...
- if (methodName.equals("next") || methodName.equals("close")) {
- // translate!
- Long scannerId;
+ if (rpcArgClass == null || from.getRequest().isEmpty()) {
+ return NORMAL_QOS;
+ }
+
+ if (isMetaRegionOperation(rpcArgClass, from.getRequest())) {
+ return HIGH_QOS;
+ }
+ if (methodName.equals("scan")) { // scanner methods...
+ ScanRequest request;
try {
- scannerId = (Long) inv.getParameters()[0];
- } catch (ClassCastException ignored) {
- // LOG.debug("Low priority: " + from);
- return NORMAL_QOS; // doh.
+ request = ScanRequest.parseFrom(from.getRequest());
+ } catch (InvalidProtocolBufferException e) {
+ LOG.warn("Couldn't parse protobuf packet " + e);
+ throw new RuntimeException(e);
}
- String scannerIdString = Long.toString(scannerId);
- RegionScanner scanner = scanners.get(scannerIdString);
+ if (!request.hasScannerId()) {
+ return NORMAL_QOS;
+ }
+ RegionScanner scanner = hRegionServer.getScanner(request.getScannerId());
if (scanner != null && scanner.getRegionInfo().isMetaRegion()) {
- // LOG.debug("High priority scanner request: " + scannerId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("High priority scanner request: " + request.getScannerId());
+ }
return HIGH_QOS;
}
- } else if (inv.getParameterClasses().length == 0) {
- // Just let it through. This is getOnlineRegions, etc.
- } else if (inv.getParameterClasses()[0] == byte[].class) {
- // first arg is byte array, so assume this is a regionname:
- if (isMetaRegion((byte[]) inv.getParameters()[0])) {
- // LOG.debug("High priority with method: " + methodName +
- // " and region: "
- // + Bytes.toString((byte[]) inv.getParameters()[0]));
- return HIGH_QOS;
- }
- } else if (inv.getParameterClasses()[0] == MultiAction.class) {
- MultiAction> ma = (MultiAction>) inv.getParameters()[0];
- Set regions = ma.getRegions();
- // ok this sucks, but if any single of the actions touches a meta, the
- // whole
- // thing gets pingged high priority. This is a dangerous hack because
- // people
- // can get their multi action tagged high QOS by tossing a Get(.META.)
- // AND this
- // regionserver hosts META/-ROOT-
- for (byte[] region : regions) {
- if (isMetaRegion(region)) {
- // LOG.debug("High priority multi with region: " +
- // Bytes.toString(region));
- return HIGH_QOS; // short circuit for the win.
- }
- }
}
- // LOG.debug("Low priority: " + from.toString());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Low priority: " + from.toString());
+ }
return NORMAL_QOS;
}
+
+ private boolean isMetaRegionOperation(Class extends Message> argumentClass,
+ ByteString argument) {
+ try {
+ Method parseFrom = methodMap.get("parseFrom").get(argumentClass);
+ Object deserializedRequestObj = parseFrom.invoke(null, argument);
+ Method getRegion = methodMap.get("getRegion").get(argumentClass);
+ RegionSpecifier regionSpecifier =
+ (RegionSpecifier)getRegion.invoke(deserializedRequestObj,
+ (Object[])null);
+ HRegion region = hRegionServer.getRegion(regionSpecifier);
+ if (region.getRegionInfo().isMetaRegion()) {
+ return true;
+ }
+ } catch (Exception ex) {
+ return false;
+ }
+ return true;
+ }
}
/**
Index: hbase-server/src/main/protobuf/RPC.proto
===================================================================
--- hbase-server/src/main/protobuf/RPC.proto (revision 1371363)
+++ hbase-server/src/main/protobuf/RPC.proto (working copy)
@@ -26,20 +26,21 @@
*
* As part of setting up a connection to a server, the client needs to send
* the ConnectionHeader header. At the data level, this looks like
- * <"hrpc"-bytearray><'5'-byte>
+ * <"hrpc"-bytearray><'5'[byte]>
*
* For every RPC that the client makes it needs to send the following
* RpcRequestHeader and the RpcRequestBody. At the data level this looks like:
- *
+ *
*
- *
+ *
*
* On a success, the server's protobuf response looks like
*
- *
+ *
* On a failure, the server's protobuf response looks like
*
- *
+ *
*
* There is one special message that's sent from client to server -
* the Ping message. At the data level, this is just the bytes corresponding
@@ -84,8 +85,16 @@
/** protocol version of class declaring the called method */
optional uint64 clientProtocolVersion = 2;
- /** Bytes corresponding to the client protobuf request */
+ /** Bytes corresponding to the client protobuf request. This is the actual
+ * bytes corresponding to the RPC request argument.
+ */
optional bytes request = 3;
+
+ /** Some metainfo about the request. Helps us to treat RPCs with
+ * different priorities. For now this is just the classname of the request
+ * proto object.
+ */
+ optional string requestClassName = 4;
}
/**
@@ -93,7 +102,7 @@
*/
message RpcResponseHeader {
/** Echo back the callId the client sent */
- required int32 callId = 1;
+ required uint32 callId = 1;
/** Did the RPC execution encounter an error at the server */
enum Status {
SUCCESS = 0;
@@ -106,7 +115,9 @@
* The RPC response body
*/
message RpcResponseBody {
- /** Optional response bytes */
+ /** Optional response bytes. This is the actual bytes corresponding to the
+ * return value of the invoked RPC.
+ */
optional bytes response = 1;
}
/**
@@ -122,4 +133,4 @@
/** Exception stack trace from the server side */
optional string stackTrace = 2;
-}
\ No newline at end of file
+}
Index: hbase-server/src/main/resources/hbase-default.xml
===================================================================
--- hbase-server/src/main/resources/hbase-default.xml (revision 1371363)
+++ hbase-server/src/main/resources/hbase-default.xml (working copy)
@@ -523,7 +523,7 @@
hbase.rpc.engine
- org.apache.hadoop.hbase.ipc.WritableRpcEngine
+ org.apache.hadoop.hbase.ipc.ProtobufRpcEngine
Implementation of org.apache.hadoop.hbase.ipc.RpcEngine to be
used for client / server RPC call marshalling.
Index: hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
===================================================================
--- hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java (revision 1371363)
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java (working copy)
@@ -34,12 +34,15 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.ipc.VersionedProtocol;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.mortbay.log.Log;
/**
* Test that delayed RPCs work. Fire up three calls, the first of which should
@@ -163,7 +166,7 @@
public interface TestRpc extends VersionedProtocol {
public static final long VERSION = 1L;
- int test(boolean delay);
+ TestResponse test(TestArg delay);
}
private static class TestRpcImpl implements TestRpc {
@@ -183,9 +186,12 @@
}
@Override
- public int test(final boolean delay) {
+ public TestResponse test(final TestArg testArg) {
+ boolean delay = testArg.getDelay();
+ TestResponse.Builder responseBuilder = TestResponse.newBuilder();
if (!delay) {
- return UNDELAYED;
+ responseBuilder.setResponse(UNDELAYED);
+ return responseBuilder.build();
}
final Delayable call = HBaseServer.getCurrentCall();
call.startDelay(delayReturnValue);
@@ -193,7 +199,9 @@
public void run() {
try {
Thread.sleep(500);
- call.endDelay(delayReturnValue ? DELAYED : null);
+ TestResponse.Builder responseBuilder = TestResponse.newBuilder();
+ call.endDelay(delayReturnValue ?
+ responseBuilder.setResponse(DELAYED).build() : null);
} catch (Exception e) {
e.printStackTrace();
}
@@ -201,7 +209,8 @@
}.start();
// This value should go back to client only if the response is set
// immediately at delay time.
- return 0xDEADBEEF;
+ responseBuilder.setResponse(0xDEADBEEF);
+ return responseBuilder.build();
}
@Override
@@ -235,7 +244,9 @@
@Override
public void run() {
try {
- Integer result = new Integer(server.test(delay));
+ Integer result =
+ new Integer(server.test(TestArg.newBuilder()
+ .setDelay(delay).build()).getResponse());
if (results != null) {
synchronized (results) {
results.add(result);
@@ -263,7 +274,7 @@
int result = 0xDEADBEEF;
try {
- result = client.test(false);
+ result = client.test(TestArg.newBuilder().setDelay(false).build()).getResponse();
} catch (Exception e) {
fail("No exception should have been thrown.");
}
@@ -271,12 +282,13 @@
boolean caughtException = false;
try {
- result = client.test(true);
+ result = client.test(TestArg.newBuilder().setDelay(true).build()).getResponse();
} catch(Exception e) {
// Exception thrown by server is enclosed in a RemoteException.
- if (e.getCause().getMessage().startsWith(
+ if (e.getCause().getMessage().contains(
"java.lang.Exception: Something went wrong"))
caughtException = true;
+ Log.warn(e);
}
assertTrue(caughtException);
}
@@ -286,9 +298,9 @@
*/
private static class FaultyTestRpc implements TestRpc {
@Override
- public int test(boolean delay) {
- if (!delay)
- return UNDELAYED;
+ public TestResponse test(TestArg arg) {
+ if (!arg.getDelay())
+ return TestResponse.newBuilder().setResponse(UNDELAYED).build();
Delayable call = HBaseServer.getCurrentCall();
call.startDelay(true);
try {
@@ -297,7 +309,7 @@
e.printStackTrace();
}
// Client will receive the Exception, not this value.
- return DELAYED;
+ return TestResponse.newBuilder().setResponse(DELAYED).build();
}
@Override
Index: hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestPBOnWritableRpc.java
===================================================================
--- hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestPBOnWritableRpc.java (revision 1371363)
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestPBOnWritableRpc.java (working copy)
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.ipc;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotSame;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.SmallTests;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.junit.Test;
-
-import com.google.protobuf.DescriptorProtos;
-import com.google.protobuf.DescriptorProtos.EnumDescriptorProto;
-
-import org.junit.experimental.categories.Category;
-
-/** Unit tests to test PB-based types on WritableRpcEngine. */
-@Category(SmallTests.class)
-public class TestPBOnWritableRpc {
-
- private static Configuration conf = new Configuration();
-
- public interface TestProtocol extends VersionedProtocol {
- public static final long VERSION = 1L;
-
- String echo(String value) throws IOException;
- Writable echo(Writable value) throws IOException;
-
- DescriptorProtos.EnumDescriptorProto exchangeProto(
- DescriptorProtos.EnumDescriptorProto arg);
- }
-
- public static class TestImpl implements TestProtocol {
- public long getProtocolVersion(String protocol, long clientVersion) {
- return TestProtocol.VERSION;
- }
-
- public ProtocolSignature getProtocolSignature(String protocol, long clientVersion,
- int hashcode) {
- return new ProtocolSignature(TestProtocol.VERSION, null);
- }
-
- @Override
- public String echo(String value) throws IOException { return value; }
-
- @Override
- public Writable echo(Writable writable) {
- return writable;
- }
-
- @Override
- public EnumDescriptorProto exchangeProto(EnumDescriptorProto arg) {
- return arg;
- }
- }
-
- @Test(timeout=10000)
- public void testCalls() throws Exception {
- testCallsInternal(conf);
- }
-
- private void testCallsInternal(Configuration conf) throws Exception {
- RpcServer rpcServer = HBaseRPC.getServer(new TestImpl(),
- new Class>[] {TestProtocol.class},
- "localhost", // BindAddress is IP we got for this server.
- 9999, // port number
- 2, // number of handlers
- 0, // we dont use high priority handlers in master
- conf.getBoolean("hbase.rpc.verbose", false), conf,
- 0);
- TestProtocol proxy = null;
- try {
- rpcServer.start();
-
- InetSocketAddress isa =
- new InetSocketAddress("localhost", 9999);
- proxy = (TestProtocol) HBaseRPC.waitForProxy(
- TestProtocol.class, TestProtocol.VERSION,
- isa, conf, -1, 8000, 8000);
-
- String stringResult = proxy.echo("foo");
- assertEquals(stringResult, "foo");
-
- stringResult = proxy.echo((String)null);
- assertEquals(stringResult, null);
-
- Text utf8Result = (Text)proxy.echo(new Text("hello world"));
- assertEquals(utf8Result, new Text("hello world"));
-
- utf8Result = (Text)proxy.echo((Text)null);
- assertEquals(utf8Result, null);
-
- // Test protobufs
- EnumDescriptorProto sendProto =
- EnumDescriptorProto.newBuilder().setName("test").build();
- EnumDescriptorProto retProto = proxy.exchangeProto(sendProto);
- assertEquals(sendProto, retProto);
- assertNotSame(sendProto, retProto);
- } finally {
- rpcServer.stop();
- if(proxy != null) {
- HBaseRPC.stopProxy(proxy);
- }
- }
- }
-
- public static void main(String[] args) throws Exception {
- new TestPBOnWritableRpc().testCallsInternal(conf);
- }
-
- @org.junit.Rule
- public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
- new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
-}
Index: hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/protobuf/generated/TestDelayedRpcProtos.java
===================================================================
--- hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/protobuf/generated/TestDelayedRpcProtos.java (revision 0)
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/protobuf/generated/TestDelayedRpcProtos.java (revision 0)
@@ -0,0 +1,825 @@
+// Generated by the protocol buffer compiler. DO NOT EDIT!
+// source: test_delayed_rpc.proto
+
+package org.apache.hadoop.hbase.ipc.protobuf.generated;
+
+public final class TestDelayedRpcProtos {
+ private TestDelayedRpcProtos() {}
+ public static void registerAllExtensions(
+ com.google.protobuf.ExtensionRegistry registry) {
+ }
+ public interface TestArgOrBuilder
+ extends com.google.protobuf.MessageOrBuilder {
+
+ // required bool delay = 1;
+ boolean hasDelay();
+ boolean getDelay();
+ }
+ public static final class TestArg extends
+ com.google.protobuf.GeneratedMessage
+ implements TestArgOrBuilder {
+ // Use TestArg.newBuilder() to construct.
+ private TestArg(Builder builder) {
+ super(builder);
+ }
+ private TestArg(boolean noInit) {}
+
+ private static final TestArg defaultInstance;
+ public static TestArg getDefaultInstance() {
+ return defaultInstance;
+ }
+
+ public TestArg getDefaultInstanceForType() {
+ return defaultInstance;
+ }
+
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.internal_static_TestArg_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.internal_static_TestArg_fieldAccessorTable;
+ }
+
+ private int bitField0_;
+ // required bool delay = 1;
+ public static final int DELAY_FIELD_NUMBER = 1;
+ private boolean delay_;
+ public boolean hasDelay() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ public boolean getDelay() {
+ return delay_;
+ }
+
+ private void initFields() {
+ delay_ = false;
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ if (!hasDelay()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void writeTo(com.google.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ output.writeBool(1, delay_);
+ }
+ getUnknownFields().writeTo(output);
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBoolSize(1, delay_);
+ }
+ size += getUnknownFields().getSerializedSize();
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @java.lang.Override
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ @java.lang.Override
+ public boolean equals(final java.lang.Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg)) {
+ return super.equals(obj);
+ }
+ org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg other = (org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg) obj;
+
+ boolean result = true;
+ result = result && (hasDelay() == other.hasDelay());
+ if (hasDelay()) {
+ result = result && (getDelay()
+ == other.getDelay());
+ }
+ result = result &&
+ getUnknownFields().equals(other.getUnknownFields());
+ return result;
+ }
+
+ @java.lang.Override
+ public int hashCode() {
+ int hash = 41;
+ hash = (19 * hash) + getDescriptorForType().hashCode();
+ if (hasDelay()) {
+ hash = (37 * hash) + DELAY_FIELD_NUMBER;
+ hash = (53 * hash) + hashBoolean(getDelay());
+ }
+ hash = (29 * hash) + getUnknownFields().hashCode();
+ return hash;
+ }
+
+ public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg parseFrom(
+ com.google.protobuf.ByteString data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data).buildParsed();
+ }
+ public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg parseFrom(
+ com.google.protobuf.ByteString data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data, extensionRegistry)
+ .buildParsed();
+ }
+ public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg parseFrom(byte[] data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data).buildParsed();
+ }
+ public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg parseFrom(
+ byte[] data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data, extensionRegistry)
+ .buildParsed();
+ }
+ public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input).buildParsed();
+ }
+ public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg parseFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input, extensionRegistry)
+ .buildParsed();
+ }
+ public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ Builder builder = newBuilder();
+ if (builder.mergeDelimitedFrom(input)) {
+ return builder.buildParsed();
+ } else {
+ return null;
+ }
+ }
+ public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg parseDelimitedFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ Builder builder = newBuilder();
+ if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+ return builder.buildParsed();
+ } else {
+ return null;
+ }
+ }
+ public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg parseFrom(
+ com.google.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input).buildParsed();
+ }
+ public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg parseFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input, extensionRegistry)
+ .buildParsed();
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder(org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ @java.lang.Override
+ protected Builder newBuilderForType(
+ com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ Builder builder = new Builder(parent);
+ return builder;
+ }
+ public static final class Builder extends
+ com.google.protobuf.GeneratedMessage.Builder
+ implements org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArgOrBuilder {
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.internal_static_TestArg_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.internal_static_TestArg_fieldAccessorTable;
+ }
+
+ // Construct using org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg.newBuilder()
+ private Builder() {
+ maybeForceBuilderInitialization();
+ }
+
+ private Builder(BuilderParent parent) {
+ super(parent);
+ maybeForceBuilderInitialization();
+ }
+ private void maybeForceBuilderInitialization() {
+ if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+ }
+ }
+ private static Builder create() {
+ return new Builder();
+ }
+
+ public Builder clear() {
+ super.clear();
+ delay_ = false;
+ bitField0_ = (bitField0_ & ~0x00000001);
+ return this;
+ }
+
+ public Builder clone() {
+ return create().mergeFrom(buildPartial());
+ }
+
+ public com.google.protobuf.Descriptors.Descriptor
+ getDescriptorForType() {
+ return org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg.getDescriptor();
+ }
+
+ public org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg getDefaultInstanceForType() {
+ return org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg.getDefaultInstance();
+ }
+
+ public org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg build() {
+ org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ private org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg buildParsed()
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(
+ result).asInvalidProtocolBufferException();
+ }
+ return result;
+ }
+
+ public org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg buildPartial() {
+ org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg result = new org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg(this);
+ int from_bitField0_ = bitField0_;
+ int to_bitField0_ = 0;
+ if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+ to_bitField0_ |= 0x00000001;
+ }
+ result.delay_ = delay_;
+ result.bitField0_ = to_bitField0_;
+ onBuilt();
+ return result;
+ }
+
+ public Builder mergeFrom(com.google.protobuf.Message other) {
+ if (other instanceof org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg) {
+ return mergeFrom((org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg)other);
+ } else {
+ super.mergeFrom(other);
+ return this;
+ }
+ }
+
+ public Builder mergeFrom(org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg other) {
+ if (other == org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg.getDefaultInstance()) return this;
+ if (other.hasDelay()) {
+ setDelay(other.getDelay());
+ }
+ this.mergeUnknownFields(other.getUnknownFields());
+ return this;
+ }
+
+ public final boolean isInitialized() {
+ if (!hasDelay()) {
+
+ return false;
+ }
+ return true;
+ }
+
+ public Builder mergeFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+ com.google.protobuf.UnknownFieldSet.newBuilder(
+ this.getUnknownFields());
+ while (true) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ this.setUnknownFields(unknownFields.build());
+ onChanged();
+ return this;
+ default: {
+ if (!parseUnknownField(input, unknownFields,
+ extensionRegistry, tag)) {
+ this.setUnknownFields(unknownFields.build());
+ onChanged();
+ return this;
+ }
+ break;
+ }
+ case 8: {
+ bitField0_ |= 0x00000001;
+ delay_ = input.readBool();
+ break;
+ }
+ }
+ }
+ }
+
+ private int bitField0_;
+
+ // required bool delay = 1;
+ private boolean delay_ ;
+ public boolean hasDelay() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ public boolean getDelay() {
+ return delay_;
+ }
+ public Builder setDelay(boolean value) {
+ bitField0_ |= 0x00000001;
+ delay_ = value;
+ onChanged();
+ return this;
+ }
+ public Builder clearDelay() {
+ bitField0_ = (bitField0_ & ~0x00000001);
+ delay_ = false;
+ onChanged();
+ return this;
+ }
+
+ // @@protoc_insertion_point(builder_scope:TestArg)
+ }
+
+ static {
+ defaultInstance = new TestArg(true);
+ defaultInstance.initFields();
+ }
+
+ // @@protoc_insertion_point(class_scope:TestArg)
+ }
+
+ public interface TestResponseOrBuilder
+ extends com.google.protobuf.MessageOrBuilder {
+
+ // required int32 response = 1;
+ boolean hasResponse();
+ int getResponse();
+ }
+ public static final class TestResponse extends
+ com.google.protobuf.GeneratedMessage
+ implements TestResponseOrBuilder {
+ // Use TestResponse.newBuilder() to construct.
+ private TestResponse(Builder builder) {
+ super(builder);
+ }
+ private TestResponse(boolean noInit) {}
+
+ private static final TestResponse defaultInstance;
+ public static TestResponse getDefaultInstance() {
+ return defaultInstance;
+ }
+
+ public TestResponse getDefaultInstanceForType() {
+ return defaultInstance;
+ }
+
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.internal_static_TestResponse_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.internal_static_TestResponse_fieldAccessorTable;
+ }
+
+ private int bitField0_;
+ // required int32 response = 1;
+ public static final int RESPONSE_FIELD_NUMBER = 1;
+ private int response_;
+ public boolean hasResponse() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ public int getResponse() {
+ return response_;
+ }
+
+ private void initFields() {
+ response_ = 0;
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ if (!hasResponse()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void writeTo(com.google.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ output.writeInt32(1, response_);
+ }
+ getUnknownFields().writeTo(output);
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt32Size(1, response_);
+ }
+ size += getUnknownFields().getSerializedSize();
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @java.lang.Override
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ @java.lang.Override
+ public boolean equals(final java.lang.Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse)) {
+ return super.equals(obj);
+ }
+ org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse other = (org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse) obj;
+
+ boolean result = true;
+ result = result && (hasResponse() == other.hasResponse());
+ if (hasResponse()) {
+ result = result && (getResponse()
+ == other.getResponse());
+ }
+ result = result &&
+ getUnknownFields().equals(other.getUnknownFields());
+ return result;
+ }
+
+ @java.lang.Override
+ public int hashCode() {
+ int hash = 41;
+ hash = (19 * hash) + getDescriptorForType().hashCode();
+ if (hasResponse()) {
+ hash = (37 * hash) + RESPONSE_FIELD_NUMBER;
+ hash = (53 * hash) + getResponse();
+ }
+ hash = (29 * hash) + getUnknownFields().hashCode();
+ return hash;
+ }
+
+ public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse parseFrom(
+ com.google.protobuf.ByteString data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data).buildParsed();
+ }
+ public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse parseFrom(
+ com.google.protobuf.ByteString data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data, extensionRegistry)
+ .buildParsed();
+ }
+ public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse parseFrom(byte[] data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data).buildParsed();
+ }
+ public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse parseFrom(
+ byte[] data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data, extensionRegistry)
+ .buildParsed();
+ }
+ public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input).buildParsed();
+ }
+ public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse parseFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input, extensionRegistry)
+ .buildParsed();
+ }
+ public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ Builder builder = newBuilder();
+ if (builder.mergeDelimitedFrom(input)) {
+ return builder.buildParsed();
+ } else {
+ return null;
+ }
+ }
+ public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse parseDelimitedFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ Builder builder = newBuilder();
+ if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+ return builder.buildParsed();
+ } else {
+ return null;
+ }
+ }
+ public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse parseFrom(
+ com.google.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input).buildParsed();
+ }
+ public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse parseFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input, extensionRegistry)
+ .buildParsed();
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder(org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ @java.lang.Override
+ protected Builder newBuilderForType(
+ com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ Builder builder = new Builder(parent);
+ return builder;
+ }
+ public static final class Builder extends
+ com.google.protobuf.GeneratedMessage.Builder
+ implements org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponseOrBuilder {
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.internal_static_TestResponse_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.internal_static_TestResponse_fieldAccessorTable;
+ }
+
+ // Construct using org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse.newBuilder()
+ private Builder() {
+ maybeForceBuilderInitialization();
+ }
+
+ private Builder(BuilderParent parent) {
+ super(parent);
+ maybeForceBuilderInitialization();
+ }
+ private void maybeForceBuilderInitialization() {
+ if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+ }
+ }
+ private static Builder create() {
+ return new Builder();
+ }
+
+ public Builder clear() {
+ super.clear();
+ response_ = 0;
+ bitField0_ = (bitField0_ & ~0x00000001);
+ return this;
+ }
+
+ public Builder clone() {
+ return create().mergeFrom(buildPartial());
+ }
+
+ public com.google.protobuf.Descriptors.Descriptor
+ getDescriptorForType() {
+ return org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse.getDescriptor();
+ }
+
+ public org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse getDefaultInstanceForType() {
+ return org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse.getDefaultInstance();
+ }
+
+ public org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse build() {
+ org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ private org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse buildParsed()
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(
+ result).asInvalidProtocolBufferException();
+ }
+ return result;
+ }
+
+ public org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse buildPartial() {
+ org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse result = new org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse(this);
+ int from_bitField0_ = bitField0_;
+ int to_bitField0_ = 0;
+ if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+ to_bitField0_ |= 0x00000001;
+ }
+ result.response_ = response_;
+ result.bitField0_ = to_bitField0_;
+ onBuilt();
+ return result;
+ }
+
+ public Builder mergeFrom(com.google.protobuf.Message other) {
+ if (other instanceof org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse) {
+ return mergeFrom((org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse)other);
+ } else {
+ super.mergeFrom(other);
+ return this;
+ }
+ }
+
+ public Builder mergeFrom(org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse other) {
+ if (other == org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse.getDefaultInstance()) return this;
+ if (other.hasResponse()) {
+ setResponse(other.getResponse());
+ }
+ this.mergeUnknownFields(other.getUnknownFields());
+ return this;
+ }
+
+ public final boolean isInitialized() {
+ if (!hasResponse()) {
+
+ return false;
+ }
+ return true;
+ }
+
+ public Builder mergeFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+ com.google.protobuf.UnknownFieldSet.newBuilder(
+ this.getUnknownFields());
+ while (true) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ this.setUnknownFields(unknownFields.build());
+ onChanged();
+ return this;
+ default: {
+ if (!parseUnknownField(input, unknownFields,
+ extensionRegistry, tag)) {
+ this.setUnknownFields(unknownFields.build());
+ onChanged();
+ return this;
+ }
+ break;
+ }
+ case 8: {
+ bitField0_ |= 0x00000001;
+ response_ = input.readInt32();
+ break;
+ }
+ }
+ }
+ }
+
+ private int bitField0_;
+
+ // required int32 response = 1;
+ private int response_ ;
+ public boolean hasResponse() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ public int getResponse() {
+ return response_;
+ }
+ public Builder setResponse(int value) {
+ bitField0_ |= 0x00000001;
+ response_ = value;
+ onChanged();
+ return this;
+ }
+ public Builder clearResponse() {
+ bitField0_ = (bitField0_ & ~0x00000001);
+ response_ = 0;
+ onChanged();
+ return this;
+ }
+
+ // @@protoc_insertion_point(builder_scope:TestResponse)
+ }
+
+ static {
+ defaultInstance = new TestResponse(true);
+ defaultInstance.initFields();
+ }
+
+ // @@protoc_insertion_point(class_scope:TestResponse)
+ }
+
+ private static com.google.protobuf.Descriptors.Descriptor
+ internal_static_TestArg_descriptor;
+ private static
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internal_static_TestArg_fieldAccessorTable;
+ private static com.google.protobuf.Descriptors.Descriptor
+ internal_static_TestResponse_descriptor;
+ private static
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internal_static_TestResponse_fieldAccessorTable;
+
+ public static com.google.protobuf.Descriptors.FileDescriptor
+ getDescriptor() {
+ return descriptor;
+ }
+ private static com.google.protobuf.Descriptors.FileDescriptor
+ descriptor;
+ static {
+ java.lang.String[] descriptorData = {
+ "\n\026test_delayed_rpc.proto\"\030\n\007TestArg\022\r\n\005d" +
+ "elay\030\001 \002(\010\" \n\014TestResponse\022\020\n\010response\030\001" +
+ " \002(\005BL\n.org.apache.hadoop.hbase.ipc.prot" +
+ "obuf.generatedB\024TestDelayedRpcProtos\210\001\001\240" +
+ "\001\001"
+ };
+ com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
+ new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
+ public com.google.protobuf.ExtensionRegistry assignDescriptors(
+ com.google.protobuf.Descriptors.FileDescriptor root) {
+ descriptor = root;
+ internal_static_TestArg_descriptor =
+ getDescriptor().getMessageTypes().get(0);
+ internal_static_TestArg_fieldAccessorTable = new
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+ internal_static_TestArg_descriptor,
+ new java.lang.String[] { "Delay", },
+ org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg.class,
+ org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg.Builder.class);
+ internal_static_TestResponse_descriptor =
+ getDescriptor().getMessageTypes().get(1);
+ internal_static_TestResponse_fieldAccessorTable = new
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+ internal_static_TestResponse_descriptor,
+ new java.lang.String[] { "Response", },
+ org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse.class,
+ org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse.Builder.class);
+ return null;
+ }
+ };
+ com.google.protobuf.Descriptors.FileDescriptor
+ .internalBuildGeneratedFileFrom(descriptorData,
+ new com.google.protobuf.Descriptors.FileDescriptor[] {
+ }, assigner);
+ }
+
+ // @@protoc_insertion_point(outer_class_scope)
+}
Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java
===================================================================
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java (revision 0)
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java (revision 0)
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Get;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
+import org.apache.hadoop.hbase.regionserver.HRegionServer.QosFunction;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import com.google.protobuf.ByteString;
+/**
+ * Tests that verify certain RPCs get a higher QoS.
+ */
+@Category(SmallTests.class)
+public class TestPriorityRpc {
+
+ @Test
+ public void testQosFunction() throws IOException {
+ HRegionServer regionServer =
+ HRegionServer.constructRegionServer(HRegionServer.class, new Configuration());
+ QosFunction qosFunc = regionServer.getQosFunction();
+ RpcRequestBody.Builder rpcRequestBuilder = RpcRequestBody.newBuilder();
+ rpcRequestBuilder.setMethodName("scan");
+
+ //build an empty scan request
+ ScanRequest.Builder scanBuilder = ScanRequest.newBuilder();
+ ByteString requestBody = scanBuilder.build().toByteString();
+ rpcRequestBuilder.setRequest(requestBody);
+ RpcRequestBody rpcRequest = rpcRequestBuilder.build();
+ assertTrue (qosFunc.apply(rpcRequest) == HRegionServer.NORMAL_QOS);
+
+ //build a scan request with scannerID
+ scanBuilder.setScannerId(12345);
+ requestBody = scanBuilder.build().toByteString();
+ rpcRequestBuilder.setRequest(requestBody);
+ rpcRequestBuilder.setRequestClassName(ScanRequest.class.getCanonicalName());
+ rpcRequest = rpcRequestBuilder.build();
+ //mock out a high priority type handling and see the QoS returned
+ HRegionServer mockRS = Mockito.mock(HRegionServer.class);
+ RegionScanner mockRegionScanner = Mockito.mock(RegionScanner.class);
+ HRegionInfo mockRegionInfo = Mockito.mock(HRegionInfo.class);
+ Mockito.when(mockRS.getScanner(12345)).thenReturn(mockRegionScanner);
+ Mockito.when(mockRegionScanner.getRegionInfo()).thenReturn(mockRegionInfo);
+ Mockito.when(mockRegionInfo.isMetaRegion()).thenReturn(true);
+
+ qosFunc.setRegionServer(mockRS);
+
+ assertTrue (qosFunc.apply(rpcRequest) == HRegionServer.HIGH_QOS);
+
+ //the same as above but with non-meta region
+ Mockito.when(mockRegionInfo.isMetaRegion()).thenReturn(false);
+ assertTrue (qosFunc.apply(rpcRequest) == HRegionServer.NORMAL_QOS);
+
+ //create a rpc request that has references to META region and also
+ //uses one of the known argument classes (known argument classes are
+ //listed in HRegionServer.QosFunction.knownArgumentClasses)
+ rpcRequestBuilder = RpcRequestBody.newBuilder();
+ rpcRequestBuilder.setMethodName("foo");
+ rpcRequestBuilder.setRequestClassName(GetRequest.class.getCanonicalName());
+ GetRequest.Builder getRequestBuilder = GetRequest.newBuilder();
+ RegionSpecifier.Builder regionSpecifierBuilder = RegionSpecifier.newBuilder();
+ regionSpecifierBuilder.setType(RegionSpecifierType.REGION_NAME);
+ ByteString name =
+ ByteString.copyFrom(HRegionInfo.FIRST_META_REGIONINFO.getRegionName());
+ regionSpecifierBuilder.setValue(name);
+ RegionSpecifier regionSpecifier = regionSpecifierBuilder.build();
+ getRequestBuilder.setRegion(regionSpecifier);
+ Get.Builder getBuilder = Get.newBuilder();
+ getBuilder.setRow(ByteString.copyFrom("somerow".getBytes()));
+ getRequestBuilder.setGet(getBuilder.build());
+ rpcRequestBuilder.setRequest(getRequestBuilder.build().toByteString());
+ rpcRequest = rpcRequestBuilder.build();
+ HRegion mockRegion = Mockito.mock(HRegion.class);
+ Mockito.when(mockRS.getRegion(regionSpecifier)).thenReturn(mockRegion);
+ Mockito.when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo);
+ assertTrue (qosFunc.apply(rpcRequest) == HRegionServer.HIGH_QOS);
+
+ //the same as above except that the request is not using any of the
+ //known argument classes (it uses one random request classes)
+ //(known argument classes are listed in
+ //HRegionServer.QosFunction.knownArgumentClasses)
+ rpcRequestBuilder.setRequestClassName(GetOnlineRegionRequest.class.getCanonicalName());
+ rpcRequest = rpcRequestBuilder.build();
+ assertTrue (qosFunc.apply(rpcRequest) == HRegionServer.NORMAL_QOS);
+ }
+}
\ No newline at end of file
Index: hbase-server/src/test/protobuf/test_delayed_rpc.proto
===================================================================
--- hbase-server/src/test/protobuf/test_delayed_rpc.proto (revision 0)
+++ hbase-server/src/test/protobuf/test_delayed_rpc.proto (revision 0)
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+option java_package = "org.apache.hadoop.hbase.ipc.protobuf.generated";
+option java_outer_classname = "TestDelayedRpcProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+
+message TestArg {
+ required bool delay = 1;
+}
+
+message TestResponse {
+ required int32 response = 1;
+}
Index: hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterRPCException.java
===================================================================
--- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterRPCException.java (revision 1371525)
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterRPCException.java (working copy)
@@ -20,21 +20,23 @@
package org.apache.hadoop.hbase.master;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.io.IOException;
import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.MasterMonitorProtocol;
-import org.apache.hadoop.hbase.protobuf.RequestConverter;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import com.google.protobuf.ServiceException;
+
@Category(MediumTests.class)
public class TestHMasterRPCException {
@@ -49,16 +51,33 @@
ServerName sm = hm.getServerName();
InetSocketAddress isa = new InetSocketAddress(sm.getHostname(), sm.getPort());
- try {
- MasterMonitorProtocol inf = (MasterMonitorProtocol) HBaseRPC.getProxy(
- MasterMonitorProtocol.class, MasterMonitorProtocol.VERSION, isa, conf, 100);
- fail();
- } catch (ServerNotRunningYetException ex) {
- assertTrue(ex.getMessage().startsWith(
- "org.apache.hadoop.hbase.ipc.ServerNotRunningYetException: Server is not running yet"));
- } catch (Throwable t) {
- fail("Unexpected throwable: " + t);
+ int i = 0;
+ //retry the RPC a few times; we have seen SocketTimeoutExceptions if we
+ //try to connect too soon. Retry on SocketTimeoutException.
+ while (i < 20) {
+ try {
+ MasterMonitorProtocol inf = (MasterMonitorProtocol) HBaseRPC.getProxy(
+ MasterMonitorProtocol.class, MasterMonitorProtocol.VERSION, isa, conf, 100);
+ inf.isMasterRunning(null, IsMasterRunningRequest.getDefaultInstance());
+ fail();
+ } catch (ServiceException ex) {
+ IOException ie = ProtobufUtil.getRemoteException(ex);
+ if (!(ie instanceof SocketTimeoutException)) {
+ if (ie.getMessage().startsWith(
+ "org.apache.hadoop.hbase.ipc.ServerNotRunningYetException: Server is not running yet")) {
+ return;
+ }
+ fail("Unexpected exception: " + ie);
+ } else {
+ System.err.println("Got SocketTimeoutException. Will retry. ");
+ }
+ } catch (Throwable t) {
+ fail("Unexpected throwable: " + t);
+ }
+ Thread.sleep(100);
+ i++;
}
+ fail();
}
@org.junit.Rule