diff --git src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java index 235c078..0d167b7 100644 --- src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java +++ src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java @@ -1131,12 +1131,19 @@ public class HBaseClient { * refs for keys in HashMap properly. For now its ok. */ ConnectionId remoteId = new ConnectionId(addr, protocol, ticket, rpcTimeout); - synchronized (connections) { - connection = connections.get(remoteId); - if (connection == null) { - connection = createConnection(remoteId); - connections.put(remoteId, connection); - } + connection = connections.get(remoteId); + if (connection == null) { + // It is possible that a connection was created at this time which could + // have been used - however we always cap connections to a limit in the pool + // so we won't create too many of them. Connections are also thread safe + // and are reused across threads. + connection = createConnection(remoteId); + // Note that connections.put() is not atomic. It first checks the size and + // then adds the connection, so its possible that two threads check the same + // size and add the connection incrementing the connection pool size by 2. + // This is a bug in RoundRobinPool. However unused connections self + // destruct after their idle time is over. + connections.put(remoteId, connection); } connection.addCall(call); diff --git src/main/java/org/apache/hadoop/hbase/util/PoolMap.java src/main/java/org/apache/hadoop/hbase/util/PoolMap.java index 1956e6b..2f588df 100644 --- src/main/java/org/apache/hadoop/hbase/util/PoolMap.java +++ src/main/java/org/apache/hadoop/hbase/util/PoolMap.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; @@ -55,7 +56,7 @@ public class PoolMap implements Map { private int poolMaxSize; - private Map> pools = new ConcurrentHashMap>(); + private ConcurrentMap> pools = new ConcurrentHashMap>(); public PoolMap(PoolType poolType) { this.poolType = poolType; @@ -74,11 +75,9 @@ public class PoolMap implements Map { @Override public V put(K key, V value) { - Pool pool = pools.get(key); - if (pool == null) { - pools.put(key, pool = createPool()); - } - return pool != null ? pool.put(value) : null; + Pool pool = createPool(); + Pool returnPool = pools.putIfAbsent(key, pool); + return returnPool != null ? returnPool.put(value) : pool.put(value); } @SuppressWarnings("unchecked")