commit d13b3ccf8c94edf2e445e68f19b58dfa672edb75 Author: karthick Date: Tue Oct 19 23:23:26 2010 -0400 HBASE-2939 Allow Client-Side Connection Pooling In 0.20.6 diff --git a/src/java/org/apache/hadoop/hbase/HConstants.java b/src/java/org/apache/hadoop/hbase/HConstants.java index b8e6f25..e6841f2 100644 --- a/src/java/org/apache/hadoop/hbase/HConstants.java +++ b/src/java/org/apache/hadoop/hbase/HConstants.java @@ -141,7 +141,13 @@ public interface HConstants { /** Maximum value length, enforced on KeyValue construction */ static final int MAXIMUM_VALUE_LENGTH = Integer.MAX_VALUE; - + + /** Parameter name for HBase client IPC pool type */ + public static final String HBASE_CLIENT_IPC_POOL_TYPE = "hbase.client.ipc.pool.type"; + + /** Parameter name for HBase client IPC pool size */ + public static final String HBASE_CLIENT_IPC_POOL_SIZE = "hbase.client.ipc.pool.size"; + // Always store the location of the root table's HRegion. // This HRegion is never split. diff --git a/src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java b/src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java index cbe0e1a..b4847d3 100644 --- a/src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java +++ b/src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java @@ -34,6 +34,7 @@ import java.io.InputStream; import java.util.Hashtable; import java.util.Iterator; +import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -53,6 +54,10 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.PoolMap; +import org.apache.hadoop.hbase.util.PoolMap.PoolType; + /** A client for an IPC service. IPC calls take a single {@link Writable} as a * parameter, and return a {@link Writable} as their value. A service runs on * a port and is defined by a parameter class and a value class. @@ -66,8 +71,7 @@ public class HBaseClient { public static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.HBaseClient"); - protected Hashtable connections = - new Hashtable(); + protected final Map connections; protected Class valueClass; // class of call values protected int counter; // counter for call ids @@ -650,6 +654,8 @@ public class HBaseClient { } this.conf = conf; this.socketFactory = factory; + this.connections = new PoolMap(getPoolType(conf), + + getPoolSize(conf)); } /** @@ -669,6 +675,33 @@ public class HBaseClient { return socketFactory; } + + /** + * Return the pool type specified in the configuration, if it roughly equals either + * the name of {@link PoolType#Reusable} or {@link PoolType#ThreadLocal}, otherwise + * default to the former type. + * + * @param config configuration + * @return either a {@link PoolType#Reusable} or {@link PoolType#ThreadLocal} + */ + private static PoolType getPoolType(Configuration config) { + return PoolType.valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE), + PoolType.RoundRobin, PoolType.ThreadLocal); + } + + /** + * Return the pool size specified in the configuration, otherwise the maximum allowable + * size (which for all intents and purposes represents an unbounded pool). + * + * @param config + * @return the maximum pool size + */ + private static int getPoolSize(Configuration config) { + return config + .getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, PoolType.RoundRobin + .equals(getPoolType(config)) ? 1 : Integer.MAX_VALUE); + } + /** Stop all threads related to this client. No further calls may be made * using this client. */ public void stop() { diff --git a/src/java/org/apache/hadoop/hbase/util/PoolMap.java b/src/java/org/apache/hadoop/hbase/util/PoolMap.java new file mode 100644 index 0000000..09d3dea --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/util/PoolMap.java @@ -0,0 +1,257 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * + * The PoolMap maps a key to a collection of values, the elements + * of which are managed by a pool. In effect, that collection acts as a shared + * pool of resources, access to which is closely controlled as per the semantics + * of the pool. + * + *

+ * In case the size of the pool is set to a non-zero positive number, that is + * used to cap the number of resources that a pool may contain for any given + * key. A size of {@link Integer#MAX_VALUE} is interpreted as an unbounded pool. + *

+ * + * @param + * the type of the key to the resource + * @param + * the type of the resource being pooled + * + * @author Karthick Sankarachary + */ +public class PoolMap implements Map { + private PoolType poolType; + + private int poolMaxSize; + + private Map> pools = Collections + .synchronizedMap(new HashMap>()); + + public PoolMap(PoolType poolType, int poolMaxSize) { + this.poolType = poolType; + this.poolMaxSize = poolMaxSize; + } + + @Override + public V get(Object key) { + Pool pool = pools.get(key); + return pool != null ? pool.get() : null; + } + + @Override + public V put(K key, V value) { + Pool pool = pools.get(key); + if (pool == null) { + synchronized (this) { + pool = pools.get(key); + if (pool == null) { + pools.put(key, pool = createPool()); + } + } + } + return pool != null ? pool.put(value) : null; + } + + @Override + public V remove(Object key) { + Pool pool = pools.remove(key); + if (pool != null) { + pool.clear(); + } + return null; + } + + public boolean remove(K key, V value) { + Pool pool = pools.get(key); + return pool != null ? pool.remove(value) : false; + } + + @Override + public Collection values() { + Collection values = new ArrayList(); + for (Pool pool : pools.values()) { + Collection poolValues = pool.values(); + if (poolValues != null) { + values.addAll(poolValues); + } + } + return values; + } + + @Override + public boolean isEmpty() { + return pools.isEmpty(); + } + + @Override + public int size() { + return pools.size(); + } + + public int size(K key) { + Pool pool = pools.get(key); + return pool != null ? pool.size() : 0; + } + + @Override + public boolean containsKey(Object key) { + return pools.containsKey(key); + } + + @Override + public boolean containsValue(Object value) { + if (value == null) { + return false; + } + for (Pool pool : pools.values()) { + if (value.equals(pool.get())) { + return true; + } + } + return false; + } + + @Override + public void putAll(Map map) { + for (Map.Entry entry : map.entrySet()) { + put(entry.getKey(), entry.getValue()); + } + } + + @Override + public void clear() { + for (Pool pool : pools.values()) { + pool.clear(); + } + pools.clear(); + } + + @Override + public Set keySet() { + return pools.keySet(); + } + + @Override + public Set> entrySet() { + Set> entries = new HashSet>(); + for (Map.Entry> poolEntry : pools.entrySet()) { + final K poolKey = poolEntry.getKey(); + final Pool pool = poolEntry.getValue(); + for (final V poolValue : pool.values()) { + if (pool != null) { + entries.add(new Map.Entry() { + @Override + public K getKey() { + return poolKey; + } + + @Override + public V getValue() { + return poolValue; + } + + @Override + public V setValue(V value) { + return pool.put(value); + } + }); + } + } + } + return entries; + } + + protected interface Pool { + public R get(); + + public R put(R resource); + + public boolean remove(R resource); + + public void clear(); + + public Collection values(); + + public int size(); + } + + public enum PoolType { + Reusable("reusable"), ThreadLocal("threadlocal"), RoundRobin("roundrobin"); + + private String configName; + + PoolType(String configName) { + this.configName = configName; + } + + public String getConfigName() { + return configName; + } + + public static PoolType valueOf(String poolTypeName, + PoolType defaultPoolType, PoolType... otherPoolTypes) { + PoolType poolType = null; + if (poolTypeName != null) + poolType = PoolType.valueOf(poolTypeName); + if (poolType != null) { + boolean allowedType = false; + if (poolType.equals(defaultPoolType)) { + allowedType = true; + } else { + if (otherPoolTypes != null) { + for (PoolType allowedPoolType : otherPoolTypes) { + if (poolType.equals(allowedPoolType)) { + allowedType = true; + break; + } + } + } + } + if (!allowedType) { + poolType = null; + } + } + return (poolType != null) ? poolType : defaultPoolType; + } + } + + protected Pool createPool() { + switch (poolType) { + case Reusable: + return new ReusablePool(poolMaxSize); + case RoundRobin: + return new RoundRobinPool(poolMaxSize); + case ThreadLocal: + return new ThreadLocalPool(poolMaxSize); + } + return null; + } +} diff --git a/src/java/org/apache/hadoop/hbase/util/ReusablePool.java b/src/java/org/apache/hadoop/hbase/util/ReusablePool.java new file mode 100644 index 0000000..d249335 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/util/ReusablePool.java @@ -0,0 +1,71 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.Collection; +import java.util.LinkedList; + +import org.apache.hadoop.hbase.util.PoolMap.Pool; + +/** + * The ReusablePool represents a {@link SharedMap.Pool} that builds + * on the {@link LinkedList} class. It essentially allows resources to be + * checked out, at which point it is removed from this pool. When the resource + * is no longer required, it should be returned to the pool in order to be + * reused. + * + *

+ * If {@link #maxSize} is set to {@link Integer#MAX_VALUE}, then the size of the + * pool is unbounded. Otherwise, it caps the number of consumers that can check + * out a resource from this pool to the (non-zero positive) value specified in + * {@link #maxSize}. + *

+ * + * @author Karthick Sankarachary + * + * @param + * the type of the resource + */ +@SuppressWarnings("serial") +public class ReusablePool extends LinkedList implements Pool { + private int maxSize; + + public ReusablePool(int maxSize) { + this.maxSize = maxSize; + } + + @Override + public R get() { + return poll(); + } + + @Override + public R put(R resource) { + if (size() < maxSize) { + add(resource); + } + return null; + } + + @Override + public Collection values() { + return this; + } +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/util/RoundRobinPool.java b/src/java/org/apache/hadoop/hbase/util/RoundRobinPool.java new file mode 100644 index 0000000..b5d1233 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/util/RoundRobinPool.java @@ -0,0 +1,79 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.ArrayList; +import java.util.Collection; + +import org.apache.hadoop.hbase.util.PoolMap.Pool; + +/** + * The RoundRobinPool represents a {@link SharedMap.Pool}, which + * stores its resources in an {@link ArrayList}. It load-balances access to its + * resources by returning a different resource every time a given key is looked + * up. + * + *

+ * If {@link #maxSize} is set to {@link Integer#MAX_VALUE}, then the size of the + * pool is unbounded. Otherwise, it caps the number of resources in this pool to + * the (non-zero positive) value specified in {@link #maxSize}. + *

+ * + * @author Karthick Sankarachary + * + * @param + * the type of the resource + * + */ +@SuppressWarnings("serial") +class RoundRobinPool extends ArrayList implements Pool { + private int maxSize; + private int nextResource = 0; + + public RoundRobinPool(int maxSize) { + this.maxSize = maxSize; + } + + @Override + public R put(R resource) { + if (size() >= maxSize) { + return resource; + } + R previousResource = get(); + this.add(resource); + return previousResource; + } + + @Override + public R get() { + if (size() < maxSize) { + return null; + } + nextResource %= size(); + R resource = get(nextResource++); + return resource; + } + + @Override + public Collection values() { + return this; + } + +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/util/ThreadLocalPool.java b/src/java/org/apache/hadoop/hbase/util/ThreadLocalPool.java new file mode 100644 index 0000000..b628487 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/util/ThreadLocalPool.java @@ -0,0 +1,127 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hbase.util.PoolMap.Pool; + +/** + * The ThreadLocalPool represents a {@link PoolMap.Pool} that + * builds on the {@link ThreadLocal} class. It essentially binds the resource to + * the thread from which it is accessed. + * + *

+ * If {@link #maxSize} is set to {@link Integer#MAX_VALUE}, then the size of the + * pool is bounded only by the number of threads that add resources to this + * pool. Otherwise, it caps the number of threads that can set a value on this + * {@link ThreadLocal} instance to the (non-zero positive) value specified in + * {@link #maxSize}. + *

+ * + * + * @author Karthick Sankarachary + * + * @param + * the type of the resource + */ +class ThreadLocalPool extends ThreadLocal implements Pool { + public static Map, List> poolResources = new HashMap, List>(); + + private int maxSize; + + public ThreadLocalPool(int maxSize) { + this.maxSize = maxSize; + } + + @Override + public synchronized R put(R resource) { + R previousResource = get(); + List resources = getResources(this); + if (previousResource != null) { + resources.remove(previousResource); + } + if (resources.size() >= maxSize) { + return resource; + } + this.set(resource); + if (resource != null && resource != previousResource) { + resources.add(resource); + } + return previousResource; + } + + @Override + public void remove() { + R resource = super.get(); + super.remove(); + if (resource != null) { + getResources(this).remove(resource); + } + } + + @Override + public int size() { + return getResources(this).size(); + } + + @Override + public boolean remove(R resource) { + if (resource == null) { + return false; + } + if (resource.equals(get())) { + remove(); + return true; + } + return false; + } + + @Override + public void clear() { + poolResources.clear(); + } + + @SuppressWarnings("unchecked") + @Override + public Collection values() { + List resources = Collections.synchronizedList(new LinkedList()); + for (Object resource : getResources(this)) { + if (resource != null) { + resources.add((R) resource); + } + } + return resources; + } + + private synchronized static List getResources(ThreadLocalPool pool) { + List resources = poolResources.get(pool); + if (resources == null) { + resources = new LinkedList(); + poolResources.put(pool, resources); + } + return resources; + } +} \ No newline at end of file diff --git a/src/test/org/apache/hadoop/hbase/util/TestPoolMap.java b/src/test/org/apache/hadoop/hbase/util/TestPoolMap.java new file mode 100644 index 0000000..2133a09 --- /dev/null +++ b/src/test/org/apache/hadoop/hbase/util/TestPoolMap.java @@ -0,0 +1,246 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; + +import junit.framework.TestCase; + +import org.apache.hadoop.hbase.util.PoolMap.PoolType; + +public class TestPoolMap extends TestCase { + protected PoolMap poolMap; + protected ExecutorService executorService; + + protected final static int SAMPLE_SIZE = 10; + protected List samplePool; + + @Override + protected void setUp() throws Exception { + samplePool = new ArrayList(); + for (int index = 0; index < SAMPLE_SIZE; index++) { + samplePool.add(new Resource()); + } + } + + /** + * Demonstrates that, for an unbounded thread local pool, the number of + * resources in it is actually bounded by the size of the thread pool. + * + * @throws Exception + */ + public void testUnboundedThreadLocalPool() throws Exception { + poolMap = new PoolMap(PoolType.ThreadLocal, + Integer.MAX_VALUE); + assertPoolBehaves(SAMPLE_SIZE - 1, SAMPLE_SIZE - 1); + assertPoolBehaves(SAMPLE_SIZE, SAMPLE_SIZE); + assertPoolBehaves(SAMPLE_SIZE + 1, SAMPLE_SIZE); + } + + /** + * Demonstrates that, for an bounded thread local pool, the number of + * resources in it is bounded by the size of the thread pool, the max pool + * size, or the sample size, whichever is lower. + * + * @throws Exception + */ + public void testBoundedThreadLocalPool() throws Exception { + poolMap = new PoolMap(PoolType.ThreadLocal, SAMPLE_SIZE); + assertPoolBehaves(SAMPLE_SIZE - 1, SAMPLE_SIZE - 1); + assertPoolBehaves(SAMPLE_SIZE, SAMPLE_SIZE); + assertPoolBehaves(SAMPLE_SIZE + 1, SAMPLE_SIZE); + + poolMap = new PoolMap(PoolType.ThreadLocal, SAMPLE_SIZE - 1); + assertPoolBehaves(SAMPLE_SIZE - 1, SAMPLE_SIZE - 1); + assertPoolBehaves(SAMPLE_SIZE, SAMPLE_SIZE - 1); + assertPoolBehaves(SAMPLE_SIZE + 1, SAMPLE_SIZE - 1); + + poolMap = new PoolMap(PoolType.ThreadLocal, SAMPLE_SIZE + 1); + assertPoolBehaves(SAMPLE_SIZE - 1, SAMPLE_SIZE - 1); + assertPoolBehaves(SAMPLE_SIZE, SAMPLE_SIZE); + assertPoolBehaves(SAMPLE_SIZE + 1, SAMPLE_SIZE); + } + + /** + * Demonstrates that, for an unbounded round robin pool, the number of + * resources in it is actually bounded only by how many resources are added to + * it. + * + * @throws Exception + */ + public void testUnboundedRoundRobinPool() throws Exception { + poolMap = new PoolMap(PoolType.RoundRobin, Integer.MAX_VALUE); + assertPoolBehaves(SAMPLE_SIZE - 1, SAMPLE_SIZE); + assertPoolBehaves(SAMPLE_SIZE, SAMPLE_SIZE); + assertPoolBehaves(SAMPLE_SIZE + 1, SAMPLE_SIZE); + } + + /** + * Demonstrates that, for an bounded round robin pool, the number of resources + * in it is bounded by the max size of the pool, or the sample size, whichever + * is lower. + * + * @throws Exception + */ + public void testBoundedRoundRobinPool() throws Exception { + poolMap = new PoolMap(PoolType.RoundRobin, SAMPLE_SIZE); + assertPoolBehaves(SAMPLE_SIZE - 1, SAMPLE_SIZE); + assertPoolBehaves(SAMPLE_SIZE, SAMPLE_SIZE); + assertPoolBehaves(SAMPLE_SIZE + 1, SAMPLE_SIZE); + + poolMap = new PoolMap(PoolType.RoundRobin, SAMPLE_SIZE - 1); + assertPoolBehaves(SAMPLE_SIZE - 1, SAMPLE_SIZE - 1); + assertPoolBehaves(SAMPLE_SIZE, SAMPLE_SIZE - 1); + assertPoolBehaves(SAMPLE_SIZE + 1, SAMPLE_SIZE - 1); + + poolMap = new PoolMap(PoolType.RoundRobin, SAMPLE_SIZE + 1); + assertPoolBehaves(SAMPLE_SIZE - 1, SAMPLE_SIZE); + assertPoolBehaves(SAMPLE_SIZE, SAMPLE_SIZE); + assertPoolBehaves(SAMPLE_SIZE + 1, SAMPLE_SIZE); + } + + /** + * Verifies that a pool map behaves as expected. Specifically, we try to put + * all the resources in the sample pool, using the given number of clients, + * and then assert that the resultant value set is of the same size and + * contains what it is supposed to contain, and doesn't contain what it is not + * supposed to contain. + * + * @param clientPoolSize + * @param expectedPoolSize + * @throws InterruptedException + * @throws ExecutionException + */ + private void assertPoolBehaves(int clientPoolSize, final int expectedPoolSize) + throws InterruptedException, ExecutionException { + executorService = Executors.newFixedThreadPool(clientPoolSize); + + Set keys = new HashSet(poolMap.keySet()); + final int expectedPoolMapSize = poolMap.values().size() + expectedPoolSize; + final Key key = new Key(); + keys.add(key); + + final Semaphore semaphore = new Semaphore(samplePool.size()); + semaphore.acquire(samplePool.size()); + + final ArrayBlockingQueue queue = new ArrayBlockingQueue( + expectedPoolMapSize); + queue.addAll(poolMap.values()); + + List> futures = new ArrayList>(); + + for (final Resource resource : samplePool) { + futures.add(executorService.submit(new Runnable() { + @Override + public void run() { + synchronized (queue) { + Resource previousResource = poolMap.put(key, resource); + if (previousResource != resource) { + if (queue.size() >= expectedPoolSize) { + queue.remove(previousResource); + } + queue.offer(resource); + } + semaphore.release(); + } + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + } + })); + } + + Thread.sleep(2000); + semaphore.acquire(samplePool.size()); + for (Future future : futures) { + future.get(); + } + Collection values = poolMap.values(); + + assertEquals(keys.size(), poolMap.size()); + assertEquals(expectedPoolMapSize, values.size()); + + for (Resource resource : samplePool) { + assertTrue("The resource " + resource + + " was supposed to exist in the pool", !values.contains(resource) + || queue.contains(resource)); + assertTrue("The resource " + resource + + " was not supposed to exist in the pool", values.contains(resource) + || !queue.contains(resource)); + } + } + + @Override + protected void tearDown() throws Exception { + if (poolMap != null) { + poolMap.clear(); + assertEquals(0, poolMap.size()); + assertEquals(0, poolMap.values().size()); + } + if (executorService != null) { + executorService.shutdownNow(); + } + if (samplePool != null) { + samplePool.clear(); + } + } + + /** + * A class that represents a {@link PoolMap} generic key type. + * + * @author Karthick Sankarachary + * + */ + public class Key { + } + + /** + * A class that represents a {@link PoolMap} generic resource type. + * + * @author Karthick + * + */ + public static class Resource { + private static int nextId = 0; + private int id = nextId++; + + @Override + public String toString() { + return String.valueOf(id); + } + + @Override + public boolean equals(Object that) { + return that != null && ((Resource) that).id == id; + } + } +}