diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java
index 3dc5b49..486293b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java
@@ -24,8 +24,8 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
-import com.google.protobuf.Service;
-import com.google.protobuf.ServiceException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@@ -36,37 +36,46 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.PoolMap;
import org.apache.hadoop.hbase.util.PoolMap.PoolType;
+import org.apache.hadoop.hbase.util.ReusableSharedMap;
+import org.apache.hadoop.hbase.util.SegmentedSharedMap;
+import org.apache.hadoop.hbase.util.SegmentedSharedMap.SegmentFactory;
+import org.apache.hadoop.hbase.util.SharedMap;
+import org.apache.hadoop.hbase.util.ThreadLocalSharedMapDecorator;
+
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
/**
- * A simple pool of HTable instances.
- *
- * Each HTablePool acts as a pool for all tables. To use, instantiate an
- * HTablePool and use {@link #getTable(String)} to get an HTable from the pool.
- *
- * This method is not needed anymore, clients should call
- * HTableInterface.close() rather than returning the tables to the pool
- *
- * Once you are done with it, close your instance of {@link HTableInterface}
- * by calling {@link HTableInterface#close()} rather than returning the tables
- * to the pool with (deprecated) {@link #putTable(HTableInterface)}.
- *
+ * A simple pool of {@link HTable} instances.
+ * This enables you to reuse the instances and avoid overhead to establish I/O connections
+ * to a cluster (see {@link HConnectionManager}).
+ * {@code HTable} is not thread safe, and this pool exclusively lends you the instances.
+ * {@code HTablePool} is thread safe.
*
- * A pool can be created with a maxSize which defines the most HTable
- * references that will ever be retained for each table. Otherwise the default
- * is {@link Integer#MAX_VALUE}.
- *
+ * To use, instantiate an {@code HTablePool}
+ * and call {@link #getTable(String)} or {@link #getTable(byte[])}
+ * to get an instance of {@link HTableInterface}, which is a wrapper of {@code HTable}.
+ * Once you are done with it, call {@link HTableInterface#close()}
+ * and the instance returns to the pool if any.
*
- * Pool will manage its own connections to the cluster. See
- * {@link HConnectionManager}.
+ * The pool holds multiple idle instances (which are not borrowed) for each table.
+ * You can specify the maximum count of idle instances for each table
+ * by the parameter {@code maxSize} in some of the constructors.
+ * The default value is {@link Integer#MAX_VALUE}.
+ * Note that this pool is not prepared to control total numbers of the instances existing
+ * (even if it might eventually save resources),
+ * and the methods {@code getTable()} always create a new instance
+ * if there is no corresponding idle instance in the pool.
+ * (Remember that {@code HTable} is not thread safe
+ * and you require an instance which you can use exclusively.)
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class HTablePool implements Closeable {
- private final PoolMap tables;
- private final int maxSize;
- private final PoolType poolType;
+ private static final Log log = LogFactory.getLog(HTablePool.class);
+
+ private final SharedMap tableMap;
private final Configuration config;
private final HTableInterfaceFactory tableFactory;
@@ -144,24 +153,26 @@ public class HTablePool implements Closeable {
// Make a new configuration instance so I can safely cleanup when
// done with the pool.
this.config = config == null ? HBaseConfiguration.create() : config;
- this.maxSize = maxSize;
- this.tableFactory = tableFactory == null ? new HTableFactory()
- : tableFactory;
- if (poolType == null) {
- this.poolType = PoolType.Reusable;
- } else {
- switch (poolType) {
- case Reusable:
- case ThreadLocal:
- this.poolType = poolType;
- break;
- default:
- this.poolType = PoolType.Reusable;
- break;
- }
+ this.tableFactory = tableFactory == null ? new HTableFactory() : tableFactory;
+ this.tableMap = newTableMap(maxSize, poolType);
+ }
+
+ private static SharedMap newTableMap(
+ final int maxSize, PoolType poolType) {
+
+ SharedMap tables = new SegmentedSharedMap(
+ new SegmentFactory() {
+ @Override
+ public SharedMap create() {
+ return new ReusableSharedMap(maxSize);
+ }
+ });
+
+ if (poolType == PoolType.ThreadLocal) {
+ tables = new ThreadLocalSharedMapDecorator(tables);
}
- this.tables = new PoolMap(this.poolType,
- this.maxSize);
+
+ return tables;
}
/**
@@ -196,9 +207,10 @@ public class HTablePool implements Closeable {
* if there is a problem instantiating the HTable
*/
private HTableInterface findOrCreateTable(String tableName) {
- HTableInterface table = tables.get(tableName);
+ HTableInterface table = tableMap.borrowObject(tableName);
if (table == null) {
table = createHTable(tableName);
+ tableMap.registerObject(tableName, table);
}
return table;
}
@@ -258,13 +270,11 @@ public class HTablePool implements Closeable {
private void returnTable(HTableInterface table) throws IOException {
// this is the old putTable method renamed and made private
String tableName = Bytes.toString(table.getTableName());
- if (tables.size(tableName) >= maxSize) {
+
+ if (! tableMap.returnObject(tableName, table)) {
// release table instance since we're not reusing it
- this.tables.remove(tableName, table);
- this.tableFactory.releaseHTableInterface(table);
- return;
+ releaseTableQuietly(table);
}
- tables.put(tableName, table);
}
protected HTableInterface createHTable(String tableName) {
@@ -283,13 +293,7 @@ public class HTablePool implements Closeable {
* @param tableName
*/
public void closeTablePool(final String tableName) throws IOException {
- Collection tables = this.tables.values(tableName);
- if (tables != null) {
- for (HTableInterface table : tables) {
- this.tableFactory.releaseHTableInterface(table);
- }
- }
- this.tables.remove(tableName);
+ releaseTablesQuietly(tableMap.clear(tableName));
}
/**
@@ -308,14 +312,23 @@ public class HTablePool implements Closeable {
* Note: this is a 'shutdown' of all the table pools.
*/
public void close() throws IOException {
- for (String tableName : tables.keySet()) {
- closeTablePool(tableName);
+ releaseTablesQuietly(tableMap.clear());
+ }
+
+ private void releaseTableQuietly(HTableInterface table) {
+ try {
+ tableFactory.releaseHTableInterface(table);
+ } catch (Exception e) {
+ if (log.isErrorEnabled()) {
+ log.error("Failed to release a table: " + Bytes.toString(table.getTableName()), e);
+ }
}
- this.tables.clear();
}
- int getCurrentPoolSize(String tableName) {
- return tables.size(tableName);
+ private void releaseTablesQuietly(Collection extends HTableInterface> tables) {
+ for (HTableInterface table : tables) {
+ releaseTableQuietly(table);
+ }
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
index 33786b7..a50f7f0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
@@ -35,14 +35,21 @@ import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.net.SocketFactory;
@@ -54,13 +61,13 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestHeader;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseBody;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader.Status;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestHeader;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
import org.apache.hadoop.hbase.protobuf.generated.Tracing.RPCTInfo;
import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
@@ -72,8 +79,11 @@ import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.PoolMap;
import org.apache.hadoop.hbase.util.PoolMap.PoolType;
+import org.apache.hadoop.hbase.util.RoundRobinSharedMap;
+import org.apache.hadoop.hbase.util.SegmentedSharedMap;
+import org.apache.hadoop.hbase.util.SegmentedSharedMap.SegmentFactory;
+import org.apache.hadoop.hbase.util.SharedMap;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RemoteException;
@@ -105,9 +115,10 @@ public class HBaseClient {
public static final Log LOG = LogFactory
.getLog("org.apache.hadoop.ipc.HBaseClient");
- protected final PoolMap connections;
+ protected final SegmentedSharedMap connectionMap;
+ private final ThreadGroup connectionThreadGroup;
- protected int counter; // counter for call ids
+ protected final AtomicInteger counter = new AtomicInteger(); // counter for call ids
protected final AtomicBoolean running = new AtomicBoolean(true); // if client runs
final protected Configuration conf;
final protected int maxIdleTime; // connections will be culled if it was idle for
@@ -254,54 +265,107 @@ public class HBaseClient {
return refCount==0;
}
- /** A call waiting for a value. */
- protected class Call {
- final int id; // call id
- final RpcRequestBody param; // rpc request object
- Message value; // value, null if error
- IOException error; // exception, null if value
- boolean done; // true when call is done
- long startTime;
-
- protected Call(RpcRequestBody param) {
- this.param = param;
- this.startTime = System.currentTimeMillis();
- synchronized (HBaseClient.this) {
- this.id = counter++;
- }
- }
+ /**
+ * A call waiting for a value.
+ * It is expected that either {@link #setException(IOException)} or {@link #setValue(Message)}
+ * will be called only once.
+ */
+ private interface Call {
+ RpcRequestBody getParam();
- /** Indicate when the call is complete and the
- * value or error are available. Notifies by default. */
- protected synchronized void callComplete() {
- this.done = true;
- notify(); // notify caller
- }
+ long getStartTime();
/** Set the exception when there is an error.
* Notify the caller the call is done.
*
* @param error exception thrown by the call; either local or remote
*/
- public synchronized void setException(IOException error) {
- this.error = error;
- callComplete();
- }
+ void setException(IOException error);
/** Set the return value when there is no error.
* Notify the caller the call is done.
*
* @param value return value of the call.
*/
- public synchronized void setValue(Message value) {
- this.value = value;
- callComplete();
+ void setValue(Message value);
+ }
+
+ private static abstract class AbstractCallDecorator implements Call {
+ final Call base;
+
+ AbstractCallDecorator(Call base) {
+ this.base = base;
+ }
+
+ @Override
+ public RpcRequestBody getParam() {
+ return base.getParam();
+ }
+
+ @Override
+ public long getStartTime() {
+ return base.getStartTime();
+ }
+
+ @Override
+ public void setException(IOException error) {
+ base.setException(error);
+ done();
+ }
+
+ @Override
+ public void setValue(Message value) {
+ base.setValue(value);
+ done();
+ }
+
+ void done() {}
+ }
+
+ private static class SingleCall implements Call {
+ final RpcRequestBody param; // rpc request object
+ final long startTime;
+
+ Message value; // value, null if error
+ IOException error; // exception, null if value
+ final CountDownLatch doneLatch = new CountDownLatch(1);
+
+ SingleCall(RpcRequestBody param, long startTime) {
+ this.param = param;
+ this.startTime = startTime;
+ }
+
+ @Override
+ public RpcRequestBody getParam() {
+ return param;
}
+ @Override
public long getStartTime() {
- return this.startTime;
+ return startTime;
+ }
+
+ @Override
+ public void setException(IOException error) {
+ this.error = error;
+ doneLatch.countDown();
+ }
+
+ @Override
+ public void setValue(Message value) {
+ this.value = value;
+ doneLatch.countDown();
+ }
+
+ Message getValue() throws IOException, InterruptedException {
+ doneLatch.await();
+ if (error != null) {
+ throw error;
+ }
+ return value;
}
}
+
protected static Map> tokenHandlers =
new HashMap>();
static {
@@ -317,10 +381,11 @@ public class HBaseClient {
return new Connection(remoteId);
}
- /** Thread that reads responses and notifies callers. Each connection owns a
+ /** Runnable object that reads responses and notifies callers. Each connection owns a
* socket connected to a remote address. Calls are multiplexed through this
* socket: responses may be delivered out of order. */
- protected class Connection extends Thread {
+ protected class Connection implements Runnable {
+ private final String name;
private ConnectionHeader header; // connection header
protected ConnectionId remoteId;
protected Socket socket = null; // connected socket
@@ -401,11 +466,10 @@ public class HBaseClient {
}
this.header = builder.build();
- this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
+ this.name = "IPC Client (" + socketFactory.hashCode() +") connection to " +
remoteId.getAddress().toString() +
- ((ticket==null)?" from an unknown user": (" from "
- + ticket.getUserName())));
- this.setDaemon(true);
+ ((ticket==null)?" from an unknown user": (" from "
+ + ticket.getUserName()));
}
private UserInformation getUserInfoPB(UserGroupInformation ugi) {
@@ -442,22 +506,19 @@ public class HBaseClient {
* It is up to the user code to check this status.
* @param call to add
*/
- protected synchronized void addCall(Call call) {
+ protected synchronized void addCall(int id, Call call) {
// If the connection is about to close, we manage this as if the call was already added
// to the connection calls list. If not, the connection creations are serialized, as
// mentioned in HBASE-6364
if (this.shouldCloseConnection.get()) {
if (this.closeException == null) {
call.setException(new IOException(
- "Call " + call.id + " not added as the connection " + remoteId + " is closing"));
+ "Call " + id + " not added as the connection " + remoteId + " is closing"));
} else {
call.setException(this.closeException);
}
- synchronized (call) {
- call.notifyAll();
- }
} else {
- calls.put(call.id, call);
+ calls.put(id, call);
notify();
}
}
@@ -647,8 +708,8 @@ public class HBaseClient {
@Override
public void run() {
if (LOG.isDebugEnabled())
- LOG.debug(getName() + ": starting, having connections "
- + connections.size());
+ LOG.debug(name + ": starting, having connections "
+ + connectionMap.size());
try {
while (waitForWork()) {//wait here for work - read or close connection
@@ -662,8 +723,8 @@ public class HBaseClient {
close();
if (LOG.isDebugEnabled())
- LOG.debug(getName() + ": stopped, remaining connections "
- + connections.size());
+ LOG.debug(name + ": stopped, remaining connections "
+ + connectionMap.size());
}
private synchronized void disposeSasl() {
@@ -719,57 +780,72 @@ public class HBaseClient {
final int currRetries,
final int maxRetries, final Exception ex, final Random rand,
final UserGroupInformation user)
- throws IOException, InterruptedException{
- user.doAs(new PrivilegedExceptionAction() {
- public Object run() throws IOException, InterruptedException {
- closeConnection();
- if (shouldAuthenticateOverKrb()) {
- if (currRetries < maxRetries) {
- LOG.debug("Exception encountered while connecting to " +
- "the server : " + ex);
- //try re-login
- if (UserGroupInformation.isLoginKeytabBased()) {
- UserGroupInformation.getLoginUser().reloginFromKeytab();
- } else {
- UserGroupInformation.getLoginUser().reloginFromTicketCache();
+ throws IOException {
+ try {
+ user.doAs(new PrivilegedExceptionAction() {
+ @Override
+ public Object run() throws IOException {
+ closeConnection();
+ if (shouldAuthenticateOverKrb()) {
+ try {
+ if (currRetries < maxRetries) {
+ LOG.debug("Exception encountered while connecting to " +
+ "the server : " + ex);
+ //try re-login
+ if (UserGroupInformation.isLoginKeytabBased()) {
+ UserGroupInformation.getLoginUser().reloginFromKeytab();
+ } else {
+ UserGroupInformation.getLoginUser().reloginFromTicketCache();
+ }
+ disposeSasl();
+ //have granularity of milliseconds
+ //we are sleeping with the Connection lock held but since this
+ //connection instance is being used for connecting to the server
+ //in question, it is okay
+ Thread.sleep((rand.nextInt(reloginMaxBackoff) + 1));
+ return null;
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
}
- disposeSasl();
- //have granularity of milliseconds
- //we are sleeping with the Connection lock held but since this
- //connection instance is being used for connecting to the server
- //in question, it is okay
- Thread.sleep((rand.nextInt(reloginMaxBackoff) + 1));
- return null;
- } else {
String msg = "Couldn't setup connection for " +
- UserGroupInformation.getLoginUser().getUserName() +
- " to " + serverPrincipal;
+ UserGroupInformation.getLoginUser().getUserName() +
+ " to " + serverPrincipal;
LOG.warn(msg);
throw (IOException) new IOException(msg).initCause(ex);
+
+ } else {
+ LOG.warn("Exception encountered while connecting to " +
+ "the server : " + ex);
}
- } else {
- LOG.warn("Exception encountered while connecting to " +
- "the server : " + ex);
- }
- if (ex instanceof RemoteException) {
- throw (RemoteException)ex;
- }
- if (ex instanceof SaslException) {
- String msg = "SASL authentication failed." +
- " The most likely cause is missing or invalid credentials." +
- " Consider 'kinit'.";
- LOG.fatal(msg, ex);
- throw new RuntimeException(msg, ex);
+ if (ex instanceof RemoteException) {
+ throw (RemoteException)ex;
+ }
+ if (ex instanceof SaslException) {
+ String msg = "SASL authentication failed." +
+ " The most likely cause is missing or invalid credentials." +
+ " Consider 'kinit'.";
+ LOG.fatal(msg, ex);
+ throw new RuntimeException(msg, ex);
+ }
+ throw new IOException(ex);
}
- throw new IOException(ex);
- }
- });
+ });
+ } catch (InterruptedException e) {
+ throw new AssertionError(e);
+ }
}
- protected synchronized void setupIOstreams()
- throws IOException, InterruptedException {
- if (socket != null || shouldCloseConnection.get()) {
- return;
+ /**
+ * @return true if ready; false if already closed
+ */
+ protected synchronized boolean setupIOstreams() throws IOException {
+ if (shouldCloseConnection.get()) {
+ return false;
+ }
+
+ if (socket != null) {
+ return true;
}
if (failedServers.isFailedServer(remoteId.getAddress())) {
@@ -842,8 +918,10 @@ public class HBaseClient {
touch();
// start the receiver thread after the socket connection has been set up
- start();
- return;
+ Thread thread = new Thread(connectionThreadGroup, this, name);
+ thread.setDaemon(true);
+ thread.start();
+ return true;
}
} catch (IOException e) {
failedServers.addToFailedServers(remoteId.address);
@@ -883,11 +961,7 @@ public class HBaseClient {
// release the resources
// first thing to do;take the connection out of the connection list
- synchronized (connections) {
- if (connections.get(remoteId) == this) {
- connections.remove(remoteId);
- }
- }
+ connectionMap.invalidateObject(remoteId, this);
// close the streams and therefore the socket
IOUtils.closeStream(out);
@@ -916,23 +990,34 @@ public class HBaseClient {
cleanupCalls();
}
if (LOG.isDebugEnabled())
- LOG.debug(getName() + ": closed");
+ LOG.debug(name + ": closed");
+ }
+
+ /**
+ * Prepares to handle a response for the given {@code call} and
+ * sends parameters of the {@code call}.
+ * The response is notified via methods of {@code call}.
+ */
+ void invokeCall(Call call) {
+ int id = counter.getAndIncrement();
+ addCall(id, call);
+ sendParam(id, call.getParam());
}
/* Initiates a call by sending the parameter to the remote server.
* Note: this is not called from the Connection thread, but by other
* threads.
*/
- protected void sendParam(Call call) {
+ protected void sendParam(int id, RpcRequestBody param) {
if (shouldCloseConnection.get()) {
return;
}
try {
if (LOG.isDebugEnabled())
- LOG.debug(getName() + " sending #" + call.id);
+ LOG.debug(name + " sending #" + id);
RpcRequestHeader.Builder headerBuilder = RPCProtos.RpcRequestHeader.newBuilder();
- headerBuilder.setCallId(call.id);
+ headerBuilder.setCallId(id);
if (Trace.isTracing()) {
Span s = Trace.currentTrace();
@@ -945,16 +1030,16 @@ public class HBaseClient {
synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC
RpcRequestHeader header = headerBuilder.build();
int serializedHeaderSize = header.getSerializedSize();
- int requestSerializedSize = call.param.getSerializedSize();
+ int requestSerializedSize = param.getSerializedSize();
this.out.writeInt(serializedHeaderSize +
CodedOutputStream.computeRawVarint32Size(serializedHeaderSize) +
requestSerializedSize +
CodedOutputStream.computeRawVarint32Size(requestSerializedSize));
header.writeDelimitedTo(this.out);
- call.param.writeDelimitedTo(this.out);
+ param.writeDelimitedTo(this.out);
this.out.flush();
}
- } catch(IOException e) {
+ } catch (IOException e) {
markClosed(e);
}
}
@@ -983,7 +1068,7 @@ public class HBaseClient {
int id = response.getCallId();
if (LOG.isDebugEnabled())
- LOG.debug(getName() + " got value #" + id);
+ LOG.debug(name + " got value #" + id);
Call call = calls.get(id);
Status status = response.getStatus();
@@ -992,7 +1077,7 @@ public class HBaseClient {
try {
rpcResponseType = ProtobufRpcEngine.Invoker.getReturnProtoType(
ProtobufRpcEngine.Server.getMethod(remoteId.getProtocol(),
- call.param.getMethodName()));
+ call.getParam().getMethodName()));
} catch (Exception e) {
throw new RuntimeException(e); //local exception
}
@@ -1054,7 +1139,8 @@ public class HBaseClient {
protected void cleanupCalls(long rpcTimeout) {
Iterator> itor = calls.entrySet().iterator();
while (itor.hasNext()) {
- Call c = itor.next().getValue();
+ Entry entry = itor.next();
+ Call c = entry.getValue();
long waitTime = System.currentTimeMillis() - c.getStartTime();
if (waitTime >= rpcTimeout) {
if (this.closeException == null) {
@@ -1064,13 +1150,10 @@ public class HBaseClient {
// over on the server; e.g. I just asked the regionserver to bulk
// open 3k regions or its a big fat multiput into a heavily-loaded
// server (Perhaps this only happens at the extremes?)
- this.closeException = new CallTimeoutException("Call id=" + c.id +
+ this.closeException = new CallTimeoutException("Call id=" + entry.getKey() +
", waitTime=" + waitTime + ", rpcTimetout=" + rpcTimeout);
}
c.setException(this.closeException);
- synchronized (c) {
- c.notifyAll();
- }
itor.remove();
} else {
break;
@@ -1105,44 +1188,46 @@ public class HBaseClient {
}
}
- /** Call implementation used for parallel calls. */
- protected class ParallelCall extends Call {
- private final ParallelResults results;
- protected final int index;
+ /** Result collector for parallel calls. */
+ private static class ParallelResults {
+ final Message[] values;
+ final CountDownLatch doneLatch;
- public ParallelCall(RpcRequestBody param, ParallelResults results, int index) {
- super(param);
- this.results = results;
- this.index = index;
+ ParallelResults(int size) {
+ values = new RpcResponseBody[size];
+ doneLatch = new CountDownLatch(size);
}
- /** Deliver result to result collector. */
- @Override
- protected void callComplete() {
- results.callComplete(this);
+ Message[] getValues() throws InterruptedException {
+ doneLatch.await();
+ return values;
}
- }
- /** Result collector for parallel calls. */
- protected static class ParallelResults {
- protected final Message[] values;
- protected int size;
- protected int count;
+ Call getCall(final int index, final RpcRequestBody param, final long startTime) {
+ return new Call() {
+ @Override
+ public RpcRequestBody getParam() {
+ return param;
+ }
- public ParallelResults(int size) {
- this.values = new RpcResponseBody[size];
- this.size = size;
- }
+ @Override
+ public long getStartTime() {
+ return startTime;
+ }
- /*
- * Collect a result.
- */
- synchronized void callComplete(ParallelCall call) {
- // FindBugs IS2_INCONSISTENT_SYNC
- values[call.index] = call.value; // store the value
- count++; // count it
- if (count == size) // if all values are in
- notify(); // then notify waiting caller
+ @Override
+ public void setException(IOException error) {
+ // log errors
+ LOG.info("Exception in parallel calls: " + error.getMessage(), error);
+ doneLatch.countDown();
+ }
+
+ @Override
+ public void setValue(Message value) {
+ values[index] = value;
+ doneLatch.countDown();
+ }
+ };
}
}
@@ -1166,9 +1251,21 @@ public class HBaseClient {
this.conf = conf;
this.socketFactory = factory;
this.clusterId = conf.get(HConstants.CLUSTER_ID, "default");
- this.connections = new PoolMap(
- getPoolType(conf), getPoolSize(conf));
+ this.connectionMap = newConnectionMap(conf);
this.failedServers = new FailedServers(conf);
+ this.connectionThreadGroup = new ThreadGroup(
+ "IPC Client (" + socketFactory.hashCode() +") connection");
+ }
+
+ private static SegmentedSharedMap newConnectionMap(Configuration conf) {
+ final int size = getPoolSize(conf);
+ return new SegmentedSharedMap(
+ new SegmentFactory() {
+ @Override
+ public SharedMap create() {
+ return new RoundRobinSharedMap(size);
+ }
+ });
}
/**
@@ -1194,7 +1291,10 @@ public class HBaseClient {
* @param config configuration
* @return either a {@link PoolType#RoundRobin} or
* {@link PoolType#ThreadLocal}
+ * @deprecated The thread-local logic is not appropriate
+ * and never applied to this class anymore.
*/
+ @Deprecated
protected static PoolType getPoolType(Configuration config) {
return PoolType.valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE),
PoolType.RoundRobin, PoolType.ThreadLocal);
@@ -1231,14 +1331,10 @@ public class HBaseClient {
}
// wake up all connections
- synchronized (connections) {
- for (Connection conn : connections.values()) {
- conn.interrupt();
- }
- }
+ connectionThreadGroup.interrupt();
// wait until all connections are closed
- while (!connections.isEmpty()) {
+ while (!connectionMap.isEmpty()) {
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {
@@ -1274,39 +1370,44 @@ public class HBaseClient {
Class extends VersionedProtocol> protocol,
User ticket, int rpcTimeout)
throws InterruptedException, IOException {
- Call call = new Call(param);
- Connection connection = getConnection(addr, protocol, ticket, rpcTimeout, call);
- connection.sendParam(call); // send the parameter
- boolean interrupted = false;
- //noinspection SynchronizationOnLocalVariableOrMethodParameter
- synchronized (call) {
- while (!call.done) {
- try {
- call.wait(); // wait for the result
- } catch (InterruptedException ignored) {
- // save the fact that we were interrupted
- interrupted = true;
- }
- }
- if (interrupted) {
- // set the interrupt flag now that we are done waiting
- Thread.currentThread().interrupt();
- }
+ if (Thread.interrupted()) {
+ throw new InterruptedException();
+ }
- if (call.error != null) {
- if (call.error instanceof RemoteException) {
- call.error.fillInStackTrace();
- throw call.error;
- }
- // local exception
- throw wrapException(addr, call.error);
+ SingleCall call = new SingleCall(param, System.currentTimeMillis());
+ ConnectionId remoteId = getConnectionId(addr, protocol, ticket, rpcTimeout);
+ invokeCall(remoteId, call);
+ try {
+ return call.getValue();
+ } catch (IOException e) {
+ if (e instanceof RemoteException) {
+ e.fillInStackTrace();
+ throw e;
}
- return call.value;
+ // local exception
+ throw wrapException(addr, e);
}
}
/**
+ * Prepares to handle a response for the given {@code call} and
+ * sends parameters of the {@code call},
+ * for an appropriate connection specified by the given {@code remoteId}.
+ * The response is notified via methods of {@code call}.
+ */
+ private void invokeCall(ConnectionId remoteId, Call call) throws IOException {
+ final Connection connection = getConnection(remoteId);
+
+ connection.invokeCall(new AbstractCallDecorator(call) {
+ @Override
+ void done() {
+ returnConnection(connection);
+ }
+ });
+ }
+
+ /**
* Take an IOException and the address we were trying to connect to
* and return an IOException with the input exception as the cause.
* The new exception provides the stack trace of the place where
@@ -1364,81 +1465,124 @@ public class HBaseClient {
throws IOException, InterruptedException {
if (addresses.length == 0) return new RpcResponseBody[0];
+ if (Thread.interrupted()) {
+ throw new InterruptedException();
+ }
+
ParallelResults results = new ParallelResults(params.length);
- // TODO this synchronization block doesnt make any sense, we should possibly fix it
- //noinspection SynchronizationOnLocalVariableOrMethodParameter
- synchronized (results) {
- for (int i = 0; i < params.length; i++) {
- ParallelCall call = new ParallelCall(params[i], results, i);
- try {
- Connection connection =
- getConnection(addresses[i], protocol, ticket, 0, call);
- connection.sendParam(call); // send each parameter
- } catch (IOException e) {
- // log errors
- LOG.info("Calling "+addresses[i]+" caught: " +
- e.getMessage(),e);
- results.size--; // wait for one fewer result
- }
- }
- while (results.count != results.size) {
- try {
- results.wait(); // wait for all results
- } catch (InterruptedException ignored) {}
+ long currentTime = System.currentTimeMillis();
+
+ Map> callMap =
+ new LinkedHashMap>();
+ for (int i = 0; i < params.length; i++) {
+ Call call = results.getCall(i, params[i], currentTime);
+
+ List calls = callMap.get(addresses[i]);
+ if (calls == null) {
+ calls = new ArrayList();
+ callMap.put(addresses[i], calls);
}
+ calls.add(call);
+ }
- return results.values;
+ for (Entry> entry : callMap.entrySet()) {
+ ConnectionId remoteId = getConnectionId(entry.getKey(), protocol, ticket, 0);
+ invokeCalls(remoteId, entry.getValue());
+ }
+
+ return results.getValues();
+ }
+
+ /**
+ * Prepares to handle a response and sends parameters for each of the given {@code calls},
+ * for an appropriate connection specified by the given {@code remoteId}.
+ * Each response is notified via methods of each of {@code calls}.
+ * This method has better performance than
+ * calling repeatedly {@link #invokeCall(ConnectionId, Call)}.
+ */
+ private void invokeCalls(final ConnectionId remoteId, Collection extends Call> calls)
+ throws IOException {
+ final Connection connection = getConnection(remoteId);
+
+ final AtomicInteger countDown = new AtomicInteger(calls.size());
+
+ for (Call call : calls) {
+ connection.invokeCall(new AbstractCallDecorator(call) {
+ @Override
+ void done() {
+ if (countDown.decrementAndGet() == 0) {
+ returnConnection(connection);
+ }
+ }
+ });
}
}
/* Get a connection from the pool, or create a new one and add it to the
* pool. Connections to a given host/port are reused. */
- protected Connection getConnection(InetSocketAddress addr,
- Class extends VersionedProtocol> protocol,
- User ticket,
- int rpcTimeout,
- Call call)
- throws IOException, InterruptedException {
+ private Connection getConnection(ConnectionId remoteId) throws IOException {
if (!running.get()) {
// the client is stopped
throw new IOException("The client is stopped");
}
- Connection connection;
- /* we could avoid this allocation for each RPC by having a
- * connectionsId object and with set() method. We need to manage the
- * refs for keys in HashMap properly. For now its ok.
- */
- ConnectionId remoteId = new ConnectionId(addr, protocol, ticket, rpcTimeout);
- synchronized (connections) {
- connection = connections.get(remoteId);
- if (connection == null) {
- connection = createConnection(remoteId);
- connections.put(remoteId, connection);
+
+ final SharedMap segment = this.connectionMap.segmentFor(remoteId);
+
+ while (true) {
+ Connection connection;
+ synchronized (segment) {
+ // This synchronization guards against creating excessive instances of Connection.
+ // Note that the methods of the segment are guarded by the segment itself,
+ // according to its actual implementation class.
+ // This synchronization blocks threads which use the same segment,
+ // and you should not execute long processing for specified remoteId
+ // in the synchronization.
+
+ connection = segment.borrowObject(remoteId);
+ if (connection == null) {
+ connection = createConnection(remoteId);
+ segment.registerObject(remoteId, connection);
+ }
+ }
+
+ if (connection.setupIOstreams()) {
+ // Connection.setupIOstreams() establishes an actual network connection,
+ // which is relatively long processing for the specified remoteId.
+ // We execute the processing here separately from creating the instance of Connection,
+ // because the creation is executed under the synchronization of the segment,
+ // which blocks threads which use the same segment.
+
+ return connection;
}
+
+ segment.invalidateObject(remoteId, connection);
}
- connection.addCall(call);
-
- //we don't invoke the method below inside "synchronized (connections)"
- //block above. The reason for that is if the server happens to be slow,
- //it will take longer to establish a connection and that will slow the
- //entire system down.
- //Moreover, if the connection is currently created, there will be many threads
- // waiting here; as setupIOstreams is synchronized. If the connection fails with a
- // timeout, they will all fail simultaneously. This is checked in setupIOstreams.
- connection.setupIOstreams();
- return connection;
+ }
+
+ private void returnConnection(Connection connection) {
+ connectionMap.returnObject(connection.remoteId, connection);
+ }
+
+ /* we could avoid this allocation for each RPC by having a
+ * connectionsId object and with set() method. We need to manage the
+ * refs for keys in HashMap properly. For now its ok.
+ */
+ private static ConnectionId getConnectionId(InetSocketAddress address,
+ Class extends VersionedProtocol> protocol,
+ User ticket,
+ int rpcTimeout) {
+ return new ConnectionId(address, protocol, ticket, rpcTimeout);
}
/**
* This class holds the address and the user ticket. The client connections
* to servers are uniquely identified by
*/
- protected static class ConnectionId {
+ private final static class ConnectionId {
final InetSocketAddress address;
final User ticket;
final int rpcTimeout;
- Class extends VersionedProtocol> protocol;
- private static final int PRIME = 16777619;
+ final Class extends VersionedProtocol> protocol;
ConnectionId(InetSocketAddress address,
Class extends VersionedProtocol> protocol,
@@ -1464,20 +1608,30 @@ public class HBaseClient {
@Override
public boolean equals(Object obj) {
- if (obj instanceof ConnectionId) {
- ConnectionId id = (ConnectionId) obj;
- return address.equals(id.address) && protocol == id.protocol &&
- ((ticket != null && ticket.equals(id.ticket)) ||
- (ticket == id.ticket)) && rpcTimeout == id.rpcTimeout;
- }
- return false;
+ if(obj == this) { return true; }
+ if(! (obj instanceof ConnectionId)) { return false; }
+ ConnectionId other = (ConnectionId) obj;
+
+ return address.equals(other.address) &&
+ protocol == other.protocol &&
+ (ticket == other.ticket || ticket != null && ticket.equals(other.ticket)) &&
+ rpcTimeout == other.rpcTimeout;
}
- @Override // simply use the default Object#hashcode() ?
+ int hashCode;
+
+ @Override
public int hashCode() {
- return (address.hashCode() + PRIME * (
- PRIME * System.identityHashCode(protocol) ^
- (ticket == null ? 0 : ticket.hashCode()) )) ^ rpcTimeout;
+ int hashCode = this.hashCode;
+ if(hashCode == 0) {
+ this.hashCode = hashCode = Arrays.hashCode(new int[] {
+ address.hashCode(),
+ System.identityHashCode(protocol),
+ (ticket == null ? 0 : ticket.hashCode()),
+ rpcTimeout
+ });
+ }
+ return hashCode;
}
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractSharedMap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractSharedMap.java
new file mode 100644
index 0000000..b9e1648
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractSharedMap.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Skeleton implementation of {@code SharedMap}.
+ *
+ * All of the public methods are synchronized with this, except for argument checks.
+ * Objects to be shared are identified with their equality, instead of sameness.
+ *
+ * @param the type of keys that you associate objects to be shared with
+ * @param the type of objects to be shared
+ */
+abstract class AbstractSharedMap implements SharedMap {
+ /** Guarded by this */
+ private final Map> poolMap = new HashMap>();
+
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ */
+ @Override
+ public V borrowObject(K key) {
+ if (key == null) { throw new NullPointerException(); }
+
+ synchronized (this) {
+ Pool pool = poolMap.get(key);
+ if (pool == null) {
+ return null;
+ }
+ return pool.borrowObject();
+ }
+ }
+
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ */
+ @Override
+ public boolean registerObject(K key, V value) {
+ if (key == null) { throw new NullPointerException("key"); }
+ if (value == null) { throw new NullPointerException("value"); }
+
+ synchronized (this) {
+ Pool pool = poolMap.get(key);
+ if (pool == null) {
+ pool = newPool();
+ poolMap.put(key, pool);
+ }
+ return pool.registerObject(value);
+ }
+ }
+
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ */
+ @Override
+ public boolean returnObject(K key, V value) {
+ if (key == null) { throw new NullPointerException("key"); }
+ if (value == null) { throw new NullPointerException("value"); }
+
+ synchronized (this) {
+ Pool pool = poolMap.get(key);
+ if (pool == null) {
+ return false;
+ }
+ return pool.returnObject(value);
+ }
+ }
+
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ */
+ @Override
+ public void invalidateObject(K key, V value) {
+ if (key == null) { throw new NullPointerException("key"); }
+ if (value == null) { throw new NullPointerException("value"); }
+
+ synchronized (this) {
+ Pool pool = poolMap.get(key);
+ if (pool == null) {
+ return;
+ }
+
+ pool.invalidateObject(value);
+
+ if (pool.isEmpty()) {
+ poolMap.remove(pool);
+ }
+ }
+ }
+
+ @Override
+ public synchronized Collection clear() {
+ Collection idleObjects = new ArrayList();
+
+ for (Pool pool : poolMap.values()) {
+ idleObjects.addAll(pool.clear());
+ }
+
+ poolMap.clear();
+
+ return idleObjects;
+ }
+
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ */
+ @Override
+ public Collection clear(Object key) {
+ if (key == null) { throw new NullPointerException(); }
+
+ synchronized (this) {
+ Pool pool = poolMap.get(key);
+ return pool.clear();
+ }
+ }
+
+ @Override
+ public synchronized int size() {
+ int size = 0;
+ for (Pool pool : poolMap.values()) {
+ size += pool.size();
+ }
+ return size;
+ }
+
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ */
+ @Override
+ public int size(K key) {
+ if (key == null) { throw new NullPointerException(); }
+
+ synchronized (this) {
+ Pool pool = poolMap.get(key);
+ return pool == null ? 0 : pool.size();
+ }
+ }
+
+ @Override
+ public synchronized boolean isEmpty() {
+ for (Pool pool : poolMap.values()) {
+ if (! pool.isEmpty()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ abstract Pool newPool();
+
+ interface Pool {
+ /**
+ * Returns one of the shared objects,
+ * or returns null if there is no appropriate object to borrow.
+ */
+ V borrowObject();
+
+ /**
+ * Registers the given {@code value} to this instance.
+ * At this point the registered object is regarded as borrowed.
+ *
+ * This method does nothing and returns false if and only if
+ * the given {@code value} is already registered.
+ * In practice the {@code value} is expected to be a newly created object,
+ * and this method is expected to return true.
+ */
+ boolean registerObject(V value);
+
+ /**
+ * Makes the given {@code value} to be returned into this instance.
+ * It is expected that the {@code value} has been borrowed from this instance.
+ *
+ * If this method returns false,
+ * then the the registration of the {@code value} is invalidated.
+ * This happens because the pool is full when this method is called,
+ * or you have explicitly called {@link #invalidateObject()} or {@link #clear()}.
+ */
+ boolean returnObject(V value);
+
+ /**
+ * Invalidates the registration of the given {@code value}.
+ * You may call this method regardless of whether the {@code value} is borrowed or not.
+ */
+ void invalidateObject(V value);
+
+ /**
+ * Invalidates all of the registrations to this instance,
+ * and returns the cleared idle objects in the pool.
+ */
+ Collection clear();
+
+ /**
+ * Returns the count of the registered objects.
+ */
+ int size();
+
+ /**
+ * Returns true if there is no registration.
+ */
+ boolean isEmpty();
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java
index 364be66..54445a3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java
@@ -61,10 +61,18 @@ public class PoolMap implements Map {
private Map> pools = new ConcurrentHashMap>();
+ /**
+ * @deprecated Use {@link SharedMap} and its implementation classes.
+ */
+ @Deprecated
public PoolMap(PoolType poolType) {
this.poolType = poolType;
}
+ /**
+ * @deprecated Use {@link SharedMap} and its implementation classes.
+ */
+ @Deprecated
public PoolMap(PoolType poolType, int poolMaxSize) {
this.poolType = poolType;
this.poolMaxSize = poolMaxSize;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ReusableSharedMap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ReusableSharedMap.java
new file mode 100644
index 0000000..443511f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ReusableSharedMap.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Queue;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Implementation of {@code SharedMap},
+ * ensuring that it lends a shared object to a single caller at the same time.
+ *
+ * All of the public methods are synchronized with this, except for argument checks.
+ * Objects to be shared are identified with their equality, instead of sameness.
+ *
+ * @param the type of keys that you associate objects to be shared with
+ * @param the type of objects to be shared
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ReusableSharedMap extends AbstractSharedMap {
+ private final int maxObjectCountPerKey;
+
+ /**
+ * Constructs with the given {@code maxObjectCountPerKey},
+ * which is a max count of idle objects to hold for each key.
+ */
+ public ReusableSharedMap(int maxObjectCountPerKey) {
+ this.maxObjectCountPerKey = maxObjectCountPerKey;
+ }
+
+ @Override
+ Pool newPool() {
+ return new ReusablePool();
+ }
+
+ private class ReusablePool implements Pool {
+ // The boolean values represent busy (borrowing) or not.
+ // This is for performance instead of sequential search in idleQueue.
+ final Map registeredObjects = new HashMap();
+ final Queue idleQueue = new ArrayDeque();
+
+ @Override
+ public V borrowObject() {
+ V value = idleQueue.poll();
+ if (value == null) {
+ return null;
+ }
+
+ registeredObjects.put(value, Boolean.TRUE);
+ return value;
+ }
+
+ @Override
+ public boolean registerObject(V value) {
+ if (registeredObjects.containsKey(value)) {
+ return false;
+ }
+ registeredObjects.put(value, Boolean.TRUE);
+ return true;
+ }
+
+ @Override
+ public boolean returnObject(V value) {
+ Boolean busy = registeredObjects.get(value);
+ if (busy == null) {
+ return false;
+ }
+
+ if (! busy) {
+ return true;
+ }
+
+ if (idleQueue.size() >= maxObjectCountPerKey) {
+ registeredObjects.remove(value);
+ return false;
+ }
+
+ registeredObjects.put(value, Boolean.FALSE);
+ idleQueue.offer(value);
+ return true;
+ }
+
+ @Override
+ public void invalidateObject(V value) {
+ registeredObjects.remove(value);
+ idleQueue.remove(value);
+ }
+
+ @Override
+ public Collection clear() {
+ Collection idleObjects = new ArrayList(idleQueue);
+
+ registeredObjects.clear();
+ idleQueue.clear();
+
+ return idleObjects;
+ }
+
+ @Override
+ public int size() {
+ return registeredObjects.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return registeredObjects.isEmpty();
+ }
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RoundRobinSharedMap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RoundRobinSharedMap.java
new file mode 100644
index 0000000..649e312
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RoundRobinSharedMap.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Queue;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Implementation of {@code SharedMap} based on the round robin logic,
+ * except that it lends an idle object whenever there is.
+ * Objects to be shared must be thread safe,
+ * because the round robin logic will lend the same shared object
+ * to different callers at the same time.
+ *
+ * The round robin logic restricts the count of shared objects
+ * and prevents unlimitedly consuming resources.
+ * For each key it starts to re-lend shared objects cyclicly
+ * when the count of borrowed objects exceeds the user specified threshold.
+ *
+ * All of the public methods are synchronized with this, except for argument checks.
+ * Objects to be shared are identified with their equality, instead of sameness.
+ *
+ * @param the type of keys that you associate objects to be shared with
+ * @param the type of objects to be shared
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class RoundRobinSharedMap extends AbstractSharedMap {
+ /** the max count of shared objects for each key, which count is positive */
+ private final int maxObjectCountPerKey;
+
+ /**
+ * Constructs with the given {@code maxObjectCountPerKey},
+ * which is a threshold count of borrowed objects for each key
+ * before starting to lend the same shared object to different callers,
+ * and which is also a max count of idle objects to hold for each key
+ * though the count of registered objects may exceed the max count.
+ *
+ * The given {@code maxObjectCountPerKey} is expected to be positive,
+ * otherwise the max count is set to be 1 instead of the given value.
+ */
+ public RoundRobinSharedMap(int maxObjectCountPerKey) {
+ this.maxObjectCountPerKey = Math.max(1, maxObjectCountPerKey);
+ }
+
+ @Override
+ Pool newPool() {
+ return new RoundRobinPool();
+ }
+
+ private class RoundRobinPool implements Pool {
+ // The integer values represent borrowing counts.
+ final Map registeredObjects = new HashMap();
+ final Queue idleQueue = new ArrayDeque();
+ final Queue busyQueue = new ArrayDeque();
+
+ @Override
+ public V borrowObject() {
+ V value = idleQueue.poll();
+ if (value != null) {
+ registeredObjects.put(value, 1);
+ busyQueue.offer(value);
+ return value;
+ }
+
+ if (busyQueue.size() < maxObjectCountPerKey) {
+ return null;
+ }
+
+ // Now we are ready to pick up and lend a busy object in busyQueue,
+ // which is not empty because busyQueue.size() >= maxObjectCountPerKey > 0.
+
+ value = busyQueue.poll();
+ assert value != null;
+ registeredObjects.put(value, registeredObjects.get(value) + 1);
+ busyQueue.offer(value);
+ return value;
+ }
+
+ @Override
+ public boolean registerObject(V value) {
+ if (registeredObjects.containsKey(value)) {
+ return false;
+ }
+ registeredObjects.put(value, 1);
+ busyQueue.offer(value);
+ return true;
+ }
+
+ @Override
+ public boolean returnObject(V value) {
+ Integer busyCountWrapper = registeredObjects.get(value);
+ if (busyCountWrapper == null) {
+ return false;
+ }
+
+ int busyCount = busyCountWrapper;
+ if (busyCount == 0) {
+ return true;
+ }
+
+ int nextBusyCount = busyCount - 1;
+ if (nextBusyCount != 0) {
+ registeredObjects.put(value, nextBusyCount);
+ return true;
+ }
+
+ busyQueue.remove(value);
+
+ if (idleQueue.size() >= maxObjectCountPerKey) {
+ registeredObjects.remove(value);
+ return false;
+
+ } else {
+ registeredObjects.put(value, 0);
+ idleQueue.offer(value);
+ return true;
+ }
+ }
+
+ @Override
+ public void invalidateObject(V value) {
+ registeredObjects.remove(value);
+ idleQueue.remove(value);
+ busyQueue.remove(value);
+ }
+
+ @Override
+ public Collection clear() {
+ Collection idleObjects = new ArrayList(idleQueue);
+
+ registeredObjects.clear();
+ idleQueue.clear();
+ busyQueue.clear();
+
+ return idleObjects;
+ }
+
+ @Override
+ public int size() {
+ return registeredObjects.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return registeredObjects.isEmpty();
+ }
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/SegmentedSharedMap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/SegmentedSharedMap.java
new file mode 100644
index 0000000..145c833
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/SegmentedSharedMap.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Implementation of {@code SharedMap},
+ * which delegates operations to segmented maps according to keys,
+ * in order to reduce contention between threads.
+ *
+ * Thread safe.
+ * Objects to be shared are identified with their equality, instead of sameness.
+ *
+ * @param the type of keys that you associate objects to be shared with
+ * @param the type of objects to be shared
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class SegmentedSharedMap implements SharedMap {
+ /**
+ * Factory to create a map which will be used as one of segmented maps.
+ *
+ * @param the type of keys that you associate objects to be shared with
+ * @param the type of objects to be shared
+ */
+ public interface SegmentFactory {
+ SharedMap create();
+ }
+
+ private static final int MAX_SEGMENTS = 1 << 16;
+
+ private final SharedMap[] segments;
+ private final int segmentMask;
+ private final int segmentShift;
+
+ /**
+ * Constructs with the default concurrency level (16).
+ *
+ * @throws NullPointerException if {@code segmentFactory} is null
+ */
+ public SegmentedSharedMap(SegmentFactory segmentFactory) {
+ this(segmentFactory, 16);
+ }
+
+ /**
+ * Constructs with the given {@code concurrencyLevel},
+ * which is the estimated number of concurrently accessing threads.
+ *
+ * @throws NullPointerException if {@code segmentFactory} is null
+ */
+ public SegmentedSharedMap(SegmentFactory segmentFactory, int concurrencyLevel) {
+ if(segmentFactory == null) {
+ throw new NullPointerException();
+ }
+
+ int requestedSegmentSize = Math.min(concurrencyLevel, MAX_SEGMENTS);
+
+ int segmentShift = 32;
+ int segmentSize = 1;
+ while (segmentSize < requestedSegmentSize) {
+ segmentShift--;
+ segmentSize <<= 1;
+ }
+ this.segmentShift = segmentShift;
+ this.segmentMask = segmentSize - 1;
+
+ @SuppressWarnings("unchecked")
+ SharedMap[] segments = new SharedMap[segmentSize];
+ for(int i = 0; i < segmentSize; i++) {
+ segments[i] = segmentFactory.create();
+ }
+ this.segments = segments;
+ }
+
+ /**
+ * @see ConcurrentHashMap#hash(int)
+ */
+ private static int hash(int h) {
+ h += (h << 15) ^ 0xffffcd7d;
+ h ^= (h >>> 10);
+ h += (h << 3);
+ h ^= (h >>> 6);
+ h += (h << 2) + (h << 14);
+ return h ^ (h >>> 16);
+ }
+
+ /**
+ * Returns the corresponding internal segment for the given {@code key}.
+ *
+ * @throws NullPointerException if {@code key} is null
+ */
+ public SharedMap segmentFor(K key) {
+ int hash = hash(key.hashCode());
+ return segments[(hash >>> segmentShift) & segmentMask];
+ }
+
+ /**
+ * Returns the count of the segments used internally.
+ */
+ int countOfSegments() {
+ return segments.length;
+ }
+
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ */
+ @Override
+ public V borrowObject(K key) {
+ return segmentFor(key).borrowObject(key);
+ }
+
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ */
+ @Override
+ public boolean registerObject(K key, V value) {
+ return segmentFor(key).registerObject(key, value);
+ }
+
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ */
+ @Override
+ public boolean returnObject(K key, V value) {
+ return segmentFor(key).returnObject(key, value);
+ }
+
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ */
+ @Override
+ public void invalidateObject(K key, V value) {
+ segmentFor(key).invalidateObject(key, value);
+ }
+
+ @Override
+ public Collection clear() {
+ Collection result = new ArrayList();
+ for(SharedMap segment : segments) {
+ result.addAll(segment.clear());
+ }
+ return result;
+ }
+
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ */
+ @Override
+ public Collection clear(K key) {
+ return segmentFor(key).clear(key);
+ }
+
+ @Override
+ public int size() {
+ int result = 0;
+ for(SharedMap segment : segments) {
+ result += segment.size();
+ }
+ return result;
+ }
+
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ */
+ @Override
+ public int size(K key) {
+ return segmentFor(key).size(key);
+ }
+
+ @Override
+ public boolean isEmpty() {
+ for(SharedMap segment : segments) {
+ if(! segment.isEmpty()) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/SharedMap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/SharedMap.java
new file mode 100644
index 0000000..aa876fa
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/SharedMap.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.util.Collection;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * This provides functions to share objects.
+ *
+ * In order to share objects, first of all call {@link #borrowObject()}.
+ * If the method returns a non-null object, use it.
+ * If the method returns null, create a new object and call {@link #registerObject()},
+ * and use it. Just after the registration the object is regarded as borrowed.
+ *
+ * After using the borrowed object,
+ * call {@link #returnObject()} to return the object to the pool,
+ * or call {@link #invalidateObject()} to invalidate the object's registration
+ * instead of returning the object.
+ * If the pool is already full, the method {@code returnObject()} returns false
+ * with invalidating the registration of the object.
+ * Also the method returns false if the object's registration is already invalidated
+ * because {@link #invalidateObject()} or {@link #clear()} has been called.
+ * Anyway if the method {@code returnObject()} returns false
+ * the object is rejected to return the pool and is already invalidated,
+ * and you might tear down the rejected object.
+ * You can safely tear down the rejected object even in multi-thread contexts,
+ * because the object is never passed to the other threads via the pool.
+ *
+ * Incidentally, you can call {@link #invalidateObject} and {@link #clear()}
+ * in order to invalidate the registrations at any time, by any thread,
+ * regardless of whether the objects are borrowed or not.
+ * The method {@code clear()} returns a collection of the idle objects,
+ * which have been registered but not borrowed and idle in the pool,
+ * and you can safely tear down them in the sense that
+ * you get them exclusively in multi-thread contexts.
+ * The rest of the objects that have been registered are borrowed and in use
+ * by some other threads at the moment, and you can't tear down them right now.
+ * But the borrower eventually calls {@code returnObject()} (which returns false)
+ * or {@code invalidateObject()} after using the borrowed object, and in either method
+ * the borrower know that the registration of the object is already invalidated
+ * and the object is ready to tear down.
+ *
+ * The implementations of this interface are expected to be thread safe,
+ * but it depends on the implementations whether it borrows the same shared object
+ * to different callers at the same time.
+ *
+ * Objects to be shared are identified with their equality, instead of sameness.
+ * Concretely, when we have two objects {@code o1} and {@code o2} to be shared,
+ * we won't distinguish between {@code o1} the {@code o2}
+ * if {@code o1.equals(o2)} returns true rather than if {@code o1 == o2} is true.
+ * That enables us to use ordinary collections for the objects
+ * and simplify the implementation logic of this interface.
+ * Also that is enough for practical uses.
+ *
+ * @param the type of keys that you associate objects to be shared with
+ * @param the type of objects to be shared
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface SharedMap {
+ /**
+ * Returns one of the shared objects associated with the given {@code key},
+ * or returns null if there is no appropriate object to borrow.
+ * In the latter case, it is expected that a new instance is created
+ * and registered to this instance.
+ *
+ * @throws NullPointerException if {@code key} is null
+ */
+ V borrowObject(K key);
+
+ /**
+ * Registers the given {@code value} to this instance,
+ * associating it with the given {@code key}.
+ * At this point the registered object is regarded as borrowed
+ * and you can continue to use the object,
+ * and after finishing using it you have to return or invalidate
+ * the registered object into this instance.
+ *
+ * This method does nothing and returns false if and only if
+ * the given {@code value} is already registered
+ * associated with the given {@code key}.
+ * In practice the {@code value} is expected to be a newly created object,
+ * and this method is expected to return true.
+ *
+ * @throws NullPointerException if {@code key} or {@code value} is null
+ */
+ boolean registerObject(K key, V value);
+
+ /**
+ * Makes the given {@code value} associated with the given {@code key}
+ * to be returned into this instance.
+ * It is expected that the {@code value} has been borrowed from this instance.
+ *
+ * If this method returns false, then the registration of the {@code value} is invalidated.
+ * This happens because the pool is full when this method is called,
+ * or you have explicitly called {@link #invalidateObject()} or {@link #clear()}.
+ * You might have to tear down the rejected object.
+ *
+ * @throws NullPointerException if {@code key} or {@code value} is null
+ */
+ boolean returnObject(K key, V value);
+
+ /**
+ * Invalidates the registration of the given {@code value} associated with the {@code key}.
+ * You may call this method regardless of whether the {@code value} is borrowed or not.
+ * You might have to tear down the removed object.
+ *
+ * @throws NullPointerException if {@code key} or {@code value} is null
+ */
+ void invalidateObject(K key, V value);
+
+ /**
+ * Invalidates all of the registrations to this instance,
+ * and returns the objects that have been registered but idle in the pool.
+ * You might have to tear down the returned objects.
+ *
+ * The rest of the objects that have been registered are still in use at this moment,
+ * and they are not contained in the collection returned by this method.
+ * In fact, you should not tear down them until the borrowers complete the usage.
+ * Instead, because the borrowers are supposed to call {@link #returnObject()}
+ * (which rejects invalidated objects and returns false)
+ * or {@link #invalidateObject()} after their usage,
+ * you can make the borrowers themselves tear down the invalidated objects
+ * just after calling these methods.
+ */
+ Collection clear();
+
+ /**
+ * Invalidates all of the registrations to this instance associated with the given {@code key},
+ * and returns the objects that have been registered but idle in the pool.
+ * You might have to tear down the returned objects.
+ *
+ * The rest of the objects that have been registered are still in use at this moment,
+ * and they are not contained in the collection returned by this method.
+ * In fact, you should not tear down them until the borrowers complete the usage.
+ * Instead, because the borrowers are supposed to call {@link #returnObject()}
+ * (which rejects invalidated objects and returns false)
+ * or {@link #invalidateObject()} after their usage,
+ * you can make the borrowers themselves tear down the invalidated objects
+ * just after calling these methods.
+ *
+ * @throws NullPointerException if {@code key} is null
+ */
+ Collection clear(K key);
+
+ /**
+ * Returns the count of the registered objects.
+ */
+ int size();
+
+ /**
+ * Returns the count of the registered objects associated with the given {@code key}.
+ *
+ * @throws NullPointerException if {@code key} is null
+ */
+ int size(K key);
+
+ /**
+ * Returns true if there is no registration.
+ */
+ boolean isEmpty();
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ThreadLocalSharedMapDecorator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ThreadLocalSharedMapDecorator.java
new file mode 100644
index 0000000..4ca9f5c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ThreadLocalSharedMapDecorator.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Decorator of {@code SharedMap}
+ * which gives the same shared object for each key at the same time for each thread.
+ * This lends the same object even after the other threads invalidate the object,
+ * until the borrowing thread completely returns the object or invalidates it.
+ *
+ * Note that it depends on the base instance of {@code SharedMap}
+ * whether the each shared object is accessed by at most one thread at the same time or not.
+ *
+ * Thread safe.
+ * Objects to be shared are identified with their equality, instead of sameness.
+ *
+ * @param the type of keys that you associate objects to be shared with
+ * @param the type of objects to be shared
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ThreadLocalSharedMapDecorator implements SharedMap {
+ private static class BorrowingCounter {
+ final V value;
+
+ int count = 1;
+
+ BorrowingCounter(V value) {
+ assert value != null;
+ this.value = value;
+ }
+ }
+
+ private final ThreadLocal>> borrowingCounterMapRef
+ = new ThreadLocal>>() {
+ @Override
+ protected Map> initialValue() {
+ return new HashMap>();
+ }
+ };
+
+ private final SharedMap base;
+
+ /**
+ * @throws NullPointerException if {@code base} is null
+ */
+ public ThreadLocalSharedMapDecorator(SharedMap base) {
+ if (base == null) { throw new NullPointerException(); }
+ this.base = base;
+ }
+
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ */
+ @Override
+ public V borrowObject(K key) {
+ if (key == null) { throw new NullPointerException(); }
+
+ Map> counterMap = borrowingCounterMapRef.get();
+
+ BorrowingCounter counter = counterMap.get(key);
+ if (counter == null) {
+ V value = base.borrowObject(key);
+ if (value != null) {
+ counterMap.put(key, new BorrowingCounter(value));
+ }
+ return value;
+ }
+
+ counter.count++;
+ return counter.value;
+ }
+
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ */
+ @Override
+ public boolean registerObject(K key, V value) {
+ if (! base.registerObject(key, value)) {
+ return false;
+ }
+
+ Map> counterMap = borrowingCounterMapRef.get();
+ BorrowingCounter counter = counterMap.get(key);
+ if (counter == null) {
+ counterMap.put(key, new BorrowingCounter(value));
+ }
+ return true;
+ }
+
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ */
+ @Override
+ public boolean returnObject(K key, V value) {
+ if (key == null) { throw new NullPointerException("key"); }
+ if (value == null) { throw new NullPointerException("value"); }
+
+ Map> counterMap = borrowingCounterMapRef.get();
+
+ BorrowingCounter counter = counterMap.get(key);
+ if (counter == null || ! counter.value.equals(value)) {
+ return base.returnObject(key, value);
+ }
+
+ if (--counter.count > 0) {
+ return true;
+ }
+
+ counterMap.remove(key);
+ return base.returnObject(key, value);
+ }
+
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ */
+ @Override
+ public void invalidateObject(K key, V value) {
+ base.invalidateObject(key, value);
+
+ Map> counterMap = borrowingCounterMapRef.get();
+ BorrowingCounter counter = counterMap.get(key);
+ if (counter == null || ! counter.value.equals(value)) {
+ return;
+ }
+ counterMap.remove(key);
+ }
+
+ @Override
+ public Collection clear() {
+ return base.clear();
+ }
+
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ */
+ @Override
+ public Collection clear(K key) {
+ return base.clear(key);
+ }
+
+ @Override
+ public int size() {
+ return base.size();
+ }
+
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ */
+ @Override
+ public int size(K key) {
+ return base.size(key);
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return base.isEmpty();
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
index 8e48cb5..15cdfcd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
@@ -38,12 +38,13 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.UUID;
import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -4461,125 +4462,76 @@ public class TestFromClientSide {
}
@Test
- public void testClientPoolRoundRobin() throws IOException {
- final byte[] tableName = Bytes.toBytes("testClientPoolRoundRobin");
+ public void testClientPool() throws Exception {
+ final byte[] tableName = Bytes.toBytes("testClientPool");
+ final int poolSize = 3;
+ final int concurrentLevel = poolSize * 2;
+ final int numVersions = 10;
- int poolSize = 3;
- int numVersions = poolSize * 2;
- Configuration conf = TEST_UTIL.getConfiguration();
+ final Configuration conf = TEST_UTIL.getConfiguration();
conf.set(HConstants.HBASE_CLIENT_IPC_POOL_TYPE, "round-robin");
conf.setInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, poolSize);
- HTable table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY },
- conf, Integer.MAX_VALUE);
- table.setAutoFlush(true);
+ ExecutorService service = Executors.newCachedThreadPool();
+ List> futures = new ArrayList>();
- final long ts = EnvironmentEdgeManager.currentTimeMillis();
- Get get = new Get(ROW);
- get.addColumn(FAMILY, QUALIFIER);
- get.setMaxVersions();
+ final CyclicBarrier readyBarrier = new CyclicBarrier(concurrentLevel);
- for (int versions = 1; versions <= numVersions; versions++) {
- Put put = new Put(ROW);
- put.add(FAMILY, QUALIFIER, ts + versions, VALUE);
- table.put(put);
+ for (int id = 0; id < concurrentLevel; id++) {
+ final byte[] qualifier = Bytes.toBytes("testQualifier" + id);
+ futures.add(service.submit(new Callable() {
+ @Override
+ public Void call() throws Exception {
+ readyBarrier.await();
+ verifyClientPool(conf, tableName, qualifier, numVersions);
+ return null;
+ }
+ }));
+ }
- Result result = table.get(get);
- NavigableMap navigableMap = result.getMap().get(FAMILY)
- .get(QUALIFIER);
+ service.shutdown();
+ if (! service.awaitTermination(60, TimeUnit.SECONDS)) {
+ service.shutdownNow();
+ fail("60sec elapsed before termination");
+ }
- assertEquals("The number of versions of '" + FAMILY + ":" + QUALIFIER
- + " did not match " + versions, versions, navigableMap.size());
- for (Map.Entry entry : navigableMap.entrySet()) {
- assertTrue("The value at time " + entry.getKey()
- + " did not match what was put",
- Bytes.equals(VALUE, entry.getValue()));
- }
+ for (Future future : futures) {
+ future.get();
}
}
- @Test
- public void testClientPoolThreadLocal() throws IOException {
- final byte[] tableName = Bytes.toBytes("testClientPoolThreadLocal");
+ private static void verifyClientPool(
+ Configuration conf, byte[] tableName, byte[] qualifier, int numVersions) throws Exception {
- int poolSize = Integer.MAX_VALUE;
- int numVersions = 3;
- Configuration conf = TEST_UTIL.getConfiguration();
- conf.set(HConstants.HBASE_CLIENT_IPC_POOL_TYPE, "thread-local");
- conf.setInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, poolSize);
-
- final HTable table = TEST_UTIL.createTable(tableName,
- new byte[][] { FAMILY }, conf);
+ HTable table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY }, conf,
+ Integer.MAX_VALUE);
table.setAutoFlush(true);
- final long ts = EnvironmentEdgeManager.currentTimeMillis();
- final Get get = new Get(ROW);
- get.addColumn(FAMILY, QUALIFIER);
+ Get get = new Get(ROW);
+ get.addColumn(FAMILY, qualifier);
get.setMaxVersions();
+ final long ts = EnvironmentEdgeManager.currentTimeMillis();
+
for (int versions = 1; versions <= numVersions; versions++) {
Put put = new Put(ROW);
- put.add(FAMILY, QUALIFIER, ts + versions, VALUE);
+ put.add(FAMILY, qualifier, ts + versions, VALUE);
table.put(put);
Result result = table.get(get);
- NavigableMap navigableMap = result.getMap().get(FAMILY)
- .get(QUALIFIER);
+ NavigableMap navigableMap = result.getMap().get(FAMILY).get(qualifier);
+
+ assertEquals(
+ "The number of versions of '" + FAMILY + ":" + qualifier + " did not match " + versions,
+ versions,
+ navigableMap.size());
- assertEquals("The number of versions of '" + FAMILY + ":" + QUALIFIER
- + " did not match " + versions, versions, navigableMap.size());
for (Map.Entry entry : navigableMap.entrySet()) {
- assertTrue("The value at time " + entry.getKey()
- + " did not match what was put",
- Bytes.equals(VALUE, entry.getValue()));
+ assertTrue(
+ "The value at time " + entry.getKey() + " did not match what was put",
+ Bytes.equals(VALUE, entry.getValue()));
}
}
-
- final Object waitLock = new Object();
- ExecutorService executorService = Executors.newFixedThreadPool(numVersions);
- final AtomicReference error = new AtomicReference(null);
- for (int versions = numVersions; versions < numVersions * 2; versions++) {
- final int versionsCopy = versions;
- executorService.submit(new Callable() {
- @Override
- public Void call() {
- try {
- Put put = new Put(ROW);
- put.add(FAMILY, QUALIFIER, ts + versionsCopy, VALUE);
- table.put(put);
-
- Result result = table.get(get);
- NavigableMap navigableMap = result.getMap()
- .get(FAMILY).get(QUALIFIER);
-
- assertEquals("The number of versions of '" + FAMILY + ":"
- + QUALIFIER + " did not match " + versionsCopy, versionsCopy,
- navigableMap.size());
- for (Map.Entry entry : navigableMap.entrySet()) {
- assertTrue("The value at time " + entry.getKey()
- + " did not match what was put",
- Bytes.equals(VALUE, entry.getValue()));
- }
- synchronized (waitLock) {
- waitLock.wait();
- }
- } catch (Exception e) {
- } catch (AssertionError e) {
- // the error happens in a thread, it won't fail the test,
- // need to pass it to the caller for proper handling.
- error.set(e);
- LOG.error(e);
- }
-
- return null;
- }
- });
- }
- synchronized (waitLock) {
- waitLock.notifyAll();
- }
- executorService.shutdownNow();
- assertNull(error.get());
}
@Test
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTablePool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTablePool.java
index 49f75aa..522321c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTablePool.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTablePool.java
@@ -19,10 +19,12 @@
package org.apache.hadoop.hbase.client;
import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.PoolMap.PoolType;
@@ -193,20 +195,26 @@ public class TestHTablePool {
@Test
public void testTableWithMaxSize() throws Exception {
+ HTableFactoryWithTableCounter tableFactory = new HTableFactoryWithTableCounter();
+
HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), 2,
- getPoolType());
+ tableFactory, getPoolType());
// Request tables from an empty pool
HTableInterface table1 = pool.getTable(TABLENAME);
HTableInterface table2 = pool.getTable(TABLENAME);
HTableInterface table3 = pool.getTable(TABLENAME);
+ tableFactory.assertTableCount(3);
+
// Close tables (returns tables to the pool)
table1.close();
table2.close();
// The pool should reject this one since it is already full
table3.close();
+ tableFactory.assertTableCount(2);
+
// Request tables of the same name
HTableInterface sameTable1 = pool.getTable(TABLENAME);
HTableInterface sameTable2 = pool.getTable(TABLENAME);
@@ -224,8 +232,10 @@ public class TestHTablePool {
@Test
public void testCloseTablePool() throws IOException {
+ HTableFactoryWithTableCounter tableFactory = new HTableFactoryWithTableCounter();
+
HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), 4,
- getPoolType());
+ tableFactory, getPoolType());
HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
if (admin.tableExists(TABLENAME)) {
@@ -243,19 +253,17 @@ public class TestHTablePool {
tables[i] = pool.getTable(TABLENAME);
}
+ tableFactory.assertTableCount(4);
+
pool.closeTablePool(TABLENAME);
+ tableFactory.assertTableCount(4);
+
for (int i = 0; i < 4; ++i) {
tables[i].close();
}
- Assert.assertEquals(4,
- pool.getCurrentPoolSize(Bytes.toString(TABLENAME)));
-
- pool.closeTablePool(TABLENAME);
-
- Assert.assertEquals(0,
- pool.getCurrentPoolSize(Bytes.toString(TABLENAME)));
+ tableFactory.assertTableCount(0);
}
}
@@ -268,14 +276,18 @@ public class TestHTablePool {
@Test
public void testTableWithMaxSize() throws Exception {
+ HTableFactoryWithTableCounter tableFactory = new HTableFactoryWithTableCounter();
+
HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), 2,
- getPoolType());
+ tableFactory, getPoolType());
// Request tables from an empty pool
HTableInterface table1 = pool.getTable(TABLENAME);
HTableInterface table2 = pool.getTable(TABLENAME);
HTableInterface table3 = pool.getTable(TABLENAME);
+ tableFactory.assertTableCount(1);
+
// Close tables (returns tables to the pool)
table1.close();
table2.close();
@@ -283,6 +295,8 @@ public class TestHTablePool {
// <= 2
table3.close();
+ tableFactory.assertTableCount(1);
+
// Request tables of the same name
HTableInterface sameTable1 = pool.getTable(TABLENAME);
HTableInterface sameTable2 = pool.getTable(TABLENAME);
@@ -300,8 +314,10 @@ public class TestHTablePool {
@Test
public void testCloseTablePool() throws IOException {
+ HTableFactoryWithTableCounter tableFactory = new HTableFactoryWithTableCounter();
+
HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), 4,
- getPoolType());
+ tableFactory, getPoolType());
HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
if (admin.tableExists(TABLENAME)) {
@@ -319,20 +335,41 @@ public class TestHTablePool {
tables[i] = pool.getTable(TABLENAME);
}
+ tableFactory.assertTableCount(1);
+
pool.closeTablePool(TABLENAME);
+ tableFactory.assertTableCount(1);
+
for (int i = 0; i < 4; ++i) {
tables[i].close();
}
- Assert.assertEquals(1,
- pool.getCurrentPoolSize(Bytes.toString(TABLENAME)));
-
- pool.closeTablePool(TABLENAME);
-
- Assert.assertEquals(0,
- pool.getCurrentPoolSize(Bytes.toString(TABLENAME)));
+ tableFactory.assertTableCount(0);
}
}
+ private static class HTableFactoryWithTableCounter implements HTableInterfaceFactory {
+ final HTableInterfaceFactory baseTableFactory = new HTableFactory();
+ final AtomicInteger tableCounter = new AtomicInteger();
+
+ @Override
+ public HTableInterface createHTableInterface(Configuration config, byte[] tableName) {
+ HTableInterface table = baseTableFactory.createHTableInterface(config, tableName);
+ tableCounter.incrementAndGet();
+ return table;
+ }
+
+ @Override
+ public void releaseHTableInterface(HTableInterface table) throws IOException {
+ tableCounter.decrementAndGet();
+ baseTableFactory.releaseHTableInterface(table);
+ }
+
+ void assertTableCount(int count) {
+ Assert.assertEquals(count, tableCounter.get());
+ }
+
+ }
+
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/AbstractTestSharedMap.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/AbstractTestSharedMap.java
new file mode 100644
index 0000000..ea8443e
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/AbstractTestSharedMap.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public abstract class AbstractTestSharedMap {
+ abstract SharedMap newSharedMapWithInfinitePool();
+
+ SharedMap map;
+ Object o1;
+ Object o2;
+ Object o3;
+ Object o4;
+
+ @Before
+ public void setUp() {
+ map = newSharedMapWithInfinitePool();
+ o1 = new Object();
+ o2 = new Object();
+ o3 = new Object();
+ o4 = new Object();
+ }
+
+ @Test
+ public void testBorrowRegisterReturn() {
+ Assert.assertNull(map.borrowObject("a"));
+
+ map.registerObject("a", o1);
+ Assert.assertTrue(map.returnObject("a", o1));
+
+ Assert.assertSame(o1, map.borrowObject("a"));
+ Assert.assertNull(map.borrowObject("a"));
+ Assert.assertTrue(map.returnObject("a", o1));
+
+ Assert.assertSame(o1, map.borrowObject("a"));
+ Assert.assertNull(map.borrowObject("a"));
+ Assert.assertTrue(map.returnObject("a", o1));
+
+ Assert.assertNull(map.borrowObject("b"));
+ Assert.assertFalse(map.returnObject("a", o2));
+ }
+
+ @Test
+ public void testMultipleRegisters() {
+ map.registerObject("a", o1);
+ map.registerObject("a", o2);
+
+ Assert.assertTrue(map.returnObject("a", o1));
+ Assert.assertTrue(map.returnObject("a", o2));
+
+ Assert.assertSame(o1, map.borrowObject("a"));
+ Assert.assertSame(o2, map.borrowObject("a"));
+ Assert.assertNull(map.borrowObject("a"));
+ }
+
+ @Test
+ public void testRejectDuplicateRegistration() {
+ Assert.assertTrue(map.registerObject("a", o1));
+ Assert.assertFalse(map.registerObject("a", o1));
+ }
+
+ @Test
+ public void testInvalidateBeforeReturn() {
+ map.registerObject("a", o1);
+
+ map.invalidateObject("a", o1);
+
+ Assert.assertFalse(map.returnObject("a", o1));
+ Assert.assertNull(map.borrowObject("a"));
+ }
+
+ @Test
+ public void testInvalidateAfterReturn() {
+ map.registerObject("a", o1);
+ Assert.assertTrue(map.returnObject("a", o1));
+
+ map.invalidateObject("a", o1);
+
+ Assert.assertNull(map.borrowObject("a"));
+ }
+
+ @Test
+ public void testClear() {
+ map.registerObject("a", o1);
+ map.registerObject("b", o2);
+ map.registerObject("c", o3);
+
+ Assert.assertTrue(map.returnObject("a", o1));
+ Assert.assertTrue(map.returnObject("b", o2));
+
+ Collection targets = map.clear();
+
+ Assert.assertEquals(
+ new HashSet(Arrays.asList(o1, o2)),
+ new HashSet(targets));
+
+ Assert.assertNull(map.borrowObject("a"));
+ Assert.assertNull(map.borrowObject("b"));
+ Assert.assertFalse(map.returnObject("c", o3));
+ }
+
+ @Test
+ public void testClearWithKey() {
+ map.registerObject("a", o1);
+ map.registerObject("a", o2);
+ map.registerObject("b", o3);
+ map.registerObject("b", o4);
+
+ Assert.assertTrue(map.returnObject("a", o1));
+ Assert.assertTrue(map.returnObject("b", o3));
+
+ Collection targets = map.clear("a");
+
+ Assert.assertEquals(
+ new HashSet(Collections.singleton(o1)),
+ new HashSet(targets));
+
+ Assert.assertNull(map.borrowObject("a"));
+ Assert.assertFalse(map.returnObject("a", o2));
+
+ Assert.assertSame(o3, map.borrowObject("b"));
+ Assert.assertTrue(map.returnObject("b", o4));
+ }
+
+ @Test
+ public void testSizeAndEmpty() {
+ Assert.assertEquals(0, map.size());
+ Assert.assertEquals(0, map.size("a"));
+ Assert.assertEquals(0, map.size("b"));
+ Assert.assertTrue(map.isEmpty());
+
+ map.registerObject("a", o1);
+
+ Assert.assertEquals(1, map.size());
+ Assert.assertEquals(1, map.size("a"));
+ Assert.assertEquals(0, map.size("b"));
+ Assert.assertFalse(map.isEmpty());
+
+ map.registerObject("a", o2);
+
+ Assert.assertEquals(2, map.size());
+ Assert.assertEquals(2, map.size("a"));
+ Assert.assertEquals(0, map.size("b"));
+ Assert.assertFalse(map.isEmpty());
+
+ map.registerObject("b", o3);
+
+ Assert.assertEquals(3, map.size());
+ Assert.assertEquals(2, map.size("a"));
+ Assert.assertEquals(1, map.size("b"));
+ Assert.assertFalse(map.isEmpty());
+
+ map.clear();
+
+ Assert.assertEquals(0, map.size());
+ Assert.assertEquals(0, map.size("a"));
+ Assert.assertEquals(0, map.size("b"));
+ Assert.assertTrue(map.isEmpty());
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestReusableSharedMap.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestReusableSharedMap.java
new file mode 100644
index 0000000..3f60678
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestReusableSharedMap.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import org.apache.hadoop.hbase.SmallTests;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestReusableSharedMap extends AbstractTestSharedMap {
+ @Override
+ SharedMap newSharedMapWithInfinitePool() {
+ return new ReusableSharedMap(Integer.MAX_VALUE);
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestReusableSharedMapWithFinitePool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestReusableSharedMapWithFinitePool.java
new file mode 100644
index 0000000..99452fe
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestReusableSharedMapWithFinitePool.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import org.apache.hadoop.hbase.SmallTests;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestReusableSharedMapWithFinitePool {
+ SharedMap map;
+ Object o1;
+ Object o2;
+ Object o3;
+ Object o4;
+
+ @Before
+ public void setUp() {
+ map = new ReusableSharedMap(3);
+ o1 = new Object();
+ o2 = new Object();
+ o3 = new Object();
+ o4 = new Object();
+ }
+
+ @Test
+ public void testOverRegisters() {
+ map.registerObject("a", o1);
+ map.registerObject("a", o2);
+ map.registerObject("a", o3);
+ map.registerObject("a", o4);
+
+ Assert.assertNull(map.borrowObject("a"));
+
+ Assert.assertTrue(map.returnObject("a", o1));
+ Assert.assertTrue(map.returnObject("a", o2));
+ Assert.assertTrue(map.returnObject("a", o3));
+ Assert.assertFalse(map.returnObject("a", o4));
+
+ Assert.assertSame(o1, map.borrowObject("a"));
+ Assert.assertSame(o2, map.borrowObject("a"));
+ Assert.assertSame(o3, map.borrowObject("a"));
+ Assert.assertNull(map.borrowObject("a"));
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestRoundRobinSharedMap.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestRoundRobinSharedMap.java
new file mode 100644
index 0000000..49140c9
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestRoundRobinSharedMap.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import org.apache.hadoop.hbase.SmallTests;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestRoundRobinSharedMap extends AbstractTestSharedMap {
+ @Override
+ SharedMap newSharedMapWithInfinitePool() {
+ return new RoundRobinSharedMap(Integer.MAX_VALUE);
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestRoundRobinSharedMapWithFinitePool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestRoundRobinSharedMapWithFinitePool.java
new file mode 100644
index 0000000..d21b364
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestRoundRobinSharedMapWithFinitePool.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import org.apache.hadoop.hbase.SmallTests;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestRoundRobinSharedMapWithFinitePool {
+ SharedMap map;
+ Object o1;
+ Object o2;
+ Object o3;
+ Object o4;
+
+ @Before
+ public void setUp() {
+ map = new RoundRobinSharedMap(3);
+ o1 = new Object();
+ o2 = new Object();
+ o3 = new Object();
+ o4 = new Object();
+ }
+
+ @Test
+ public void testFullRegisters() {
+ map.registerObject("a", o1);
+ map.registerObject("a", o2);
+
+ Assert.assertNull(map.borrowObject("a"));
+
+ map.registerObject("a", o3);
+
+ Assert.assertSame(o1, map.borrowObject("a"));
+ Assert.assertSame(o2, map.borrowObject("a"));
+ Assert.assertSame(o3, map.borrowObject("a"));
+
+ Assert.assertSame(o1, map.borrowObject("a"));
+ Assert.assertSame(o2, map.borrowObject("a"));
+ Assert.assertSame(o3, map.borrowObject("a"));
+ }
+
+ @Test
+ public void testOverRegisters() {
+ map.registerObject("a", o1);
+ map.registerObject("a", o2);
+ map.registerObject("a", o3);
+ map.registerObject("a", o4);
+
+ Assert.assertSame(o1, map.borrowObject("a"));
+ Assert.assertSame(o2, map.borrowObject("a"));
+ Assert.assertSame(o3, map.borrowObject("a"));
+ Assert.assertSame(o4, map.borrowObject("a"));
+
+ Assert.assertTrue(map.returnObject("a", o1));
+ Assert.assertTrue(map.returnObject("a", o2));
+ Assert.assertTrue(map.returnObject("a", o3));
+ Assert.assertTrue(map.returnObject("a", o4));
+
+ Assert.assertTrue(map.returnObject("a", o1));
+ Assert.assertTrue(map.returnObject("a", o2));
+ Assert.assertTrue(map.returnObject("a", o3));
+ Assert.assertFalse(map.returnObject("a", o4));
+
+ Assert.assertSame(o1, map.borrowObject("a"));
+ Assert.assertSame(o2, map.borrowObject("a"));
+ Assert.assertSame(o3, map.borrowObject("a"));
+
+ Assert.assertSame(o1, map.borrowObject("a"));
+ Assert.assertSame(o2, map.borrowObject("a"));
+ Assert.assertSame(o3, map.borrowObject("a"));
+ }
+
+ @Test
+ public void testIdleObjectPriority() {
+ map.registerObject("a", o1);
+ map.registerObject("a", o2);
+ map.registerObject("a", o3);
+
+ map.returnObject("a", o2);
+
+ Assert.assertSame(o2, map.borrowObject("a"));
+ Assert.assertSame(o1, map.borrowObject("a"));
+ Assert.assertSame(o3, map.borrowObject("a"));
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestSegmentedSharedMap.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestSegmentedSharedMap.java
new file mode 100644
index 0000000..15734b7
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestSegmentedSharedMap.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.util.SegmentedSharedMap.SegmentFactory;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestSegmentedSharedMap extends AbstractTestSharedMap {
+ @Override
+ SharedMap newSharedMapWithInfinitePool() {
+ return new SegmentedSharedMap(new SegmentFactory() {
+ @Override
+ public SharedMap create() {
+ return new ReusableSharedMap(Integer.MAX_VALUE);
+ }
+ });
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestSegmentedSharedMapForSegments.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestSegmentedSharedMapForSegments.java
new file mode 100644
index 0000000..0477c9c
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestSegmentedSharedMapForSegments.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.util.SegmentedSharedMap.SegmentFactory;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestSegmentedSharedMapForSegments {
+ @Test
+ public void testAllSegmentsUsed() {
+ SegmentedSharedMap map = new SegmentedSharedMap(
+ new SegmentFactory() {
+ @Override
+ public SharedMap create() {
+ return new ReusableSharedMap(Integer.MAX_VALUE);
+ }
+ }, 1000);
+
+ Set> segmentsUsed = new HashSet>();
+
+ for(int i=0; i<1000000; i++) {
+ segmentsUsed.add(map.segmentFor(new Object()));
+ if(segmentsUsed.size() == map.countOfSegments()) {
+ return;
+ }
+ }
+
+ Assert.fail("bug or bad luck");
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestThreadLocalSharedMapDecorator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestThreadLocalSharedMapDecorator.java
new file mode 100644
index 0000000..cc2b9c5
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestThreadLocalSharedMapDecorator.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.hbase.SmallTests;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestThreadLocalSharedMapDecorator {
+ SharedMap map;
+ Object o1;
+ Object o2;
+ Object o3;
+
+ @Before
+ public void setUp() {
+ map = new ThreadLocalSharedMapDecorator(
+ new RoundRobinSharedMap(Integer.MAX_VALUE));
+ o1 = new Object();
+ o2 = new Object();
+ o3 = new Object();
+ }
+
+ @Test
+ public void testRegisterAndBorrow() {
+ Assert.assertNull(map.borrowObject("a"));
+
+ map.registerObject("a", o1);
+ map.registerObject("a", o2);
+ map.registerObject("a", o3);
+
+ Assert.assertSame(o1, map.borrowObject("a"));
+ Assert.assertSame(o1, map.borrowObject("a"));
+ }
+
+ @Test
+ public void testReturnAndBorrow() {
+ Assert.assertNull(map.borrowObject("a"));
+
+ map.registerObject("a", o1);
+ Assert.assertTrue(map.returnObject("a", o1));
+
+ map.registerObject("a", o2);
+ Assert.assertTrue(map.returnObject("a", o2));
+
+ Assert.assertSame(o1, map.borrowObject("a"));
+ Assert.assertSame(o1, map.borrowObject("a"));
+ Assert.assertTrue(map.returnObject("a", o1));
+ Assert.assertSame(o1, map.borrowObject("a"));
+ Assert.assertTrue(map.returnObject("a", o1));
+ Assert.assertTrue(map.returnObject("a", o1));
+
+ Assert.assertSame(o2, map.borrowObject("a"));
+ Assert.assertSame(o2, map.borrowObject("a"));
+ Assert.assertTrue(map.returnObject("a", o2));
+ Assert.assertTrue(map.returnObject("a", o2));
+
+ Assert.assertSame(o1, map.borrowObject("a"));
+ }
+
+ @Test
+ public void testRejectDuplicateRegistration() {
+ Assert.assertTrue(map.registerObject("a", o1));
+ Assert.assertFalse(map.registerObject("a", o1));
+ }
+
+ @Test
+ public void testInvalidate() {
+ map.registerObject("a", o1);
+ Assert.assertTrue(map.returnObject("a", o1));
+
+ map.registerObject("a", o2);
+ Assert.assertTrue(map.returnObject("a", o2));
+
+ Assert.assertSame(o1, map.borrowObject("a"));
+
+ map.invalidateObject("a", o1);
+
+ Assert.assertSame(o2, map.borrowObject("a"));
+ Assert.assertFalse(map.returnObject("a", o1));
+ }
+
+ @Test
+ public void testClear() {
+ map.registerObject("a", o1);
+
+ map.clear();
+
+ // We still enable to borrow until we return completely
+ // or invalidate from inside the thread.
+ Assert.assertSame(o1, map.borrowObject("a"));
+
+ Assert.assertTrue(map.returnObject("a", o1));
+ Assert.assertFalse(map.returnObject("a", o1)); // completely returned and rejected
+ }
+
+ @Test
+ public void testThreadLocal() throws Exception {
+ map.registerObject("a", o1);
+ Assert.assertTrue(map.returnObject("a", o1));
+
+ map.registerObject("a", o2);
+ Assert.assertTrue(map.returnObject("a", o2));
+
+ Assert.assertSame(o1, map.borrowObject("a"));
+ Assert.assertTrue(map.returnObject("a", o1));
+
+ Assert.assertSame(o2, map.borrowObject("a"));
+ Assert.assertTrue(map.returnObject("a", o2));
+
+ final AtomicReference borrowedRef = new AtomicReference();
+
+ Thread thread = new Thread() {
+ @Override
+ public void run() {
+ borrowedRef.set(map.borrowObject("a"));
+ }
+ };
+
+ thread.start();
+ thread.join();
+
+ Assert.assertSame(o1, borrowedRef.get());
+
+ Assert.assertSame(o2, map.borrowObject("a"));
+ Assert.assertTrue(map.returnObject("a", o2));
+
+ Assert.assertSame(o2, map.borrowObject("a"));
+ Assert.assertTrue(map.returnObject("a", o2));
+ }
+}