diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java index 463350f..b788c67 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java @@ -33,8 +33,10 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.PoolMap; import org.apache.hadoop.hbase.util.PoolMap.PoolType; +import org.apache.hadoop.hbase.util.ReusableSharedMap; +import org.apache.hadoop.hbase.util.SharedMap; +import org.apache.hadoop.hbase.util.ThreadLocalSharedMapDecorator; import com.google.protobuf.Descriptors; import com.google.protobuf.Message; @@ -67,9 +69,7 @@ import com.google.protobuf.ServiceException; @InterfaceAudience.Private @Deprecated public class HTablePool implements Closeable { - private final PoolMap tables; - private final int maxSize; - private final PoolType poolType; + private final SharedMap tables; private final Configuration config; private final HTableInterfaceFactory tableFactory; @@ -147,24 +147,14 @@ public class HTablePool implements Closeable { // Make a new configuration instance so I can safely cleanup when // done with the pool. this.config = config == null ? HBaseConfiguration.create() : config; - this.maxSize = maxSize; this.tableFactory = tableFactory == null ? new HTableFactory() : tableFactory; - if (poolType == null) { - this.poolType = PoolType.Reusable; + if (poolType != PoolType.ThreadLocal) { + tables = new ReusableSharedMap(maxSize); } else { - switch (poolType) { - case Reusable: - case ThreadLocal: - this.poolType = poolType; - break; - default: - this.poolType = PoolType.Reusable; - break; - } + tables = new ThreadLocalSharedMapDecorator( + new ReusableSharedMap(maxSize)); } - this.tables = new PoolMap(this.poolType, - this.maxSize); } /** @@ -199,9 +189,10 @@ public class HTablePool implements Closeable { * if there is a problem instantiating the HTable */ private HTableInterface findOrCreateTable(String tableName) { - HTableInterface table = tables.get(tableName); + HTableInterface table = tables.borrowObject(tableName); if (table == null) { table = createHTable(tableName); + tables.registerObject(tableName, table); } return table; } @@ -261,13 +252,10 @@ public class HTablePool implements Closeable { private void returnTable(HTableInterface table) throws IOException { // this is the old putTable method renamed and made private String tableName = Bytes.toString(table.getTableName()); - if (tables.size(tableName) >= maxSize) { + if (! tables.returnObject(tableName, table)) { // release table instance since we're not reusing it - this.tables.removeValue(tableName, table); this.tableFactory.releaseHTableInterface(table); - return; } - tables.put(tableName, table); } protected HTableInterface createHTable(String tableName) { @@ -286,13 +274,10 @@ public class HTablePool implements Closeable { * @param tableName */ public void closeTablePool(final String tableName) throws IOException { - Collection tables = this.tables.values(tableName); - if (tables != null) { - for (HTableInterface table : tables) { - this.tableFactory.releaseHTableInterface(table); - } + Collection tables = this.tables.clear(tableName); + for (HTableInterface table : tables) { + this.tableFactory.releaseHTableInterface(table); } - this.tables.remove(tableName); } /** @@ -311,10 +296,10 @@ public class HTablePool implements Closeable { * Note: this is a 'shutdown' of all the table pools. */ public void close() throws IOException { - for (String tableName : tables.keySet()) { - closeTablePool(tableName); + Collection tables = this.tables.clear(); + for (HTableInterface table : tables) { + this.tableFactory.releaseHTableInterface(table); } - this.tables.clear(); } public int getCurrentPoolSize(String tableName) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java index 3d81800..e1f27bc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java @@ -80,8 +80,11 @@ import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.PoolMap; import org.apache.hadoop.hbase.util.PoolMap.PoolType; +import org.apache.hadoop.hbase.util.RoundRobinSharedMap; +import org.apache.hadoop.hbase.util.SharedMap; +import org.apache.hadoop.hbase.util.SegmentedSharedMap; +import org.apache.hadoop.hbase.util.SegmentedSharedMap.SegmentFactory; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; @@ -114,7 +117,8 @@ public class RpcClient { // The LOG key is intentionally not from this package to avoid ipc logging at DEBUG (all under // o.a.h.hbase is set to DEBUG as default). public static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.RpcClient"); - protected final PoolMap connections; + /** The methods of each segment are guarded by the segment itself. */ + protected final SegmentedSharedMap connections; protected int counter; // counter for call ids protected final AtomicBoolean running = new AtomicBoolean(true); // if client runs @@ -838,7 +842,7 @@ public class RpcClient { } protected synchronized void setupIOstreams() - throws IOException, InterruptedException { + throws IOException { if (socket != null || shouldCloseConnection.get()) { return; } @@ -976,9 +980,7 @@ public class RpcClient { // release the resources // first thing to do;take the connection out of the connection list - synchronized (connections) { - connections.removeValue(remoteId, this); - } + connections.invalidateObject(remoteId, this); // close the streams and therefore the socket if (this.out != null) { @@ -1265,7 +1267,7 @@ public class RpcClient { this.compressor = getCompressor(conf); this.socketFactory = factory; this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT; - this.connections = new PoolMap(getPoolType(conf), getPoolSize(conf)); + this.connections = newConnectionMap(conf); this.failedServers = new FailedServers(conf); this.fallbackAllowed = conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT); @@ -1284,6 +1286,18 @@ public class RpcClient { } } + /** For each segment of the return map, its methods are guarded by the segment itself. */ + private static SegmentedSharedMap newConnectionMap(Configuration conf) { + final int size = getPoolSize(conf); + return new SegmentedSharedMap( + new SegmentFactory() { + @Override + public SharedMap create() { + return new RoundRobinSharedMap(size); + } + }); + } + /** * Construct an IPC client for the cluster clusterId with the default SocketFactory * @param conf configuration @@ -1356,7 +1370,10 @@ public class RpcClient { * @param config configuration * @return either a {@link PoolType#RoundRobin} or * {@link PoolType#ThreadLocal} + * @deprecated This method is not used because + * using a thread-local pool doesn't work well. */ + @Deprecated protected static PoolType getPoolType(Configuration config) { return PoolType.valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE), PoolType.RoundRobin, PoolType.ThreadLocal); @@ -1388,10 +1405,8 @@ public class RpcClient { if (!running.compareAndSet(true, false)) return; // wake up all connections - synchronized (connections) { - for (Connection conn : connections.values()) { - conn.interrupt(); - } + for (Connection conn : connections.getRegisteredObjects()) { + conn.interrupt(); } // wait until all connections are closed @@ -1432,8 +1447,21 @@ public class RpcClient { int rpcTimeout, int priority) throws InterruptedException, IOException { Call call = new Call(md, param, cells, returnType); - Connection connection = - getConnection(ticket, call, addr, rpcTimeout, this.codec, this.compressor); + ConnectionId remoteId = getRemoteId(ticket, call, addr, rpcTimeout); + Connection connection = getConnection(remoteId, call); + try { + return call(call, connection, + md, param, cells, returnType, ticket, addr, rpcTimeout, priority); + } finally { + returnConnection(remoteId, connection); + } + } + + private Pair call(Call call, Connection connection, + MethodDescriptor md, Message param, CellScanner cells, + Message returnType, User ticket, InetSocketAddress addr, + int rpcTimeout, int priority) + throws InterruptedException, IOException { connection.writeRequest(call, priority); // send the parameter //noinspection SynchronizationOnLocalVariableOrMethodParameter @@ -1493,50 +1521,66 @@ public class RpcClient { * safe exception. */ public void cancelConnections(String hostname, int port, IOException ioe) { - synchronized (connections) { - for (Connection connection : connections.values()) { - if (connection.isAlive() && - connection.getRemoteAddress().getPort() == port && - connection.getRemoteAddress().getHostName().equals(hostname)) { - LOG.info("The server on " + hostname + ":" + port + - " is dead - stopping the connection " + connection.remoteId); - connection.closeConnection(); - // We could do a connection.interrupt(), but it's safer not to do it, as the - // interrupted exception behavior is not defined nor enforced enough. - } + for (Connection connection : connections.getRegisteredObjects()) { + if (connection.isAlive() && + connection.getRemoteAddress().getPort() == port && + connection.getRemoteAddress().getHostName().equals(hostname)) { + LOG.info("The server on " + hostname + ":" + port + + " is dead - stopping the connection " + connection.remoteId); + connection.closeConnection(); + // We could do a connection.interrupt(), but it's safer not to do it, as the + // interrupted exception behavior is not defined nor enforced enough. } } } - /* Get a connection from the pool, or create a new one and add it to the - * pool. Connections to a given host/port are reused. */ - protected Connection getConnection(User ticket, Call call, InetSocketAddress addr, - int rpcTimeout, final Codec codec, final CompressionCodec compressor) - throws IOException, InterruptedException { + /** + * Returns an ID which is used to retrieve a connection from the pool. + */ + protected ConnectionId getRemoteId( + User ticket, Call call, InetSocketAddress addr, int rpcTimeout) { + return new ConnectionId(ticket, call.md.getService().getName(), addr, rpcTimeout); + } + + /** + * Get a connection from the pool, or create a new one and add it to the + * pool. Connections to a given host/port are reused. + * You should call {@link #returnConnection(ConnectionId, Connection)} after use. + */ + protected Connection getConnection(ConnectionId remoteId, Call call) throws IOException { if (!running.get()) throw new StoppedRpcClientException(); + + SharedMap segment = connections.segmentFor(remoteId); Connection connection; - ConnectionId remoteId = - new ConnectionId(ticket, call.md.getService().getName(), addr, rpcTimeout); - synchronized (connections) { - connection = connections.get(remoteId); + synchronized (segment) { + // This synchronization guards against creating excessive instances of Connection. + // Note that the methods of the segment are guarded by the segment itself. + // This synchronization blocks threads which use the same segment, + // and you should not execute long processing for specified remoteId + // in the synchronization. + + connection = segment.borrowObject(remoteId); if (connection == null) { connection = createConnection(remoteId, this.codec, this.compressor); - connections.put(remoteId, connection); + segment.registerObject(remoteId, connection); } } connection.addCall(call); - //we don't invoke the method below inside "synchronized (connections)" - //block above. The reason for that is if the server happens to be slow, - //it will take longer to establish a connection and that will slow the - //entire system down. - //Moreover, if the connection is currently created, there will be many threads - // waiting here; as setupIOstreams is synchronized. If the connection fails with a - // timeout, they will all fail simultaneously. This is checked in setupIOstreams. connection.setupIOstreams(); + // Connection.setupIOstreams() establishes an actual network connection, + // which is relatively long processing for the specified remoteId. + // We execute the processing here separately from creating the instance of Connection, + // because the creation is executed under the synchronization of the segment, + // which blocks threads which use the same segment. + return connection; } + protected void returnConnection(ConnectionId remoteId, Connection connection) { + connections.returnObject(remoteId, connection); + } + /** * This class holds the address and the user ticket, etc. The client connections * to servers are uniquely identified by diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/AbstractSharedMap.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/AbstractSharedMap.java new file mode 100644 index 0000000..00f5e6d --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/AbstractSharedMap.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Skeleton implementation of {@code SharedMap}. + *

+ * All of the public methods are synchronized with this, except for argument checks. + * Objects to be shared are identified with their equality, instead of sameness. + * + * @param the type of keys that you associate objects to be shared with + * @param the type of objects to be shared + */ +abstract class AbstractSharedMap implements SharedMap { + /** Guarded by this */ + private final Map> poolMap = new HashMap>(); + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public V borrowObject(K key) { + if (key == null) { throw new NullPointerException(); } + + synchronized (this) { + Pool pool = poolMap.get(key); + if (pool == null) { + return null; + } + return pool.borrowObject(); + } + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public boolean registerObject(K key, V value) { + if (key == null) { throw new NullPointerException("key"); } + if (value == null) { throw new NullPointerException("value"); } + + synchronized (this) { + Pool pool = poolMap.get(key); + if (pool == null) { + pool = newPool(); + poolMap.put(key, pool); + } + return pool.registerObject(value); + } + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public boolean returnObject(K key, V value) { + if (key == null) { throw new NullPointerException("key"); } + if (value == null) { throw new NullPointerException("value"); } + + synchronized (this) { + Pool pool = poolMap.get(key); + if (pool == null) { + return false; + } + return pool.returnObject(value); + } + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public void invalidateObject(K key, V value) { + if (key == null) { throw new NullPointerException("key"); } + if (value == null) { throw new NullPointerException("value"); } + + synchronized (this) { + Pool pool = poolMap.get(key); + if (pool == null) { + return; + } + + pool.invalidateObject(value); + + if (pool.isEmpty()) { + poolMap.remove(pool); + } + } + } + + @Override + public synchronized Collection clear() { + Collection idleObjects = new ArrayList(); + + for (Pool pool : poolMap.values()) { + idleObjects.addAll(pool.clear()); + } + + poolMap.clear(); + + return idleObjects; + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public Collection clear(Object key) { + if (key == null) { throw new NullPointerException(); } + + synchronized (this) { + Pool pool = poolMap.get(key); + return pool.clear(); + } + } + + @Override + public synchronized int size() { + int size = 0; + for (Pool pool : poolMap.values()) { + size += pool.size(); + } + return size; + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public int size(K key) { + if (key == null) { throw new NullPointerException(); } + + synchronized (this) { + Pool pool = poolMap.get(key); + return pool == null ? 0 : pool.size(); + } + } + + @Override + public synchronized boolean isEmpty() { + for (Pool pool : poolMap.values()) { + if (! pool.isEmpty()) { + return false; + } + } + return true; + } + + @Override + public synchronized Collection getRegisteredObjects() { + List result = new ArrayList(); + for(Pool pool : poolMap.values()) { + result.addAll(pool.getRegisteredObjects()); + } + return result; + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public Collection getRegisteredObjects(K key) { + if (key == null) { throw new NullPointerException(); } + + synchronized (this) { + Pool pool = poolMap.get(key); + return pool == null ? + Collections.emptyList() : pool.getRegisteredObjects(); + } + } + + abstract Pool newPool(); + + interface Pool { + /** + * Returns one of the shared objects, + * or returns null if there is no appropriate object to borrow. + */ + V borrowObject(); + + /** + * Registers the given {@code value} to this instance. + * At this point the registered object is regarded as borrowed. + *

+ * This method does nothing and returns false if and only if + * the given {@code value} is already registered. + * In practice the {@code value} is expected to be a newly created object, + * and this method is expected to return true. + */ + boolean registerObject(V value); + + /** + * Makes the given {@code value} to be returned into this instance. + * It is expected that the {@code value} has been borrowed from this instance. + *

+ * If this method returns false, + * then the the registration of the {@code value} is invalidated. + * This happens because the pool is full when this method is called, + * or you have explicitly called {@link #invalidateObject()} or {@link #clear()}. + */ + boolean returnObject(V value); + + /** + * Invalidates the registration of the given {@code value}. + * You may call this method regardless of whether the {@code value} is borrowed or not. + */ + void invalidateObject(V value); + + /** + * Invalidates all of the registrations to this instance, + * and returns the cleared idle objects in the pool. + */ + Collection clear(); + + /** + * Returns the count of the registered objects. + */ + int size(); + + /** + * Returns true if there is no registration. + */ + boolean isEmpty(); + + /** + * Returns the registered objects. + */ + Collection getRegisteredObjects(); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java index 7283831..1f01df1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java @@ -59,10 +59,18 @@ public class PoolMap implements Map { private Map> pools = new ConcurrentHashMap>(); + /** + * @deprecated Use {@link SharedMap} and its implementation classes. + */ + @Deprecated public PoolMap(PoolType poolType) { this.poolType = poolType; } + /** + * @deprecated Use {@link SharedMap} and its implementation classes. + */ + @Deprecated public PoolMap(PoolType poolType, int poolMaxSize) { this.poolType = poolType; this.poolMaxSize = poolMaxSize; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/ReusableSharedMap.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/ReusableSharedMap.java new file mode 100644 index 0000000..c3f1b38 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/ReusableSharedMap.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Queue; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Implementation of {@code SharedMap}, + * ensuring that it lends a shared object to a single caller at the same time. + *

+ * All of the public methods are synchronized with this, except for argument checks. + * Objects to be shared are identified with their equality, instead of sameness. + * + * @param the type of keys that you associate objects to be shared with + * @param the type of objects to be shared + */ +@InterfaceAudience.Private +public class ReusableSharedMap extends AbstractSharedMap { + private final int maxObjectCountPerKey; + + /** + * Constructs with the given {@code maxObjectCountPerKey}, + * which is a max count of idle objects to hold for each key. + */ + public ReusableSharedMap(int maxObjectCountPerKey) { + this.maxObjectCountPerKey = maxObjectCountPerKey; + } + + @Override + Pool newPool() { + return new ReusablePool(); + } + + private class ReusablePool implements Pool { + // The boolean values represent busy (borrowing) or not. + // This is for performance instead of sequential search in idleQueue. + final Map registeredObjects = new HashMap(); + final Queue idleQueue = new ArrayDeque(); + + @Override + public V borrowObject() { + V value = idleQueue.poll(); + if (value == null) { + return null; + } + + registeredObjects.put(value, Boolean.TRUE); + return value; + } + + @Override + public boolean registerObject(V value) { + if (registeredObjects.containsKey(value)) { + return false; + } + registeredObjects.put(value, Boolean.TRUE); + return true; + } + + @Override + public boolean returnObject(V value) { + Boolean busy = registeredObjects.get(value); + if (busy == null) { + return false; + } + + if (! busy) { + return true; + } + + if (idleQueue.size() >= maxObjectCountPerKey) { + registeredObjects.remove(value); + return false; + } + + registeredObjects.put(value, Boolean.FALSE); + idleQueue.offer(value); + return true; + } + + @Override + public void invalidateObject(V value) { + registeredObjects.remove(value); + idleQueue.remove(value); + } + + @Override + public Collection clear() { + Collection idleObjects = new ArrayList(idleQueue); + + registeredObjects.clear(); + idleQueue.clear(); + + return idleObjects; + } + + @Override + public int size() { + return registeredObjects.size(); + } + + @Override + public boolean isEmpty() { + return registeredObjects.isEmpty(); + } + + @Override + public Collection getRegisteredObjects() { + return new ArrayList(registeredObjects.keySet()); + } + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/RoundRobinSharedMap.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/RoundRobinSharedMap.java new file mode 100644 index 0000000..c04f77f --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/RoundRobinSharedMap.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Queue; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Implementation of {@code SharedMap} based on the round robin logic, + * except that it lends an idle object whenever there is. + * Objects to be shared must be thread safe, + * because the round robin logic will lend the same shared object + * to different callers at the same time. + *

+ * The round robin logic restricts the count of shared objects + * and prevents unlimitedly consuming resources. + * For each key it starts to re-lend shared objects cyclicly + * when the count of borrowed objects exceeds the user specified threshold. + *

+ * All of the public methods are synchronized with this, except for argument checks. + * Objects to be shared are identified with their equality, instead of sameness. + * + * @param the type of keys that you associate objects to be shared with + * @param the type of objects to be shared + */ +@InterfaceAudience.Private +public class RoundRobinSharedMap extends AbstractSharedMap { + /** the max count of shared objects for each key, which count is positive */ + private final int maxObjectCountPerKey; + + /** + * Constructs with the given {@code maxObjectCountPerKey}, + * which is a threshold count of borrowed objects for each key + * before starting to lend the same shared object to different callers, + * and which is also a max count of idle objects to hold for each key + * though the count of registered objects may exceed the max count. + *

+ * The given {@code maxObjectCountPerKey} is expected to be positive, + * otherwise the max count is set to be 1 instead of the given value. + */ + public RoundRobinSharedMap(int maxObjectCountPerKey) { + this.maxObjectCountPerKey = Math.max(1, maxObjectCountPerKey); + } + + @Override + Pool newPool() { + return new RoundRobinPool(); + } + + private class RoundRobinPool implements Pool { + // The integer values represent borrowing counts. + final Map registeredObjects = new HashMap(); + final Queue idleQueue = new ArrayDeque(); + final Queue busyQueue = new ArrayDeque(); + + @Override + public V borrowObject() { + V value = idleQueue.poll(); + if (value != null) { + registeredObjects.put(value, 1); + busyQueue.offer(value); + return value; + } + + if (busyQueue.size() < maxObjectCountPerKey) { + return null; + } + + // Now we are ready to pick up and lend a busy object in busyQueue, + // which is not empty because busyQueue.size() >= maxObjectCountPerKey > 0. + + value = busyQueue.poll(); + assert value != null; + registeredObjects.put(value, registeredObjects.get(value) + 1); + busyQueue.offer(value); + return value; + } + + @Override + public boolean registerObject(V value) { + if (registeredObjects.containsKey(value)) { + return false; + } + registeredObjects.put(value, 1); + busyQueue.offer(value); + return true; + } + + @Override + public boolean returnObject(V value) { + Integer busyCountWrapper = registeredObjects.get(value); + if (busyCountWrapper == null) { + return false; + } + + int busyCount = busyCountWrapper; + if (busyCount == 0) { + return true; + } + + int nextBusyCount = busyCount - 1; + if (nextBusyCount != 0) { + registeredObjects.put(value, nextBusyCount); + return true; + } + + busyQueue.remove(value); + + if (idleQueue.size() >= maxObjectCountPerKey) { + registeredObjects.remove(value); + return false; + + } else { + registeredObjects.put(value, 0); + idleQueue.offer(value); + return true; + } + } + + @Override + public void invalidateObject(V value) { + registeredObjects.remove(value); + idleQueue.remove(value); + busyQueue.remove(value); + } + + @Override + public Collection clear() { + Collection idleObjects = new ArrayList(idleQueue); + + registeredObjects.clear(); + idleQueue.clear(); + busyQueue.clear(); + + return idleObjects; + } + + @Override + public int size() { + return registeredObjects.size(); + } + + @Override + public boolean isEmpty() { + return registeredObjects.isEmpty(); + } + + @Override + public Collection getRegisteredObjects() { + return new ArrayList(registeredObjects.keySet()); + } + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/SegmentationHelper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/SegmentationHelper.java new file mode 100644 index 0000000..ba50b95 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/SegmentationHelper.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.concurrent.ConcurrentHashMap; + +class SegmentationHelper { + private static final int MAX_SEGMENTS = 1 << 16; + + private final int reuqiredSize; + private final int mask; + private final int shift; + + /** + * @param concurrencyLevel an estimated number of concurrently accessing threads + */ + SegmentationHelper(int concurrencyLevel) { + int requestedSize = Math.min(concurrencyLevel, MAX_SEGMENTS); + + int shift = 32; + int size = 1; + while (size < requestedSize) { + shift--; + size <<= 1; + } + + this.reuqiredSize = size; + this.shift = shift; + this.mask = size - 1; + } + + int requiredSize() { + return reuqiredSize; + } + + /** + * The length of the given {@code segments} must be + * equal to or greater than {@link #requiredSize()}. + */ + T segmentFor(T[] segments, Object key) { + assert segments.length >= reuqiredSize; + + int hash = hash(key.hashCode()); + return segments[(hash >>> shift) & mask]; + } + + /** + * Applies a supplemental hash function to a given hashCode, + * which defends against poor quality hash functions. + * + * @see ConcurrentHashMap#hash(int) + */ + private static int hash(int h) { + h += (h << 15) ^ 0xffffcd7d; + h ^= (h >>> 10); + h += (h << 3); + h ^= (h >>> 6); + h += (h << 2) + (h << 14); + return h ^ (h >>> 16); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/SegmentedSharedMap.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/SegmentedSharedMap.java new file mode 100644 index 0000000..b0ffb90 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/SegmentedSharedMap.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Implementation of {@code SharedMap}, + * which delegates operations to segmented maps according to keys, + * in order to reduce contention between threads. + *

+ * Thread safe. + * Objects to be shared are identified with their equality, instead of sameness. + * + * @param the type of keys that you associate objects to be shared with + * @param the type of objects to be shared + */ +@InterfaceAudience.Private +public class SegmentedSharedMap implements SharedMap { + /** + * Factory to create a map which will be used as one of segmented maps. + * + * @param the type of keys that you associate objects to be shared with + * @param the type of objects to be shared + */ + public interface SegmentFactory { + SharedMap create(); + } + + private final SegmentationHelper helper; + private final SharedMap[] segments; + + /** + * Constructs with the default concurrency level (16). + * + * @throws NullPointerException if {@code segmentFactory} is null + */ + public SegmentedSharedMap(SegmentFactory segmentFactory) { + this(segmentFactory, 16); + } + + /** + * Constructs with the given {@code concurrencyLevel}, + * which is the estimated number of concurrently accessing threads. + * + * @throws NullPointerException if {@code segmentFactory} is null + */ + public SegmentedSharedMap(SegmentFactory segmentFactory, int concurrencyLevel) { + if (segmentFactory == null) { + throw new NullPointerException(); + } + + helper = new SegmentationHelper(concurrencyLevel); + + @SuppressWarnings("unchecked") + SharedMap[] segments = new SharedMap[helper.requiredSize()]; + for (int i = 0; i < segments.length; i++) { + segments[i] = segmentFactory.create(); + } + this.segments = segments; + } + + /** + * Returns the corresponding internal segment for the given {@code key}. + * + * @throws NullPointerException if {@code key} is null + */ + public SharedMap segmentFor(K key) { + if(key == null) { + throw new NullPointerException(); + } + return helper.segmentFor(segments, key); + } + + /** + * Returns the count of the segments used internally. + */ + int countOfSegments() { + return segments.length; + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public V borrowObject(K key) { + return segmentFor(key).borrowObject(key); + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public boolean registerObject(K key, V value) { + return segmentFor(key).registerObject(key, value); + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public boolean returnObject(K key, V value) { + return segmentFor(key).returnObject(key, value); + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public void invalidateObject(K key, V value) { + segmentFor(key).invalidateObject(key, value); + } + + @Override + public Collection clear() { + Collection result = new ArrayList(); + for (SharedMap segment : segments) { + result.addAll(segment.clear()); + } + return result; + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public Collection clear(K key) { + return segmentFor(key).clear(key); + } + + @Override + public int size() { + int result = 0; + for (SharedMap segment : segments) { + result += segment.size(); + } + return result; + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public int size(K key) { + return segmentFor(key).size(key); + } + + @Override + public boolean isEmpty() { + for (SharedMap segment : segments) { + if (! segment.isEmpty()) { + return false; + } + } + return true; + } + + @Override + public Collection getRegisteredObjects() { + List result = new ArrayList(); + for (SharedMap segment : segments) { + result.addAll(segment.getRegisteredObjects()); + } + return result; + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public Collection getRegisteredObjects(K key) { + return segmentFor(key).getRegisteredObjects(key); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/SharedMap.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/SharedMap.java new file mode 100644 index 0000000..1746f37 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/SharedMap.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.Collection; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * This provides functions to share objects. + *

+ * In order to share objects, first of all call {@link #borrowObject()}. + * If the method returns a non-null object, use it. + * If the method returns null, create a new object and call {@link #registerObject()}, + * and use it. Just after the registration the object is regarded as borrowed. + *

+ * After using the borrowed object, + * call {@link #returnObject()} to return the object to the pool, + * or call {@link #invalidateObject()} to invalidate the object's registration + * instead of returning the object. + * If the pool is already full, the method {@code returnObject()} returns false + * with invalidating the registration of the object. + * Also the method returns false if the object's registration is already invalidated + * because {@link #invalidateObject()} or {@link #clear()} has been called. + * Anyway if the method {@code returnObject()} returns false + * the object is rejected to return the pool and is already invalidated, + * and you might tear down the rejected object. + * You can safely tear down the rejected object even in multi-thread contexts, + * because the object is never passed to the other threads via the pool. + *

+ * Incidentally, you can call {@link #invalidateObject} and {@link #clear()} + * in order to invalidate the registrations at any time, by any thread, + * regardless of whether the objects are borrowed or not. + * The method {@code clear()} returns a collection of the idle objects, + * which have been registered but not borrowed and idle in the pool, + * and you can safely tear down them in the sense that + * you get them exclusively in multi-thread contexts. + * The rest of the objects that have been registered are borrowed and in use + * by some other threads at the moment, and you can't tear down them right now. + * But the borrower eventually calls {@code returnObject()} (which returns false) + * or {@code invalidateObject()} after using the borrowed object, and in either method + * the borrower know that the registration of the object is already invalidated + * and the object is ready to tear down. + *

+ * The implementations of this interface are expected to be thread safe, + * but it depends on the implementations whether it borrows the same shared object + * to different callers at the same time. + *

+ * Objects to be shared are identified with their equality, instead of sameness. + * Concretely, when we have two objects {@code o1} and {@code o2} to be shared, + * we won't distinguish between {@code o1} the {@code o2} + * if {@code o1.equals(o2)} returns true rather than if {@code o1 == o2} is true. + * That enables us to use ordinary collections for the objects + * and simplify the implementation logic of this interface. + * Also that is enough for practical uses. + * + * @param the type of keys that you associate objects to be shared with + * @param the type of objects to be shared + */ +@InterfaceAudience.Private +public interface SharedMap { + /** + * Returns one of the shared objects associated with the given {@code key}, + * or returns null if there is no appropriate object to borrow. + * In the latter case, it is expected that a new instance is created + * and registered to this instance. + * + * @throws NullPointerException if {@code key} is null + */ + V borrowObject(K key); + + /** + * Registers the given {@code value} to this instance, + * associating it with the given {@code key}. + * At this point the registered object is regarded as borrowed + * and you can continue to use the object, + * and after finishing using it you have to return or invalidate + * the registered object into this instance. + *

+ * This method does nothing and returns false if and only if + * the given {@code value} is already registered + * associated with the given {@code key}. + * In practice the {@code value} is expected to be a newly created object, + * and this method is expected to return true. + * + * @throws NullPointerException if {@code key} or {@code value} is null + */ + boolean registerObject(K key, V value); + + /** + * Makes the given {@code value} associated with the given {@code key} + * to be returned into this instance. + * It is expected that the {@code value} has been borrowed from this instance. + *

+ * If this method returns false, then the registration of the {@code value} is invalidated. + * This happens because the pool is full when this method is called, + * or you have explicitly called {@link #invalidateObject()} or {@link #clear()}. + * You might have to tear down the rejected object. + * + * @throws NullPointerException if {@code key} or {@code value} is null + */ + boolean returnObject(K key, V value); + + /** + * Invalidates the registration of the given {@code value} associated with the {@code key}. + * You may call this method regardless of whether the {@code value} is borrowed or not. + * You might have to tear down the removed object. + * + * @throws NullPointerException if {@code key} or {@code value} is null + */ + void invalidateObject(K key, V value); + + /** + * Invalidates all of the registrations to this instance, + * and returns the objects that have been registered but idle in the pool. + * You might have to tear down the returned objects. + *

+ * The rest of the objects that have been registered are still in use at this moment, + * and they are not contained in the collection returned by this method. + * In fact, you should not tear down them until the borrowers complete the usage. + * Instead, because the borrowers are supposed to call {@link #returnObject()} + * (which rejects invalidated objects and returns false) + * or {@link #invalidateObject()} after their usage, + * you can make the borrowers themselves tear down the invalidated objects + * just after calling these methods. + */ + Collection clear(); + + /** + * Invalidates all of the registrations to this instance associated with the given {@code key}, + * and returns the objects that have been registered but idle in the pool. + * You might have to tear down the returned objects. + *

+ * The rest of the objects that have been registered are still in use at this moment, + * and they are not contained in the collection returned by this method. + * In fact, you should not tear down them until the borrowers complete the usage. + * Instead, because the borrowers are supposed to call {@link #returnObject()} + * (which rejects invalidated objects and returns false) + * or {@link #invalidateObject()} after their usage, + * you can make the borrowers themselves tear down the invalidated objects + * just after calling these methods. + * + * @throws NullPointerException if {@code key} is null + */ + Collection clear(K key); + + /** + * Returns the count of the registered objects. + */ + int size(); + + /** + * Returns the count of the registered objects associated with the given {@code key}. + * + * @throws NullPointerException if {@code key} is null + */ + int size(K key); + + /** + * Returns true if there is no registration. + */ + boolean isEmpty(); + + /** + * Returns the registered objects. + */ + Collection getRegisteredObjects(); + + /** + * Returns the registered objects associated with the given {@code key}. + * + * @throws NullPointerException if {@code key} is null + */ + Collection getRegisteredObjects(K key); +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/ThreadLocalSharedMapDecorator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/ThreadLocalSharedMapDecorator.java new file mode 100644 index 0000000..991ed3b --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/ThreadLocalSharedMapDecorator.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Decorator of {@code SharedMap} + * which gives the same shared object for each key at the same time for each thread. + * This lends the same object even after the other threads invalidate the object, + * until the borrowing thread completely returns the object or invalidates it. + *

+ * Note that it depends on the base instance of {@code SharedMap} + * whether the each shared object is accessed by at most one thread at the same time or not. + *

+ * Thread safe. + * Objects to be shared are identified with their equality, instead of sameness. + * + * @param the type of keys that you associate objects to be shared with + * @param the type of objects to be shared + */ +@InterfaceAudience.Private +public class ThreadLocalSharedMapDecorator implements SharedMap { + private static class BorrowingCounter { + final V value; + + int count = 1; + + BorrowingCounter(V value) { + assert value != null; + this.value = value; + } + } + + private final ThreadLocal>> borrowingCounterMapRef + = new ThreadLocal>>() { + @Override + protected Map> initialValue() { + return new HashMap>(); + } + }; + + private final SharedMap base; + + /** + * @throws NullPointerException if {@code base} is null + */ + public ThreadLocalSharedMapDecorator(SharedMap base) { + if (base == null) { throw new NullPointerException(); } + this.base = base; + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public V borrowObject(K key) { + if (key == null) { throw new NullPointerException(); } + + Map> counterMap = borrowingCounterMapRef.get(); + + BorrowingCounter counter = counterMap.get(key); + if (counter == null) { + V value = base.borrowObject(key); + if (value != null) { + counterMap.put(key, new BorrowingCounter(value)); + } + return value; + } + + counter.count++; + return counter.value; + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public boolean registerObject(K key, V value) { + if (! base.registerObject(key, value)) { + return false; + } + + Map> counterMap = borrowingCounterMapRef.get(); + BorrowingCounter counter = counterMap.get(key); + if (counter == null) { + counterMap.put(key, new BorrowingCounter(value)); + } + return true; + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public boolean returnObject(K key, V value) { + if (key == null) { throw new NullPointerException("key"); } + if (value == null) { throw new NullPointerException("value"); } + + Map> counterMap = borrowingCounterMapRef.get(); + + BorrowingCounter counter = counterMap.get(key); + if (counter == null || ! counter.value.equals(value)) { + return base.returnObject(key, value); + } + + if (--counter.count > 0) { + return true; + } + + counterMap.remove(key); + return base.returnObject(key, value); + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public void invalidateObject(K key, V value) { + base.invalidateObject(key, value); + + Map> counterMap = borrowingCounterMapRef.get(); + BorrowingCounter counter = counterMap.get(key); + if (counter == null || ! counter.value.equals(value)) { + return; + } + counterMap.remove(key); + } + + @Override + public Collection clear() { + return base.clear(); + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public Collection clear(K key) { + return base.clear(key); + } + + @Override + public int size() { + return base.size(); + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public int size(K key) { + return base.size(key); + } + + @Override + public boolean isEmpty() { + return base.isEmpty(); + } + + @Override + public Collection getRegisteredObjects() { + return base.getRegisteredObjects(); + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public Collection getRegisteredObjects(K key) { + return base.getRegisteredObjects(key); + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/util/AbstractTestSharedMap.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/AbstractTestSharedMap.java new file mode 100644 index 0000000..ebbecc9 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/AbstractTestSharedMap.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public abstract class AbstractTestSharedMap { + abstract SharedMap newSharedMapWithInfinitePool(); + + SharedMap map; + Object o1; + Object o2; + Object o3; + Object o4; + + @Before + public void setUp() { + map = newSharedMapWithInfinitePool(); + o1 = new Object(); + o2 = new Object(); + o3 = new Object(); + o4 = new Object(); + } + + @Test + public void testBorrowRegisterReturn() { + Assert.assertNull(map.borrowObject("a")); + + map.registerObject("a", o1); + Assert.assertTrue(map.returnObject("a", o1)); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertNull(map.borrowObject("a")); + Assert.assertTrue(map.returnObject("a", o1)); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertNull(map.borrowObject("a")); + Assert.assertTrue(map.returnObject("a", o1)); + + Assert.assertNull(map.borrowObject("b")); + Assert.assertFalse(map.returnObject("a", o2)); + } + + @Test + public void testMultipleRegisters() { + map.registerObject("a", o1); + map.registerObject("a", o2); + + Assert.assertTrue(map.returnObject("a", o1)); + Assert.assertTrue(map.returnObject("a", o2)); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertNull(map.borrowObject("a")); + } + + @Test + public void testRejectDuplicateRegistration() { + Assert.assertTrue(map.registerObject("a", o1)); + Assert.assertFalse(map.registerObject("a", o1)); + } + + @Test + public void testInvalidateBeforeReturn() { + map.registerObject("a", o1); + + map.invalidateObject("a", o1); + + Assert.assertFalse(map.returnObject("a", o1)); + Assert.assertNull(map.borrowObject("a")); + } + + @Test + public void testInvalidateAfterReturn() { + map.registerObject("a", o1); + Assert.assertTrue(map.returnObject("a", o1)); + + map.invalidateObject("a", o1); + + Assert.assertNull(map.borrowObject("a")); + } + + @Test + public void testClear() { + map.registerObject("a", o1); + map.registerObject("b", o2); + map.registerObject("c", o3); + + Assert.assertTrue(map.returnObject("a", o1)); + Assert.assertTrue(map.returnObject("b", o2)); + + Collection targets = map.clear(); + + Assert.assertEquals( + new HashSet(Arrays.asList(o1, o2)), + new HashSet(targets)); + + Assert.assertNull(map.borrowObject("a")); + Assert.assertNull(map.borrowObject("b")); + Assert.assertFalse(map.returnObject("c", o3)); + } + + @Test + public void testClearWithKey() { + map.registerObject("a", o1); + map.registerObject("a", o2); + map.registerObject("b", o3); + map.registerObject("b", o4); + + Assert.assertTrue(map.returnObject("a", o1)); + Assert.assertTrue(map.returnObject("b", o3)); + + Collection targets = map.clear("a"); + + Assert.assertEquals( + new HashSet(Collections.singleton(o1)), + new HashSet(targets)); + + Assert.assertNull(map.borrowObject("a")); + Assert.assertFalse(map.returnObject("a", o2)); + + Assert.assertSame(o3, map.borrowObject("b")); + Assert.assertTrue(map.returnObject("b", o4)); + } + + @Test + public void testSizeAndEmptyAndGetRegisteredObjects() { + Assert.assertEquals(0, map.size()); + Assert.assertEquals(0, map.size("a")); + Assert.assertEquals(0, map.size("b")); + Assert.assertTrue(map.isEmpty()); + verifyRegisteredObjects(); + verifyRegisteredObjectsForKey("a"); + verifyRegisteredObjectsForKey("b"); + + map.registerObject("a", o1); + + Assert.assertEquals(1, map.size()); + Assert.assertEquals(1, map.size("a")); + Assert.assertEquals(0, map.size("b")); + Assert.assertFalse(map.isEmpty()); + verifyRegisteredObjects(o1); + verifyRegisteredObjectsForKey("a", o1); + verifyRegisteredObjectsForKey("b"); + + map.registerObject("a", o2); + + Assert.assertEquals(2, map.size()); + Assert.assertEquals(2, map.size("a")); + Assert.assertEquals(0, map.size("b")); + Assert.assertFalse(map.isEmpty()); + verifyRegisteredObjects(o1, o2); + verifyRegisteredObjectsForKey("a", o1, o2); + verifyRegisteredObjectsForKey("b"); + + map.registerObject("b", o3); + + Assert.assertEquals(3, map.size()); + Assert.assertEquals(2, map.size("a")); + Assert.assertEquals(1, map.size("b")); + Assert.assertFalse(map.isEmpty()); + verifyRegisteredObjects(o1, o2, o3); + verifyRegisteredObjectsForKey("a", o1, o2); + verifyRegisteredObjectsForKey("b", o3); + + map.clear(); + + Assert.assertEquals(0, map.size()); + Assert.assertEquals(0, map.size("a")); + Assert.assertEquals(0, map.size("b")); + Assert.assertTrue(map.isEmpty()); + verifyRegisteredObjects(); + verifyRegisteredObjectsForKey("a"); + verifyRegisteredObjectsForKey("b"); + } + + private void verifyRegisteredObjects(Object... expecteds) { + Assert.assertEquals( + new HashSet(Arrays.asList(expecteds)), + new HashSet(map.getRegisteredObjects())); + } + + private void verifyRegisteredObjectsForKey(String key, Object... expecteds) { + Assert.assertEquals( + new HashSet(Arrays.asList(expecteds)), + new HashSet(map.getRegisteredObjects(key))); + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestReusableSharedMap.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestReusableSharedMap.java new file mode 100644 index 0000000..3f60678 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestReusableSharedMap.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import org.apache.hadoop.hbase.SmallTests; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestReusableSharedMap extends AbstractTestSharedMap { + @Override + SharedMap newSharedMapWithInfinitePool() { + return new ReusableSharedMap(Integer.MAX_VALUE); + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestReusableSharedMapWithFinitePool.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestReusableSharedMapWithFinitePool.java new file mode 100644 index 0000000..99452fe --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestReusableSharedMapWithFinitePool.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import org.apache.hadoop.hbase.SmallTests; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestReusableSharedMapWithFinitePool { + SharedMap map; + Object o1; + Object o2; + Object o3; + Object o4; + + @Before + public void setUp() { + map = new ReusableSharedMap(3); + o1 = new Object(); + o2 = new Object(); + o3 = new Object(); + o4 = new Object(); + } + + @Test + public void testOverRegisters() { + map.registerObject("a", o1); + map.registerObject("a", o2); + map.registerObject("a", o3); + map.registerObject("a", o4); + + Assert.assertNull(map.borrowObject("a")); + + Assert.assertTrue(map.returnObject("a", o1)); + Assert.assertTrue(map.returnObject("a", o2)); + Assert.assertTrue(map.returnObject("a", o3)); + Assert.assertFalse(map.returnObject("a", o4)); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertSame(o3, map.borrowObject("a")); + Assert.assertNull(map.borrowObject("a")); + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestRoundRobinSharedMap.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestRoundRobinSharedMap.java new file mode 100644 index 0000000..49140c9 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestRoundRobinSharedMap.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import org.apache.hadoop.hbase.SmallTests; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestRoundRobinSharedMap extends AbstractTestSharedMap { + @Override + SharedMap newSharedMapWithInfinitePool() { + return new RoundRobinSharedMap(Integer.MAX_VALUE); + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestRoundRobinSharedMapWithFinitePool.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestRoundRobinSharedMapWithFinitePool.java new file mode 100644 index 0000000..d21b364 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestRoundRobinSharedMapWithFinitePool.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import org.apache.hadoop.hbase.SmallTests; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestRoundRobinSharedMapWithFinitePool { + SharedMap map; + Object o1; + Object o2; + Object o3; + Object o4; + + @Before + public void setUp() { + map = new RoundRobinSharedMap(3); + o1 = new Object(); + o2 = new Object(); + o3 = new Object(); + o4 = new Object(); + } + + @Test + public void testFullRegisters() { + map.registerObject("a", o1); + map.registerObject("a", o2); + + Assert.assertNull(map.borrowObject("a")); + + map.registerObject("a", o3); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertSame(o3, map.borrowObject("a")); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertSame(o3, map.borrowObject("a")); + } + + @Test + public void testOverRegisters() { + map.registerObject("a", o1); + map.registerObject("a", o2); + map.registerObject("a", o3); + map.registerObject("a", o4); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertSame(o3, map.borrowObject("a")); + Assert.assertSame(o4, map.borrowObject("a")); + + Assert.assertTrue(map.returnObject("a", o1)); + Assert.assertTrue(map.returnObject("a", o2)); + Assert.assertTrue(map.returnObject("a", o3)); + Assert.assertTrue(map.returnObject("a", o4)); + + Assert.assertTrue(map.returnObject("a", o1)); + Assert.assertTrue(map.returnObject("a", o2)); + Assert.assertTrue(map.returnObject("a", o3)); + Assert.assertFalse(map.returnObject("a", o4)); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertSame(o3, map.borrowObject("a")); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertSame(o3, map.borrowObject("a")); + } + + @Test + public void testIdleObjectPriority() { + map.registerObject("a", o1); + map.registerObject("a", o2); + map.registerObject("a", o3); + + map.returnObject("a", o2); + + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertSame(o3, map.borrowObject("a")); + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestSegmentedSharedMap.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestSegmentedSharedMap.java new file mode 100644 index 0000000..15734b7 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestSegmentedSharedMap.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.util.SegmentedSharedMap.SegmentFactory; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestSegmentedSharedMap extends AbstractTestSharedMap { + @Override + SharedMap newSharedMapWithInfinitePool() { + return new SegmentedSharedMap(new SegmentFactory() { + @Override + public SharedMap create() { + return new ReusableSharedMap(Integer.MAX_VALUE); + } + }); + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestSegmentedSharedMapForSegments.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestSegmentedSharedMapForSegments.java new file mode 100644 index 0000000..0477c9c --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestSegmentedSharedMapForSegments.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.util.SegmentedSharedMap.SegmentFactory; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestSegmentedSharedMapForSegments { + @Test + public void testAllSegmentsUsed() { + SegmentedSharedMap map = new SegmentedSharedMap( + new SegmentFactory() { + @Override + public SharedMap create() { + return new ReusableSharedMap(Integer.MAX_VALUE); + } + }, 1000); + + Set> segmentsUsed = new HashSet>(); + + for(int i=0; i<1000000; i++) { + segmentsUsed.add(map.segmentFor(new Object())); + if(segmentsUsed.size() == map.countOfSegments()) { + return; + } + } + + Assert.fail("bug or bad luck"); + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestThreadLocalSharedMapDecorator.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestThreadLocalSharedMapDecorator.java new file mode 100644 index 0000000..cc2b9c5 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestThreadLocalSharedMapDecorator.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.hbase.SmallTests; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestThreadLocalSharedMapDecorator { + SharedMap map; + Object o1; + Object o2; + Object o3; + + @Before + public void setUp() { + map = new ThreadLocalSharedMapDecorator( + new RoundRobinSharedMap(Integer.MAX_VALUE)); + o1 = new Object(); + o2 = new Object(); + o3 = new Object(); + } + + @Test + public void testRegisterAndBorrow() { + Assert.assertNull(map.borrowObject("a")); + + map.registerObject("a", o1); + map.registerObject("a", o2); + map.registerObject("a", o3); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertSame(o1, map.borrowObject("a")); + } + + @Test + public void testReturnAndBorrow() { + Assert.assertNull(map.borrowObject("a")); + + map.registerObject("a", o1); + Assert.assertTrue(map.returnObject("a", o1)); + + map.registerObject("a", o2); + Assert.assertTrue(map.returnObject("a", o2)); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertTrue(map.returnObject("a", o1)); + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertTrue(map.returnObject("a", o1)); + Assert.assertTrue(map.returnObject("a", o1)); + + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertTrue(map.returnObject("a", o2)); + Assert.assertTrue(map.returnObject("a", o2)); + + Assert.assertSame(o1, map.borrowObject("a")); + } + + @Test + public void testRejectDuplicateRegistration() { + Assert.assertTrue(map.registerObject("a", o1)); + Assert.assertFalse(map.registerObject("a", o1)); + } + + @Test + public void testInvalidate() { + map.registerObject("a", o1); + Assert.assertTrue(map.returnObject("a", o1)); + + map.registerObject("a", o2); + Assert.assertTrue(map.returnObject("a", o2)); + + Assert.assertSame(o1, map.borrowObject("a")); + + map.invalidateObject("a", o1); + + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertFalse(map.returnObject("a", o1)); + } + + @Test + public void testClear() { + map.registerObject("a", o1); + + map.clear(); + + // We still enable to borrow until we return completely + // or invalidate from inside the thread. + Assert.assertSame(o1, map.borrowObject("a")); + + Assert.assertTrue(map.returnObject("a", o1)); + Assert.assertFalse(map.returnObject("a", o1)); // completely returned and rejected + } + + @Test + public void testThreadLocal() throws Exception { + map.registerObject("a", o1); + Assert.assertTrue(map.returnObject("a", o1)); + + map.registerObject("a", o2); + Assert.assertTrue(map.returnObject("a", o2)); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertTrue(map.returnObject("a", o1)); + + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertTrue(map.returnObject("a", o2)); + + final AtomicReference borrowedRef = new AtomicReference(); + + Thread thread = new Thread() { + @Override + public void run() { + borrowedRef.set(map.borrowObject("a")); + } + }; + + thread.start(); + thread.join(); + + Assert.assertSame(o1, borrowedRef.get()); + + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertTrue(map.returnObject("a", o2)); + + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertTrue(map.returnObject("a", o2)); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTablePool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTablePool.java index 322ebaa..cade6f0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTablePool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTablePool.java @@ -19,7 +19,9 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.PoolMap.PoolType; @@ -219,20 +221,26 @@ public class TestHTablePool { @Test public void testTableWithMaxSize() throws Exception { + HTableFactoryWithTableCounter tableFactory = new HTableFactoryWithTableCounter(); + HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), 2, - getPoolType()); + tableFactory, getPoolType()); // Request tables from an empty pool HTableInterface table1 = pool.getTable(TABLENAME); HTableInterface table2 = pool.getTable(TABLENAME); HTableInterface table3 = pool.getTable(TABLENAME); + tableFactory.assertTableCount(3); + // Close tables (returns tables to the pool) table1.close(); table2.close(); // The pool should reject this one since it is already full table3.close(); + tableFactory.assertTableCount(2); + // Request tables of the same name HTableInterface sameTable1 = pool.getTable(TABLENAME); HTableInterface sameTable2 = pool.getTable(TABLENAME); @@ -250,8 +258,10 @@ public class TestHTablePool { @Test public void testCloseTablePool() throws IOException { + HTableFactoryWithTableCounter tableFactory = new HTableFactoryWithTableCounter(); + HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), 4, - getPoolType()); + tableFactory, getPoolType()); HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); if (admin.tableExists(TABLENAME)) { @@ -269,19 +279,15 @@ public class TestHTablePool { tables[i] = pool.getTable(TABLENAME); } + tableFactory.assertTableCount(4); + pool.closeTablePool(TABLENAME); for (int i = 0; i < 4; ++i) { tables[i].close(); } - Assert.assertEquals(4, - pool.getCurrentPoolSize(Bytes.toString(TABLENAME))); - - pool.closeTablePool(TABLENAME); - - Assert.assertEquals(0, - pool.getCurrentPoolSize(Bytes.toString(TABLENAME))); + tableFactory.assertTableCount(0); } } @@ -294,14 +300,18 @@ public class TestHTablePool { @Test public void testTableWithMaxSize() throws Exception { + HTableFactoryWithTableCounter tableFactory = new HTableFactoryWithTableCounter(); + HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), 2, - getPoolType()); + tableFactory, getPoolType()); // Request tables from an empty pool HTableInterface table1 = pool.getTable(TABLENAME); HTableInterface table2 = pool.getTable(TABLENAME); HTableInterface table3 = pool.getTable(TABLENAME); + tableFactory.assertTableCount(1); + // Close tables (returns tables to the pool) table1.close(); table2.close(); @@ -309,6 +319,8 @@ public class TestHTablePool { // <= 2 table3.close(); + tableFactory.assertTableCount(1); + // Request tables of the same name HTableInterface sameTable1 = pool.getTable(TABLENAME); HTableInterface sameTable2 = pool.getTable(TABLENAME); @@ -326,8 +338,10 @@ public class TestHTablePool { @Test public void testCloseTablePool() throws IOException { + HTableFactoryWithTableCounter tableFactory = new HTableFactoryWithTableCounter(); + HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), 4, - getPoolType()); + tableFactory, getPoolType()); HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); if (admin.tableExists(TABLENAME)) { @@ -345,20 +359,39 @@ public class TestHTablePool { tables[i] = pool.getTable(TABLENAME); } + tableFactory.assertTableCount(1); + pool.closeTablePool(TABLENAME); + tableFactory.assertTableCount(1); + for (int i = 0; i < 4; ++i) { tables[i].close(); } - Assert.assertEquals(1, - pool.getCurrentPoolSize(Bytes.toString(TABLENAME))); - - pool.closeTablePool(TABLENAME); - - Assert.assertEquals(0, - pool.getCurrentPoolSize(Bytes.toString(TABLENAME))); + tableFactory.assertTableCount(0); } } + private static class HTableFactoryWithTableCounter implements HTableInterfaceFactory { + final HTableInterfaceFactory baseTableFactory = new HTableFactory(); + final AtomicInteger tableCounter = new AtomicInteger(); + + @Override + public HTableInterface createHTableInterface(Configuration config, byte[] tableName) { + HTableInterface table = baseTableFactory.createHTableInterface(config, tableName); + tableCounter.incrementAndGet(); + return table; + } + + @Override + public void releaseHTableInterface(HTableInterface table) throws IOException { + tableCounter.decrementAndGet(); + baseTableFactory.releaseHTableInterface(table); + } + + void assertTableCount(int count) { + Assert.assertEquals(count, tableCounter.get()); + } + } }