diff --git src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java index 235c078..363eb4f 100644 --- src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java +++ src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java @@ -1131,12 +1131,19 @@ public class HBaseClient { * refs for keys in HashMap properly. For now its ok. */ ConnectionId remoteId = new ConnectionId(addr, protocol, ticket, rpcTimeout); - synchronized (connections) { - connection = connections.get(remoteId); - if (connection == null) { - connection = createConnection(remoteId); - connections.put(remoteId, connection); - } + connection = connections.get(remoteId); + if (connection == null) { + // It is possible that a connection was created at this time which could + // have been used - however we always cap connections to a limit in the pool + // so we won't create too many of them. Connections are also thread safe + // and are reused across threads. + connection = createConnection(remoteId); + // Note that connections.put() is not atomic. It first checks whether there + // is a pool for this server endpoint and does a putIfAbsent to create a pool. + // Once the pool is created, we add the connection to the pool. So, this + // connection may not eventually get added to the pool. But the connection + // if unused should self destruct after some time. + connections.put(remoteId, connection); } connection.addCall(call); diff --git src/main/java/org/apache/hadoop/hbase/util/PoolMap.java src/main/java/org/apache/hadoop/hbase/util/PoolMap.java index 1956e6b..dc1368c 100644 --- src/main/java/org/apache/hadoop/hbase/util/PoolMap.java +++ src/main/java/org/apache/hadoop/hbase/util/PoolMap.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; @@ -55,15 +56,18 @@ public class PoolMap implements Map { private int poolMaxSize; - private Map> pools = new ConcurrentHashMap>(); + private ConcurrentMap> pools = new ConcurrentHashMap>(); + private Pool singletonPool; public PoolMap(PoolType poolType) { this.poolType = poolType; + this.singletonPool = createPool(); } public PoolMap(PoolType poolType, int poolMaxSize) { this.poolType = poolType; this.poolMaxSize = poolMaxSize; + this.singletonPool = createPool(); } @Override @@ -74,11 +78,14 @@ public class PoolMap implements Map { @Override public V put(K key, V value) { - Pool pool = pools.get(key); - if (pool == null) { - pools.put(key, pool = createPool()); + Pool returnPool = pools.putIfAbsent(key, singletonPool); + if (returnPool != null) { + return returnPool.put(value); + } else { + V returnValue = singletonPool.put(value); + singletonPool = createPool(); + return returnValue; } - return pool != null ? pool.put(value) : null; } @SuppressWarnings("unchecked") @@ -349,7 +356,7 @@ public class PoolMap implements Map { } @Override - public R put(R resource) { + public synchronized R put(R resource) { if (size() < maxSize) { add(resource); }