commit cdf91c0ab7efee5edb89dcd5830d812403eddaf6 Author: nspiegelberg Date: 84 seconds ago HBASE-3653 : Parallelize Server Requests on HBase Client diff --git a/CHANGES.txt b/CHANGES.txt index 60fc8e6..3d2a9d0 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -113,11 +113,12 @@ Release 0.91.0 - Unreleased HBASE-2495 Allow record filtering with selected row key values in HBase Export (Subbu M Iyer via Stack) HBASE-3600 Update our jruby to 1.6.0 - + TASK HBASE-3559 Move report of split to master OFF the heartbeat channel HBASE-3573 Move shutdown messaging OFF hearbeat; prereq for fix of hbase-1502 + HBASE-3653 Parallelize Server Requests on HBase Client NEW FEATURES diff --git a/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java b/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java index 3d2c5ea..644de1f 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java +++ b/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java @@ -240,6 +240,7 @@ public class HConnectionManager { private final Map servers = new ConcurrentHashMap(); + private final ConcurrentHashMap connectionLock = new ConcurrentHashMap(); /** * Map of table to table {@link HRegionLocation}s. The table key is made @@ -941,21 +942,30 @@ public class HConnectionManager { getMaster(); } HRegionInterface server; - synchronized (this.servers) { - // See if we already have a connection - server = this.servers.get(regionServer.toString()); - if (server == null) { // Get a connection - try { - server = (HRegionInterface)HBaseRPC.waitForProxy( - serverInterfaceClass, HRegionInterface.VERSION, - regionServer.getInetSocketAddress(), this.conf, - this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout); - } catch (RemoteException e) { - LOG.warn("RemoteException connecting to RS", e); - // Throw what the RemoteException was carrying. - throw RemoteExceptionHandler.decodeRemoteException(e); + String rsName = regionServer.toString(); + // See if we already have a connection (common case) + server = this.servers.get(rsName); + if (server == null) { + // create a unique lock for this RS (if necessary) + this.connectionLock.putIfAbsent(rsName, rsName); + // get the RS lock + synchronized (this.connectionLock.get(rsName)) { + // do one more lookup in case we were stalled above + server = this.servers.get(rsName); + if (server == null) { + try { + // definitely a cache miss. establish an RPC for this RS + server = (HRegionInterface) HBaseRPC.waitForProxy( + serverInterfaceClass, HRegionInterface.VERSION, + regionServer.getInetSocketAddress(), this.conf, + this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout); + this.servers.put(rsName, server); + } catch (RemoteException e) { + LOG.warn("RemoteException connecting to RS", e); + // Throw what the RemoteException was carrying. + throw RemoteExceptionHandler.decodeRemoteException(e); + } } - this.servers.put(regionServer.toString(), server); } } return server;