diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/MasterAdminProtocol.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/MasterAdminProtocol.java index 57d83ef..95304ce 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/MasterAdminProtocol.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/MasterAdminProtocol.java @@ -21,52 +21,9 @@ package org.apache.hadoop.hbase; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.MasterAdminService; -import org.apache.hadoop.hbase.security.TokenInfo; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService; import org.apache.hadoop.hbase.security.KerberosInfo; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AddColumnRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AddColumnResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CatalogScanRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CatalogScanResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CreateTableRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CreateTableResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteColumnRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteColumnResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AssignRegionRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AssignRegionResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteTableRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteTableResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DisableTableRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DisableTableResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableTableRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableTableResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyColumnRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyColumnResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyTableRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyTableResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.MoveRegionRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.MoveRegionResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.OfflineRegionRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.OfflineRegionResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.SetBalancerRunningRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.SetBalancerRunningResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.UnassignRegionRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.UnassignRegionResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.BalanceRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.BalanceResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ShutdownRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ShutdownResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.StopMasterRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.StopMasterResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningResponse; - - -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; +import org.apache.hadoop.hbase.security.TokenInfo; /** * Protocol that a client uses to communicate with the Master (for admin purposes). @@ -77,273 +34,6 @@ import com.google.protobuf.ServiceException; @InterfaceAudience.Private @InterfaceStability.Evolving public interface MasterAdminProtocol extends - MasterAdminService.BlockingInterface, MasterProtocol { + MasterAdminService.BlockingInterface, MasterService.BlockingInterface { public static final long VERSION = 1L; - - /* Column-level */ - - /** - * Adds a column to the specified table - * @param controller Unused (set to null). - * @param req AddColumnRequest that contains:
- * - tableName: table to modify
- * - column: column descriptor - * @throws ServiceException - */ - @Override - public AddColumnResponse addColumn(RpcController controller, AddColumnRequest req) - throws ServiceException; - - /** - * Deletes a column from the specified table. Table must be disabled. - * @param controller Unused (set to null). - * @param req DeleteColumnRequest that contains:
- * - tableName: table to alter
- * - columnName: column family to remove - * @throws ServiceException - */ - @Override - public DeleteColumnResponse deleteColumn(RpcController controller, DeleteColumnRequest req) - throws ServiceException; - - /** - * Modifies an existing column on the specified table - * @param controller Unused (set to null). - * @param req ModifyColumnRequest that contains:
- * - tableName: table name
- * - descriptor: new column descriptor - * @throws ServiceException e - */ - @Override - public ModifyColumnResponse modifyColumn(RpcController controller, ModifyColumnRequest req) - throws ServiceException; - - /* Region-level */ - - /** - * Move a region to a specified destination server. - * @param controller Unused (set to null). - * @param req The request that contains:
- * - region: The encoded region name; i.e. the hash that makes - * up the region name suffix: e.g. if regionname is - * TestTable,0094429456,1289497600452.527db22f95c8a9e0116f0cc13c680396., - * then the encoded region name is: 527db22f95c8a9e0116f0cc13c680396.
- * - destServerName: The servername of the destination regionserver. If - * passed the empty byte array we'll assign to a random server. A server name - * is made of host, port and startcode. Here is an example: - * host187.example.com,60020,1289493121758. - * @throws ServiceException that wraps a UnknownRegionException if we can't find a - * region named encodedRegionName - */ - @Override - public MoveRegionResponse moveRegion(RpcController controller, MoveRegionRequest req) - throws ServiceException; - - /** - * Assign a region to a server chosen at random. - * @param controller Unused (set to null). - * @param req contains the region to assign. Will use existing RegionPlan if one - * found. - * @throws ServiceException - */ - @Override - public AssignRegionResponse assignRegion(RpcController controller, AssignRegionRequest req) - throws ServiceException; - - /** - * Unassign a region from current hosting regionserver. Region will then be - * assigned to a regionserver chosen at random. Region could be reassigned - * back to the same server. Use {@link #moveRegion} if you want to - * control the region movement. - * @param controller Unused (set to null). - * @param req The request that contains:
- * - region: Region to unassign. Will clear any existing RegionPlan - * if one found.
- * - force: If true, force unassign (Will remove region from - * regions-in-transition too if present as well as from assigned regions -- - * radical!.If results in double assignment use hbck -fix to resolve. - * @throws ServiceException - */ - @Override - public UnassignRegionResponse unassignRegion(RpcController controller, UnassignRegionRequest req) - throws ServiceException; - - /** - * Offline a region from the assignment manager's in-memory state. The - * region should be in a closed state and there will be no attempt to - * automatically reassign the region as in unassign. This is a special - * method, and should only be used by experts or hbck. - * @param controller Unused (set to null). - * @param request OfflineRegionRequest that contains:
- * - region: Region to offline. Will clear any existing RegionPlan - * if one found. - * @throws ServiceException - */ - @Override - public OfflineRegionResponse offlineRegion(RpcController controller, OfflineRegionRequest request) - throws ServiceException; - - /* Table-level */ - - /** - * Creates a new table asynchronously. If splitKeys are specified, then the - * table will be created with an initial set of multiple regions. - * If splitKeys is null, the table will be created with a single region. - * @param controller Unused (set to null). - * @param req CreateTableRequest that contains:
- * - tablesSchema: table descriptor
- * - splitKeys - * @throws ServiceException - */ - @Override - public CreateTableResponse createTable(RpcController controller, CreateTableRequest req) - throws ServiceException; - - /** - * Deletes a table - * @param controller Unused (set to null). - * @param req DeleteTableRequest that contains:
- * - tableName: table to delete - * @throws ServiceException - */ - @Override - public DeleteTableResponse deleteTable(RpcController controller, DeleteTableRequest req) - throws ServiceException; - - /** - * Puts the table on-line (only needed if table has been previously taken offline) - * @param controller Unused (set to null). - * @param req EnableTableRequest that contains:
- * - tableName: table to enable - * @throws ServiceException - */ - @Override - public EnableTableResponse enableTable(RpcController controller, EnableTableRequest req) - throws ServiceException; - - /** - * Take table offline - * - * @param controller Unused (set to null). - * @param req DisableTableRequest that contains:
- * - tableName: table to take offline - * @throws ServiceException - */ - @Override - public DisableTableResponse disableTable(RpcController controller, DisableTableRequest req) - throws ServiceException; - - /** - * Modify a table's metadata - * - * @param controller Unused (set to null). - * @param req ModifyTableRequest that contains:
- * - tableName: table to modify
- * - tableSchema: new descriptor for table - * @throws ServiceException - */ - @Override - public ModifyTableResponse modifyTable(RpcController controller, ModifyTableRequest req) - throws ServiceException; - - /* Cluster-level */ - - /** - * Shutdown an HBase cluster. - * @param controller Unused (set to null). - * @param request ShutdownRequest - * @return ShutdownResponse - * @throws ServiceException - */ - @Override - public ShutdownResponse shutdown(RpcController controller, ShutdownRequest request) - throws ServiceException; - - /** - * Stop HBase Master only. - * Does not shutdown the cluster. - * @param controller Unused (set to null). - * @param request StopMasterRequest - * @return StopMasterResponse - * @throws ServiceException - */ - @Override - public StopMasterResponse stopMaster(RpcController controller, StopMasterRequest request) - throws ServiceException; - - /** - * Run the balancer. Will run the balancer and if regions to move, it will - * go ahead and do the reassignments. Can NOT run for various reasons. Check - * logs. - * @param c Unused (set to null). - * @param request BalanceRequest - * @return BalanceResponse that contains:
- * - balancerRan: True if balancer ran and was able to tell the region servers to - * unassign all the regions to balance (the re-assignment itself is async), - * false otherwise. - */ - @Override - public BalanceResponse balance(RpcController c, BalanceRequest request) throws ServiceException; - - /** - * Turn the load balancer on or off. - * @param controller Unused (set to null). - * @param req SetBalancerRunningRequest that contains:
- * - on: If true, enable balancer. If false, disable balancer.
- * - synchronous: if true, wait until current balance() call, if outstanding, to return. - * @return SetBalancerRunningResponse that contains:
- * - prevBalanceValue: Previous balancer value - * @throws ServiceException - */ - @Override - public SetBalancerRunningResponse setBalancerRunning( - RpcController controller, SetBalancerRunningRequest req) throws ServiceException; - - /** - * @param c Unused (set to null). - * @param req IsMasterRunningRequest - * @return IsMasterRunningRequest that contains:
- * isMasterRunning: true if master is available - * @throws ServiceException - */ - @Override - public IsMasterRunningResponse isMasterRunning(RpcController c, IsMasterRunningRequest req) - throws ServiceException; - - /** - * Run a scan of the catalog table - * @param c Unused (set to null). - * @param req CatalogScanRequest - * @return CatalogScanResponse that contains the int return code corresponding - * to the number of entries cleaned - * @throws ServiceException - */ - @Override - public CatalogScanResponse runCatalogScan(RpcController c, - CatalogScanRequest req) throws ServiceException; - - /** - * Enable/Disable the catalog janitor - * @param c Unused (set to null). - * @param req EnableCatalogJanitorRequest that contains:
- * - enable: If true, enable catalog janitor. If false, disable janitor.
- * @return EnableCatalogJanitorResponse that contains:
- * - prevValue: true, if it was enabled previously; false, otherwise - * @throws ServiceException - */ - @Override - public EnableCatalogJanitorResponse enableCatalogJanitor(RpcController c, - EnableCatalogJanitorRequest req) throws ServiceException; - - /** - * Query whether the catalog janitor is enabled - * @param c Unused (set to null). - * @param req IsCatalogJanitorEnabledRequest - * @return IsCatalogCatalogJanitorEnabledResponse that contains:
- * - value: true, if it is enabled; false, otherwise - * @throws ServiceException - */ - @Override - public IsCatalogJanitorEnabledResponse isCatalogJanitorEnabled(RpcController c, - IsCatalogJanitorEnabledRequest req) throws ServiceException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/MasterMonitorProtocol.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/MasterMonitorProtocol.java index d8cff7d..10723fd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/MasterMonitorProtocol.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/MasterMonitorProtocol.java @@ -21,19 +21,9 @@ package org.apache.hadoop.hbase; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.MasterMonitorService; -import org.apache.hadoop.hbase.security.TokenInfo; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService; import org.apache.hadoop.hbase.security.KerberosInfo; -import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetSchemaAlterStatusRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetSchemaAlterStatusResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetTableDescriptorsRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetTableDescriptorsResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetClusterStatusRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetClusterStatusResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningResponse; - -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; +import org.apache.hadoop.hbase.security.TokenInfo; /** * Protocol that a client uses to communicate with the Master (for monitoring purposes). @@ -44,56 +34,6 @@ import com.google.protobuf.ServiceException; @InterfaceAudience.Public @InterfaceStability.Evolving public interface MasterMonitorProtocol extends - MasterMonitorService.BlockingInterface, MasterProtocol { + MasterMonitorService.BlockingInterface, MasterService.BlockingInterface { public static final long VERSION = 1L; - - /** - * Used by the client to get the number of regions that have received the - * updated schema - * - * @param controller Unused (set to null). - * @param req GetSchemaAlterStatusRequest that contains:
- * - tableName - * @return GetSchemaAlterStatusResponse indicating the number of regions updated. - * yetToUpdateRegions is the regions that are yet to be updated totalRegions - * is the total number of regions of the table - * @throws ServiceException - */ - @Override - public GetSchemaAlterStatusResponse getSchemaAlterStatus( - RpcController controller, GetSchemaAlterStatusRequest req) throws ServiceException; - - /** - * Get list of TableDescriptors for requested tables. - * @param controller Unused (set to null). - * @param req GetTableDescriptorsRequest that contains:
- * - tableNames: requested tables, or if empty, all are requested - * @return GetTableDescriptorsResponse - * @throws ServiceException - */ - @Override - public GetTableDescriptorsResponse getTableDescriptors( - RpcController controller, GetTableDescriptorsRequest req) throws ServiceException; - - /** - * Return cluster status. - * @param controller Unused (set to null). - * @param req GetClusterStatusRequest - * @return status object - * @throws ServiceException - */ - @Override - public GetClusterStatusResponse getClusterStatus(RpcController controller, GetClusterStatusRequest req) - throws ServiceException; - - /** - * @param c Unused (set to null). - * @param req IsMasterRunningRequest - * @return IsMasterRunningRequest that contains:
- * isMasterRunning: true if master is available - * @throws ServiceException - */ - @Override - public IsMasterRunningResponse isMasterRunning(RpcController c, IsMasterRunningRequest req) - throws ServiceException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/MasterProtocol.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/MasterProtocol.java deleted file mode 100644 index 62b3b84..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/MasterProtocol.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// Functions implemented by all the master protocols (e.g. MasterAdminProtocol, -// MasterMonitorProtocol). Currently, this is only isMasterRunning, which is used, -// on proxy creation, to check if the master has been stopped. If it has, -// a MasterNotRunningException is thrown back to the client, and the client retries. - -package org.apache.hadoop.hbase; - -import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService; -import org.apache.hadoop.hbase.ipc.VersionedProtocol; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; - -public interface MasterProtocol extends VersionedProtocol, MasterService.BlockingInterface { - - /** - * @param c Unused (set to null). - * @param req IsMasterRunningRequest - * @return IsMasterRunningRequest that contains:
- * isMasterRunning: true if master is available - * @throws ServiceException - */ - public IsMasterRunningResponse isMasterRunning(RpcController c, IsMasterRunningRequest req) - throws ServiceException; -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java index e8942a8..8a2c1e6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java @@ -62,7 +62,6 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.MasterAdminProtocol; import org.apache.hadoop.hbase.MasterMonitorProtocol; import org.apache.hadoop.hbase.MasterNotRunningException; -import org.apache.hadoop.hbase.MasterProtocol; import org.apache.hadoop.hbase.RegionMovedException; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.ServerName; @@ -73,12 +72,15 @@ import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.ipc.HBaseClientRPC; +import org.apache.hadoop.hbase.ipc.ProtobufRpcClientEngine; import org.apache.hadoop.hbase.ipc.VersionedProtocol; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableSchema; import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetTableDescriptorsRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetTableDescriptorsResponse; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; @@ -94,6 +96,12 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.ipc.RemoteException; import org.apache.zookeeper.KeeperException; +import com.google.protobuf.Descriptors.MethodDescriptor; +import com.google.protobuf.BlockingRpcChannel; +import com.google.protobuf.Message; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcChannel; +import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; /** @@ -672,14 +680,14 @@ public class HConnectionManager { } private static class MasterProtocolState { - public MasterProtocol protocol; + public MasterService.BlockingInterface protocol; public int userCount; public long keepAliveUntil = Long.MAX_VALUE; - public final Class protocolClass; + public final Class protocolClass; public long version; public MasterProtocolState ( - final Class protocolClass, long version) { + final Class protocolClass, long version) { this.protocolClass = protocolClass; this.version = version; } @@ -688,7 +696,7 @@ public class HConnectionManager { /** * Create a new Master proxy. Try once only. */ - private MasterProtocol createMasterInterface( + private MasterService.BlockingInterface createMasterInterface( MasterProtocolState masterProtocolState) throws IOException, KeeperException, ServiceException { @@ -700,29 +708,69 @@ public class HConnectionManager { } try { - checkIfBaseNodeAvailable(zkw); - ServerName sn = MasterAddressTracker.getMasterAddress(zkw); + final ServerName sn = MasterAddressTracker.getMasterAddress(zkw); if (sn == null) { String msg = "ZooKeeper available but no active master location found"; LOG.info(msg); throw new MasterNotRunningException(msg); } + final InetSocketAddress isa = + new InetSocketAddress(sn.getHostname(), sn.getPort()); + + // pb service setup + RpcController controller = new RpcController() { + @Override + public String errorText() { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean failed() { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean isCanceled() { + // TODO Auto-generated method stub + return false; + } + @Override + public void notifyOnCancel(RpcCallback arg0) { + // TODO Auto-generated method stub + + } - InetSocketAddress isa = - new InetSocketAddress(sn.getHostname(), sn.getPort()); - MasterProtocol tryMaster = (MasterProtocol) HBaseClientRPC.getProxy( - masterProtocolState.protocolClass, - masterProtocolState.version, - isa, this.conf, this.rpcTimeout); + @Override + public void reset() { + // TODO Auto-generated method stub + + } - if (tryMaster.isMasterRunning( - null, RequestConverter.buildIsMasterRunningRequest()).getIsMasterRunning()) { - return tryMaster; + @Override + public void setFailed(String arg0) { + // TODO Auto-generated method stub + + } + + @Override + public void startCancel() { + // TODO Auto-generated method stub + + } + }; + BlockingRpcChannel channel = HBaseClientRPC.getChannel(isa, conf, rpcTimeout); + MasterProtos.MasterService.BlockingInterface stub = + MasterProtos.MasterService.newBlockingStub(channel); + if (stub.isMasterRunning(controller, + RequestConverter.buildIsMasterRunningRequest()).getIsMasterRunning()) { + return stub; } else { - HBaseClientRPC.stopProxy(tryMaster); + // TODO HBaseClientRPC.stopProxy(tryMaster); String msg = "Can create a proxy to master, but it is not running"; LOG.info(msg); throw new MasterNotRunningException(msg); @@ -736,7 +784,7 @@ public class HConnectionManager { * Create a master, retries if necessary. */ @edu.umd.cs.findbugs.annotations.SuppressWarnings (value="SWL_SLEEP_WITH_LOCK_HELD") - private MasterProtocol createMasterWithRetries( + private MasterService.BlockingInterface createMasterWithRetries( MasterProtocolState masterProtocolState) throws MasterNotRunningException { // The lock must be at the beginning to prevent multiple master creation @@ -744,7 +792,7 @@ public class HConnectionManager { synchronized (this.masterAndZKLock) { Exception exceptionCaught = null; - MasterProtocol master = null; + MasterService.BlockingInterface master = null; int tries = 0; while ( !this.closed && master == null @@ -1601,7 +1649,7 @@ public class HConnectionManager { synchronized (masterAndZKLock) { if (!isKeepAliveMasterConnectedAndRunning(protocolState)) { if (protocolState.protocol != null) { - HBaseClientRPC.stopProxy(protocolState.protocol); + // TODO HBaseClientRPC.stopProxy(protocolState.protocol); } protocolState.protocol = null; protocolState.protocol = createMasterWithRetries(protocolState); @@ -1676,8 +1724,8 @@ public class HConnectionManager { private void closeMasterProtocol(MasterProtocolState protocolState) { if (protocolState.protocol != null){ - LOG.info("Closing master protocol: " + protocolState.protocolClass.getName()); - HBaseClientRPC.stopProxy(protocolState.protocol); + // TODO LOG.info("Closing master protocol: " + protocolStateprotocolClass.getName()); + // TODO HBaseClientRPC.stopProxy(protocolState.protocol); protocolState.protocol = null; } protocolState.userCount = 0; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java index 1d280f6..78fcc71 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java @@ -190,6 +190,8 @@ public class HBaseClient { } public static class FailedServerException extends IOException { + private static final long serialVersionUID = -4744376109431464127L; + public FailedServerException(String s) { super(s); } @@ -1494,8 +1496,9 @@ public class HBaseClient { @Override // simply use the default Object#hashcode() ? public int hashCode() { + // TODO: For now allow a protocol of null while testing pb Service return (address.hashCode() + PRIME * ( - PRIME * System.identityHashCode(protocol) ^ + PRIME * (protocol == null? 1: System.identityHashCode(protocol)) ^ (ticket == null ? 0 : ticket.hashCode()) )) ^ rpcTimeout; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClientRPC.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClientRPC.java index 1b4f20b..9eaf03b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClientRPC.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClientRPC.java @@ -29,6 +29,8 @@ import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.ReflectionUtils; +import com.google.protobuf.BlockingRpcChannel; + import javax.net.SocketFactory; import java.io.IOException; import java.lang.reflect.Field; @@ -84,18 +86,20 @@ public class HBaseClientRPC { // return the RpcEngine configured to handle a protocol static synchronized RpcClientEngine getProtocolEngine(Class protocol, Configuration conf) { - RpcClientEngine engine = PROTOCOL_ENGINES.get(protocol); + // TODO: Fix. Allowing null protocol + RpcClientEngine engine = PROTOCOL_ENGINES.get(protocol == null? Object.class: protocol); if (engine == null) { // check for a configured default engine Class defaultEngine = conf.getClass(RPC_ENGINE_PROP, ProtobufRpcClientEngine.class); // check for a per interface override - Class impl = conf.getClass(RPC_ENGINE_PROP + "." + protocol.getName(), + String name = protocol == null? "none": protocol.getName(); + Class impl = conf.getClass(RPC_ENGINE_PROP + "." + name, defaultEngine); - LOG.debug("Using " + impl.getName() + " for " + protocol.getName()); + LOG.debug("Using " + impl.getName() + " for " + name); engine = (RpcClientEngine) ReflectionUtils.newInstance(impl, conf); - if (protocol.isInterface()) + if (protocol != null && protocol.isInterface()) PROXY_ENGINES.put(Proxy.getProxyClass(protocol.getClassLoader(), protocol), engine); @@ -248,6 +252,14 @@ public class HBaseClientRPC { return proxy; } + public static BlockingRpcChannel getChannel(InetSocketAddress addr, Configuration conf, + int rpcTimeout) + throws IOException { + RpcClientEngine engine = getProtocolEngine(null, conf); + return engine.getBlockingChannel(addr, User.getCurrent(), conf, + NetUtils.getDefaultSocketFactory(conf), rpcTimeout); + } + /** * Construct a client-side proxy object with the default SocketFactory * diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java index 884db91..ae5603b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java @@ -46,18 +46,17 @@ import java.nio.channels.WritableByteChannel; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Random; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; import javax.security.sasl.Sasl; import javax.security.sasl.SaslException; @@ -68,28 +67,28 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; +import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader; -import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader.Status; -import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; -import org.apache.hadoop.hbase.monitoring.TaskMonitor; -import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation; import org.apache.hadoop.hbase.security.HBaseSaslRpcServer; import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.AuthMethod; import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler; import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler; import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslStatus; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.ByteBufferOutputStream; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.RPC.VersionMismatch; +import org.apache.hadoop.ipc.Server; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; @@ -97,20 +96,19 @@ import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; import org.apache.hadoop.security.token.SecretManager; -import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.SecretManager.InvalidToken; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.StringUtils; - -import com.google.common.base.Function; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.google.protobuf.Message; - import org.cliffc.high_scale_lib.Counter; import org.cloudera.htrace.Sampler; import org.cloudera.htrace.Span; +import org.cloudera.htrace.Trace; import org.cloudera.htrace.TraceInfo; import org.cloudera.htrace.impl.NullSpan; -import org.cloudera.htrace.Trace; + +import com.google.common.base.Function; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.protobuf.Message; /** A client for an IPC service. IPC calls take a single Protobuf message as a * parameter, and return a single Protobuf message as their value. A service runs on @@ -170,9 +168,6 @@ public abstract class HBaseServer implements RpcServer { private volatile boolean started = false; // For generated protocol classes which doesn't have VERSION field - private static final Map, Long> - PROTOCOL_VERSION = new HashMap, Long>(); - private static final Map> PROTOCOL_CACHE = new ConcurrentHashMap>(); @@ -1504,9 +1499,13 @@ public abstract class HBaseServer implements RpcServer { new DataInputStream(new ByteArrayInputStream(buf)); header = ConnectionHeader.parseFrom(in); try { - String protocolClassName = header.getProtocol(); - if (protocolClassName != null) { + String protocolClassName = null; + if (header.hasProtocol()) protocolClassName = header.getProtocol(); + if (protocolClassName != null && protocolClassName.length() > 0) { protocol = getProtocolClass(header.getProtocol(), conf); + } else { + // TOOD: This is pb Service indicator for now. + this.protocol = null; } } catch (ClassNotFoundException cnfe) { throw new IOException("Unknown protocol: " + header.getProtocol()); @@ -1791,8 +1790,7 @@ public abstract class HBaseServer implements RpcServer { call.connection.protocol); // make the call - value = call(call.connection.protocol, call.rpcRequestBody, call.timestamp, - status); + value = call(call.connection.protocol, call.rpcRequestBody, call.timestamp, status); } catch (Throwable e) { LOG.debug(getName()+", call "+call+": error: " + e, e); errorClass = e.getClass().getName(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServerRPC.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServerRPC.java index b5ee23d..c70e1df 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServerRPC.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServerRPC.java @@ -146,6 +146,9 @@ public class HBaseServerRPC { /** * Construct a server for a protocol implementation instance. */ + // TODO: an opaque 'Class' as protocol is of no use, confuses actually. + // TODO: Remove. Rely on what is passed in as 'ifaces' instead. + // St.Ack 20130101 public static RpcServer getServer(Class protocol, final Object instance, final Class[] ifaces, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java index 46873ab..d5c1e27 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java @@ -19,16 +19,6 @@ package org.apache.hadoop.hbase.ipc; -import com.google.protobuf.Message; -import com.google.protobuf.ServiceException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.ipc.RemoteException; - -import javax.net.SocketFactory; import java.io.IOException; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; @@ -37,6 +27,21 @@ import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import javax.net.SocketFactory; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.ipc.RemoteException; + +import com.google.protobuf.BlockingRpcChannel; +import com.google.protobuf.Descriptors.MethodDescriptor; +import com.google.protobuf.Message; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + public class ProtobufRpcClientEngine implements RpcClientEngine { private static final Log LOG = @@ -46,7 +51,67 @@ public class ProtobufRpcClientEngine implements RpcClientEngine { super(); } + static class HBlockingRpcChannel implements BlockingRpcChannel { + private final InetSocketAddress address; + private final User user; + private final HBaseClient client; + private boolean isClosed = false; + private final int rpcTimeout; + + HBlockingRpcChannel(final InetSocketAddress address, final User user, + final Configuration conf, final SocketFactory factory, final int rpcTimeout) { + this.address = address; + this.user = user; + this.client = CLIENTS.getClient(conf, factory); + this.rpcTimeout = rpcTimeout; + } + + @Override + public Message callBlockingMethod(MethodDescriptor md, + RpcController controller, Message request, Message responseType) + throws ServiceException { + long startTime = 0; + if (LOG.isDebugEnabled()) { + startTime = System.currentTimeMillis(); + } + Message response = null; + try { + // TODO: Fix. Allowing protocol == null for now. Instead, pass name + // of the service. + response = client.call(constructRpcRequest(md.getName(), new Object [] {request}, 0), + address, null, user, rpcTimeout); + if (LOG.isDebugEnabled()) { + long callTime = System.currentTimeMillis() - startTime; + if (LOG.isTraceEnabled()) LOG.trace("Call: " + md.getName() + " " + callTime); + } + return response; + } catch (Throwable e) { + if (e instanceof RemoteException) { + Throwable cause = ((RemoteException)e).unwrapRemoteException(); + throw new ServiceException(cause); + } + throw new ServiceException(e); + } + } + + synchronized protected void close() { + if (!isClosed) { + isClosed = true; + CLIENTS.stopClient(client); + } + } + } + protected final static ClientCache CLIENTS = new ClientCache(); + + public BlockingRpcChannel getBlockingChannel(final InetSocketAddress addr, + final User user, final Configuration conf, final SocketFactory factory, + final int rpcTimeout) { + // EXPERIMENT!!!! I want to create a 'stub' but looks like you have to + // do the invocation against the actual class pb generates. + return new HBlockingRpcChannel(addr, user, conf, factory, rpcTimeout); + } + @Override public VersionedProtocol getProxy( Class protocol, long clientVersion, @@ -65,6 +130,33 @@ public class ProtobufRpcClientEngine implements RpcClientEngine { } } + private static RpcRequestBody constructRpcRequest(final String methodName, + Object[] params, + final long clientProtocolVersion) + throws ServiceException { + // TODO: Refactor so method name if needed is in the header and + // distinct from the request Message so we don't have to copy + // the request into this RpcRequestBody. + RpcRequestBody.Builder builder = RpcRequestBody.newBuilder(); + builder.setMethodName(methodName); + Message param; + int length = params.length; + if (length == 2) { + // RpcController + Message in the method args + // (generated code from RPC bits in .proto files have RpcController) + param = (Message)params[1]; + } else if (length == 1) { // Message + param = (Message)params[0]; + } else { + throw new ServiceException("Too many parameters for request. Method: [" + + methodName + "]" + ", Expected: 2, Actual: " + params.length); + } + builder.setRequestClassName(param.getClass().getName()); + builder.setRequest(param.toByteString()); + builder.setClientProtocolVersion(clientProtocolVersion); + return builder.build(); +} + static class Invoker implements InvocationHandler { private static final Map returnTypes = new ConcurrentHashMap(); @@ -100,31 +192,6 @@ public class ProtobufRpcClientEngine implements RpcClientEngine { } } - private RpcRequestBody constructRpcRequest(Method method, - Object[] params) throws ServiceException { - RpcRequestBody rpcRequest; - RpcRequestBody.Builder builder = RpcRequestBody.newBuilder(); - builder.setMethodName(method.getName()); - Message param; - int length = params.length; - if (length == 2) { - // RpcController + Message in the method args - // (generated code from RPC bits in .proto files have RpcController) - param = (Message)params[1]; - } else if (length == 1) { // Message - param = (Message)params[0]; - } else { - throw new ServiceException("Too many parameters for request. Method: [" - + method.getName() + "]" + ", Expected: 2, Actual: " - + params.length); - } - builder.setRequestClassName(param.getClass().getName()); - builder.setRequest(param.toByteString()); - builder.setClientProtocolVersion(clientProtocolVersion); - rpcRequest = builder.build(); - return rpcRequest; - } - /** * This is the client side invoker of RPC method. It only throws * ServiceException, since the invocation proxy expects only @@ -151,7 +218,8 @@ public class ProtobufRpcClientEngine implements RpcClientEngine { startTime = System.currentTimeMillis(); } - RpcRequestBody rpcRequest = constructRpcRequest(method, args); + RpcRequestBody rpcRequest = constructRpcRequest(method.getName(), + args, this.clientProtocolVersion); Message val = null; try { val = client.call(rpcRequest, address, protocol, ticket, rpcTimeout); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcServerEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcServerEngine.java index 3317af3..94820dd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcServerEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcServerEngine.java @@ -36,12 +36,13 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.security.HBasePolicyProvider; -import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager; -import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; import org.codehaus.jackson.map.ObjectMapper; +import com.google.protobuf.BlockingService; +import com.google.protobuf.Descriptors; import com.google.protobuf.Message; import com.google.protobuf.ServiceException; /** @@ -69,7 +70,6 @@ class ProtobufRpcServerEngine implements RpcServerEngine { public static class Server extends HBaseServer { boolean verbose; Object instance; - Class implementation; private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time"; private static final String WARN_RESPONSE_SIZE = @@ -79,11 +79,12 @@ class ProtobufRpcServerEngine implements RpcServerEngine { private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024; - /** Names for suffixed metrics */ - private static final String ABOVE_ONE_SEC_METRIC = ".aboveOneSec."; - private final int warnResponseTime; private final int warnResponseSize; + /** + * These are the Interfaces we will field requests for + */ + private final Class [] interfaces; private static String classNameBase(String className) { String[] names = className.split("\\.", -1); @@ -101,17 +102,13 @@ class ProtobufRpcServerEngine implements RpcServerEngine { super(bindAddress, port, numHandlers, metaHandlerCount, conf, classNameBase(instance.getClass().getName()), highPriorityLevel); - this.instance = instance; - this.implementation = instance.getClass(); - this.verbose = verbose; - this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME); this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE); this.verbose = verbose; this.instance = instance; - this.implementation = instance.getClass(); + this.interfaces = ifaces; } private static final Map methodArg = new ConcurrentHashMap(); @@ -155,81 +152,88 @@ class ProtobufRpcServerEngine implements RpcServerEngine { * exception name and the stack trace are returned in the protobuf response. */ public Message call(Class protocol, - RpcRequestBody rpcRequest, long receiveTime, MonitoredRPCHandler status) - throws IOException { + RpcRequestBody rpcRequest, long receiveTime, MonitoredRPCHandler status) + throws IOException { try { - String methodName = rpcRequest.getMethodName(); - Method method = getMethod(protocol, methodName); - if (method == null) { - throw new UnknownProtocolException("Method " + methodName + - " doesn't exist in protocol " + protocol.getName()); + // Test we can handle this protocol at all + // TODO: Fix this so we get instance of the Service rather than just test + // this protocol is legit. + if (!isAssignable(protocol)) { + throw new UnknownProtocolException("Protocol=" + protocol + ", this.interfaces=" + + this.interfaces); } - - /** - * RPCs for a particular interface (ie protocol) are done using a - * IPC connection that is setup using rpcProxy. - * The rpcProxy's has a declared protocol name that is - * sent form client to server at connection time. - */ - //TODO: use the clientVersion to do protocol compatibility checks, and - //this could be used here to handle complex use cases like deciding - //which implementation of the protocol should be used to service the - //current request, etc. Ideally, we shouldn't land up in a situation - //where we need to support such a use case. - //For now the clientVersion field is simply ignored - long clientVersion = rpcRequest.getClientProtocolVersion(); - - if (verbose) { - LOG.info("Call: protocol name=" + protocol.getName() + - ", method=" + methodName); - } - - status.setRPC(rpcRequest.getMethodName(), - new Object[]{rpcRequest.getRequest()}, receiveTime); - status.setRPCPacket(rpcRequest); - status.resume("Servicing call"); - //get an instance of the method arg type - Message protoType = getMethodArgType(method); - Message param = protoType.newBuilderForType() - .mergeFrom(rpcRequest.getRequest()).build(); - Message result; - Object impl = null; - if (protocol.isAssignableFrom(this.implementation)) { - impl = this.instance; - } else { - throw new UnknownProtocolException(protocol); - } - long startTime = System.currentTimeMillis(); - if (method.getParameterTypes().length == 2) { - // RpcController + Message in the method args - // (generated code from RPC bits in .proto files have RpcController) - result = (Message)method.invoke(impl, null, param); - } else if (method.getParameterTypes().length == 1) { - // Message (hand written code usually has only a single argument) - result = (Message)method.invoke(impl, param); + String methodName = rpcRequest.getMethodName(); + String protocolName = protocol.getName(); + Message request = null; + Message response = null; + long clientVersion = 0; + BlockingService bs = null; + if (this.instance instanceof BlockingService) bs = (BlockingService)this.instance; + if (bs != null) { + // TODO: Check Descriptor or MethodDescriptor not created each time. + // TODO: IS THE ORIGINAL IMPLEMENTATION BLOCKING? + Descriptors.MethodDescriptor md = + bs.getDescriptorForType().findMethodByName(methodName); + checkMethod(md, protocolName, methodName); + setStatus(protocolName, methodName, status, rpcRequest, receiveTime); + request = bs.getRequestPrototype(md).getDefaultInstanceForType(). + newBuilderForType().mergeFrom(rpcRequest.getRequest()).build(); + // Overriding RPCController to pass extra non-protobuf data in and out of method. + // Check for extra on wire and add it as payload. Ditto on way out. + PayloadCarryingRpcController c = new PayloadCarryingRpcController(); + response = bs.callBlockingMethod(md, c, request); } else { - throw new ServiceException("Too many parameters for method: [" - + method.getName() + "]" + ", allowed (at most): 2, Actual: " - + method.getParameterTypes().length); + Method method = getMethod(protocol, methodName); + checkMethod(method, protocolName, methodName); + /** + * RPCs for a particular interface (ie protocol) are done using a + * IPC connection that is setup using rpcProxy. + * The rpcProxy's has a declared protocol name that is + * sent form client to server at connection time. + */ + //TODO: use the clientVersion to do protocol compatibility checks, and + //this could be used here to handle complex use cases like deciding + //which implementation of the protocol should be used to service the + //current request, etc. Ideally, we shouldn't land up in a situation + //where we need to support such a use case. + //For now the clientVersion field is simply ignored + clientVersion = rpcRequest.getClientProtocolVersion(); + setStatus(protocolName, methodName, status, rpcRequest, receiveTime); + // Get an instance of the method arg type + Message protoType = getMethodArgType(method); + request = protoType.newBuilderForType().mergeFrom(rpcRequest.getRequest()).build(); + if (method.getParameterTypes().length == 2) { + // RpcController + Message in the method args + // (generated code from RPC bits in .proto files have RpcController) + response = (Message)method.invoke(this.instance, null, request); + } else if (method.getParameterTypes().length == 1) { + // Message (hand written code usually has only a single argument) + response = (Message)method.invoke(this.instance, request); + } else { + throw new ServiceException("Too many parameters for method: [" + + method.getName() + "]" + ", allowed (at most): 2, Actual: " + + method.getParameterTypes().length); + } } + int processingTime = (int) (System.currentTimeMillis() - startTime); int qTime = (int) (startTime-receiveTime); if (TRACELOG.isDebugEnabled()) { TRACELOG.debug("Call #" + CurCall.get().id + - "; served=" + protocol.getSimpleName() + "#" + method.getName() + + "; served=" + protocol.getSimpleName() + "#" + methodName + ", queueTime=" + qTime + ", processingTime=" + processingTime + - ", request=" + param.toString() + - " response=" + result.toString()); + ", request=" + request.toString() + + " response=" + response.toString()); } metrics.dequeuedCall(qTime); metrics.processedCall(processingTime); if (verbose) { - log("Return: "+result, LOG); + log("Return: " + response, LOG); } - long responseSize = result.getSerializedSize(); + long responseSize = response.getSerializedSize(); // log any RPC responses that are slower than the configured warn // response time or larger than configured warning size boolean tooSlow = (processingTime > warnResponseTime @@ -242,15 +246,15 @@ class ProtobufRpcServerEngine implements RpcServerEngine { StringBuilder buffer = new StringBuilder(256); buffer.append(methodName); buffer.append("("); - buffer.append(param.getClass().getName()); + buffer.append(request.getClass().getName()); buffer.append(")"); - buffer.append(", client version="+clientVersion); + buffer.append(", client version=" + clientVersion); logResponse(new Object[]{rpcRequest.getRequest()}, methodName, buffer.toString(), (tooLarge ? "TooLarge" : "TooSlow"), status.getClient(), startTime, processingTime, qTime, responseSize); } - return result; + return response; } catch (InvocationTargetException e) { Throwable target = e.getTargetException(); if (target instanceof IOException) { @@ -272,6 +276,36 @@ class ProtobufRpcServerEngine implements RpcServerEngine { } } + private void setStatus(final String protocolName, final String methodName, + final MonitoredRPCHandler status, final RpcRequestBody rpcRequest, + final long receiveTime) { + if (verbose) { + LOG.info("Call protocol name=" + protocolName + ", method=" + methodName); + } + status.setRPC(methodName, new Object[]{rpcRequest.getRequest()}, receiveTime); + status.setRPCPacket(rpcRequest); + status.resume("Servicing call"); + } + + private static void checkMethod(final Object method, final String protocolName, + final String methodName) + throws UnknownProtocolException { + if (method != null) return; + throw new UnknownProtocolException("Method=" + methodName + + " doesn't exist in protocol=" + protocolName); + } + + private boolean isAssignable(final Class protocol) { + boolean result = false; + for (Class i: this.interfaces) { + if (protocol.isAssignableFrom(i)) { + result = true; + break; + } + } + return result; + } + static Method getMethod(Class protocol, String methodName) { Method method = methodInstances.get(methodName); @@ -314,6 +348,7 @@ class ProtobufRpcServerEngine implements RpcServerEngine { methodArg.put(method.getName(), protoType); return protoType; } + /** * Logs an RPC response to the LOG file, producing valid JSON objects for * client Operations. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientEngine.java index f6dcbf9..9fdee0d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientEngine.java @@ -23,6 +23,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.security.User; +import com.google.protobuf.BlockingRpcChannel; + import javax.net.SocketFactory; import java.io.IOException; import java.net.InetSocketAddress; @@ -30,6 +32,8 @@ import java.net.InetSocketAddress; /** An RPC implementation for the client */ @InterfaceAudience.Private public interface RpcClientEngine { + public BlockingRpcChannel getBlockingChannel(final InetSocketAddress addr, final User user, + final Configuration conf, final SocketFactory factory, final int rpcTimeout); /** Construct a client-side proxy object. */ VersionedProtocol getProxy(Class protocol, long clientVersion, InetSocketAddress addr, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerEngine.java index 466efed..535c400 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerEngine.java @@ -28,6 +28,10 @@ import java.io.IOException; interface RpcServerEngine { /** Construct a server for a protocol implementation instance. */ + // TODO: This method seems wrong. Why do we take 'ifaces' and 'protocol'? + // A server should be able to do more than one 'iface'... but we take a single + // 'protocol' only. + // TODO: Fix. St.Ack 20130101 RpcServer getServer(Class protocol, Object instance, Class[] ifaces, String bindAddress, int port, int numHandlers, int metaHandlerCount, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java index 0d8684e..57b7e17 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java @@ -33,6 +33,7 @@ import org.junit.Before; import org.junit.After; import org.junit.experimental.categories.Category; +import com.google.protobuf.BlockingService; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; @@ -57,7 +58,6 @@ public class TestProtoBufRpc { } public static class PBServerImpl implements TestRpcService { - @Override public EmptyResponseProto ping(RpcController unused, EmptyRequestProto request) throws ServiceException { @@ -99,54 +99,53 @@ public class TestProtoBufRpc { HBaseClientRPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcClientEngine.class); HBaseServerRPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcServerEngine.class); - // Create server side implementation - PBServerImpl serverImpl = new PBServerImpl(); + // Create server side BlockingService implementation + BlockingService service = + TestProtobufRpcProto.newReflectiveBlockingService(new PBServerImpl()); // Get RPC server for server side implementation - server = HBaseServerRPC.getServer(TestRpcService.class,serverImpl, - new Class[]{TestRpcService.class}, - ADDRESS, PORT, 10, 10, true, conf, 0); - addr = server.getListenerAddress(); + server = HBaseServerRPC.getServer(TestRpcService.class, service, + new Class[]{TestRpcService.class}, ADDRESS, PORT, 10, 10, true, conf, 0); + TestProtoBufRpc.addr = server.getListenerAddress(); server.start(); } - - + @After public void tearDown() throws Exception { server.stop(); } - private static TestRpcService getClient() throws IOException { - // Set RPC engine to protobuf RPC engine - HBaseClientRPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcClientEngine.class); - HBaseServerRPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcServerEngine.class); - - return (TestRpcService) HBaseClientRPC.getProxy(TestRpcService.class, 0, - addr, conf, 10000); + private static TestProtobufRpcProto.BlockingInterface getClient() throws IOException { + // Create client-side blockinginterface. + return TestProtobufRpcProto. + newBlockingStub(HBaseClientRPC.getChannel(TestProtoBufRpc.addr, conf, 60000)); } @Test public void testProtoBufRpc() throws Exception { - TestRpcService client = getClient(); + TestProtobufRpcProto.BlockingInterface client = getClient(); testProtoBufRpc(client); } - - // separated test out so that other tests can call it. - public static void testProtoBufRpc(TestRpcService client) throws Exception { + + // Separated test out so that other tests can call it. + public static void testProtoBufRpc(TestProtobufRpcProto.BlockingInterface client) + throws Exception { // Test ping method EmptyRequestProto emptyRequest = EmptyRequestProto.newBuilder().build(); client.ping(null, emptyRequest); - + // Test echo method - EchoRequestProto echoRequest = EchoRequestProto.newBuilder() - .setMessage("hello").build(); + EchoRequestProto echoRequest = EchoRequestProto.newBuilder().setMessage("hello").build(); EchoResponseProto echoResponse = client.echo(null, echoRequest); Assert.assertEquals(echoResponse.getMessage(), "hello"); - + // Test error method - error should be thrown as RemoteException + boolean exception = false; try { client.error(null, emptyRequest); Assert.fail("Expected exception is not thrown"); } catch (ServiceException e) { + exception = true; } + Assert.assertTrue(exception); } -} +} \ No newline at end of file