diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java index a320095..eee6d5f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java @@ -20,134 +20,78 @@ package org.apache.hadoop.hbase.ipc; import java.io.IOException; -import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.lang.reflect.Proxy; import java.net.InetSocketAddress; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import javax.net.SocketFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.IpcProtocol; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.net.NetUtils; +import com.google.protobuf.BlockingRpcChannel; +import com.google.protobuf.BlockingService; +import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.Message; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; import com.google.protobuf.ServiceException; +/** + * Engine to start, run, and stop protobuf Services. + */ public class ProtobufRpcClientEngine implements RpcClientEngine { - private static final Log LOG = LogFactory.getLog("org.apache.hadoop.hbase.ipc.ProtobufRpcClientEngine"); - ProtobufRpcClientEngine() { - super(); - } - protected final static ClientCache CLIENTS = new ClientCache(); - @Override - public IpcProtocol getProxy( - Class protocol, - InetSocketAddress addr, User ticket, Configuration conf, - SocketFactory factory, int rpcTimeout) throws IOException { - final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory, - rpcTimeout); - return (IpcProtocol) Proxy.newProxyInstance( - protocol.getClassLoader(), new Class[]{protocol}, invoker); - } - @Override - public void stopProxy(IpcProtocol proxy) { - if (proxy!=null) { - ((Invoker)Proxy.getInvocationHandler(proxy)).close(); - } - } - - static class Invoker implements InvocationHandler { - private static final Map returnTypes = - new ConcurrentHashMap(); - private Class protocol; - private InetSocketAddress address; - private User ticket; - private HBaseClient client; + /** + * Replaces old Invoker class. Implements pb Servce RpcChannel. + */ + private static class HBlockingRpcChannel implements BlockingRpcChannel { + private final InetSocketAddress address; + private final User user; + private final HBaseClient client; private boolean isClosed = false; - final private int rpcTimeout; + private final int rpcTimeout; - public Invoker(Class protocol, - InetSocketAddress addr, User ticket, Configuration conf, - SocketFactory factory, int rpcTimeout) throws IOException { - this.protocol = protocol; - this.address = addr; - this.ticket = ticket; + HBlockingRpcChannel(final InetSocketAddress address, final User user, + final Configuration conf, final SocketFactory factory, + final int rpcTimeout) { + this.address = address; + this.user = user; this.client = CLIENTS.getClient(conf, factory); this.rpcTimeout = rpcTimeout; } - private RpcRequestBody constructRpcRequest(Method method, - Object[] params) throws ServiceException { - RpcRequestBody rpcRequest; - RpcRequestBody.Builder builder = RpcRequestBody.newBuilder(); - builder.setMethodName(method.getName()); - Message param; - int length = params.length; - if (length == 2) { - // RpcController + Message in the method args - // (generated code from RPC bits in .proto files have RpcController) - param = (Message)params[1]; - } else if (length == 1) { // Message - param = (Message)params[0]; - } else { - throw new ServiceException("Too many parameters for request. Method: [" - + method.getName() + "]" + ", Expected: 2, Actual: " - + params.length); - } - builder.setRequestClassName(param.getClass().getName()); - builder.setRequest(param.toByteString()); - rpcRequest = builder.build(); - return rpcRequest; - } - - /** - * This is the client side invoker of RPC method. It only throws - * ServiceException, since the invocation proxy expects only - * ServiceException to be thrown by the method in case protobuf service. - * - * ServiceException has the following causes: - *
    - *
  1. Exceptions encountered on the client side in this method are - * set as cause in ServiceException as is.
  2. - *
  3. Exceptions from the server are wrapped in RemoteException and are - * set as cause in ServiceException
  4. - *
- * - * Note that the client calling protobuf RPC methods, must handle - * ServiceException by getting the cause from the ServiceException. If the - * cause is RemoteException, then unwrap it to get the exception thrown by - * the server. - */ @Override - public Object invoke(Object proxy, Method method, Object[] args) - throws ServiceException { + public Message callBlockingMethod(MethodDescriptor md, + RpcController controller, Message request, Message responseType) + throws ServiceException { long startTime = 0; if (LOG.isDebugEnabled()) { startTime = System.currentTimeMillis(); } - - RpcRequestBody rpcRequest = constructRpcRequest(method, args); - Message val = null; + Message response = null; try { - val = client.call(rpcRequest, address, protocol, ticket, rpcTimeout); - + RpcRequestBody.Builder builder = RpcRequestBody.newBuilder(); + builder.setMethodName(md.getName()); + builder.setRequestClassName(request.getClass().getName()); + builder.setRequest(request.toByteString()); + response = client.call(builder.build(), address, null, user, rpcTimeout); if (LOG.isDebugEnabled()) { long callTime = System.currentTimeMillis() - startTime; - if (LOG.isTraceEnabled()) LOG.trace("Call: " + method.getName() + " " + callTime); + if (LOG.isTraceEnabled()) LOG.trace("Call: " + md.getName() + " " + callTime); } - return val; + return response; } catch (Throwable e) { if (e instanceof RemoteException) { Throwable cause = ((RemoteException)e).unwrapRemoteException(); @@ -158,23 +102,62 @@ public class ProtobufRpcClientEngine implements RpcClientEngine { } synchronized protected void close() { + // Currently unused but could remove this client stop stuff, this + // client caching and leave it instead over in HConnection. TODO. if (!isClosed) { isClosed = true; CLIENTS.stopClient(client); } } + } - static Message getReturnProtoType(Method method) throws Exception { - if (returnTypes.containsKey(method.getName())) { - return returnTypes.get(method.getName()); - } + ProtobufRpcClientEngine() { + super(); + } - Class returnType = method.getReturnType(); - Method newInstMethod = returnType.getMethod("getDefaultInstance"); - newInstMethod.setAccessible(true); - Message protoType = (Message) newInstMethod.invoke(null, (Object[]) null); - returnTypes.put(method.getName(), protoType); - return protoType; + @Override + public BlockingInterface start(Class service, + InetSocketAddress addr, User ticket, Configuration conf, + SocketFactory factory, int rpcTimeout) + throws IOException { + BlockingRpcChannel channel = new HBlockingRpcChannel(addr, ticket, conf, factory, rpcTimeout); + Class [] param = new Class[1]; + param[0] = com.google.protobuf.BlockingRpcChannel.class; + BlockingService stub; + Method m; + try { + m = service.getDeclaredMethod("newBlockingStub", param); + stub = (BlockingService)m.invoke(service, channel); + } catch (SecurityException e) { + throw new IOException(e); + } catch (NoSuchMethodException e) { + throw new IOException(e); + } catch (IllegalArgumentException e) { + throw new IOException(e); + } catch (IllegalAccessException e) { + throw new IOException(e); + } catch (InvocationTargetException e) { + throw new IOException(e); } + return stub; + } + + @Override + public void stop(BlockingService service) { + if (service != null) { + // TODO: Need to have it so I do not have to shutdown a service.... Currently + // need to call close on the backing BlockingRpcChannel. Need to stuff + // the BRC into the service or keep a local map so can get to the BRC from + // the passed in instance. + } + } + + public static void main(String[] args) throws IOException { + Configuration conf = HBaseConfiguration.create(); + ProtobufRpcClientEngine e = new ProtobufRpcClientEngine(); + BlockingService service = e.start(MasterProtos.MasterService.class, + InetSocketAddress.createUnresolved("example.org", 1234), + null, conf, NetUtils.getDefaultSocketFactory(conf), -1); + e.stop(service); } } \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientEngine.java index 2e24e76..02da78a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientEngine.java @@ -21,9 +21,11 @@ package org.apache.hadoop.hbase.ipc; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.IpcProtocol; import org.apache.hadoop.hbase.security.User; +import com.google.protobuf.BlockingService; +import com.google.protobuf.Service; + import javax.net.SocketFactory; import java.io.IOException; import java.net.InetSocketAddress; @@ -32,10 +34,10 @@ import java.net.InetSocketAddress; @InterfaceAudience.Private public interface RpcClientEngine { /** Construct a client-side proxy object. */ - IpcProtocol getProxy(Class protocol, + BlockingService start(Class service, InetSocketAddress addr, User ticket, Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException; /** Stop this proxy. */ - void stopProxy(IpcProtocol proxy); + void stop(BlockingService service); } \ No newline at end of file