diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java index 3dc5b49..486293b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java @@ -24,8 +24,8 @@ import java.util.Collection; import java.util.List; import java.util.Map; -import com.google.protobuf.Service; -import com.google.protobuf.ServiceException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -36,37 +36,46 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.PoolMap; import org.apache.hadoop.hbase.util.PoolMap.PoolType; +import org.apache.hadoop.hbase.util.ReusableSharedMap; +import org.apache.hadoop.hbase.util.SegmentedSharedMap; +import org.apache.hadoop.hbase.util.SegmentedSharedMap.SegmentFactory; +import org.apache.hadoop.hbase.util.SharedMap; +import org.apache.hadoop.hbase.util.ThreadLocalSharedMapDecorator; + +import com.google.protobuf.Service; +import com.google.protobuf.ServiceException; /** - * A simple pool of HTable instances. - * - * Each HTablePool acts as a pool for all tables. To use, instantiate an - * HTablePool and use {@link #getTable(String)} to get an HTable from the pool. - * - * This method is not needed anymore, clients should call - * HTableInterface.close() rather than returning the tables to the pool - * - * Once you are done with it, close your instance of {@link HTableInterface} - * by calling {@link HTableInterface#close()} rather than returning the tables - * to the pool with (deprecated) {@link #putTable(HTableInterface)}. - * + * A simple pool of {@link HTable} instances. + * This enables you to reuse the instances and avoid overhead to establish I/O connections + * to a cluster (see {@link HConnectionManager}). + * {@code HTable} is not thread safe, and this pool exclusively lends you the instances. + * {@code HTablePool} is thread safe. *

- * A pool can be created with a maxSize which defines the most HTable - * references that will ever be retained for each table. Otherwise the default - * is {@link Integer#MAX_VALUE}. - * + * To use, instantiate an {@code HTablePool} + * and call {@link #getTable(String)} or {@link #getTable(byte[])} + * to get an instance of {@link HTableInterface}, which is a wrapper of {@code HTable}. + * Once you are done with it, call {@link HTableInterface#close()} + * and the instance returns to the pool if any. *

- * Pool will manage its own connections to the cluster. See - * {@link HConnectionManager}. + * The pool holds multiple idle instances (which are not borrowed) for each table. + * You can specify the maximum count of idle instances for each table + * by the parameter {@code maxSize} in some of the constructors. + * The default value is {@link Integer#MAX_VALUE}. + * Note that this pool is not prepared to control total numbers of the instances existing + * (even if it might eventually save resources), + * and the methods {@code getTable()} always create a new instance + * if there is no corresponding idle instance in the pool. + * (Remember that {@code HTable} is not thread safe + * and you require an instance which you can use exclusively.) */ @InterfaceAudience.Public @InterfaceStability.Stable public class HTablePool implements Closeable { - private final PoolMap tables; - private final int maxSize; - private final PoolType poolType; + private static final Log log = LogFactory.getLog(HTablePool.class); + + private final SharedMap tableMap; private final Configuration config; private final HTableInterfaceFactory tableFactory; @@ -144,24 +153,26 @@ public class HTablePool implements Closeable { // Make a new configuration instance so I can safely cleanup when // done with the pool. this.config = config == null ? HBaseConfiguration.create() : config; - this.maxSize = maxSize; - this.tableFactory = tableFactory == null ? new HTableFactory() - : tableFactory; - if (poolType == null) { - this.poolType = PoolType.Reusable; - } else { - switch (poolType) { - case Reusable: - case ThreadLocal: - this.poolType = poolType; - break; - default: - this.poolType = PoolType.Reusable; - break; - } + this.tableFactory = tableFactory == null ? new HTableFactory() : tableFactory; + this.tableMap = newTableMap(maxSize, poolType); + } + + private static SharedMap newTableMap( + final int maxSize, PoolType poolType) { + + SharedMap tables = new SegmentedSharedMap( + new SegmentFactory() { + @Override + public SharedMap create() { + return new ReusableSharedMap(maxSize); + } + }); + + if (poolType == PoolType.ThreadLocal) { + tables = new ThreadLocalSharedMapDecorator(tables); } - this.tables = new PoolMap(this.poolType, - this.maxSize); + + return tables; } /** @@ -196,9 +207,10 @@ public class HTablePool implements Closeable { * if there is a problem instantiating the HTable */ private HTableInterface findOrCreateTable(String tableName) { - HTableInterface table = tables.get(tableName); + HTableInterface table = tableMap.borrowObject(tableName); if (table == null) { table = createHTable(tableName); + tableMap.registerObject(tableName, table); } return table; } @@ -258,13 +270,11 @@ public class HTablePool implements Closeable { private void returnTable(HTableInterface table) throws IOException { // this is the old putTable method renamed and made private String tableName = Bytes.toString(table.getTableName()); - if (tables.size(tableName) >= maxSize) { + + if (! tableMap.returnObject(tableName, table)) { // release table instance since we're not reusing it - this.tables.remove(tableName, table); - this.tableFactory.releaseHTableInterface(table); - return; + releaseTableQuietly(table); } - tables.put(tableName, table); } protected HTableInterface createHTable(String tableName) { @@ -283,13 +293,7 @@ public class HTablePool implements Closeable { * @param tableName */ public void closeTablePool(final String tableName) throws IOException { - Collection tables = this.tables.values(tableName); - if (tables != null) { - for (HTableInterface table : tables) { - this.tableFactory.releaseHTableInterface(table); - } - } - this.tables.remove(tableName); + releaseTablesQuietly(tableMap.clear(tableName)); } /** @@ -308,14 +312,23 @@ public class HTablePool implements Closeable { * Note: this is a 'shutdown' of all the table pools. */ public void close() throws IOException { - for (String tableName : tables.keySet()) { - closeTablePool(tableName); + releaseTablesQuietly(tableMap.clear()); + } + + private void releaseTableQuietly(HTableInterface table) { + try { + tableFactory.releaseHTableInterface(table); + } catch (Exception e) { + if (log.isErrorEnabled()) { + log.error("Failed to release a table: " + Bytes.toString(table.getTableName()), e); + } } - this.tables.clear(); } - int getCurrentPoolSize(String tableName) { - return tables.size(tableName); + private void releaseTablesQuietly(Collection tables) { + for (HTableInterface table : tables) { + releaseTableQuietly(table); + } } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java index 33786b7..a50f7f0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java @@ -35,14 +35,21 @@ import java.net.SocketException; import java.net.SocketTimeoutException; import java.net.UnknownHostException; import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Random; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import javax.net.SocketFactory; @@ -54,13 +61,13 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseBody; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader.Status; -import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader; -import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation; import org.apache.hadoop.hbase.protobuf.generated.Tracing.RPCTInfo; import org.apache.hadoop.hbase.security.HBaseSaslRpcClient; @@ -72,8 +79,11 @@ import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier; import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.PoolMap; import org.apache.hadoop.hbase.util.PoolMap.PoolType; +import org.apache.hadoop.hbase.util.RoundRobinSharedMap; +import org.apache.hadoop.hbase.util.SegmentedSharedMap; +import org.apache.hadoop.hbase.util.SegmentedSharedMap.SegmentFactory; +import org.apache.hadoop.hbase.util.SharedMap; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RemoteException; @@ -105,9 +115,10 @@ public class HBaseClient { public static final Log LOG = LogFactory .getLog("org.apache.hadoop.ipc.HBaseClient"); - protected final PoolMap connections; + protected final SegmentedSharedMap connectionMap; + private final ThreadGroup connectionThreadGroup; - protected int counter; // counter for call ids + protected final AtomicInteger counter = new AtomicInteger(); // counter for call ids protected final AtomicBoolean running = new AtomicBoolean(true); // if client runs final protected Configuration conf; final protected int maxIdleTime; // connections will be culled if it was idle for @@ -254,54 +265,107 @@ public class HBaseClient { return refCount==0; } - /** A call waiting for a value. */ - protected class Call { - final int id; // call id - final RpcRequestBody param; // rpc request object - Message value; // value, null if error - IOException error; // exception, null if value - boolean done; // true when call is done - long startTime; - - protected Call(RpcRequestBody param) { - this.param = param; - this.startTime = System.currentTimeMillis(); - synchronized (HBaseClient.this) { - this.id = counter++; - } - } + /** + * A call waiting for a value. + * It is expected that either {@link #setException(IOException)} or {@link #setValue(Message)} + * will be called only once. + */ + private interface Call { + RpcRequestBody getParam(); - /** Indicate when the call is complete and the - * value or error are available. Notifies by default. */ - protected synchronized void callComplete() { - this.done = true; - notify(); // notify caller - } + long getStartTime(); /** Set the exception when there is an error. * Notify the caller the call is done. * * @param error exception thrown by the call; either local or remote */ - public synchronized void setException(IOException error) { - this.error = error; - callComplete(); - } + void setException(IOException error); /** Set the return value when there is no error. * Notify the caller the call is done. * * @param value return value of the call. */ - public synchronized void setValue(Message value) { - this.value = value; - callComplete(); + void setValue(Message value); + } + + private static abstract class AbstractCallDecorator implements Call { + final Call base; + + AbstractCallDecorator(Call base) { + this.base = base; + } + + @Override + public RpcRequestBody getParam() { + return base.getParam(); + } + + @Override + public long getStartTime() { + return base.getStartTime(); + } + + @Override + public void setException(IOException error) { + base.setException(error); + done(); + } + + @Override + public void setValue(Message value) { + base.setValue(value); + done(); + } + + void done() {} + } + + private static class SingleCall implements Call { + final RpcRequestBody param; // rpc request object + final long startTime; + + Message value; // value, null if error + IOException error; // exception, null if value + final CountDownLatch doneLatch = new CountDownLatch(1); + + SingleCall(RpcRequestBody param, long startTime) { + this.param = param; + this.startTime = startTime; + } + + @Override + public RpcRequestBody getParam() { + return param; } + @Override public long getStartTime() { - return this.startTime; + return startTime; + } + + @Override + public void setException(IOException error) { + this.error = error; + doneLatch.countDown(); + } + + @Override + public void setValue(Message value) { + this.value = value; + doneLatch.countDown(); + } + + Message getValue() throws IOException, InterruptedException { + doneLatch.await(); + if (error != null) { + throw error; + } + return value; } } + protected static Map> tokenHandlers = new HashMap>(); static { @@ -317,10 +381,11 @@ public class HBaseClient { return new Connection(remoteId); } - /** Thread that reads responses and notifies callers. Each connection owns a + /** Runnable object that reads responses and notifies callers. Each connection owns a * socket connected to a remote address. Calls are multiplexed through this * socket: responses may be delivered out of order. */ - protected class Connection extends Thread { + protected class Connection implements Runnable { + private final String name; private ConnectionHeader header; // connection header protected ConnectionId remoteId; protected Socket socket = null; // connected socket @@ -401,11 +466,10 @@ public class HBaseClient { } this.header = builder.build(); - this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " + + this.name = "IPC Client (" + socketFactory.hashCode() +") connection to " + remoteId.getAddress().toString() + - ((ticket==null)?" from an unknown user": (" from " - + ticket.getUserName()))); - this.setDaemon(true); + ((ticket==null)?" from an unknown user": (" from " + + ticket.getUserName())); } private UserInformation getUserInfoPB(UserGroupInformation ugi) { @@ -442,22 +506,19 @@ public class HBaseClient { * It is up to the user code to check this status. * @param call to add */ - protected synchronized void addCall(Call call) { + protected synchronized void addCall(int id, Call call) { // If the connection is about to close, we manage this as if the call was already added // to the connection calls list. If not, the connection creations are serialized, as // mentioned in HBASE-6364 if (this.shouldCloseConnection.get()) { if (this.closeException == null) { call.setException(new IOException( - "Call " + call.id + " not added as the connection " + remoteId + " is closing")); + "Call " + id + " not added as the connection " + remoteId + " is closing")); } else { call.setException(this.closeException); } - synchronized (call) { - call.notifyAll(); - } } else { - calls.put(call.id, call); + calls.put(id, call); notify(); } } @@ -647,8 +708,8 @@ public class HBaseClient { @Override public void run() { if (LOG.isDebugEnabled()) - LOG.debug(getName() + ": starting, having connections " - + connections.size()); + LOG.debug(name + ": starting, having connections " + + connectionMap.size()); try { while (waitForWork()) {//wait here for work - read or close connection @@ -662,8 +723,8 @@ public class HBaseClient { close(); if (LOG.isDebugEnabled()) - LOG.debug(getName() + ": stopped, remaining connections " - + connections.size()); + LOG.debug(name + ": stopped, remaining connections " + + connectionMap.size()); } private synchronized void disposeSasl() { @@ -719,57 +780,72 @@ public class HBaseClient { final int currRetries, final int maxRetries, final Exception ex, final Random rand, final UserGroupInformation user) - throws IOException, InterruptedException{ - user.doAs(new PrivilegedExceptionAction() { - public Object run() throws IOException, InterruptedException { - closeConnection(); - if (shouldAuthenticateOverKrb()) { - if (currRetries < maxRetries) { - LOG.debug("Exception encountered while connecting to " + - "the server : " + ex); - //try re-login - if (UserGroupInformation.isLoginKeytabBased()) { - UserGroupInformation.getLoginUser().reloginFromKeytab(); - } else { - UserGroupInformation.getLoginUser().reloginFromTicketCache(); + throws IOException { + try { + user.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws IOException { + closeConnection(); + if (shouldAuthenticateOverKrb()) { + try { + if (currRetries < maxRetries) { + LOG.debug("Exception encountered while connecting to " + + "the server : " + ex); + //try re-login + if (UserGroupInformation.isLoginKeytabBased()) { + UserGroupInformation.getLoginUser().reloginFromKeytab(); + } else { + UserGroupInformation.getLoginUser().reloginFromTicketCache(); + } + disposeSasl(); + //have granularity of milliseconds + //we are sleeping with the Connection lock held but since this + //connection instance is being used for connecting to the server + //in question, it is okay + Thread.sleep((rand.nextInt(reloginMaxBackoff) + 1)); + return null; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } - disposeSasl(); - //have granularity of milliseconds - //we are sleeping with the Connection lock held but since this - //connection instance is being used for connecting to the server - //in question, it is okay - Thread.sleep((rand.nextInt(reloginMaxBackoff) + 1)); - return null; - } else { String msg = "Couldn't setup connection for " + - UserGroupInformation.getLoginUser().getUserName() + - " to " + serverPrincipal; + UserGroupInformation.getLoginUser().getUserName() + + " to " + serverPrincipal; LOG.warn(msg); throw (IOException) new IOException(msg).initCause(ex); + + } else { + LOG.warn("Exception encountered while connecting to " + + "the server : " + ex); } - } else { - LOG.warn("Exception encountered while connecting to " + - "the server : " + ex); - } - if (ex instanceof RemoteException) { - throw (RemoteException)ex; - } - if (ex instanceof SaslException) { - String msg = "SASL authentication failed." + - " The most likely cause is missing or invalid credentials." + - " Consider 'kinit'."; - LOG.fatal(msg, ex); - throw new RuntimeException(msg, ex); + if (ex instanceof RemoteException) { + throw (RemoteException)ex; + } + if (ex instanceof SaslException) { + String msg = "SASL authentication failed." + + " The most likely cause is missing or invalid credentials." + + " Consider 'kinit'."; + LOG.fatal(msg, ex); + throw new RuntimeException(msg, ex); + } + throw new IOException(ex); } - throw new IOException(ex); - } - }); + }); + } catch (InterruptedException e) { + throw new AssertionError(e); + } } - protected synchronized void setupIOstreams() - throws IOException, InterruptedException { - if (socket != null || shouldCloseConnection.get()) { - return; + /** + * @return true if ready; false if already closed + */ + protected synchronized boolean setupIOstreams() throws IOException { + if (shouldCloseConnection.get()) { + return false; + } + + if (socket != null) { + return true; } if (failedServers.isFailedServer(remoteId.getAddress())) { @@ -842,8 +918,10 @@ public class HBaseClient { touch(); // start the receiver thread after the socket connection has been set up - start(); - return; + Thread thread = new Thread(connectionThreadGroup, this, name); + thread.setDaemon(true); + thread.start(); + return true; } } catch (IOException e) { failedServers.addToFailedServers(remoteId.address); @@ -883,11 +961,7 @@ public class HBaseClient { // release the resources // first thing to do;take the connection out of the connection list - synchronized (connections) { - if (connections.get(remoteId) == this) { - connections.remove(remoteId); - } - } + connectionMap.invalidateObject(remoteId, this); // close the streams and therefore the socket IOUtils.closeStream(out); @@ -916,23 +990,34 @@ public class HBaseClient { cleanupCalls(); } if (LOG.isDebugEnabled()) - LOG.debug(getName() + ": closed"); + LOG.debug(name + ": closed"); + } + + /** + * Prepares to handle a response for the given {@code call} and + * sends parameters of the {@code call}. + * The response is notified via methods of {@code call}. + */ + void invokeCall(Call call) { + int id = counter.getAndIncrement(); + addCall(id, call); + sendParam(id, call.getParam()); } /* Initiates a call by sending the parameter to the remote server. * Note: this is not called from the Connection thread, but by other * threads. */ - protected void sendParam(Call call) { + protected void sendParam(int id, RpcRequestBody param) { if (shouldCloseConnection.get()) { return; } try { if (LOG.isDebugEnabled()) - LOG.debug(getName() + " sending #" + call.id); + LOG.debug(name + " sending #" + id); RpcRequestHeader.Builder headerBuilder = RPCProtos.RpcRequestHeader.newBuilder(); - headerBuilder.setCallId(call.id); + headerBuilder.setCallId(id); if (Trace.isTracing()) { Span s = Trace.currentTrace(); @@ -945,16 +1030,16 @@ public class HBaseClient { synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC RpcRequestHeader header = headerBuilder.build(); int serializedHeaderSize = header.getSerializedSize(); - int requestSerializedSize = call.param.getSerializedSize(); + int requestSerializedSize = param.getSerializedSize(); this.out.writeInt(serializedHeaderSize + CodedOutputStream.computeRawVarint32Size(serializedHeaderSize) + requestSerializedSize + CodedOutputStream.computeRawVarint32Size(requestSerializedSize)); header.writeDelimitedTo(this.out); - call.param.writeDelimitedTo(this.out); + param.writeDelimitedTo(this.out); this.out.flush(); } - } catch(IOException e) { + } catch (IOException e) { markClosed(e); } } @@ -983,7 +1068,7 @@ public class HBaseClient { int id = response.getCallId(); if (LOG.isDebugEnabled()) - LOG.debug(getName() + " got value #" + id); + LOG.debug(name + " got value #" + id); Call call = calls.get(id); Status status = response.getStatus(); @@ -992,7 +1077,7 @@ public class HBaseClient { try { rpcResponseType = ProtobufRpcEngine.Invoker.getReturnProtoType( ProtobufRpcEngine.Server.getMethod(remoteId.getProtocol(), - call.param.getMethodName())); + call.getParam().getMethodName())); } catch (Exception e) { throw new RuntimeException(e); //local exception } @@ -1054,7 +1139,8 @@ public class HBaseClient { protected void cleanupCalls(long rpcTimeout) { Iterator> itor = calls.entrySet().iterator(); while (itor.hasNext()) { - Call c = itor.next().getValue(); + Entry entry = itor.next(); + Call c = entry.getValue(); long waitTime = System.currentTimeMillis() - c.getStartTime(); if (waitTime >= rpcTimeout) { if (this.closeException == null) { @@ -1064,13 +1150,10 @@ public class HBaseClient { // over on the server; e.g. I just asked the regionserver to bulk // open 3k regions or its a big fat multiput into a heavily-loaded // server (Perhaps this only happens at the extremes?) - this.closeException = new CallTimeoutException("Call id=" + c.id + + this.closeException = new CallTimeoutException("Call id=" + entry.getKey() + ", waitTime=" + waitTime + ", rpcTimetout=" + rpcTimeout); } c.setException(this.closeException); - synchronized (c) { - c.notifyAll(); - } itor.remove(); } else { break; @@ -1105,44 +1188,46 @@ public class HBaseClient { } } - /** Call implementation used for parallel calls. */ - protected class ParallelCall extends Call { - private final ParallelResults results; - protected final int index; + /** Result collector for parallel calls. */ + private static class ParallelResults { + final Message[] values; + final CountDownLatch doneLatch; - public ParallelCall(RpcRequestBody param, ParallelResults results, int index) { - super(param); - this.results = results; - this.index = index; + ParallelResults(int size) { + values = new RpcResponseBody[size]; + doneLatch = new CountDownLatch(size); } - /** Deliver result to result collector. */ - @Override - protected void callComplete() { - results.callComplete(this); + Message[] getValues() throws InterruptedException { + doneLatch.await(); + return values; } - } - /** Result collector for parallel calls. */ - protected static class ParallelResults { - protected final Message[] values; - protected int size; - protected int count; + Call getCall(final int index, final RpcRequestBody param, final long startTime) { + return new Call() { + @Override + public RpcRequestBody getParam() { + return param; + } - public ParallelResults(int size) { - this.values = new RpcResponseBody[size]; - this.size = size; - } + @Override + public long getStartTime() { + return startTime; + } - /* - * Collect a result. - */ - synchronized void callComplete(ParallelCall call) { - // FindBugs IS2_INCONSISTENT_SYNC - values[call.index] = call.value; // store the value - count++; // count it - if (count == size) // if all values are in - notify(); // then notify waiting caller + @Override + public void setException(IOException error) { + // log errors + LOG.info("Exception in parallel calls: " + error.getMessage(), error); + doneLatch.countDown(); + } + + @Override + public void setValue(Message value) { + values[index] = value; + doneLatch.countDown(); + } + }; } } @@ -1166,9 +1251,21 @@ public class HBaseClient { this.conf = conf; this.socketFactory = factory; this.clusterId = conf.get(HConstants.CLUSTER_ID, "default"); - this.connections = new PoolMap( - getPoolType(conf), getPoolSize(conf)); + this.connectionMap = newConnectionMap(conf); this.failedServers = new FailedServers(conf); + this.connectionThreadGroup = new ThreadGroup( + "IPC Client (" + socketFactory.hashCode() +") connection"); + } + + private static SegmentedSharedMap newConnectionMap(Configuration conf) { + final int size = getPoolSize(conf); + return new SegmentedSharedMap( + new SegmentFactory() { + @Override + public SharedMap create() { + return new RoundRobinSharedMap(size); + } + }); } /** @@ -1194,7 +1291,10 @@ public class HBaseClient { * @param config configuration * @return either a {@link PoolType#RoundRobin} or * {@link PoolType#ThreadLocal} + * @deprecated The thread-local logic is not appropriate + * and never applied to this class anymore. */ + @Deprecated protected static PoolType getPoolType(Configuration config) { return PoolType.valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE), PoolType.RoundRobin, PoolType.ThreadLocal); @@ -1231,14 +1331,10 @@ public class HBaseClient { } // wake up all connections - synchronized (connections) { - for (Connection conn : connections.values()) { - conn.interrupt(); - } - } + connectionThreadGroup.interrupt(); // wait until all connections are closed - while (!connections.isEmpty()) { + while (!connectionMap.isEmpty()) { try { Thread.sleep(100); } catch (InterruptedException ignored) { @@ -1274,39 +1370,44 @@ public class HBaseClient { Class protocol, User ticket, int rpcTimeout) throws InterruptedException, IOException { - Call call = new Call(param); - Connection connection = getConnection(addr, protocol, ticket, rpcTimeout, call); - connection.sendParam(call); // send the parameter - boolean interrupted = false; - //noinspection SynchronizationOnLocalVariableOrMethodParameter - synchronized (call) { - while (!call.done) { - try { - call.wait(); // wait for the result - } catch (InterruptedException ignored) { - // save the fact that we were interrupted - interrupted = true; - } - } - if (interrupted) { - // set the interrupt flag now that we are done waiting - Thread.currentThread().interrupt(); - } + if (Thread.interrupted()) { + throw new InterruptedException(); + } - if (call.error != null) { - if (call.error instanceof RemoteException) { - call.error.fillInStackTrace(); - throw call.error; - } - // local exception - throw wrapException(addr, call.error); + SingleCall call = new SingleCall(param, System.currentTimeMillis()); + ConnectionId remoteId = getConnectionId(addr, protocol, ticket, rpcTimeout); + invokeCall(remoteId, call); + try { + return call.getValue(); + } catch (IOException e) { + if (e instanceof RemoteException) { + e.fillInStackTrace(); + throw e; } - return call.value; + // local exception + throw wrapException(addr, e); } } /** + * Prepares to handle a response for the given {@code call} and + * sends parameters of the {@code call}, + * for an appropriate connection specified by the given {@code remoteId}. + * The response is notified via methods of {@code call}. + */ + private void invokeCall(ConnectionId remoteId, Call call) throws IOException { + final Connection connection = getConnection(remoteId); + + connection.invokeCall(new AbstractCallDecorator(call) { + @Override + void done() { + returnConnection(connection); + } + }); + } + + /** * Take an IOException and the address we were trying to connect to * and return an IOException with the input exception as the cause. * The new exception provides the stack trace of the place where @@ -1364,81 +1465,124 @@ public class HBaseClient { throws IOException, InterruptedException { if (addresses.length == 0) return new RpcResponseBody[0]; + if (Thread.interrupted()) { + throw new InterruptedException(); + } + ParallelResults results = new ParallelResults(params.length); - // TODO this synchronization block doesnt make any sense, we should possibly fix it - //noinspection SynchronizationOnLocalVariableOrMethodParameter - synchronized (results) { - for (int i = 0; i < params.length; i++) { - ParallelCall call = new ParallelCall(params[i], results, i); - try { - Connection connection = - getConnection(addresses[i], protocol, ticket, 0, call); - connection.sendParam(call); // send each parameter - } catch (IOException e) { - // log errors - LOG.info("Calling "+addresses[i]+" caught: " + - e.getMessage(),e); - results.size--; // wait for one fewer result - } - } - while (results.count != results.size) { - try { - results.wait(); // wait for all results - } catch (InterruptedException ignored) {} + long currentTime = System.currentTimeMillis(); + + Map> callMap = + new LinkedHashMap>(); + for (int i = 0; i < params.length; i++) { + Call call = results.getCall(i, params[i], currentTime); + + List calls = callMap.get(addresses[i]); + if (calls == null) { + calls = new ArrayList(); + callMap.put(addresses[i], calls); } + calls.add(call); + } - return results.values; + for (Entry> entry : callMap.entrySet()) { + ConnectionId remoteId = getConnectionId(entry.getKey(), protocol, ticket, 0); + invokeCalls(remoteId, entry.getValue()); + } + + return results.getValues(); + } + + /** + * Prepares to handle a response and sends parameters for each of the given {@code calls}, + * for an appropriate connection specified by the given {@code remoteId}. + * Each response is notified via methods of each of {@code calls}. + * This method has better performance than + * calling repeatedly {@link #invokeCall(ConnectionId, Call)}. + */ + private void invokeCalls(final ConnectionId remoteId, Collection calls) + throws IOException { + final Connection connection = getConnection(remoteId); + + final AtomicInteger countDown = new AtomicInteger(calls.size()); + + for (Call call : calls) { + connection.invokeCall(new AbstractCallDecorator(call) { + @Override + void done() { + if (countDown.decrementAndGet() == 0) { + returnConnection(connection); + } + } + }); } } /* Get a connection from the pool, or create a new one and add it to the * pool. Connections to a given host/port are reused. */ - protected Connection getConnection(InetSocketAddress addr, - Class protocol, - User ticket, - int rpcTimeout, - Call call) - throws IOException, InterruptedException { + private Connection getConnection(ConnectionId remoteId) throws IOException { if (!running.get()) { // the client is stopped throw new IOException("The client is stopped"); } - Connection connection; - /* we could avoid this allocation for each RPC by having a - * connectionsId object and with set() method. We need to manage the - * refs for keys in HashMap properly. For now its ok. - */ - ConnectionId remoteId = new ConnectionId(addr, protocol, ticket, rpcTimeout); - synchronized (connections) { - connection = connections.get(remoteId); - if (connection == null) { - connection = createConnection(remoteId); - connections.put(remoteId, connection); + + final SharedMap segment = this.connectionMap.segmentFor(remoteId); + + while (true) { + Connection connection; + synchronized (segment) { + // This synchronization guards against creating excessive instances of Connection. + // Note that the methods of the segment are guarded by the segment itself, + // according to its actual implementation class. + // This synchronization blocks threads which use the same segment, + // and you should not execute long processing for specified remoteId + // in the synchronization. + + connection = segment.borrowObject(remoteId); + if (connection == null) { + connection = createConnection(remoteId); + segment.registerObject(remoteId, connection); + } + } + + if (connection.setupIOstreams()) { + // Connection.setupIOstreams() establishes an actual network connection, + // which is relatively long processing for the specified remoteId. + // We execute the processing here separately from creating the instance of Connection, + // because the creation is executed under the synchronization of the segment, + // which blocks threads which use the same segment. + + return connection; } + + segment.invalidateObject(remoteId, connection); } - connection.addCall(call); - - //we don't invoke the method below inside "synchronized (connections)" - //block above. The reason for that is if the server happens to be slow, - //it will take longer to establish a connection and that will slow the - //entire system down. - //Moreover, if the connection is currently created, there will be many threads - // waiting here; as setupIOstreams is synchronized. If the connection fails with a - // timeout, they will all fail simultaneously. This is checked in setupIOstreams. - connection.setupIOstreams(); - return connection; + } + + private void returnConnection(Connection connection) { + connectionMap.returnObject(connection.remoteId, connection); + } + + /* we could avoid this allocation for each RPC by having a + * connectionsId object and with set() method. We need to manage the + * refs for keys in HashMap properly. For now its ok. + */ + private static ConnectionId getConnectionId(InetSocketAddress address, + Class protocol, + User ticket, + int rpcTimeout) { + return new ConnectionId(address, protocol, ticket, rpcTimeout); } /** * This class holds the address and the user ticket. The client connections * to servers are uniquely identified by */ - protected static class ConnectionId { + private final static class ConnectionId { final InetSocketAddress address; final User ticket; final int rpcTimeout; - Class protocol; - private static final int PRIME = 16777619; + final Class protocol; ConnectionId(InetSocketAddress address, Class protocol, @@ -1464,20 +1608,30 @@ public class HBaseClient { @Override public boolean equals(Object obj) { - if (obj instanceof ConnectionId) { - ConnectionId id = (ConnectionId) obj; - return address.equals(id.address) && protocol == id.protocol && - ((ticket != null && ticket.equals(id.ticket)) || - (ticket == id.ticket)) && rpcTimeout == id.rpcTimeout; - } - return false; + if(obj == this) { return true; } + if(! (obj instanceof ConnectionId)) { return false; } + ConnectionId other = (ConnectionId) obj; + + return address.equals(other.address) && + protocol == other.protocol && + (ticket == other.ticket || ticket != null && ticket.equals(other.ticket)) && + rpcTimeout == other.rpcTimeout; } - @Override // simply use the default Object#hashcode() ? + int hashCode; + + @Override public int hashCode() { - return (address.hashCode() + PRIME * ( - PRIME * System.identityHashCode(protocol) ^ - (ticket == null ? 0 : ticket.hashCode()) )) ^ rpcTimeout; + int hashCode = this.hashCode; + if(hashCode == 0) { + this.hashCode = hashCode = Arrays.hashCode(new int[] { + address.hashCode(), + System.identityHashCode(protocol), + (ticket == null ? 0 : ticket.hashCode()), + rpcTimeout + }); + } + return hashCode; } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractSharedMap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractSharedMap.java new file mode 100644 index 0000000..b9e1648 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractSharedMap.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * Skeleton implementation of {@code SharedMap}. + *

+ * All of the public methods are synchronized with this, except for argument checks. + * Objects to be shared are identified with their equality, instead of sameness. + * + * @param the type of keys that you associate objects to be shared with + * @param the type of objects to be shared + */ +abstract class AbstractSharedMap implements SharedMap { + /** Guarded by this */ + private final Map> poolMap = new HashMap>(); + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public V borrowObject(K key) { + if (key == null) { throw new NullPointerException(); } + + synchronized (this) { + Pool pool = poolMap.get(key); + if (pool == null) { + return null; + } + return pool.borrowObject(); + } + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public boolean registerObject(K key, V value) { + if (key == null) { throw new NullPointerException("key"); } + if (value == null) { throw new NullPointerException("value"); } + + synchronized (this) { + Pool pool = poolMap.get(key); + if (pool == null) { + pool = newPool(); + poolMap.put(key, pool); + } + return pool.registerObject(value); + } + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public boolean returnObject(K key, V value) { + if (key == null) { throw new NullPointerException("key"); } + if (value == null) { throw new NullPointerException("value"); } + + synchronized (this) { + Pool pool = poolMap.get(key); + if (pool == null) { + return false; + } + return pool.returnObject(value); + } + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public void invalidateObject(K key, V value) { + if (key == null) { throw new NullPointerException("key"); } + if (value == null) { throw new NullPointerException("value"); } + + synchronized (this) { + Pool pool = poolMap.get(key); + if (pool == null) { + return; + } + + pool.invalidateObject(value); + + if (pool.isEmpty()) { + poolMap.remove(pool); + } + } + } + + @Override + public synchronized Collection clear() { + Collection idleObjects = new ArrayList(); + + for (Pool pool : poolMap.values()) { + idleObjects.addAll(pool.clear()); + } + + poolMap.clear(); + + return idleObjects; + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public Collection clear(Object key) { + if (key == null) { throw new NullPointerException(); } + + synchronized (this) { + Pool pool = poolMap.get(key); + return pool.clear(); + } + } + + @Override + public synchronized int size() { + int size = 0; + for (Pool pool : poolMap.values()) { + size += pool.size(); + } + return size; + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public int size(K key) { + if (key == null) { throw new NullPointerException(); } + + synchronized (this) { + Pool pool = poolMap.get(key); + return pool == null ? 0 : pool.size(); + } + } + + @Override + public synchronized boolean isEmpty() { + for (Pool pool : poolMap.values()) { + if (! pool.isEmpty()) { + return false; + } + } + return true; + } + + abstract Pool newPool(); + + interface Pool { + /** + * Returns one of the shared objects, + * or returns null if there is no appropriate object to borrow. + */ + V borrowObject(); + + /** + * Registers the given {@code value} to this instance. + * At this point the registered object is regarded as borrowed. + *

+ * This method does nothing and returns false if and only if + * the given {@code value} is already registered. + * In practice the {@code value} is expected to be a newly created object, + * and this method is expected to return true. + */ + boolean registerObject(V value); + + /** + * Makes the given {@code value} to be returned into this instance. + * It is expected that the {@code value} has been borrowed from this instance. + *

+ * If this method returns false, + * then the the registration of the {@code value} is invalidated. + * This happens because the pool is full when this method is called, + * or you have explicitly called {@link #invalidateObject()} or {@link #clear()}. + */ + boolean returnObject(V value); + + /** + * Invalidates the registration of the given {@code value}. + * You may call this method regardless of whether the {@code value} is borrowed or not. + */ + void invalidateObject(V value); + + /** + * Invalidates all of the registrations to this instance, + * and returns the cleared idle objects in the pool. + */ + Collection clear(); + + /** + * Returns the count of the registered objects. + */ + int size(); + + /** + * Returns true if there is no registration. + */ + boolean isEmpty(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java index 364be66..54445a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java @@ -61,10 +61,18 @@ public class PoolMap implements Map { private Map> pools = new ConcurrentHashMap>(); + /** + * @deprecated Use {@link SharedMap} and its implementation classes. + */ + @Deprecated public PoolMap(PoolType poolType) { this.poolType = poolType; } + /** + * @deprecated Use {@link SharedMap} and its implementation classes. + */ + @Deprecated public PoolMap(PoolType poolType, int poolMaxSize) { this.poolType = poolType; this.poolMaxSize = poolMaxSize; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ReusableSharedMap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ReusableSharedMap.java new file mode 100644 index 0000000..443511f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ReusableSharedMap.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Queue; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Implementation of {@code SharedMap}, + * ensuring that it lends a shared object to a single caller at the same time. + *

+ * All of the public methods are synchronized with this, except for argument checks. + * Objects to be shared are identified with their equality, instead of sameness. + * + * @param the type of keys that you associate objects to be shared with + * @param the type of objects to be shared + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class ReusableSharedMap extends AbstractSharedMap { + private final int maxObjectCountPerKey; + + /** + * Constructs with the given {@code maxObjectCountPerKey}, + * which is a max count of idle objects to hold for each key. + */ + public ReusableSharedMap(int maxObjectCountPerKey) { + this.maxObjectCountPerKey = maxObjectCountPerKey; + } + + @Override + Pool newPool() { + return new ReusablePool(); + } + + private class ReusablePool implements Pool { + // The boolean values represent busy (borrowing) or not. + // This is for performance instead of sequential search in idleQueue. + final Map registeredObjects = new HashMap(); + final Queue idleQueue = new ArrayDeque(); + + @Override + public V borrowObject() { + V value = idleQueue.poll(); + if (value == null) { + return null; + } + + registeredObjects.put(value, Boolean.TRUE); + return value; + } + + @Override + public boolean registerObject(V value) { + if (registeredObjects.containsKey(value)) { + return false; + } + registeredObjects.put(value, Boolean.TRUE); + return true; + } + + @Override + public boolean returnObject(V value) { + Boolean busy = registeredObjects.get(value); + if (busy == null) { + return false; + } + + if (! busy) { + return true; + } + + if (idleQueue.size() >= maxObjectCountPerKey) { + registeredObjects.remove(value); + return false; + } + + registeredObjects.put(value, Boolean.FALSE); + idleQueue.offer(value); + return true; + } + + @Override + public void invalidateObject(V value) { + registeredObjects.remove(value); + idleQueue.remove(value); + } + + @Override + public Collection clear() { + Collection idleObjects = new ArrayList(idleQueue); + + registeredObjects.clear(); + idleQueue.clear(); + + return idleObjects; + } + + @Override + public int size() { + return registeredObjects.size(); + } + + @Override + public boolean isEmpty() { + return registeredObjects.isEmpty(); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RoundRobinSharedMap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RoundRobinSharedMap.java new file mode 100644 index 0000000..649e312 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RoundRobinSharedMap.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Queue; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Implementation of {@code SharedMap} based on the round robin logic, + * except that it lends an idle object whenever there is. + * Objects to be shared must be thread safe, + * because the round robin logic will lend the same shared object + * to different callers at the same time. + *

+ * The round robin logic restricts the count of shared objects + * and prevents unlimitedly consuming resources. + * For each key it starts to re-lend shared objects cyclicly + * when the count of borrowed objects exceeds the user specified threshold. + *

+ * All of the public methods are synchronized with this, except for argument checks. + * Objects to be shared are identified with their equality, instead of sameness. + * + * @param the type of keys that you associate objects to be shared with + * @param the type of objects to be shared + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class RoundRobinSharedMap extends AbstractSharedMap { + /** the max count of shared objects for each key, which count is positive */ + private final int maxObjectCountPerKey; + + /** + * Constructs with the given {@code maxObjectCountPerKey}, + * which is a threshold count of borrowed objects for each key + * before starting to lend the same shared object to different callers, + * and which is also a max count of idle objects to hold for each key + * though the count of registered objects may exceed the max count. + *

+ * The given {@code maxObjectCountPerKey} is expected to be positive, + * otherwise the max count is set to be 1 instead of the given value. + */ + public RoundRobinSharedMap(int maxObjectCountPerKey) { + this.maxObjectCountPerKey = Math.max(1, maxObjectCountPerKey); + } + + @Override + Pool newPool() { + return new RoundRobinPool(); + } + + private class RoundRobinPool implements Pool { + // The integer values represent borrowing counts. + final Map registeredObjects = new HashMap(); + final Queue idleQueue = new ArrayDeque(); + final Queue busyQueue = new ArrayDeque(); + + @Override + public V borrowObject() { + V value = idleQueue.poll(); + if (value != null) { + registeredObjects.put(value, 1); + busyQueue.offer(value); + return value; + } + + if (busyQueue.size() < maxObjectCountPerKey) { + return null; + } + + // Now we are ready to pick up and lend a busy object in busyQueue, + // which is not empty because busyQueue.size() >= maxObjectCountPerKey > 0. + + value = busyQueue.poll(); + assert value != null; + registeredObjects.put(value, registeredObjects.get(value) + 1); + busyQueue.offer(value); + return value; + } + + @Override + public boolean registerObject(V value) { + if (registeredObjects.containsKey(value)) { + return false; + } + registeredObjects.put(value, 1); + busyQueue.offer(value); + return true; + } + + @Override + public boolean returnObject(V value) { + Integer busyCountWrapper = registeredObjects.get(value); + if (busyCountWrapper == null) { + return false; + } + + int busyCount = busyCountWrapper; + if (busyCount == 0) { + return true; + } + + int nextBusyCount = busyCount - 1; + if (nextBusyCount != 0) { + registeredObjects.put(value, nextBusyCount); + return true; + } + + busyQueue.remove(value); + + if (idleQueue.size() >= maxObjectCountPerKey) { + registeredObjects.remove(value); + return false; + + } else { + registeredObjects.put(value, 0); + idleQueue.offer(value); + return true; + } + } + + @Override + public void invalidateObject(V value) { + registeredObjects.remove(value); + idleQueue.remove(value); + busyQueue.remove(value); + } + + @Override + public Collection clear() { + Collection idleObjects = new ArrayList(idleQueue); + + registeredObjects.clear(); + idleQueue.clear(); + busyQueue.clear(); + + return idleObjects; + } + + @Override + public int size() { + return registeredObjects.size(); + } + + @Override + public boolean isEmpty() { + return registeredObjects.isEmpty(); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/SegmentedSharedMap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/SegmentedSharedMap.java new file mode 100644 index 0000000..145c833 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/SegmentedSharedMap.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Implementation of {@code SharedMap}, + * which delegates operations to segmented maps according to keys, + * in order to reduce contention between threads. + *

+ * Thread safe. + * Objects to be shared are identified with their equality, instead of sameness. + * + * @param the type of keys that you associate objects to be shared with + * @param the type of objects to be shared + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class SegmentedSharedMap implements SharedMap { + /** + * Factory to create a map which will be used as one of segmented maps. + * + * @param the type of keys that you associate objects to be shared with + * @param the type of objects to be shared + */ + public interface SegmentFactory { + SharedMap create(); + } + + private static final int MAX_SEGMENTS = 1 << 16; + + private final SharedMap[] segments; + private final int segmentMask; + private final int segmentShift; + + /** + * Constructs with the default concurrency level (16). + * + * @throws NullPointerException if {@code segmentFactory} is null + */ + public SegmentedSharedMap(SegmentFactory segmentFactory) { + this(segmentFactory, 16); + } + + /** + * Constructs with the given {@code concurrencyLevel}, + * which is the estimated number of concurrently accessing threads. + * + * @throws NullPointerException if {@code segmentFactory} is null + */ + public SegmentedSharedMap(SegmentFactory segmentFactory, int concurrencyLevel) { + if(segmentFactory == null) { + throw new NullPointerException(); + } + + int requestedSegmentSize = Math.min(concurrencyLevel, MAX_SEGMENTS); + + int segmentShift = 32; + int segmentSize = 1; + while (segmentSize < requestedSegmentSize) { + segmentShift--; + segmentSize <<= 1; + } + this.segmentShift = segmentShift; + this.segmentMask = segmentSize - 1; + + @SuppressWarnings("unchecked") + SharedMap[] segments = new SharedMap[segmentSize]; + for(int i = 0; i < segmentSize; i++) { + segments[i] = segmentFactory.create(); + } + this.segments = segments; + } + + /** + * @see ConcurrentHashMap#hash(int) + */ + private static int hash(int h) { + h += (h << 15) ^ 0xffffcd7d; + h ^= (h >>> 10); + h += (h << 3); + h ^= (h >>> 6); + h += (h << 2) + (h << 14); + return h ^ (h >>> 16); + } + + /** + * Returns the corresponding internal segment for the given {@code key}. + * + * @throws NullPointerException if {@code key} is null + */ + public SharedMap segmentFor(K key) { + int hash = hash(key.hashCode()); + return segments[(hash >>> segmentShift) & segmentMask]; + } + + /** + * Returns the count of the segments used internally. + */ + int countOfSegments() { + return segments.length; + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public V borrowObject(K key) { + return segmentFor(key).borrowObject(key); + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public boolean registerObject(K key, V value) { + return segmentFor(key).registerObject(key, value); + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public boolean returnObject(K key, V value) { + return segmentFor(key).returnObject(key, value); + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public void invalidateObject(K key, V value) { + segmentFor(key).invalidateObject(key, value); + } + + @Override + public Collection clear() { + Collection result = new ArrayList(); + for(SharedMap segment : segments) { + result.addAll(segment.clear()); + } + return result; + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public Collection clear(K key) { + return segmentFor(key).clear(key); + } + + @Override + public int size() { + int result = 0; + for(SharedMap segment : segments) { + result += segment.size(); + } + return result; + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public int size(K key) { + return segmentFor(key).size(key); + } + + @Override + public boolean isEmpty() { + for(SharedMap segment : segments) { + if(! segment.isEmpty()) { + return false; + } + } + return true; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/SharedMap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/SharedMap.java new file mode 100644 index 0000000..aa876fa --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/SharedMap.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.Collection; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * This provides functions to share objects. + *

+ * In order to share objects, first of all call {@link #borrowObject()}. + * If the method returns a non-null object, use it. + * If the method returns null, create a new object and call {@link #registerObject()}, + * and use it. Just after the registration the object is regarded as borrowed. + *

+ * After using the borrowed object, + * call {@link #returnObject()} to return the object to the pool, + * or call {@link #invalidateObject()} to invalidate the object's registration + * instead of returning the object. + * If the pool is already full, the method {@code returnObject()} returns false + * with invalidating the registration of the object. + * Also the method returns false if the object's registration is already invalidated + * because {@link #invalidateObject()} or {@link #clear()} has been called. + * Anyway if the method {@code returnObject()} returns false + * the object is rejected to return the pool and is already invalidated, + * and you might tear down the rejected object. + * You can safely tear down the rejected object even in multi-thread contexts, + * because the object is never passed to the other threads via the pool. + *

+ * Incidentally, you can call {@link #invalidateObject} and {@link #clear()} + * in order to invalidate the registrations at any time, by any thread, + * regardless of whether the objects are borrowed or not. + * The method {@code clear()} returns a collection of the idle objects, + * which have been registered but not borrowed and idle in the pool, + * and you can safely tear down them in the sense that + * you get them exclusively in multi-thread contexts. + * The rest of the objects that have been registered are borrowed and in use + * by some other threads at the moment, and you can't tear down them right now. + * But the borrower eventually calls {@code returnObject()} (which returns false) + * or {@code invalidateObject()} after using the borrowed object, and in either method + * the borrower know that the registration of the object is already invalidated + * and the object is ready to tear down. + *

+ * The implementations of this interface are expected to be thread safe, + * but it depends on the implementations whether it borrows the same shared object + * to different callers at the same time. + *

+ * Objects to be shared are identified with their equality, instead of sameness. + * Concretely, when we have two objects {@code o1} and {@code o2} to be shared, + * we won't distinguish between {@code o1} the {@code o2} + * if {@code o1.equals(o2)} returns true rather than if {@code o1 == o2} is true. + * That enables us to use ordinary collections for the objects + * and simplify the implementation logic of this interface. + * Also that is enough for practical uses. + * + * @param the type of keys that you associate objects to be shared with + * @param the type of objects to be shared + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface SharedMap { + /** + * Returns one of the shared objects associated with the given {@code key}, + * or returns null if there is no appropriate object to borrow. + * In the latter case, it is expected that a new instance is created + * and registered to this instance. + * + * @throws NullPointerException if {@code key} is null + */ + V borrowObject(K key); + + /** + * Registers the given {@code value} to this instance, + * associating it with the given {@code key}. + * At this point the registered object is regarded as borrowed + * and you can continue to use the object, + * and after finishing using it you have to return or invalidate + * the registered object into this instance. + *

+ * This method does nothing and returns false if and only if + * the given {@code value} is already registered + * associated with the given {@code key}. + * In practice the {@code value} is expected to be a newly created object, + * and this method is expected to return true. + * + * @throws NullPointerException if {@code key} or {@code value} is null + */ + boolean registerObject(K key, V value); + + /** + * Makes the given {@code value} associated with the given {@code key} + * to be returned into this instance. + * It is expected that the {@code value} has been borrowed from this instance. + *

+ * If this method returns false, then the registration of the {@code value} is invalidated. + * This happens because the pool is full when this method is called, + * or you have explicitly called {@link #invalidateObject()} or {@link #clear()}. + * You might have to tear down the rejected object. + * + * @throws NullPointerException if {@code key} or {@code value} is null + */ + boolean returnObject(K key, V value); + + /** + * Invalidates the registration of the given {@code value} associated with the {@code key}. + * You may call this method regardless of whether the {@code value} is borrowed or not. + * You might have to tear down the removed object. + * + * @throws NullPointerException if {@code key} or {@code value} is null + */ + void invalidateObject(K key, V value); + + /** + * Invalidates all of the registrations to this instance, + * and returns the objects that have been registered but idle in the pool. + * You might have to tear down the returned objects. + *

+ * The rest of the objects that have been registered are still in use at this moment, + * and they are not contained in the collection returned by this method. + * In fact, you should not tear down them until the borrowers complete the usage. + * Instead, because the borrowers are supposed to call {@link #returnObject()} + * (which rejects invalidated objects and returns false) + * or {@link #invalidateObject()} after their usage, + * you can make the borrowers themselves tear down the invalidated objects + * just after calling these methods. + */ + Collection clear(); + + /** + * Invalidates all of the registrations to this instance associated with the given {@code key}, + * and returns the objects that have been registered but idle in the pool. + * You might have to tear down the returned objects. + *

+ * The rest of the objects that have been registered are still in use at this moment, + * and they are not contained in the collection returned by this method. + * In fact, you should not tear down them until the borrowers complete the usage. + * Instead, because the borrowers are supposed to call {@link #returnObject()} + * (which rejects invalidated objects and returns false) + * or {@link #invalidateObject()} after their usage, + * you can make the borrowers themselves tear down the invalidated objects + * just after calling these methods. + * + * @throws NullPointerException if {@code key} is null + */ + Collection clear(K key); + + /** + * Returns the count of the registered objects. + */ + int size(); + + /** + * Returns the count of the registered objects associated with the given {@code key}. + * + * @throws NullPointerException if {@code key} is null + */ + int size(K key); + + /** + * Returns true if there is no registration. + */ + boolean isEmpty(); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ThreadLocalSharedMapDecorator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ThreadLocalSharedMapDecorator.java new file mode 100644 index 0000000..4ca9f5c --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ThreadLocalSharedMapDecorator.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Decorator of {@code SharedMap} + * which gives the same shared object for each key at the same time for each thread. + * This lends the same object even after the other threads invalidate the object, + * until the borrowing thread completely returns the object or invalidates it. + *

+ * Note that it depends on the base instance of {@code SharedMap} + * whether the each shared object is accessed by at most one thread at the same time or not. + *

+ * Thread safe. + * Objects to be shared are identified with their equality, instead of sameness. + * + * @param the type of keys that you associate objects to be shared with + * @param the type of objects to be shared + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class ThreadLocalSharedMapDecorator implements SharedMap { + private static class BorrowingCounter { + final V value; + + int count = 1; + + BorrowingCounter(V value) { + assert value != null; + this.value = value; + } + } + + private final ThreadLocal>> borrowingCounterMapRef + = new ThreadLocal>>() { + @Override + protected Map> initialValue() { + return new HashMap>(); + } + }; + + private final SharedMap base; + + /** + * @throws NullPointerException if {@code base} is null + */ + public ThreadLocalSharedMapDecorator(SharedMap base) { + if (base == null) { throw new NullPointerException(); } + this.base = base; + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public V borrowObject(K key) { + if (key == null) { throw new NullPointerException(); } + + Map> counterMap = borrowingCounterMapRef.get(); + + BorrowingCounter counter = counterMap.get(key); + if (counter == null) { + V value = base.borrowObject(key); + if (value != null) { + counterMap.put(key, new BorrowingCounter(value)); + } + return value; + } + + counter.count++; + return counter.value; + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public boolean registerObject(K key, V value) { + if (! base.registerObject(key, value)) { + return false; + } + + Map> counterMap = borrowingCounterMapRef.get(); + BorrowingCounter counter = counterMap.get(key); + if (counter == null) { + counterMap.put(key, new BorrowingCounter(value)); + } + return true; + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public boolean returnObject(K key, V value) { + if (key == null) { throw new NullPointerException("key"); } + if (value == null) { throw new NullPointerException("value"); } + + Map> counterMap = borrowingCounterMapRef.get(); + + BorrowingCounter counter = counterMap.get(key); + if (counter == null || ! counter.value.equals(value)) { + return base.returnObject(key, value); + } + + if (--counter.count > 0) { + return true; + } + + counterMap.remove(key); + return base.returnObject(key, value); + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public void invalidateObject(K key, V value) { + base.invalidateObject(key, value); + + Map> counterMap = borrowingCounterMapRef.get(); + BorrowingCounter counter = counterMap.get(key); + if (counter == null || ! counter.value.equals(value)) { + return; + } + counterMap.remove(key); + } + + @Override + public Collection clear() { + return base.clear(); + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public Collection clear(K key) { + return base.clear(key); + } + + @Override + public int size() { + return base.size(); + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public int size(K key) { + return base.size(key); + } + + @Override + public boolean isEmpty() { + return base.isEmpty(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTablePool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTablePool.java index 49f75aa..522321c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTablePool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTablePool.java @@ -19,10 +19,12 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; import junit.framework.Assert; import junit.framework.TestCase; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.PoolMap.PoolType; @@ -193,20 +195,26 @@ public class TestHTablePool { @Test public void testTableWithMaxSize() throws Exception { + HTableFactoryWithTableCounter tableFactory = new HTableFactoryWithTableCounter(); + HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), 2, - getPoolType()); + tableFactory, getPoolType()); // Request tables from an empty pool HTableInterface table1 = pool.getTable(TABLENAME); HTableInterface table2 = pool.getTable(TABLENAME); HTableInterface table3 = pool.getTable(TABLENAME); + tableFactory.assertTableCount(3); + // Close tables (returns tables to the pool) table1.close(); table2.close(); // The pool should reject this one since it is already full table3.close(); + tableFactory.assertTableCount(2); + // Request tables of the same name HTableInterface sameTable1 = pool.getTable(TABLENAME); HTableInterface sameTable2 = pool.getTable(TABLENAME); @@ -224,8 +232,10 @@ public class TestHTablePool { @Test public void testCloseTablePool() throws IOException { + HTableFactoryWithTableCounter tableFactory = new HTableFactoryWithTableCounter(); + HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), 4, - getPoolType()); + tableFactory, getPoolType()); HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); if (admin.tableExists(TABLENAME)) { @@ -243,19 +253,17 @@ public class TestHTablePool { tables[i] = pool.getTable(TABLENAME); } + tableFactory.assertTableCount(4); + pool.closeTablePool(TABLENAME); + tableFactory.assertTableCount(4); + for (int i = 0; i < 4; ++i) { tables[i].close(); } - Assert.assertEquals(4, - pool.getCurrentPoolSize(Bytes.toString(TABLENAME))); - - pool.closeTablePool(TABLENAME); - - Assert.assertEquals(0, - pool.getCurrentPoolSize(Bytes.toString(TABLENAME))); + tableFactory.assertTableCount(0); } } @@ -268,14 +276,18 @@ public class TestHTablePool { @Test public void testTableWithMaxSize() throws Exception { + HTableFactoryWithTableCounter tableFactory = new HTableFactoryWithTableCounter(); + HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), 2, - getPoolType()); + tableFactory, getPoolType()); // Request tables from an empty pool HTableInterface table1 = pool.getTable(TABLENAME); HTableInterface table2 = pool.getTable(TABLENAME); HTableInterface table3 = pool.getTable(TABLENAME); + tableFactory.assertTableCount(1); + // Close tables (returns tables to the pool) table1.close(); table2.close(); @@ -283,6 +295,8 @@ public class TestHTablePool { // <= 2 table3.close(); + tableFactory.assertTableCount(1); + // Request tables of the same name HTableInterface sameTable1 = pool.getTable(TABLENAME); HTableInterface sameTable2 = pool.getTable(TABLENAME); @@ -300,8 +314,10 @@ public class TestHTablePool { @Test public void testCloseTablePool() throws IOException { + HTableFactoryWithTableCounter tableFactory = new HTableFactoryWithTableCounter(); + HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), 4, - getPoolType()); + tableFactory, getPoolType()); HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); if (admin.tableExists(TABLENAME)) { @@ -319,20 +335,41 @@ public class TestHTablePool { tables[i] = pool.getTable(TABLENAME); } + tableFactory.assertTableCount(1); + pool.closeTablePool(TABLENAME); + tableFactory.assertTableCount(1); + for (int i = 0; i < 4; ++i) { tables[i].close(); } - Assert.assertEquals(1, - pool.getCurrentPoolSize(Bytes.toString(TABLENAME))); - - pool.closeTablePool(TABLENAME); - - Assert.assertEquals(0, - pool.getCurrentPoolSize(Bytes.toString(TABLENAME))); + tableFactory.assertTableCount(0); } } + private static class HTableFactoryWithTableCounter implements HTableInterfaceFactory { + final HTableInterfaceFactory baseTableFactory = new HTableFactory(); + final AtomicInteger tableCounter = new AtomicInteger(); + + @Override + public HTableInterface createHTableInterface(Configuration config, byte[] tableName) { + HTableInterface table = baseTableFactory.createHTableInterface(config, tableName); + tableCounter.incrementAndGet(); + return table; + } + + @Override + public void releaseHTableInterface(HTableInterface table) throws IOException { + tableCounter.decrementAndGet(); + baseTableFactory.releaseHTableInterface(table); + } + + void assertTableCount(int count) { + Assert.assertEquals(count, tableCounter.get()); + } + + } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/AbstractTestSharedMap.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/AbstractTestSharedMap.java new file mode 100644 index 0000000..ea8443e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/AbstractTestSharedMap.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public abstract class AbstractTestSharedMap { + abstract SharedMap newSharedMapWithInfinitePool(); + + SharedMap map; + Object o1; + Object o2; + Object o3; + Object o4; + + @Before + public void setUp() { + map = newSharedMapWithInfinitePool(); + o1 = new Object(); + o2 = new Object(); + o3 = new Object(); + o4 = new Object(); + } + + @Test + public void testBorrowRegisterReturn() { + Assert.assertNull(map.borrowObject("a")); + + map.registerObject("a", o1); + Assert.assertTrue(map.returnObject("a", o1)); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertNull(map.borrowObject("a")); + Assert.assertTrue(map.returnObject("a", o1)); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertNull(map.borrowObject("a")); + Assert.assertTrue(map.returnObject("a", o1)); + + Assert.assertNull(map.borrowObject("b")); + Assert.assertFalse(map.returnObject("a", o2)); + } + + @Test + public void testMultipleRegisters() { + map.registerObject("a", o1); + map.registerObject("a", o2); + + Assert.assertTrue(map.returnObject("a", o1)); + Assert.assertTrue(map.returnObject("a", o2)); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertNull(map.borrowObject("a")); + } + + @Test + public void testRejectDuplicateRegistration() { + Assert.assertTrue(map.registerObject("a", o1)); + Assert.assertFalse(map.registerObject("a", o1)); + } + + @Test + public void testInvalidateBeforeReturn() { + map.registerObject("a", o1); + + map.invalidateObject("a", o1); + + Assert.assertFalse(map.returnObject("a", o1)); + Assert.assertNull(map.borrowObject("a")); + } + + @Test + public void testInvalidateAfterReturn() { + map.registerObject("a", o1); + Assert.assertTrue(map.returnObject("a", o1)); + + map.invalidateObject("a", o1); + + Assert.assertNull(map.borrowObject("a")); + } + + @Test + public void testClear() { + map.registerObject("a", o1); + map.registerObject("b", o2); + map.registerObject("c", o3); + + Assert.assertTrue(map.returnObject("a", o1)); + Assert.assertTrue(map.returnObject("b", o2)); + + Collection targets = map.clear(); + + Assert.assertEquals( + new HashSet(Arrays.asList(o1, o2)), + new HashSet(targets)); + + Assert.assertNull(map.borrowObject("a")); + Assert.assertNull(map.borrowObject("b")); + Assert.assertFalse(map.returnObject("c", o3)); + } + + @Test + public void testClearWithKey() { + map.registerObject("a", o1); + map.registerObject("a", o2); + map.registerObject("b", o3); + map.registerObject("b", o4); + + Assert.assertTrue(map.returnObject("a", o1)); + Assert.assertTrue(map.returnObject("b", o3)); + + Collection targets = map.clear("a"); + + Assert.assertEquals( + new HashSet(Collections.singleton(o1)), + new HashSet(targets)); + + Assert.assertNull(map.borrowObject("a")); + Assert.assertFalse(map.returnObject("a", o2)); + + Assert.assertSame(o3, map.borrowObject("b")); + Assert.assertTrue(map.returnObject("b", o4)); + } + + @Test + public void testSizeAndEmpty() { + Assert.assertEquals(0, map.size()); + Assert.assertEquals(0, map.size("a")); + Assert.assertEquals(0, map.size("b")); + Assert.assertTrue(map.isEmpty()); + + map.registerObject("a", o1); + + Assert.assertEquals(1, map.size()); + Assert.assertEquals(1, map.size("a")); + Assert.assertEquals(0, map.size("b")); + Assert.assertFalse(map.isEmpty()); + + map.registerObject("a", o2); + + Assert.assertEquals(2, map.size()); + Assert.assertEquals(2, map.size("a")); + Assert.assertEquals(0, map.size("b")); + Assert.assertFalse(map.isEmpty()); + + map.registerObject("b", o3); + + Assert.assertEquals(3, map.size()); + Assert.assertEquals(2, map.size("a")); + Assert.assertEquals(1, map.size("b")); + Assert.assertFalse(map.isEmpty()); + + map.clear(); + + Assert.assertEquals(0, map.size()); + Assert.assertEquals(0, map.size("a")); + Assert.assertEquals(0, map.size("b")); + Assert.assertTrue(map.isEmpty()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestReusableSharedMap.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestReusableSharedMap.java new file mode 100644 index 0000000..3f60678 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestReusableSharedMap.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import org.apache.hadoop.hbase.SmallTests; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestReusableSharedMap extends AbstractTestSharedMap { + @Override + SharedMap newSharedMapWithInfinitePool() { + return new ReusableSharedMap(Integer.MAX_VALUE); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestReusableSharedMapWithFinitePool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestReusableSharedMapWithFinitePool.java new file mode 100644 index 0000000..99452fe --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestReusableSharedMapWithFinitePool.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import org.apache.hadoop.hbase.SmallTests; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestReusableSharedMapWithFinitePool { + SharedMap map; + Object o1; + Object o2; + Object o3; + Object o4; + + @Before + public void setUp() { + map = new ReusableSharedMap(3); + o1 = new Object(); + o2 = new Object(); + o3 = new Object(); + o4 = new Object(); + } + + @Test + public void testOverRegisters() { + map.registerObject("a", o1); + map.registerObject("a", o2); + map.registerObject("a", o3); + map.registerObject("a", o4); + + Assert.assertNull(map.borrowObject("a")); + + Assert.assertTrue(map.returnObject("a", o1)); + Assert.assertTrue(map.returnObject("a", o2)); + Assert.assertTrue(map.returnObject("a", o3)); + Assert.assertFalse(map.returnObject("a", o4)); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertSame(o3, map.borrowObject("a")); + Assert.assertNull(map.borrowObject("a")); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestRoundRobinSharedMap.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestRoundRobinSharedMap.java new file mode 100644 index 0000000..49140c9 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestRoundRobinSharedMap.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import org.apache.hadoop.hbase.SmallTests; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestRoundRobinSharedMap extends AbstractTestSharedMap { + @Override + SharedMap newSharedMapWithInfinitePool() { + return new RoundRobinSharedMap(Integer.MAX_VALUE); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestRoundRobinSharedMapWithFinitePool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestRoundRobinSharedMapWithFinitePool.java new file mode 100644 index 0000000..d21b364 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestRoundRobinSharedMapWithFinitePool.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import org.apache.hadoop.hbase.SmallTests; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestRoundRobinSharedMapWithFinitePool { + SharedMap map; + Object o1; + Object o2; + Object o3; + Object o4; + + @Before + public void setUp() { + map = new RoundRobinSharedMap(3); + o1 = new Object(); + o2 = new Object(); + o3 = new Object(); + o4 = new Object(); + } + + @Test + public void testFullRegisters() { + map.registerObject("a", o1); + map.registerObject("a", o2); + + Assert.assertNull(map.borrowObject("a")); + + map.registerObject("a", o3); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertSame(o3, map.borrowObject("a")); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertSame(o3, map.borrowObject("a")); + } + + @Test + public void testOverRegisters() { + map.registerObject("a", o1); + map.registerObject("a", o2); + map.registerObject("a", o3); + map.registerObject("a", o4); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertSame(o3, map.borrowObject("a")); + Assert.assertSame(o4, map.borrowObject("a")); + + Assert.assertTrue(map.returnObject("a", o1)); + Assert.assertTrue(map.returnObject("a", o2)); + Assert.assertTrue(map.returnObject("a", o3)); + Assert.assertTrue(map.returnObject("a", o4)); + + Assert.assertTrue(map.returnObject("a", o1)); + Assert.assertTrue(map.returnObject("a", o2)); + Assert.assertTrue(map.returnObject("a", o3)); + Assert.assertFalse(map.returnObject("a", o4)); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertSame(o3, map.borrowObject("a")); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertSame(o3, map.borrowObject("a")); + } + + @Test + public void testIdleObjectPriority() { + map.registerObject("a", o1); + map.registerObject("a", o2); + map.registerObject("a", o3); + + map.returnObject("a", o2); + + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertSame(o3, map.borrowObject("a")); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestSegmentedSharedMap.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestSegmentedSharedMap.java new file mode 100644 index 0000000..15734b7 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestSegmentedSharedMap.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.util.SegmentedSharedMap.SegmentFactory; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestSegmentedSharedMap extends AbstractTestSharedMap { + @Override + SharedMap newSharedMapWithInfinitePool() { + return new SegmentedSharedMap(new SegmentFactory() { + @Override + public SharedMap create() { + return new ReusableSharedMap(Integer.MAX_VALUE); + } + }); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestSegmentedSharedMapForSegments.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestSegmentedSharedMapForSegments.java new file mode 100644 index 0000000..0477c9c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestSegmentedSharedMapForSegments.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.util.SegmentedSharedMap.SegmentFactory; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestSegmentedSharedMapForSegments { + @Test + public void testAllSegmentsUsed() { + SegmentedSharedMap map = new SegmentedSharedMap( + new SegmentFactory() { + @Override + public SharedMap create() { + return new ReusableSharedMap(Integer.MAX_VALUE); + } + }, 1000); + + Set> segmentsUsed = new HashSet>(); + + for(int i=0; i<1000000; i++) { + segmentsUsed.add(map.segmentFor(new Object())); + if(segmentsUsed.size() == map.countOfSegments()) { + return; + } + } + + Assert.fail("bug or bad luck"); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestThreadLocalSharedMapDecorator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestThreadLocalSharedMapDecorator.java new file mode 100644 index 0000000..cc2b9c5 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestThreadLocalSharedMapDecorator.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.hbase.SmallTests; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestThreadLocalSharedMapDecorator { + SharedMap map; + Object o1; + Object o2; + Object o3; + + @Before + public void setUp() { + map = new ThreadLocalSharedMapDecorator( + new RoundRobinSharedMap(Integer.MAX_VALUE)); + o1 = new Object(); + o2 = new Object(); + o3 = new Object(); + } + + @Test + public void testRegisterAndBorrow() { + Assert.assertNull(map.borrowObject("a")); + + map.registerObject("a", o1); + map.registerObject("a", o2); + map.registerObject("a", o3); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertSame(o1, map.borrowObject("a")); + } + + @Test + public void testReturnAndBorrow() { + Assert.assertNull(map.borrowObject("a")); + + map.registerObject("a", o1); + Assert.assertTrue(map.returnObject("a", o1)); + + map.registerObject("a", o2); + Assert.assertTrue(map.returnObject("a", o2)); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertTrue(map.returnObject("a", o1)); + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertTrue(map.returnObject("a", o1)); + Assert.assertTrue(map.returnObject("a", o1)); + + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertTrue(map.returnObject("a", o2)); + Assert.assertTrue(map.returnObject("a", o2)); + + Assert.assertSame(o1, map.borrowObject("a")); + } + + @Test + public void testRejectDuplicateRegistration() { + Assert.assertTrue(map.registerObject("a", o1)); + Assert.assertFalse(map.registerObject("a", o1)); + } + + @Test + public void testInvalidate() { + map.registerObject("a", o1); + Assert.assertTrue(map.returnObject("a", o1)); + + map.registerObject("a", o2); + Assert.assertTrue(map.returnObject("a", o2)); + + Assert.assertSame(o1, map.borrowObject("a")); + + map.invalidateObject("a", o1); + + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertFalse(map.returnObject("a", o1)); + } + + @Test + public void testClear() { + map.registerObject("a", o1); + + map.clear(); + + // We still enable to borrow until we return completely + // or invalidate from inside the thread. + Assert.assertSame(o1, map.borrowObject("a")); + + Assert.assertTrue(map.returnObject("a", o1)); + Assert.assertFalse(map.returnObject("a", o1)); // completely returned and rejected + } + + @Test + public void testThreadLocal() throws Exception { + map.registerObject("a", o1); + Assert.assertTrue(map.returnObject("a", o1)); + + map.registerObject("a", o2); + Assert.assertTrue(map.returnObject("a", o2)); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertTrue(map.returnObject("a", o1)); + + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertTrue(map.returnObject("a", o2)); + + final AtomicReference borrowedRef = new AtomicReference(); + + Thread thread = new Thread() { + @Override + public void run() { + borrowedRef.set(map.borrowObject("a")); + } + }; + + thread.start(); + thread.join(); + + Assert.assertSame(o1, borrowedRef.get()); + + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertTrue(map.returnObject("a", o2)); + + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertTrue(map.returnObject("a", o2)); + } +}