conf configuration instance.
* If no current connection exists, method creates a new connection and keys it using
* connection-specific properties from the passed {@link Configuration}; see
@@ -335,7 +172,7 @@ final class ConnectionManager {
* This is the recommended way to create HConnections.
* {@code
* ExecutorService pool = ...;
- * HConnection connection = HConnectionManager.createConnection(conf, pool);
+ * HConnection connection = ConnectionManager.createConnection(conf, pool);
* HTableInterface table = connection.getTable("mytable");
* table.get(...);
* ...
@@ -361,7 +198,7 @@ final class ConnectionManager {
* This is the recommended way to create HConnections.
* {@code
* ExecutorService pool = ...;
- * HConnection connection = HConnectionManager.createConnection(conf, pool);
+ * HConnection connection = ConnectionManager.createConnection(conf, pool);
* HTableInterface table = connection.getTable("mytable");
* table.get(...);
* ...
@@ -386,7 +223,7 @@ final class ConnectionManager {
* This is the recommended way to create HConnections.
* {@code
* ExecutorService pool = ...;
- * HConnection connection = HConnectionManager.createConnection(conf, pool);
+ * HConnection connection = ConnectionManager.createConnection(conf, pool);
* HTableInterface table = connection.getTable("mytable");
* table.get(...);
* ...
@@ -425,19 +262,6 @@ final class ConnectionManager {
}
/**
- * Delete connection information for the instance specified by passed configuration.
- * If there are no more references to the designated connection connection, this method will
- * then close connection to the zookeeper ensemble and let go of all associated resources.
- *
- * @param conf configuration whose identity is used to find {@link HConnection} instance.
- * @deprecated connection caching is going away.
- */
- @Deprecated
- public static void deleteConnection(Configuration conf) {
- deleteConnection(new HConnectionKey(conf), false);
- }
-
- /**
* Cleanup a known stale connection.
* This will then close connection to the zookeeper ensemble and let go of all resources.
*
@@ -450,37 +274,10 @@ final class ConnectionManager {
}
/**
- * Delete information for all connections. Close or not the connection, depending on the
- * staleConnection boolean and the ref count. By default, you should use it with
- * staleConnection to true.
- * @deprecated connection caching is going away.
- */
- @Deprecated
- public static void deleteAllConnections(boolean staleConnection) {
- synchronized (CONNECTION_INSTANCES) {
- Setintf against the master
- * @throws MasterNotRunningException
- */
- Object makeStub() throws IOException {
- // The lock must be at the beginning to prevent multiple master creations
- // (and leaks) in a multithread context
- synchronized (masterAndZKLock) {
- Exception exceptionCaught = null;
- if (!closed) {
- try {
- return makeStubNoRetries();
- } catch (IOException e) {
- exceptionCaught = e;
- } catch (KeeperException e) {
- exceptionCaught = e;
- } catch (ServiceException e) {
- exceptionCaught = e;
- }
-
- throw new MasterNotRunningException(exceptionCaught);
- } else {
- throw new DoNotRetryIOException("Connection was closed while trying to get master");
- }
- }
- }
- }
-
- /**
- * Class to make a MasterServiceStubMaker stub.
- */
- class MasterServiceStubMaker extends StubMaker {
- private MasterService.BlockingInterface stub;
- @Override
- protected String getServiceName() {
- return MasterService.getDescriptor().getName();
- }
-
- @Override
- MasterService.BlockingInterface makeStub() throws IOException {
- return (MasterService.BlockingInterface)super.makeStub();
- }
-
- @Override
- protected Object makeStub(BlockingRpcChannel channel) {
- this.stub = MasterService.newBlockingStub(channel);
- return this.stub;
- }
-
- @Override
- protected void isMasterRunning() throws ServiceException {
- this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
- }
- }
-
- @Override
- public AdminService.BlockingInterface getAdmin(final ServerName serverName)
- throws IOException {
- return getAdmin(serverName, false);
- }
-
- @Override
- // Nothing is done w/ the 'master' parameter. It is ignored.
- public AdminService.BlockingInterface getAdmin(final ServerName serverName,
- final boolean master)
- throws IOException {
- if (isDeadServer(serverName)) {
- throw new RegionServerStoppedException(serverName + " is dead.");
- }
- String key = getStubKey(AdminService.BlockingInterface.class.getName(),
- serverName.getHostname(), serverName.getPort());
- this.connectionLock.putIfAbsent(key, key);
- AdminService.BlockingInterface stub = null;
- synchronized (this.connectionLock.get(key)) {
- stub = (AdminService.BlockingInterface)this.stubs.get(key);
- if (stub == null) {
- BlockingRpcChannel channel =
- this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout);
- stub = AdminService.newBlockingStub(channel);
- this.stubs.put(key, stub);
- }
- }
- return stub;
- }
-
- @Override
- public ClientService.BlockingInterface getClient(final ServerName sn)
- throws IOException {
- if (isDeadServer(sn)) {
- throw new RegionServerStoppedException(sn + " is dead.");
- }
- String key = getStubKey(ClientService.BlockingInterface.class.getName(), sn.getHostname(),
- sn.getPort());
- this.connectionLock.putIfAbsent(key, key);
- ClientService.BlockingInterface stub = null;
- synchronized (this.connectionLock.get(key)) {
- stub = (ClientService.BlockingInterface)this.stubs.get(key);
- if (stub == null) {
- BlockingRpcChannel channel =
- this.rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
- stub = ClientService.newBlockingStub(channel);
- // In old days, after getting stub/proxy, we'd make a call. We are not doing that here.
- // Just fail on first actual call rather than in here on setup.
- this.stubs.put(key, stub);
- }
- }
- return stub;
- }
-
- static String getStubKey(final String serviceName, final String rsHostname, int port) {
- // Sometimes, servers go down and they come back up with the same hostname but a different
- // IP address. Force a resolution of the rsHostname by trying to instantiate an
- // InetSocketAddress, and this way we will rightfully get a new stubKey.
- // Also, include the hostname in the key so as to take care of those cases where the
- // DNS name is different but IP address remains the same.
- InetAddress i = new InetSocketAddress(rsHostname, port).getAddress();
- String address = rsHostname;
- if (i != null) {
- address = i.getHostAddress() + "-" + rsHostname;
- }
- return serviceName + "@" + address + ":" + port;
- }
-
- private ZooKeeperKeepAliveConnection keepAliveZookeeper;
- private AtomicInteger keepAliveZookeeperUserCount = new AtomicInteger(0);
- private boolean canCloseZKW = true;
-
- // keepAlive time, in ms. No reason to make it configurable.
- private static final long keepAlive = 5 * 60 * 1000;
-
- /**
- * Retrieve a shared ZooKeeperWatcher. You must close it it once you've have finished with it.
- * @return The shared instance. Never returns null.
- */
- ZooKeeperKeepAliveConnection getKeepAliveZooKeeperWatcher()
- throws IOException {
- synchronized (masterAndZKLock) {
- if (keepAliveZookeeper == null) {
- if (this.closed) {
- throw new IOException(toString() + " closed");
- }
- // We don't check that our link to ZooKeeper is still valid
- // But there is a retry mechanism in the ZooKeeperWatcher itself
- keepAliveZookeeper = new ZooKeeperKeepAliveConnection(conf, this.toString(), this);
- }
- keepAliveZookeeperUserCount.addAndGet(1);
- keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
- return keepAliveZookeeper;
- }
- }
-
- void releaseZooKeeperWatcher(final ZooKeeperWatcher zkw) {
- if (zkw == null){
- return;
- }
- if (keepAliveZookeeperUserCount.addAndGet(-1) <= 0) {
- keepZooKeeperWatcherAliveUntil = System.currentTimeMillis() + keepAlive;
- }
- }
-
- private void closeZooKeeperWatcher() {
- synchronized (masterAndZKLock) {
- if (keepAliveZookeeper != null) {
- LOG.info("Closing zookeeper sessionid=0x" +
- Long.toHexString(
- keepAliveZookeeper.getRecoverableZooKeeper().getSessionId()));
- keepAliveZookeeper.internalClose();
- keepAliveZookeeper = null;
- }
- keepAliveZookeeperUserCount.set(0);
- }
- }
-
- final MasterServiceState masterServiceState = new MasterServiceState(this);
-
- @Override
- public MasterService.BlockingInterface getMaster() throws MasterNotRunningException {
- return getKeepAliveMasterService();
- }
-
- private void resetMasterServiceState(final MasterServiceState mss) {
- mss.userCount++;
- }
-
- @Override
- public MasterKeepAliveConnection getKeepAliveMasterService()
- throws MasterNotRunningException {
- synchronized (masterAndZKLock) {
- if (!isKeepAliveMasterConnectedAndRunning(this.masterServiceState)) {
- MasterServiceStubMaker stubMaker = new MasterServiceStubMaker();
- try {
- this.masterServiceState.stub = stubMaker.makeStub();
- } catch (MasterNotRunningException ex) {
- throw ex;
- } catch (IOException e) {
- // rethrow as MasterNotRunningException so that we can keep the method sig
- throw new MasterNotRunningException(e);
- }
- }
- resetMasterServiceState(this.masterServiceState);
- }
- // Ugly delegation just so we can add in a Close method.
- final MasterService.BlockingInterface stub = this.masterServiceState.stub;
- return new MasterKeepAliveConnection() {
- MasterServiceState mss = masterServiceState;
- @Override
- public AddColumnResponse addColumn(RpcController controller, AddColumnRequest request)
- throws ServiceException {
- return stub.addColumn(controller, request);
- }
-
- @Override
- public DeleteColumnResponse deleteColumn(RpcController controller,
- DeleteColumnRequest request)
- throws ServiceException {
- return stub.deleteColumn(controller, request);
- }
-
- @Override
- public ModifyColumnResponse modifyColumn(RpcController controller,
- ModifyColumnRequest request)
- throws ServiceException {
- return stub.modifyColumn(controller, request);
- }
-
- @Override
- public MoveRegionResponse moveRegion(RpcController controller,
- MoveRegionRequest request) throws ServiceException {
- return stub.moveRegion(controller, request);
- }
-
- @Override
- public DispatchMergingRegionsResponse dispatchMergingRegions(
- RpcController controller, DispatchMergingRegionsRequest request)
- throws ServiceException {
- return stub.dispatchMergingRegions(controller, request);
- }
-
- @Override
- public AssignRegionResponse assignRegion(RpcController controller,
- AssignRegionRequest request) throws ServiceException {
- return stub.assignRegion(controller, request);
- }
-
- @Override
- public UnassignRegionResponse unassignRegion(RpcController controller,
- UnassignRegionRequest request) throws ServiceException {
- return stub.unassignRegion(controller, request);
- }
-
- @Override
- public OfflineRegionResponse offlineRegion(RpcController controller,
- OfflineRegionRequest request) throws ServiceException {
- return stub.offlineRegion(controller, request);
- }
-
- @Override
- public DeleteTableResponse deleteTable(RpcController controller,
- DeleteTableRequest request) throws ServiceException {
- return stub.deleteTable(controller, request);
- }
-
- @Override
- public TruncateTableResponse truncateTable(RpcController controller,
- TruncateTableRequest request) throws ServiceException {
- return stub.truncateTable(controller, request);
- }
-
- @Override
- public EnableTableResponse enableTable(RpcController controller,
- EnableTableRequest request) throws ServiceException {
- return stub.enableTable(controller, request);
- }
-
- @Override
- public DisableTableResponse disableTable(RpcController controller,
- DisableTableRequest request) throws ServiceException {
- return stub.disableTable(controller, request);
- }
-
- @Override
- public ModifyTableResponse modifyTable(RpcController controller,
- ModifyTableRequest request) throws ServiceException {
- return stub.modifyTable(controller, request);
- }
-
- @Override
- public CreateTableResponse createTable(RpcController controller,
- CreateTableRequest request) throws ServiceException {
- return stub.createTable(controller, request);
- }
-
- @Override
- public ShutdownResponse shutdown(RpcController controller,
- ShutdownRequest request) throws ServiceException {
- return stub.shutdown(controller, request);
- }
-
- @Override
- public StopMasterResponse stopMaster(RpcController controller,
- StopMasterRequest request) throws ServiceException {
- return stub.stopMaster(controller, request);
- }
-
- @Override
- public BalanceResponse balance(RpcController controller,
- BalanceRequest request) throws ServiceException {
- return stub.balance(controller, request);
- }
-
- @Override
- public SetBalancerRunningResponse setBalancerRunning(
- RpcController controller, SetBalancerRunningRequest request)
- throws ServiceException {
- return stub.setBalancerRunning(controller, request);
- }
-
- @Override
- public RunCatalogScanResponse runCatalogScan(RpcController controller,
- RunCatalogScanRequest request) throws ServiceException {
- return stub.runCatalogScan(controller, request);
- }
-
- @Override
- public EnableCatalogJanitorResponse enableCatalogJanitor(
- RpcController controller, EnableCatalogJanitorRequest request)
- throws ServiceException {
- return stub.enableCatalogJanitor(controller, request);
- }
-
- @Override
- public IsCatalogJanitorEnabledResponse isCatalogJanitorEnabled(
- RpcController controller, IsCatalogJanitorEnabledRequest request)
- throws ServiceException {
- return stub.isCatalogJanitorEnabled(controller, request);
- }
-
- @Override
- public CoprocessorServiceResponse execMasterService(
- RpcController controller, CoprocessorServiceRequest request)
- throws ServiceException {
- return stub.execMasterService(controller, request);
- }
-
- @Override
- public SnapshotResponse snapshot(RpcController controller,
- SnapshotRequest request) throws ServiceException {
- return stub.snapshot(controller, request);
- }
-
- @Override
- public GetCompletedSnapshotsResponse getCompletedSnapshots(
- RpcController controller, GetCompletedSnapshotsRequest request)
- throws ServiceException {
- return stub.getCompletedSnapshots(controller, request);
- }
-
- @Override
- public DeleteSnapshotResponse deleteSnapshot(RpcController controller,
- DeleteSnapshotRequest request) throws ServiceException {
- return stub.deleteSnapshot(controller, request);
- }
-
- @Override
- public IsSnapshotDoneResponse isSnapshotDone(RpcController controller,
- IsSnapshotDoneRequest request) throws ServiceException {
- return stub.isSnapshotDone(controller, request);
- }
-
- @Override
- public RestoreSnapshotResponse restoreSnapshot(
- RpcController controller, RestoreSnapshotRequest request)
- throws ServiceException {
- return stub.restoreSnapshot(controller, request);
- }
-
- @Override
- public IsRestoreSnapshotDoneResponse isRestoreSnapshotDone(
- RpcController controller, IsRestoreSnapshotDoneRequest request)
- throws ServiceException {
- return stub.isRestoreSnapshotDone(controller, request);
- }
-
- @Override
- public ExecProcedureResponse execProcedure(
- RpcController controller, ExecProcedureRequest request)
- throws ServiceException {
- return stub.execProcedure(controller, request);
- }
-
- @Override
- public ExecProcedureResponse execProcedureWithRet(
- RpcController controller, ExecProcedureRequest request)
- throws ServiceException {
- return stub.execProcedureWithRet(controller, request);
- }
-
- @Override
- public IsProcedureDoneResponse isProcedureDone(RpcController controller,
- IsProcedureDoneRequest request) throws ServiceException {
- return stub.isProcedureDone(controller, request);
- }
-
- @Override
- public IsMasterRunningResponse isMasterRunning(
- RpcController controller, IsMasterRunningRequest request)
- throws ServiceException {
- return stub.isMasterRunning(controller, request);
- }
-
- @Override
- public ModifyNamespaceResponse modifyNamespace(RpcController controller,
- ModifyNamespaceRequest request)
- throws ServiceException {
- return stub.modifyNamespace(controller, request);
- }
-
- @Override
- public CreateNamespaceResponse createNamespace(
- RpcController controller, CreateNamespaceRequest request) throws ServiceException {
- return stub.createNamespace(controller, request);
- }
-
- @Override
- public DeleteNamespaceResponse deleteNamespace(
- RpcController controller, DeleteNamespaceRequest request) throws ServiceException {
- return stub.deleteNamespace(controller, request);
- }
-
- @Override
- public GetNamespaceDescriptorResponse getNamespaceDescriptor(RpcController controller,
- GetNamespaceDescriptorRequest request) throws ServiceException {
- return stub.getNamespaceDescriptor(controller, request);
- }
-
- @Override
- public ListNamespaceDescriptorsResponse listNamespaceDescriptors(RpcController controller,
- ListNamespaceDescriptorsRequest request) throws ServiceException {
- return stub.listNamespaceDescriptors(controller, request);
- }
-
- @Override
- public ListTableDescriptorsByNamespaceResponse listTableDescriptorsByNamespace(
- RpcController controller, ListTableDescriptorsByNamespaceRequest request)
- throws ServiceException {
- return stub.listTableDescriptorsByNamespace(controller, request);
- }
-
- @Override
- public ListTableNamesByNamespaceResponse listTableNamesByNamespace(
- RpcController controller, ListTableNamesByNamespaceRequest request)
- throws ServiceException {
- return stub.listTableNamesByNamespace(controller, request);
- }
-
- @Override
- public GetTableStateResponse getTableState(
- RpcController controller, GetTableStateRequest request)
- throws ServiceException {
- return stub.getTableState(controller, request);
- }
-
- @Override
- public void close() {
- release(this.mss);
- }
-
- @Override
- public GetSchemaAlterStatusResponse getSchemaAlterStatus(
- RpcController controller, GetSchemaAlterStatusRequest request)
- throws ServiceException {
- return stub.getSchemaAlterStatus(controller, request);
- }
-
- @Override
- public GetTableDescriptorsResponse getTableDescriptors(
- RpcController controller, GetTableDescriptorsRequest request)
- throws ServiceException {
- return stub.getTableDescriptors(controller, request);
- }
-
- @Override
- public GetTableNamesResponse getTableNames(
- RpcController controller, GetTableNamesRequest request)
- throws ServiceException {
- return stub.getTableNames(controller, request);
- }
-
- @Override
- public GetClusterStatusResponse getClusterStatus(
- RpcController controller, GetClusterStatusRequest request)
- throws ServiceException {
- return stub.getClusterStatus(controller, request);
- }
-
- @Override
- public SetQuotaResponse setQuota(
- RpcController controller, SetQuotaRequest request)
- throws ServiceException {
- return stub.setQuota(controller, request);
- }
-
- @Override
- public MajorCompactionTimestampResponse getLastMajorCompactionTimestamp(
- RpcController controller, MajorCompactionTimestampRequest request)
- throws ServiceException {
- return stub.getLastMajorCompactionTimestamp(controller, request);
- }
-
- @Override
- public MajorCompactionTimestampResponse getLastMajorCompactionTimestampForRegion(
- RpcController controller, MajorCompactionTimestampForRegionRequest request)
- throws ServiceException {
- return stub.getLastMajorCompactionTimestampForRegion(controller, request);
- }
- };
- }
-
-
- private static void release(MasterServiceState mss) {
- if (mss != null && mss.connection != null) {
- ((HConnectionImplementation)mss.connection).releaseMaster(mss);
- }
- }
-
- private boolean isKeepAliveMasterConnectedAndRunning(MasterServiceState mss) {
- if (mss.getStub() == null){
- return false;
- }
- try {
- return mss.isMasterRunning();
- } catch (UndeclaredThrowableException e) {
- // It's somehow messy, but we can receive exceptions such as
- // java.net.ConnectException but they're not declared. So we catch it...
- LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable());
- return false;
- } catch (ServiceException se) {
- LOG.warn("Checking master connection", se);
- return false;
- }
- }
-
- void releaseMaster(MasterServiceState mss) {
- if (mss.getStub() == null) return;
- synchronized (masterAndZKLock) {
- --mss.userCount;
- }
- }
-
- private void closeMasterService(MasterServiceState mss) {
- if (mss.getStub() != null) {
- LOG.info("Closing master protocol: " + mss);
- mss.clearStub();
- }
- mss.userCount = 0;
- }
-
- /**
- * Immediate close of the shared master. Can be by the delayed close or when closing the
- * connection itself.
- */
- private void closeMaster() {
- synchronized (masterAndZKLock) {
- closeMasterService(masterServiceState);
- }
- }
-
- void updateCachedLocation(HRegionInfo hri, ServerName source,
- ServerName serverName, long seqNum) {
- HRegionLocation newHrl = new HRegionLocation(hri, serverName, seqNum);
- cacheLocation(hri.getTable(), source, newHrl);
- }
-
- @Override
- public void deleteCachedRegionLocation(final HRegionLocation location) {
- metaCache.clearCache(location);
- }
-
- @Override
- public void updateCachedLocations(final TableName tableName, byte[] rowkey,
- final Object exception, final HRegionLocation source) {
- assert source != null;
- updateCachedLocations(tableName, source.getRegionInfo().getRegionName()
- , rowkey, exception, source.getServerName());
- }
-
- /**
- * Update the location with the new value (if the exception is a RegionMovedException)
- * or delete it from the cache. Does nothing if we can be sure from the exception that
- * the location is still accurate, or if the cache has already been updated.
- * @param exception an object (to simplify user code) on which we will try to find a nested
- * or wrapped or both RegionMovedException
- * @param source server that is the source of the location update.
- */
- @Override
- public void updateCachedLocations(final TableName tableName, byte[] regionName, byte[] rowkey,
- final Object exception, final ServerName source) {
- if (rowkey == null || tableName == null) {
- LOG.warn("Coding error, see method javadoc. row=" + (rowkey == null ? "null" : rowkey) +
- ", tableName=" + (tableName == null ? "null" : tableName));
- return;
- }
-
- if (source == null) {
- // This should not happen, but let's secure ourselves.
- return;
- }
-
- if (regionName == null) {
- // we do not know which region, so just remove the cache entry for the row and server
- metaCache.clearCache(tableName, rowkey, source);
- return;
- }
-
- // Is it something we have already updated?
- final RegionLocations oldLocations = getCachedLocation(tableName, rowkey);
- HRegionLocation oldLocation = null;
- if (oldLocations != null) {
- oldLocation = oldLocations.getRegionLocationByRegionName(regionName);
- }
- if (oldLocation == null || !source.equals(oldLocation.getServerName())) {
- // There is no such location in the cache (it's been removed already) or
- // the cache has already been refreshed with a different location. => nothing to do
- return;
- }
-
- HRegionInfo regionInfo = oldLocation.getRegionInfo();
- Throwable cause = findException(exception);
- if (cause != null) {
- if (cause instanceof RegionTooBusyException || cause instanceof RegionOpeningException) {
- // We know that the region is still on this region server
- return;
- }
-
- if (cause instanceof RegionMovedException) {
- RegionMovedException rme = (RegionMovedException) cause;
- if (LOG.isTraceEnabled()) {
- LOG.trace("Region " + regionInfo.getRegionNameAsString() + " moved to " +
- rme.getHostname() + ":" + rme.getPort() +
- " according to " + source.getHostAndPort());
- }
- // We know that the region is not anymore on this region server, but we know
- // the new location.
- updateCachedLocation(
- regionInfo, source, rme.getServerName(), rme.getLocationSeqNum());
- return;
- }
- }
-
- // If we're here, it means that can cannot be sure about the location, so we remove it from
- // the cache. Do not send the source because source can be a new server in the same host:port
- metaCache.clearCache(regionInfo);
- }
-
- @Override
- public void updateCachedLocations(final byte[] tableName, byte[] rowkey,
- final Object exception, final HRegionLocation source) {
- updateCachedLocations(TableName.valueOf(tableName), rowkey, exception, source);
- }
-
- /**
- * @deprecated since 0.96 - Use {@link HTableInterface#batch} instead
- */
- @Override
- @Deprecated
- public void processBatch(List extends Row> list,
- final TableName tableName,
- ExecutorService pool,
- Object[] results) throws IOException, InterruptedException {
- // This belongs in HTable!!! Not in here. St.Ack
-
- // results must be the same size as list
- if (results.length != list.size()) {
- throw new IllegalArgumentException(
- "argument results must be the same size as argument list");
- }
- processBatchCallback(list, tableName, pool, results, null);
- }
-
- /**
- * @deprecated Unsupported API
- */
- @Override
- @Deprecated
- public void processBatch(List extends Row> list,
- final byte[] tableName,
- ExecutorService pool,
- Object[] results) throws IOException, InterruptedException {
- processBatch(list, TableName.valueOf(tableName), pool, results);
- }
-
- /**
- * Send the queries in parallel on the different region servers. Retries on failures.
- * If the method returns it means that there is no error, and the 'results' array will
- * contain no exception. On error, an exception is thrown, and the 'results' array will
- * contain results and exceptions.
- * @deprecated since 0.96 - Use {@link HTable#processBatchCallback} instead
- */
- @Override
- @Deprecated
- public This is NOT a connection to a particular server but to ALL servers in the cluster. Individual
@@ -49,11 +49,12 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
* HConnection instances can be shared. Sharing
* is usually what you want because rather than each HConnection instance
* having to do its own discovery of regions out on the cluster, instead, all
- * clients get to share the one cache of locations. {@link HConnectionManager} does the
+ * clients get to share the one cache of locations. {@link ConnectionManager} does the
* sharing for you if you go by it getting connections. Sharing makes cleanup of
- * HConnections awkward. See {@link HConnectionManager} for cleanup discussion.
+ * HConnections awkward. See {@link ConnectionFactory} for cleanup discussion.
*
- * @see HConnectionManager
+ * @see ConnectionManager
+ * @see ConnectionFactory
* @deprecated in favor of {@link Connection} and {@link ConnectionFactory}
*/
@InterfaceAudience.Public
@@ -79,7 +80,7 @@ public interface HConnection extends Connection {
* This is a lightweight operation, pooling or caching of the returned HTableInterface
* is neither required nor desired.
* Note that the HConnection needs to be unmanaged
- * (created with {@link HConnectionManager#createConnection(Configuration)}).
+ * (created with {@link ConnectionFactory#createConnection(Configuration)}).
* @param tableName
* @return an HTable to use for interactions with this table
*/
@@ -92,7 +93,7 @@ public interface HConnection extends Connection {
* This is a lightweight operation, pooling or caching of the returned HTableInterface
* is neither required nor desired.
* Note that the HConnection needs to be unmanaged
- * (created with {@link HConnectionManager#createConnection(Configuration)}).
+ * (created with {@link ConnectionFactory#createConnection(Configuration)}).
* @param tableName
* @return an HTable to use for interactions with this table
*/
@@ -105,7 +106,7 @@ public interface HConnection extends Connection {
* This is a lightweight operation, pooling or caching of the returned HTableInterface
* is neither required nor desired.
* Note that the HConnection needs to be unmanaged
- * (created with {@link HConnectionManager#createConnection(Configuration)}).
+ * (created with {@link ConnectionFactory#createConnection(Configuration)}).
* @param tableName
* @return an HTable to use for interactions with this table
*/
@@ -119,7 +120,7 @@ public interface HConnection extends Connection {
* This is a lightweight operation, pooling or caching of the returned HTableInterface
* is neither required nor desired.
* Note that the HConnection needs to be unmanaged
- * (created with {@link HConnectionManager#createConnection(Configuration)}).
+ * (created with {@link ConnectionFactory#createConnection(Configuration)}).
* @param tableName
* @param pool The thread pool to use for batch operations, null to use a default pool.
* @return an HTable to use for interactions with this table
@@ -133,7 +134,7 @@ public interface HConnection extends Connection {
* This is a lightweight operation, pooling or caching of the returned HTableInterface
* is neither required nor desired.
* Note that the HConnection needs to be unmanaged
- * (created with {@link HConnectionManager#createConnection(Configuration)}).
+ * (created with {@link ConnectionFactory#createConnection(Configuration)}).
* @param tableName
* @param pool The thread pool to use for batch operations, null to use a default pool.
* @return an HTable to use for interactions with this table
@@ -147,7 +148,7 @@ public interface HConnection extends Connection {
* This is a lightweight operation, pooling or caching of the returned HTableInterface
* is neither required nor desired.
* Note that the HConnection needs to be unmanaged
- * (created with {@link HConnectionManager#createConnection(Configuration)}).
+ * (created with {@link ConnectionFactory#createConnection(Configuration)}).
* @param tableName
* @param pool The thread pool to use for batch operations, null to use a default pool.
* @return an HTable to use for interactions with this table
@@ -163,7 +164,7 @@ public interface HConnection extends Connection {
* required nor desired.
*
* RegionLocator needs to be unmanaged
- * (created with {@link HConnectionManager#createConnection(Configuration)}).
+ * (created with {@link ConnectionFactory#createConnection(Configuration)}).
*
* @param tableName Name of the table who's region is to be examined
* @return A RegionLocator instance
diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionImplementation.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionImplementation.java
new file mode 100644
index 0000000..73a0f1c
--- /dev/null
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionImplementation.java
@@ -0,0 +1,2132 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.BlockingRpcChannel;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.RegionTooBusyException;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotEnabledException;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
+import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.exceptions.RegionMovedException;
+import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.RpcClientFactory;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos;
+import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ExceptionUtil;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.zookeeper.KeeperException;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Encapsulates connection to zookeeper and regionservers.*/
+@edu.umd.cs.findbugs.annotations.SuppressWarnings(
+ value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
+ justification="Access to the conncurrent hash map is under a lock so should be fine.")
+class HConnectionImplementation implements ClusterConnection, Closeable {
+ static final Log LOG = LogFactory.getLog(HConnectionImplementation.class);
+ private static final String CLIENT_NONCES_ENABLED_KEY = "hbase.client.nonces.enabled";
+
+ private final long pause;
+ private final boolean useMetaReplicas;
+ private final int numTries;
+ final int rpcTimeout;
+
+ /**
+ * Global nonceGenerator shared per client.Currently there's no reason to limit its scope.
+ * Once it's set under nonceGeneratorCreateLock, it is never unset or changed.
+ */
+ private static volatile NonceGenerator nonceGenerator = null;
+ /** The nonce generator lock. Only taken when creating HConnection, which gets a private copy. */
+ private static Object nonceGeneratorCreateLock = new Object();
+
+ private final AsyncProcess asyncProcess;
+ // single tracker per connection
+ private final ServerStatisticTracker stats;
+
+ private volatile boolean closed;
+ private volatile boolean aborted;
+
+ // package protected for the tests
+ ClusterStatusListener clusterStatusListener;
+
+
+ private final Object metaRegionLock = new Object();
+
+ // We have a single lock for master & zk to prevent deadlocks. Having
+ // one lock for ZK and one lock for master is not possible:
+ // When creating a connection to master, we need a connection to ZK to get
+ // its address. But another thread could have taken the ZK lock, and could
+ // be waiting for the master lock => deadlock.
+ private final Object masterAndZKLock = new Object();
+
+ private long keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
+
+ // thread executor shared by all HTableInterface instances created
+ // by this connection
+ private volatile ExecutorService batchPool = null;
+ // meta thread executor shared by all HTableInterface instances created
+ // by this connection
+ private volatile ExecutorService metaLookupPool = null;
+ private volatile boolean cleanupPool = false;
+
+ private final Configuration conf;
+
+ // cache the configuration value for tables so that we can avoid calling
+ // the expensive Configuration to fetch the value multiple times.
+ private final TableConfiguration tableConfig;
+
+ // Client rpc instance.
+ private RpcClient rpcClient;
+
+ private MetaCache metaCache = new MetaCache();
+
+ private int refCount;
+
+ // indicates whether this connection's life cycle is managed (by us)
+ private boolean managed;
+
+ private User user;
+
+ private RpcRetryingCallerFactory rpcCallerFactory;
+
+ private RpcControllerFactory rpcControllerFactory;
+
+ private final RetryingCallerInterceptor interceptor;
+
+ /**
+ * Cluster registry of basic info such as clusterid and meta region location.
+ */
+ Registry registry;
+
+ private final ClientBackoffPolicy backoffPolicy;
+
+ HConnectionImplementation(Configuration conf, boolean managed) throws IOException {
+ this(conf, managed, null, null);
+ }
+
+ /**
+ * constructor
+ * @param conf Configuration object
+ * @param managed If true, does not do full shutdown on close; i.e. cleanup of connection
+ * to zk and shutdown of all services; we just close down the resources this connection was
+ * responsible for and decrement usage counters. It is up to the caller to do the full
+ * cleanup. It is set when we want have connection sharing going on -- reuse of zk connection,
+ * and cached region locations, established regionserver connections, etc. When connections
+ * are shared, we have reference counting going on and will only do full cleanup when no more
+ * users of an HConnectionImplementation instance.
+ */
+ HConnectionImplementation(Configuration conf, boolean managed,
+ ExecutorService pool, User user) throws IOException {
+ this(conf);
+ this.user = user;
+ this.batchPool = pool;
+ this.managed = managed;
+ this.registry = setupRegistry();
+ retrieveClusterId();
+
+ this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId);
+ this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
+
+ // Do we publish the status?
+ boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED,
+ HConstants.STATUS_PUBLISHED_DEFAULT);
+ Class extends ClusterStatusListener.Listener> listenerClass =
+ conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS,
+ ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS,
+ ClusterStatusListener.Listener.class);
+ if (shouldListen) {
+ if (listenerClass == null) {
+ LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
+ ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status");
+ } else {
+ clusterStatusListener = new ClusterStatusListener(
+ new ClusterStatusListener.DeadServerHandler() {
+ @Override
+ public void newDead(ServerName sn) {
+ clearCaches(sn);
+ rpcClient.cancelConnections(sn);
+ }
+ }, conf, listenerClass);
+ }
+ }
+ }
+
+ /**
+ * For tests.
+ */
+ protected HConnectionImplementation(Configuration conf) {
+ this.conf = conf;
+ this.tableConfig = new TableConfiguration(conf);
+ this.closed = false;
+ this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
+ HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
+ this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS,
+ HConstants.DEFAULT_USE_META_REPLICAS);
+ this.numTries = tableConfig.getRetriesNumber();
+ this.rpcTimeout = conf.getInt(
+ HConstants.HBASE_RPC_TIMEOUT_KEY,
+ HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+ if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
+ synchronized (nonceGeneratorCreateLock) {
+ if (nonceGenerator == null) {
+ nonceGenerator = new PerClientRandomNonceGenerator();
+ }
+ }
+ } else {
+ nonceGenerator = new ConnectionManager.NoNonceGenerator();
+ }
+ stats = ServerStatisticTracker.create(conf);
+ this.asyncProcess = createAsyncProcess(this.conf);
+ this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build();
+ this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
+ this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
+ }
+
+ /**
+ * @param conn The connection for which to replace the generator.
+ * @param cnm Replaces the nonce generator used, for testing.
+ * @return old nonce generator.
+ */
+ @VisibleForTesting
+ static NonceGenerator injectNonceGeneratorForTesting(
+ ClusterConnection conn, NonceGenerator cnm) {
+ HConnectionImplementation connImpl = (HConnectionImplementation)conn;
+ NonceGenerator ng = connImpl.getNonceGenerator();
+ ConnectionManager.LOG.warn("Nonce generator is being replaced by test code for "
+ + cnm.getClass().getName());
+ nonceGenerator = cnm;
+ return ng;
+ }
+
+ @Override
+ public HTableInterface getTable(String tableName) throws IOException {
+ return getTable(TableName.valueOf(tableName));
+ }
+
+ @Override
+ public HTableInterface getTable(byte[] tableName) throws IOException {
+ return getTable(TableName.valueOf(tableName));
+ }
+
+ @Override
+ public HTableInterface getTable(TableName tableName) throws IOException {
+ return getTable(tableName, getBatchPool());
+ }
+
+ @Override
+ public HTableInterface getTable(String tableName, ExecutorService pool) throws IOException {
+ return getTable(TableName.valueOf(tableName), pool);
+ }
+
+ @Override
+ public HTableInterface getTable(byte[] tableName, ExecutorService pool) throws IOException {
+ return getTable(TableName.valueOf(tableName), pool);
+ }
+
+ @Override
+ public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException {
+ if (managed) {
+ throw new NeedUnmanagedConnectionException();
+ }
+ return new HTable(tableName, this, tableConfig, rpcCallerFactory, rpcControllerFactory, pool);
+ }
+
+ @Override
+ public BufferedMutator getBufferedMutator(BufferedMutatorParams params) {
+ if (params.getTableName() == null) {
+ throw new IllegalArgumentException("TableName cannot be null.");
+ }
+ if (params.getPool() == null) {
+ params.pool(HTable.getDefaultExecutor(getConfiguration()));
+ }
+ if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) {
+ params.writeBufferSize(tableConfig.getWriteBufferSize());
+ }
+ if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) {
+ params.maxKeyValueSize(tableConfig.getMaxKeyValueSize());
+ }
+ return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params);
+ }
+
+ @Override
+ public BufferedMutator getBufferedMutator(TableName tableName) {
+ return getBufferedMutator(new BufferedMutatorParams(tableName));
+ }
+
+ @Override
+ public RegionLocator getRegionLocator(TableName tableName) throws IOException {
+ return new HRegionLocator(tableName, this);
+ }
+
+ @Override
+ public Admin getAdmin() throws IOException {
+ if (managed) {
+ throw new NeedUnmanagedConnectionException();
+ }
+ return new HBaseAdmin(this);
+ }
+
+ private ExecutorService getBatchPool() {
+ if (batchPool == null) {
+ synchronized (this) {
+ if (batchPool == null) {
+ this.batchPool = getThreadPool(conf.getInt("hbase.hconnection.threads.max", 256),
+ conf.getInt("hbase.hconnection.threads.core", 256), "-shared-");
+ this.cleanupPool = true;
+ }
+ }
+ }
+ return this.batchPool;
+ }
+
+ private ExecutorService getThreadPool(int maxThreads, int coreThreads, String nameHint) {
+ // shared HTable thread executor not yet initialized
+ if (maxThreads == 0) {
+ maxThreads = Runtime.getRuntime().availableProcessors() * 8;
+ }
+ if (coreThreads == 0) {
+ coreThreads = Runtime.getRuntime().availableProcessors() * 8;
+ }
+ long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
+ LinkedBlockingQueue The simplest way to use this class is by using {@link #createConnection(Configuration)}.
- * This creates a new {@link HConnection} to the cluster that is managed by the caller.
- * From this {@link HConnection} {@link HTableInterface} implementations are retrieved
- * with {@link HConnection#getTable(byte[])}. Example:
- * This class has a static Map of {@link HConnection} instances keyed by
- * {@link HConnectionKey}; A {@link HConnectionKey} is identified by a set of
- * {@link Configuration} properties. Invocations of {@link #getConnection(Configuration)}
- * that pass the same {@link Configuration} instance will return the same
- * {@link HConnection} instance ONLY WHEN the set of properties are the same
- * (i.e. if you change properties in your {@link Configuration} instance, such as RPC timeout,
- * the codec used, HBase will create a new {@link HConnection} instance. For more details on
- * how this is done see {@link HConnectionKey}).
- * Sharing {@link HConnection} instances is usually what you want; all clients
- * of the {@link HConnection} instances share the HConnections' cache of Region
- * locations rather than each having to discover for itself the location of meta, etc.
- * But sharing connections makes clean up of {@link HConnection} instances a little awkward.
- * Currently, clients cleanup by calling {@link #deleteConnection(Configuration)}. This will
- * shutdown the zookeeper connection the HConnection was using and clean up all
- * HConnection resources as well as stopping proxies to servers out on the
- * cluster. Not running the cleanup will not end the world; it'll
- * just stall the closeup some and spew some zookeeper connection failed
- * messages into the log. Running the cleanup on a {@link HConnection} that is
- * subsequently used by another will cause breakage so be careful running
- * cleanup.
- * To create a {@link HConnection} that is not shared by others, you can
- * set property "hbase.client.instance.id" to a unique value for your {@link Configuration}
- * instance, like the following:
- * Cleanup used to be done inside in a shutdown hook. On startup we'd
- * register a shutdown hook that called {@link #deleteAllConnections()}
- * on its way out but the order in which shutdown hooks run is not defined so
- * were problematic for clients of HConnection that wanted to register their
- * own shutdown hooks so we removed ours though this shifts the onus for
- * cleanup to the client.
- * @deprecated Please use ConnectionFactory instead
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-@Deprecated
-public final class HConnectionManager extends ConnectionFactory {
-
- /** @deprecated connection caching is going away */
- @Deprecated
- public static final String RETRIES_BY_SERVER_KEY =
- ConnectionManager.RETRIES_BY_SERVER_KEY;
-
- /** @deprecated connection caching is going away */
- @Deprecated
- public static final int MAX_CACHED_CONNECTION_INSTANCES =
- ConnectionManager.MAX_CACHED_CONNECTION_INSTANCES;
-
- /*
- * Non-instantiable.
- */
- private HConnectionManager() {
- super();
- }
-
- /**
- * Get the connection that goes with the passed Note: This bypasses the usual HConnection life cycle management done by
- * {@link #getConnection(Configuration)}. The caller is responsible for
- * calling {@link HConnection#close()} on the returned connection instance.
- *
- * This is the recommended way to create HConnections.
- * Note: This bypasses the usual HConnection life cycle management done by
- * {@link #getConnection(Configuration)}. The caller is responsible for
- * calling {@link HConnection#close()} on the returned connection instance.
- * This is the recommended way to create HConnections.
- * Note: This bypasses the usual HConnection life cycle management done by
- * {@link #getConnection(Configuration)}. The caller is responsible for
- * calling {@link HConnection#close()} on the returned connection instance.
- * This is the recommended way to create HConnections.
- * Note: This bypasses the usual HConnection life cycle management done by
- * {@link #getConnection(Configuration)}. The caller is responsible for
- * calling {@link HConnection#close()} on the returned connection instance.
- * This is the recommended way to create HConnections.
- * intf against the master
+ * @throws org.apache.hadoop.hbase.MasterNotRunningException
+ */
+ Object makeStub() throws IOException {
+ // The lock must be at the beginning to prevent multiple master creations
+ // (and leaks) in a multithread context
+ synchronized (masterAndZKLock) {
+ Exception exceptionCaught = null;
+ if (!closed) {
+ try {
+ return makeStubNoRetries();
+ } catch (IOException e) {
+ exceptionCaught = e;
+ } catch (KeeperException e) {
+ exceptionCaught = e;
+ } catch (ServiceException e) {
+ exceptionCaught = e;
+ }
+
+ throw new MasterNotRunningException(exceptionCaught);
+ } else {
+ throw new DoNotRetryIOException("Connection was closed while trying to get master");
+ }
+ }
+ }
+ }
+
+ /**
+ * Class to make a MasterServiceStubMaker stub.
+ */
+ class MasterServiceStubMaker extends StubMaker {
+ private MasterProtos.MasterService.BlockingInterface stub;
+ @Override
+ protected String getServiceName() {
+ return MasterProtos.MasterService.getDescriptor().getName();
+ }
+
+ @Override
+ MasterProtos.MasterService.BlockingInterface makeStub() throws IOException {
+ return (MasterProtos.MasterService.BlockingInterface)super.makeStub();
+ }
+
+ @Override
+ protected Object makeStub(BlockingRpcChannel channel) {
+ this.stub = MasterProtos.MasterService.newBlockingStub(channel);
+ return this.stub;
+ }
+
+ @Override
+ protected void isMasterRunning() throws ServiceException {
+ this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
+ }
+ }
+
+ @Override
+ public AdminProtos.AdminService.BlockingInterface getAdmin(final ServerName serverName)
+ throws IOException {
+ return getAdmin(serverName, false);
+ }
+
+ @Override
+ // Nothing is done w/ the 'master' parameter. It is ignored.
+ public AdminProtos.AdminService.BlockingInterface getAdmin(final ServerName serverName,
+ final boolean master)
+ throws IOException {
+ if (isDeadServer(serverName)) {
+ throw new RegionServerStoppedException(serverName + " is dead.");
+ }
+ String key = getStubKey(AdminProtos.AdminService.BlockingInterface.class.getName(),
+ serverName.getHostname(), serverName.getPort());
+ this.connectionLock.putIfAbsent(key, key);
+ AdminProtos.AdminService.BlockingInterface stub = null;
+ synchronized (this.connectionLock.get(key)) {
+ stub = (AdminProtos.AdminService.BlockingInterface)this.stubs.get(key);
+ if (stub == null) {
+ BlockingRpcChannel channel =
+ this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout);
+ stub = AdminProtos.AdminService.newBlockingStub(channel);
+ this.stubs.put(key, stub);
+ }
+ }
+ return stub;
+ }
+
+ @Override
+ public ClientProtos.ClientService.BlockingInterface getClient(final ServerName sn)
+ throws IOException {
+ if (isDeadServer(sn)) {
+ throw new RegionServerStoppedException(sn + " is dead.");
+ }
+ String key = getStubKey(ClientProtos.ClientService.BlockingInterface.class.getName(), sn.getHostname(),
+ sn.getPort());
+ this.connectionLock.putIfAbsent(key, key);
+ ClientProtos.ClientService.BlockingInterface stub = null;
+ synchronized (this.connectionLock.get(key)) {
+ stub = (ClientProtos.ClientService.BlockingInterface)this.stubs.get(key);
+ if (stub == null) {
+ BlockingRpcChannel channel =
+ this.rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
+ stub = ClientProtos.ClientService.newBlockingStub(channel);
+ // In old days, after getting stub/proxy, we'd make a call. We are not doing that here.
+ // Just fail on first actual call rather than in here on setup.
+ this.stubs.put(key, stub);
+ }
+ }
+ return stub;
+ }
+
+ static String getStubKey(final String serviceName, final String rsHostname, int port) {
+ // Sometimes, servers go down and they come back up with the same hostname but a different
+ // IP address. Force a resolution of the rsHostname by trying to instantiate an
+ // InetSocketAddress, and this way we will rightfully get a new stubKey.
+ // Also, include the hostname in the key so as to take care of those cases where the
+ // DNS name is different but IP address remains the same.
+ InetAddress i = new InetSocketAddress(rsHostname, port).getAddress();
+ String address = rsHostname;
+ if (i != null) {
+ address = i.getHostAddress() + "-" + rsHostname;
+ }
+ return serviceName + "@" + address + ":" + port;
+ }
+
+ private ZooKeeperKeepAliveConnection keepAliveZookeeper;
+ private AtomicInteger keepAliveZookeeperUserCount = new AtomicInteger(0);
+ private boolean canCloseZKW = true;
+
+ // keepAlive time, in ms. No reason to make it configurable.
+ private static final long keepAlive = 5 * 60 * 1000;
+
+ /**
+ * Retrieve a shared ZooKeeperWatcher. You must close it it once you've have finished with it.
+ * @return The shared instance. Never returns null.
+ */
+ ZooKeeperKeepAliveConnection getKeepAliveZooKeeperWatcher()
+ throws IOException {
+ synchronized (masterAndZKLock) {
+ if (keepAliveZookeeper == null) {
+ if (this.closed) {
+ throw new IOException(toString() + " closed");
+ }
+ // We don't check that our link to ZooKeeper is still valid
+ // But there is a retry mechanism in the ZooKeeperWatcher itself
+ keepAliveZookeeper = new ZooKeeperKeepAliveConnection(conf, this.toString(), this);
+ }
+ keepAliveZookeeperUserCount.addAndGet(1);
+ keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
+ return keepAliveZookeeper;
+ }
+ }
+
+ void releaseZooKeeperWatcher(final ZooKeeperWatcher zkw) {
+ if (zkw == null){
+ return;
+ }
+ if (keepAliveZookeeperUserCount.addAndGet(-1) <= 0) {
+ keepZooKeeperWatcherAliveUntil = System.currentTimeMillis() + keepAlive;
+ }
+ }
+
+ private void closeZooKeeperWatcher() {
+ synchronized (masterAndZKLock) {
+ if (keepAliveZookeeper != null) {
+ LOG.info("Closing zookeeper sessionid=0x" +
+ Long.toHexString(
+ keepAliveZookeeper.getRecoverableZooKeeper().getSessionId()));
+ keepAliveZookeeper.internalClose();
+ keepAliveZookeeper = null;
+ }
+ keepAliveZookeeperUserCount.set(0);
+ }
+ }
+
+ final MasterServiceState masterServiceState = new MasterServiceState(this);
+
+ @Override
+ public MasterProtos.MasterService.BlockingInterface getMaster() throws MasterNotRunningException {
+ return getKeepAliveMasterService();
+ }
+
+ private void resetMasterServiceState(final MasterServiceState mss) {
+ mss.userCount++;
+ }
+
+ @Override
+ public MasterKeepAliveConnection getKeepAliveMasterService()
+ throws MasterNotRunningException {
+ synchronized (masterAndZKLock) {
+ if (!isKeepAliveMasterConnectedAndRunning(this.masterServiceState)) {
+ MasterServiceStubMaker stubMaker = new MasterServiceStubMaker();
+ try {
+ this.masterServiceState.stub = stubMaker.makeStub();
+ } catch (MasterNotRunningException ex) {
+ throw ex;
+ } catch (IOException e) {
+ // rethrow as MasterNotRunningException so that we can keep the method sig
+ throw new MasterNotRunningException(e);
+ }
+ }
+ resetMasterServiceState(this.masterServiceState);
+ }
+ // Ugly delegation just so we can add in a Close method.
+ final MasterProtos.MasterService.BlockingInterface stub = this.masterServiceState.stub;
+ return new MasterKeepAliveConnection() {
+ MasterServiceState mss = masterServiceState;
+ @Override
+ public MasterProtos.AddColumnResponse addColumn(RpcController controller, MasterProtos.AddColumnRequest request)
+ throws ServiceException {
+ return stub.addColumn(controller, request);
+ }
+
+ @Override
+ public MasterProtos.DeleteColumnResponse deleteColumn(RpcController controller,
+ MasterProtos.DeleteColumnRequest request)
+ throws ServiceException {
+ return stub.deleteColumn(controller, request);
+ }
+
+ @Override
+ public MasterProtos.ModifyColumnResponse modifyColumn(RpcController controller,
+ MasterProtos.ModifyColumnRequest request)
+ throws ServiceException {
+ return stub.modifyColumn(controller, request);
+ }
+
+ @Override
+ public MasterProtos.MoveRegionResponse moveRegion(RpcController controller,
+ MasterProtos.MoveRegionRequest request) throws ServiceException {
+ return stub.moveRegion(controller, request);
+ }
+
+ @Override
+ public MasterProtos.DispatchMergingRegionsResponse dispatchMergingRegions(
+ RpcController controller, MasterProtos.DispatchMergingRegionsRequest request)
+ throws ServiceException {
+ return stub.dispatchMergingRegions(controller, request);
+ }
+
+ @Override
+ public MasterProtos.AssignRegionResponse assignRegion(RpcController controller,
+ MasterProtos.AssignRegionRequest request) throws ServiceException {
+ return stub.assignRegion(controller, request);
+ }
+
+ @Override
+ public MasterProtos.UnassignRegionResponse unassignRegion(RpcController controller,
+ MasterProtos.UnassignRegionRequest request) throws ServiceException {
+ return stub.unassignRegion(controller, request);
+ }
+
+ @Override
+ public MasterProtos.OfflineRegionResponse offlineRegion(RpcController controller,
+ MasterProtos.OfflineRegionRequest request) throws ServiceException {
+ return stub.offlineRegion(controller, request);
+ }
+
+ @Override
+ public MasterProtos.DeleteTableResponse deleteTable(RpcController controller,
+ MasterProtos.DeleteTableRequest request) throws ServiceException {
+ return stub.deleteTable(controller, request);
+ }
+
+ @Override
+ public MasterProtos.TruncateTableResponse truncateTable(RpcController controller,
+ MasterProtos.TruncateTableRequest request) throws ServiceException {
+ return stub.truncateTable(controller, request);
+ }
+
+ @Override
+ public MasterProtos.EnableTableResponse enableTable(RpcController controller,
+ MasterProtos.EnableTableRequest request) throws ServiceException {
+ return stub.enableTable(controller, request);
+ }
+
+ @Override
+ public MasterProtos.DisableTableResponse disableTable(RpcController controller,
+ MasterProtos.DisableTableRequest request) throws ServiceException {
+ return stub.disableTable(controller, request);
+ }
+
+ @Override
+ public MasterProtos.ModifyTableResponse modifyTable(RpcController controller,
+ MasterProtos.ModifyTableRequest request) throws ServiceException {
+ return stub.modifyTable(controller, request);
+ }
+
+ @Override
+ public MasterProtos.CreateTableResponse createTable(RpcController controller,
+ MasterProtos.CreateTableRequest request) throws ServiceException {
+ return stub.createTable(controller, request);
+ }
+
+ @Override
+ public MasterProtos.ShutdownResponse shutdown(RpcController controller,
+ MasterProtos.ShutdownRequest request) throws ServiceException {
+ return stub.shutdown(controller, request);
+ }
+
+ @Override
+ public MasterProtos.StopMasterResponse stopMaster(RpcController controller,
+ MasterProtos.StopMasterRequest request) throws ServiceException {
+ return stub.stopMaster(controller, request);
+ }
+
+ @Override
+ public MasterProtos.BalanceResponse balance(RpcController controller,
+ MasterProtos.BalanceRequest request) throws ServiceException {
+ return stub.balance(controller, request);
+ }
+
+ @Override
+ public MasterProtos.SetBalancerRunningResponse setBalancerRunning(
+ RpcController controller, MasterProtos.SetBalancerRunningRequest request)
+ throws ServiceException {
+ return stub.setBalancerRunning(controller, request);
+ }
+
+ @Override
+ public MasterProtos.RunCatalogScanResponse runCatalogScan(RpcController controller,
+ MasterProtos.RunCatalogScanRequest request) throws ServiceException {
+ return stub.runCatalogScan(controller, request);
+ }
+
+ @Override
+ public MasterProtos.EnableCatalogJanitorResponse enableCatalogJanitor(
+ RpcController controller, MasterProtos.EnableCatalogJanitorRequest request)
+ throws ServiceException {
+ return stub.enableCatalogJanitor(controller, request);
+ }
+
+ @Override
+ public MasterProtos.IsCatalogJanitorEnabledResponse isCatalogJanitorEnabled(
+ RpcController controller, MasterProtos.IsCatalogJanitorEnabledRequest request)
+ throws ServiceException {
+ return stub.isCatalogJanitorEnabled(controller, request);
+ }
+
+ @Override
+ public ClientProtos.CoprocessorServiceResponse execMasterService(
+ RpcController controller, ClientProtos.CoprocessorServiceRequest request)
+ throws ServiceException {
+ return stub.execMasterService(controller, request);
+ }
+
+ @Override
+ public MasterProtos.SnapshotResponse snapshot(RpcController controller,
+ MasterProtos.SnapshotRequest request) throws ServiceException {
+ return stub.snapshot(controller, request);
+ }
+
+ @Override
+ public MasterProtos.GetCompletedSnapshotsResponse getCompletedSnapshots(
+ RpcController controller, MasterProtos.GetCompletedSnapshotsRequest request)
+ throws ServiceException {
+ return stub.getCompletedSnapshots(controller, request);
+ }
+
+ @Override
+ public MasterProtos.DeleteSnapshotResponse deleteSnapshot(RpcController controller,
+ MasterProtos.DeleteSnapshotRequest request) throws ServiceException {
+ return stub.deleteSnapshot(controller, request);
+ }
+
+ @Override
+ public MasterProtos.IsSnapshotDoneResponse isSnapshotDone(RpcController controller,
+ MasterProtos.IsSnapshotDoneRequest request) throws ServiceException {
+ return stub.isSnapshotDone(controller, request);
+ }
+
+ @Override
+ public MasterProtos.RestoreSnapshotResponse restoreSnapshot(
+ RpcController controller, MasterProtos.RestoreSnapshotRequest request)
+ throws ServiceException {
+ return stub.restoreSnapshot(controller, request);
+ }
+
+ @Override
+ public MasterProtos.IsRestoreSnapshotDoneResponse isRestoreSnapshotDone(
+ RpcController controller, MasterProtos.IsRestoreSnapshotDoneRequest request)
+ throws ServiceException {
+ return stub.isRestoreSnapshotDone(controller, request);
+ }
+
+ @Override
+ public MasterProtos.ExecProcedureResponse execProcedure(
+ RpcController controller, MasterProtos.ExecProcedureRequest request)
+ throws ServiceException {
+ return stub.execProcedure(controller, request);
+ }
+
+ @Override
+ public MasterProtos.ExecProcedureResponse execProcedureWithRet(
+ RpcController controller, MasterProtos.ExecProcedureRequest request)
+ throws ServiceException {
+ return stub.execProcedureWithRet(controller, request);
+ }
+
+ @Override
+ public MasterProtos.IsProcedureDoneResponse isProcedureDone(RpcController controller,
+ MasterProtos.IsProcedureDoneRequest request) throws ServiceException {
+ return stub.isProcedureDone(controller, request);
+ }
+
+ @Override
+ public MasterProtos.IsMasterRunningResponse isMasterRunning(
+ RpcController controller, MasterProtos.IsMasterRunningRequest request)
+ throws ServiceException {
+ return stub.isMasterRunning(controller, request);
+ }
+
+ @Override
+ public MasterProtos.ModifyNamespaceResponse modifyNamespace(RpcController controller,
+ MasterProtos.ModifyNamespaceRequest request)
+ throws ServiceException {
+ return stub.modifyNamespace(controller, request);
+ }
+
+ @Override
+ public MasterProtos.CreateNamespaceResponse createNamespace(
+ RpcController controller, MasterProtos.CreateNamespaceRequest request) throws ServiceException {
+ return stub.createNamespace(controller, request);
+ }
+
+ @Override
+ public MasterProtos.DeleteNamespaceResponse deleteNamespace(
+ RpcController controller, MasterProtos.DeleteNamespaceRequest request) throws ServiceException {
+ return stub.deleteNamespace(controller, request);
+ }
+
+ @Override
+ public MasterProtos.GetNamespaceDescriptorResponse getNamespaceDescriptor(RpcController controller,
+ MasterProtos.GetNamespaceDescriptorRequest request) throws ServiceException {
+ return stub.getNamespaceDescriptor(controller, request);
+ }
+
+ @Override
+ public MasterProtos.ListNamespaceDescriptorsResponse listNamespaceDescriptors(RpcController controller,
+ MasterProtos.ListNamespaceDescriptorsRequest request) throws ServiceException {
+ return stub.listNamespaceDescriptors(controller, request);
+ }
+
+ @Override
+ public MasterProtos.ListTableDescriptorsByNamespaceResponse listTableDescriptorsByNamespace(
+ RpcController controller, MasterProtos.ListTableDescriptorsByNamespaceRequest request)
+ throws ServiceException {
+ return stub.listTableDescriptorsByNamespace(controller, request);
+ }
+
+ @Override
+ public MasterProtos.ListTableNamesByNamespaceResponse listTableNamesByNamespace(
+ RpcController controller, MasterProtos.ListTableNamesByNamespaceRequest request)
+ throws ServiceException {
+ return stub.listTableNamesByNamespace(controller, request);
+ }
+
+ @Override
+ public MasterProtos.GetTableStateResponse getTableState(
+ RpcController controller, MasterProtos.GetTableStateRequest request)
+ throws ServiceException {
+ return stub.getTableState(controller, request);
+ }
+
+ @Override
+ public void close() {
+ release(this.mss);
+ }
+
+ @Override
+ public MasterProtos.GetSchemaAlterStatusResponse getSchemaAlterStatus(
+ RpcController controller, MasterProtos.GetSchemaAlterStatusRequest request)
+ throws ServiceException {
+ return stub.getSchemaAlterStatus(controller, request);
+ }
+
+ @Override
+ public MasterProtos.GetTableDescriptorsResponse getTableDescriptors(
+ RpcController controller, MasterProtos.GetTableDescriptorsRequest request)
+ throws ServiceException {
+ return stub.getTableDescriptors(controller, request);
+ }
+
+ @Override
+ public MasterProtos.GetTableNamesResponse getTableNames(
+ RpcController controller, MasterProtos.GetTableNamesRequest request)
+ throws ServiceException {
+ return stub.getTableNames(controller, request);
+ }
+
+ @Override
+ public MasterProtos.GetClusterStatusResponse getClusterStatus(
+ RpcController controller, MasterProtos.GetClusterStatusRequest request)
+ throws ServiceException {
+ return stub.getClusterStatus(controller, request);
+ }
+
+ @Override
+ public MasterProtos.SetQuotaResponse setQuota(
+ RpcController controller, MasterProtos.SetQuotaRequest request)
+ throws ServiceException {
+ return stub.setQuota(controller, request);
+ }
+
+ @Override
+ public MasterProtos.MajorCompactionTimestampResponse getLastMajorCompactionTimestamp(
+ RpcController controller, MasterProtos.MajorCompactionTimestampRequest request)
+ throws ServiceException {
+ return stub.getLastMajorCompactionTimestamp(controller, request);
+ }
+
+ @Override
+ public MasterProtos.MajorCompactionTimestampResponse getLastMajorCompactionTimestampForRegion(
+ RpcController controller, MasterProtos.MajorCompactionTimestampForRegionRequest request)
+ throws ServiceException {
+ return stub.getLastMajorCompactionTimestampForRegion(controller, request);
+ }
+ };
+ }
+
+
+ private static void release(MasterServiceState mss) {
+ if (mss != null && mss.connection != null) {
+ ((HConnectionImplementation)mss.connection).releaseMaster(mss);
+ }
+ }
+
+ private boolean isKeepAliveMasterConnectedAndRunning(MasterServiceState mss) {
+ if (mss.getStub() == null){
+ return false;
+ }
+ try {
+ return mss.isMasterRunning();
+ } catch (UndeclaredThrowableException e) {
+ // It's somehow messy, but we can receive exceptions such as
+ // java.net.ConnectException but they're not declared. So we catch it...
+ LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable());
+ return false;
+ } catch (ServiceException se) {
+ LOG.warn("Checking master connection", se);
+ return false;
+ }
+ }
+
+ void releaseMaster(MasterServiceState mss) {
+ if (mss.getStub() == null) return;
+ synchronized (masterAndZKLock) {
+ --mss.userCount;
+ }
+ }
+
+ private void closeMasterService(MasterServiceState mss) {
+ if (mss.getStub() != null) {
+ LOG.info("Closing master protocol: " + mss);
+ mss.clearStub();
+ }
+ mss.userCount = 0;
+ }
+
+ /**
+ * Immediate close of the shared master. Can be by the delayed close or when closing the
+ * connection itself.
+ */
+ private void closeMaster() {
+ synchronized (masterAndZKLock) {
+ closeMasterService(masterServiceState);
+ }
+ }
+
+ void updateCachedLocation(HRegionInfo hri, ServerName source,
+ ServerName serverName, long seqNum) {
+ HRegionLocation newHrl = new HRegionLocation(hri, serverName, seqNum);
+ cacheLocation(hri.getTable(), source, newHrl);
+ }
+
+ @Override
+ public void deleteCachedRegionLocation(final HRegionLocation location) {
+ metaCache.clearCache(location);
+ }
+
+ @Override
+ public void updateCachedLocations(final TableName tableName, byte[] rowkey,
+ final Object exception, final HRegionLocation source) {
+ assert source != null;
+ updateCachedLocations(tableName, source.getRegionInfo().getRegionName()
+ , rowkey, exception, source.getServerName());
+ }
+
+ /**
+ * Update the location with the new value (if the exception is a RegionMovedException)
+ * or delete it from the cache. Does nothing if we can be sure from the exception that
+ * the location is still accurate, or if the cache has already been updated.
+ * @param exception an object (to simplify user code) on which we will try to find a nested
+ * or wrapped or both RegionMovedException
+ * @param source server that is the source of the location update.
+ */
+ @Override
+ public void updateCachedLocations(final TableName tableName, byte[] regionName, byte[] rowkey,
+ final Object exception, final ServerName source) {
+ if (rowkey == null || tableName == null) {
+ LOG.warn("Coding error, see method javadoc. row=" + (rowkey == null ? "null" : rowkey) +
+ ", tableName=" + (tableName == null ? "null" : tableName));
+ return;
+ }
+
+ if (source == null) {
+ // This should not happen, but let's secure ourselves.
+ return;
+ }
+
+ if (regionName == null) {
+ // we do not know which region, so just remove the cache entry for the row and server
+ metaCache.clearCache(tableName, rowkey, source);
+ return;
+ }
+
+ // Is it something we have already updated?
+ final RegionLocations oldLocations = getCachedLocation(tableName, rowkey);
+ HRegionLocation oldLocation = null;
+ if (oldLocations != null) {
+ oldLocation = oldLocations.getRegionLocationByRegionName(regionName);
+ }
+ if (oldLocation == null || !source.equals(oldLocation.getServerName())) {
+ // There is no such location in the cache (it's been removed already) or
+ // the cache has already been refreshed with a different location. => nothing to do
+ return;
+ }
+
+ HRegionInfo regionInfo = oldLocation.getRegionInfo();
+ Throwable cause = ConnectionManager.findException(exception);
+ if (cause != null) {
+ if (cause instanceof RegionTooBusyException || cause instanceof RegionOpeningException) {
+ // We know that the region is still on this region server
+ return;
+ }
+
+ if (cause instanceof RegionMovedException) {
+ RegionMovedException rme = (RegionMovedException) cause;
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Region " + regionInfo.getRegionNameAsString() + " moved to " +
+ rme.getHostname() + ":" + rme.getPort() +
+ " according to " + source.getHostAndPort());
+ }
+ // We know that the region is not anymore on this region server, but we know
+ // the new location.
+ updateCachedLocation(
+ regionInfo, source, rme.getServerName(), rme.getLocationSeqNum());
+ return;
+ }
+ }
+
+ // If we're here, it means that can cannot be sure about the location, so we remove it from
+ // the cache. Do not send the source because source can be a new server in the same host:port
+ metaCache.clearCache(regionInfo);
+ }
+
+ @Override
+ public void updateCachedLocations(final byte[] tableName, byte[] rowkey,
+ final Object exception, final HRegionLocation source) {
+ updateCachedLocations(TableName.valueOf(tableName), rowkey, exception, source);
+ }
+
+ /**
+ * @deprecated since 0.96 - Use {@link org.apache.hadoop.hbase.client.HTableInterface#batch} instead
+ */
+ @Override
+ @Deprecated
+ public void processBatch(List extends Row> list,
+ final TableName tableName,
+ ExecutorService pool,
+ Object[] results) throws IOException, InterruptedException {
+ // This belongs in HTable!!! Not in here. St.Ack
+
+ // results must be the same size as list
+ if (results.length != list.size()) {
+ throw new IllegalArgumentException(
+ "argument results must be the same size as argument list");
+ }
+ processBatchCallback(list, tableName, pool, results, null);
+ }
+
+ /**
+ * @deprecated Unsupported API
+ */
+ @Override
+ @Deprecated
+ public void processBatch(List extends Row> list,
+ final byte[] tableName,
+ ExecutorService pool,
+ Object[] results) throws IOException, InterruptedException {
+ processBatch(list, TableName.valueOf(tableName), pool, results);
+ }
+
+ /**
+ * Send the queries in parallel on the different region servers. Retries on failures.
+ * If the method returns it means that there is no error, and the 'results' array will
+ * contain no exception. On error, an exception is thrown, and the 'results' array will
+ * contain results and exceptions.
+ * @deprecated since 0.96 - Use {@link org.apache.hadoop.hbase.client.HTable#processBatchCallback} instead
+ */
+ @Override
+ @Deprecated
+ public
- * HConnection connection = HConnectionManager.createConnection(config);
- * HTableInterface table = connection.getTable(TableName.valueOf("table1"));
- * try {
- * // Use the table as needed, for a single operation and a single thread
- * } finally {
- * table.close();
- * connection.close();
- * }
- *
- *
- * {@code
- * conf.set("hbase.client.instance.id", "12345");
- * HConnection connection = HConnectionManager.getConnection(conf);
- * // Use the connection to your hearts' delight and then when done...
- * conf.set("hbase.client.instance.id", "12345");
- * HConnectionManager.deleteConnection(conf, true);
- * }
- *
- * conf configuration instance.
- * If no current connection exists, method creates a new connection and keys it using
- * connection-specific properties from the passed {@link Configuration}; see
- * {@link HConnectionKey}.
- * @param conf configuration
- * @return HConnection object for conf
- * @deprecated connection caching is going away
- */
- @Deprecated
- public static HConnection getConnection(final Configuration conf) throws IOException {
- return ConnectionManager.getConnectionInternal(conf);
- }
-
- /**
- * Create a new HConnection instance using the passed conf instance.
- *
- * HConnection connection = HConnectionManager.createConnection(conf);
- * HTableInterface table = connection.getTable("mytable");
- * try {
- * table.get(...);
- * ...
- * } finally {
- * table.close();
- * connection.close();
- * }
- *
- *
- * @param conf configuration
- * @return HConnection object for conf
- * @deprecated in favor of {@link Connection} and {@link ConnectionFactory}
- */
- @Deprecated
- public static HConnection createConnection(Configuration conf) throws IOException {
- return ConnectionManager.createConnectionInternal(conf);
- }
-
-
- /**
- * Create a new HConnection instance using the passed conf instance.
- *
- * ExecutorService pool = ...;
- * HConnection connection = HConnectionManager.createConnection(conf, pool);
- * HTableInterface table = connection.getTable("mytable");
- * table.get(...);
- * ...
- * table.close();
- * connection.close();
- *
- * @param conf configuration
- * @param pool the thread pool to use for batch operation in HTables used via this HConnection
- * @return HConnection object for conf
- * @deprecated in favor of {@link Connection} and {@link ConnectionFactory}
- */
- @Deprecated
- public static HConnection createConnection(Configuration conf, ExecutorService pool)
- throws IOException {
- return ConnectionManager.createConnection(conf, pool);
- }
-
- /**
- * Create a new HConnection instance using the passed conf instance.
- *
- * ExecutorService pool = ...;
- * HConnection connection = HConnectionManager.createConnection(conf, pool);
- * HTableInterface table = connection.getTable("mytable");
- * table.get(...);
- * ...
- * table.close();
- * connection.close();
- *
- * @param conf configuration
- * @param user the user the connection is for
- * @return HConnection object for conf
- * @deprecated in favor of {@link Connection} and {@link ConnectionFactory}
- */
- @Deprecated
- public static HConnection createConnection(Configuration conf, User user)
- throws IOException {
- return ConnectionManager.createConnection(conf, user);
- }
-
- /**
- * Create a new HConnection instance using the passed conf instance.
- *
- * ExecutorService pool = ...;
- * HConnection connection = HConnectionManager.createConnection(conf, pool);
- * HTableInterface table = connection.getTable("mytable");
- * table.get(...);
- * ...
- * table.close();
- * connection.close();
- *
- * @param conf configuration
- * @param pool the thread pool to use for batch operation in HTables used via this HConnection
- * @param user the user the connection is for
- * @return HConnection object for conf
- * @deprecated in favor of {@link Connection} and {@link ConnectionFactory}
- */
- @Deprecated
- public static HConnection createConnection(Configuration conf, ExecutorService pool, User user)
- throws IOException {
- return ConnectionManager.createConnection(conf, pool, user);
- }
-
- /**
- * @deprecated in favor of {@link Connection} and {@link ConnectionFactory}
- */
- @Deprecated
- static HConnection createConnection(final Configuration conf, final boolean managed)
- throws IOException {
- return ConnectionManager.createConnection(conf, managed);
- }
-
- /**
- * @deprecated in favor of {@link Connection} and {@link ConnectionFactory}
- */
- @Deprecated
- static ClusterConnection createConnection(final Configuration conf, final boolean managed,
- final ExecutorService pool, final User user) throws IOException {
- return ConnectionManager.createConnection(conf, managed, pool, user);
- }
-
- /**
- * Delete connection information for the instance specified by passed configuration.
- * If there are no more references to the designated connection connection, this method will
- * then close connection to the zookeeper ensemble and let go of all associated resources.
- *
- * @param conf configuration whose identity is used to find {@link HConnection} instance.
- * @deprecated connection caching is going away.
- */
- @Deprecated
- public static void deleteConnection(Configuration conf) {
- ConnectionManager.deleteConnection(conf);
- }
-
- /**
- * Cleanup a known stale connection.
- * This will then close connection to the zookeeper ensemble and let go of all resources.
- *
- * @param connection
- * @deprecated connection caching is going away.
- */
- @Deprecated
- public static void deleteStaleConnection(HConnection connection) {
- ConnectionManager.deleteStaleConnection(connection);
- }
-
- /**
- * Delete information for all connections. Close or not the connection, depending on the
- * staleConnection boolean and the ref count. By default, you should use it with
- * staleConnection to true.
- * @deprecated connection caching is going away.
- */
- @Deprecated
- public static void deleteAllConnections(boolean staleConnection) {
- ConnectionManager.deleteAllConnections(staleConnection);
- }
-
- /**
- * Delete information for all connections..
- * @deprecated kept for backward compatibility, but the behavior is broken. HBASE-8983
- */
- @Deprecated
- public static void deleteAllConnections() {
- ConnectionManager.deleteAllConnections();
- }
-
- /**
- * This convenience method invokes the given {@link HConnectable#connect}
- * implementation using a {@link HConnection} instance that lasts just for the
- * duration of the invocation.
- *
- * @param conf
* configuration instance. Minimally the mock will return
* conf when {@link ClusterConnection#getConfiguration()} is invoked.
* Be sure to shutdown the connection when done by calling
- * {@link HConnectionManager#deleteConnection(Configuration)} else it
- * will stick around; this is probably not what you want.
+ * {@link Connection#close()} else it will stick around; this is probably not what you want.
* @param conf configuration
* @return HConnection object for conf
* @throws ZooKeeperConnectionException
@@ -71,9 +69,8 @@ public class HConnectionTestingUtility {
* Calls {@link #getMockedConnection(Configuration)} and then mocks a few
* more of the popular {@link ClusterConnection} methods so they do 'normal'
* operation (see return doc below for list). Be sure to shutdown the
- * connection when done by calling
- * {@link HConnectionManager#deleteConnection(Configuration)} else it
- * will stick around; this is probably not what you want.
+ * connection when done by calling {@link Connection#close()} else it will stick around;
+ * this is probably not what you want.
*
* @param conf Configuration to use
* @param admin An AdminProtocol; can be null but is usually
@@ -92,8 +89,7 @@ public class HConnectionTestingUtility {
* {@link ClusterConnection#getAdmin(ServerName)} is called, returns the passed
* {@link ClientProtos.ClientService.BlockingInterface} instance when
* {@link ClusterConnection#getClient(ServerName)} is called (Be sure to call
- * {@link HConnectionManager#deleteConnection(Configuration)}
- * when done with this mocked Connection.
+ * {@link Connection#close()} when done with this mocked Connection.
* @throws IOException
*/
public static ClusterConnection getMockedConnectionAndDecorate(final Configuration conf,
@@ -146,8 +142,7 @@ public class HConnectionTestingUtility {
* Get a Mockito spied-upon {@link ClusterConnection} that goes with the passed
* conf configuration instance.
* Be sure to shutdown the connection when done by calling
- * {@link HConnectionManager#deleteConnection(Configuration)} else it
- * will stick around; this is probably not what you want.
+ * {@link Connection#close()} else it will stick around; this is probably not what you want.
* @param conf configuration
* @return HConnection object for conf
* @throws ZooKeeperConnectionException
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
index b3c631a..a4ceaa8 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
@@ -285,96 +285,6 @@ public class TestFromClientSide {
table.close();
}
- /**
- * @deprecated Tests deprecated functionality. Remove when we are past 1.0.
- * @throws Exception
- */
- @Deprecated
- @Test
- public void testSharedZooKeeper() throws Exception {
- Configuration newConfig = new Configuration(TEST_UTIL.getConfiguration());
- newConfig.set(HConstants.HBASE_CLIENT_INSTANCE_ID, "12345");
-
- // First with a simple ZKW
- ZooKeeperWatcher z0 = new ZooKeeperWatcher(
- newConfig, "hconnection", new Abortable() {
- @Override public void abort(String why, Throwable e) {}
- @Override public boolean isAborted() {return false;}
- });
- z0.getRecoverableZooKeeper().getZooKeeper().exists("/oldZooKeeperWatcher", false);
- z0.close();
-
- // Then a ZooKeeperKeepAliveConnection
- ConnectionManager.HConnectionImplementation connection1 =
- (ConnectionManager.HConnectionImplementation)
- HConnectionManager.getConnection(newConfig);
-
- ZooKeeperKeepAliveConnection z1 = connection1.getKeepAliveZooKeeperWatcher();
- z1.getRecoverableZooKeeper().getZooKeeper().exists("/z1", false);
-
- z1.close();
-
- // will still work, because the real connection is not closed yet
- // Not do be done in real code
- z1.getRecoverableZooKeeper().getZooKeeper().exists("/z1afterclose", false);
-
-
- ZooKeeperKeepAliveConnection z2 = connection1.getKeepAliveZooKeeperWatcher();
- assertTrue(
- "ZooKeeperKeepAliveConnection equals on same connection", z1 == z2);
-
-
-
- Configuration newConfig2 = new Configuration(TEST_UTIL.getConfiguration());
- newConfig2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, "6789");
- ConnectionManager.HConnectionImplementation connection2 =
- (ConnectionManager.HConnectionImplementation)
- HConnectionManager.getConnection(newConfig2);
-
- assertTrue("connections should be different ", connection1 != connection2);
-
- ZooKeeperKeepAliveConnection z3 = connection2.getKeepAliveZooKeeperWatcher();
- assertTrue(
- "ZooKeeperKeepAliveConnection should be different" +
- " on different connections", z1 != z3);
-
- // Bypass the private access
- Method m = ConnectionManager.HConnectionImplementation.class.
- getDeclaredMethod("closeZooKeeperWatcher");
- m.setAccessible(true);
- m.invoke(connection2);
-
- ZooKeeperKeepAliveConnection z4 = connection2.getKeepAliveZooKeeperWatcher();
- assertTrue(
- "ZooKeeperKeepAliveConnection should be recreated" +
- " when previous connections was closed"
- , z3 != z4);
-
-
- z2.getRecoverableZooKeeper().getZooKeeper().exists("/z2", false);
- z4.getRecoverableZooKeeper().getZooKeeper().exists("/z4", false);
-
-
- HConnectionManager.deleteConnection(newConfig);
- try {
- z2.getRecoverableZooKeeper().getZooKeeper().exists("/z2", false);
- assertTrue("We should not have a valid connection for z2", false);
- } catch (Exception e){
- }
-
- z4.getRecoverableZooKeeper().getZooKeeper().exists("/z4", false);
- // We expect success here.
-
-
- HConnectionManager.deleteConnection(newConfig2);
- try {
- z4.getRecoverableZooKeeper().getZooKeeper().exists("/z4", false);
- assertTrue("We should not have a valid connection for z4", false);
- } catch (Exception e){
- }
- }
-
-
/**
* Verifies that getConfiguration returns the same Configuration object used
* to create the HTable instance.
@@ -4127,7 +4037,7 @@ public class TestFromClientSide {
*/
HTable createUnmangedHConnectionHTable(final TableName tableName) throws IOException {
TEST_UTIL.createTable(tableName, HConstants.CATALOG_FAMILY);
- HConnection conn = HConnectionManager.createConnection(TEST_UTIL.getConfiguration());
+ HConnection conn = ConnectionManager.createConnection(TEST_UTIL.getConfiguration());
return (HTable)conn.getTable(tableName);
}
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
index 5d284a2..1fba87d 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
@@ -29,7 +29,6 @@ import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.net.SocketTimeoutException;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -56,7 +55,6 @@ import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
-import org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -155,8 +153,8 @@ public class TestHCM {
new SynchronousQueue