diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java index 1b31222..a4a3074 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java @@ -29,8 +29,10 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.PoolMap; import org.apache.hadoop.hbase.util.PoolMap.PoolType; +import org.apache.hadoop.hbase.util.ReusableSharedMap; +import org.apache.hadoop.hbase.util.SharedMap; +import org.apache.hadoop.hbase.util.ThreadLocalSharedMapDecorator; import java.io.Closeable; import java.io.IOException; @@ -63,9 +65,7 @@ import java.util.Map; @InterfaceAudience.Public @InterfaceStability.Stable public class HTablePool implements Closeable { - private final PoolMap tables; - private final int maxSize; - private final PoolType poolType; + private final SharedMap tables; private final Configuration config; private final HTableInterfaceFactory tableFactory; @@ -143,24 +143,14 @@ public class HTablePool implements Closeable { // Make a new configuration instance so I can safely cleanup when // done with the pool. this.config = config == null ? HBaseConfiguration.create() : config; - this.maxSize = maxSize; - this.tableFactory = tableFactory == null ? new HTableFactory() - : tableFactory; - if (poolType == null) { - this.poolType = PoolType.Reusable; + this.tableFactory = tableFactory == null ? new HTableFactory() : tableFactory; + + if(poolType != PoolType.ThreadLocal) { + tables = new ReusableSharedMap(maxSize); } else { - switch (poolType) { - case Reusable: - case ThreadLocal: - this.poolType = poolType; - break; - default: - this.poolType = PoolType.Reusable; - break; - } - } - this.tables = new PoolMap(this.poolType, - this.maxSize); + tables = new ThreadLocalSharedMapDecorator( + new ReusableSharedMap(maxSize)); + } } /** @@ -195,9 +185,10 @@ public class HTablePool implements Closeable { * if there is a problem instantiating the HTable */ private HTableInterface findOrCreateTable(String tableName) { - HTableInterface table = tables.get(tableName); + HTableInterface table = tables.borrowObject(tableName); if (table == null) { table = createHTable(tableName); + tables.registerObject(tableName, table); } return table; } @@ -257,13 +248,11 @@ public class HTablePool implements Closeable { private void returnTable(HTableInterface table) throws IOException { // this is the old putTable method renamed and made private String tableName = Bytes.toString(table.getTableName()); - if (tables.size(tableName) >= maxSize) { + + if(! tables.returnObject(tableName, table)) { // release table instance since we're not reusing it - this.tables.remove(tableName, table); this.tableFactory.releaseHTableInterface(table); - return; } - tables.put(tableName, table); } protected HTableInterface createHTable(String tableName) { @@ -282,13 +271,10 @@ public class HTablePool implements Closeable { * @param tableName */ public void closeTablePool(final String tableName) throws IOException { - Collection tables = this.tables.values(tableName); - if (tables != null) { - for (HTableInterface table : tables) { - this.tableFactory.releaseHTableInterface(table); - } + Collection tables = this.tables.clear(tableName); + for (HTableInterface table : tables) { + this.tableFactory.releaseHTableInterface(table); } - this.tables.remove(tableName); } /** @@ -307,10 +293,10 @@ public class HTablePool implements Closeable { * Note: this is a 'shutdown' of all the table pools. */ public void close() throws IOException { - for (String tableName : tables.keySet()) { - closeTablePool(tableName); + Collection tables = this.tables.clear(); + for (HTableInterface table : tables) { + this.tableFactory.releaseHTableInterface(table); } - this.tables.clear(); } public int getCurrentPoolSize(String tableName) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java index 4ccb298..4a27b59 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java @@ -31,11 +31,14 @@ import java.lang.reflect.Method; import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.Socket; +import java.net.SocketAddress; import java.net.SocketException; import java.net.SocketTimeoutException; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; +import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; @@ -43,7 +46,9 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Random; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import javax.net.SocketFactory; @@ -74,8 +79,13 @@ import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier; import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.PoolMap; import org.apache.hadoop.hbase.util.PoolMap.PoolType; +import org.apache.hadoop.hbase.util.RoundRobinSharedMap; +import org.apache.hadoop.hbase.util.SegmentedSharedMap; +import org.apache.hadoop.hbase.util.SegmentedSharedMap.SegmentFactory; +import org.apache.hadoop.hbase.util.SegmentedMultivaluedMap; +import org.apache.hadoop.hbase.util.SharedMap; +import org.apache.hadoop.hbase.util.MultivaluedMap; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; @@ -104,10 +114,14 @@ import com.google.protobuf.TextFormat; @InterfaceAudience.Private public class HBaseClient { public static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.HBaseClient"); - protected final PoolMap connections; + protected final SegmentedSharedMap connectionMap; private ReflectionCache reflectionCache = new ReflectionCache(); + private final ThreadGroup connectionThreadGroup; + /** Thread safe */ + private final MultivaluedMap socketMap = + new SegmentedMultivaluedMap(); - protected int counter; // counter for call ids + protected final AtomicInteger counter = new AtomicInteger(); // counter for call ids protected final AtomicBoolean running = new AtomicBoolean(true); // if client runs final protected Configuration conf; final protected int maxIdleTime; // connections will be culled if it was idle for @@ -233,68 +247,117 @@ public class HBaseClient { } /** A call waiting for a value. */ - protected class Call { - final int id; // call id + private interface Call { + Method getMethod(); + Message getParam(); + CellScanner getCells(); + long getStartTime(); + void setException(IOException error); + void setResponse(Pair response); + } + + private static class SingleCall implements Call { + final Method method; final Message param; // rpc request method param object /** * Optionally has cells when making call. Optionally has cells set on response. Used * passing cells to the rpc and receiving the response. */ - CellScanner cells; - Message response; // value, null if error + final CellScanner cells; + final long startTime; + + final CountDownLatch doneLatch = new CountDownLatch(1); + Pair response; // value, null if error IOException error; // exception, null if value - boolean done; // true when call is done - long startTime; - final Method method; - protected Call(final Method method, Message param, final CellScanner cells) { - this.param = param; + SingleCall(Method method, Message param, CellScanner cells, long startTime) { this.method = method; + this.param = param; this.cells = cells; - this.startTime = System.currentTimeMillis(); - synchronized (HBaseClient.this) { - this.id = counter++; - } + this.startTime = startTime; } @Override - public String toString() { - return "callId: " + this.id + " methodName: " + this.method.getName() + " param {" + - (this.param != null? TextFormat.shortDebugString(this.param): "") + "}"; + public Method getMethod() { + return method; } - /** Indicate when the call is complete and the - * value or error are available. Notifies by default. */ - protected synchronized void callComplete() { - this.done = true; - notify(); // notify caller + @Override + public Message getParam() { + return param; } - /** Set the exception when there is an error. - * Notify the caller the call is done. - * - * @param error exception thrown by the call; either local or remote - */ - public synchronized void setException(IOException error) { + @Override + public CellScanner getCells() { + return cells; + } + + @Override + public long getStartTime() { + return startTime; + } + + @Override + public void setException(IOException error) { this.error = error; - callComplete(); + doneLatch.countDown(); } - /** Set the return value when there is no error. - * Notify the caller the call is done. - * - * @param response return value of the call. - * @param cells Can be null - */ - public synchronized void setResponse(Message response, final CellScanner cells) { + @Override + public void setResponse(Pair response) { this.response = response; - this.cells = cells; - callComplete(); + doneLatch.countDown(); + } + + Pair getResponse() throws IOException, InterruptedException { + doneLatch.await(); + if (error != null) { + throw error; + } + return response; + } + } + + private static abstract class AbstractCallDecorator implements Call { + final Call base; + + AbstractCallDecorator(Call base) { + this.base = base; } + @Override + public Method getMethod() { + return base.getMethod(); + } + + @Override + public Message getParam() { + return base.getParam(); + } + + @Override + public CellScanner getCells() { + return base.getCells(); + } + + @Override public long getStartTime() { - return this.startTime; + return base.getStartTime(); + } + + @Override + public void setException(IOException error) { + base.setException(error); + done(); + } + + @Override + public void setResponse(Pair response) { + base.setResponse(response); + done(); } + + void done() {} } protected final static Map> tokenHandlers = @@ -308,16 +371,15 @@ public class HBaseClient { * Creates a connection. Can be overridden by a subclass for testing. * @param remoteId - the ConnectionId to use for the connection creation. */ - protected Connection createConnection(ConnectionId remoteId, final Codec codec, - final CompressionCodec compressor) - throws IOException { - return new Connection(remoteId, codec, compressor); + protected Connection createConnection(ConnectionId remoteId) throws IOException { + return new Connection(remoteId); } - /** Thread that reads responses and notifies callers. Each connection owns a + /** Runnable object that reads responses and notifies callers. Each connection owns a * socket connected to a remote address. Calls are multiplexed through this * socket: responses may be delivered out of order. */ - protected class Connection extends Thread { + protected class Connection implements Runnable { + private final String name; private ConnectionHeader header; // connection header protected ConnectionId remoteId; protected Socket socket = null; // connected socket @@ -330,8 +392,6 @@ public class HBaseClient { private Token token; private HBaseSaslRpcClient saslRpcClient; private int reloginMaxBackoff; // max pause before relogin on sasl failure - private final Codec codec; - private final CompressionCodec compressor; // currently active calls protected final ConcurrentSkipListMap calls = @@ -342,14 +402,11 @@ public class HBaseClient { new AtomicBoolean(); // indicate if the connection is closed protected IOException closeException; // close reason - Connection(ConnectionId remoteId, final Codec codec, final CompressionCodec compressor) - throws IOException { + Connection(ConnectionId remoteId) throws IOException { if (remoteId.getAddress().isUnresolved()) { throw new UnknownHostException("unknown host: " + remoteId.getAddress().getHostName()); } this.server = remoteId.getAddress(); - this.codec = codec; - this.compressor = compressor; UserGroupInformation ticket = remoteId.getTicket().getUGI(); Class protocol = remoteId.getProtocol(); @@ -403,17 +460,16 @@ public class HBaseClient { if ((userInfoPB = getUserInfo(ticket)) != null) { builder.setUserInfo(userInfoPB); } - builder.setCellBlockCodecClass(this.codec.getClass().getCanonicalName()); - if (this.compressor != null) { - builder.setCellBlockCompressorClass(this.compressor.getClass().getCanonicalName()); + builder.setCellBlockCodecClass(codec.getClass().getCanonicalName()); + if (compressor != null) { + builder.setCellBlockCompressorClass(compressor.getClass().getCanonicalName()); } this.header = builder.build(); - this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " + + this.name = "IPC Client (" + socketFactory.hashCode() +") connection to " + remoteId.getAddress().toString() + ((ticket==null)?" from an unknown user": (" from " - + ticket.getUserName()))); - this.setDaemon(true); + + ticket.getUserName())); } private UserInformation getUserInfo(UserGroupInformation ugi) { @@ -448,22 +504,19 @@ public class HBaseClient { * It is up to the user code to check this status. * @param call to add */ - protected synchronized void addCall(Call call) { + protected synchronized void addCall(int id, Call call) { // If the connection is about to close, we manage this as if the call was already added // to the connection calls list. If not, the connection creations are serialized, as // mentioned in HBASE-6364 if (this.shouldCloseConnection.get()) { if (this.closeException == null) { call.setException(new IOException( - "Call " + call.id + " not added as the connection " + remoteId + " is closing")); + "Call " + id + " not added as the connection " + remoteId + " is closing")); } else { call.setException(this.closeException); } - synchronized (call) { - call.notifyAll(); - } } else { - calls.put(call.id, call); + calls.put(id, call); notify(); } } @@ -530,6 +583,7 @@ public class HBaseClient { while (true) { try { this.socket = socketFactory.createSocket(); + socketMap.add(remoteId.getAddress(), this.socket); this.socket.setTcpNoDelay(tcpNoDelay); this.socket.setKeepAlive(tcpKeepAlive); // connection time out is 20s @@ -554,15 +608,16 @@ public class HBaseClient { protected void closeConnection() { // close the current connection if (socket != null) { + socketMap.remove(remoteId.getAddress(), socket); try { socket.close(); } catch (IOException e) { LOG.warn("Not able to close a socket", e); } + // set socket to null so that the next call to setupIOstreams + // can start the process of connect all over again. + socket = null; } - // set socket to null so that the next call to setupIOstreams - // can start the process of connect all over again. - socket = null; } /** @@ -653,7 +708,7 @@ public class HBaseClient { @Override public void run() { if (LOG.isDebugEnabled()) { - LOG.debug(getName() + ": starting, connections " + connections.size()); + LOG.debug(name + ": starting, connections " + connectionMap.size()); } try { @@ -661,14 +716,14 @@ public class HBaseClient { readResponse(); } } catch (Throwable t) { - LOG.warn(getName() + ": unexpected exception receiving call responses", t); + LOG.warn(name + ": unexpected exception receiving call responses", t); markClosed(new IOException("Unexpected exception receiving call responses", t)); } close(); if (LOG.isDebugEnabled()) - LOG.debug(getName() + ": stopped, connections " + connections.size()); + LOG.debug(name + ": stopped, connections " + connectionMap.size()); } private synchronized void disposeSasl() { @@ -724,57 +779,72 @@ public class HBaseClient { final int currRetries, final int maxRetries, final Exception ex, final Random rand, final UserGroupInformation user) - throws IOException, InterruptedException{ - user.doAs(new PrivilegedExceptionAction() { - public Object run() throws IOException, InterruptedException { - closeConnection(); - if (shouldAuthenticateOverKrb()) { - if (currRetries < maxRetries) { - LOG.debug("Exception encountered while connecting to " + - "the server : " + ex); - //try re-login - if (UserGroupInformation.isLoginKeytabBased()) { - UserGroupInformation.getLoginUser().reloginFromKeytab(); - } else { - UserGroupInformation.getLoginUser().reloginFromTicketCache(); + throws IOException { + try { + user.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws IOException { + closeConnection(); + if (shouldAuthenticateOverKrb()) { + try { + if (currRetries < maxRetries) { + LOG.debug("Exception encountered while connecting to " + + "the server : " + ex); + //try re-login + if (UserGroupInformation.isLoginKeytabBased()) { + UserGroupInformation.getLoginUser().reloginFromKeytab(); + } else { + UserGroupInformation.getLoginUser().reloginFromTicketCache(); + } + disposeSasl(); + //have granularity of milliseconds + //we are sleeping with the Connection lock held but since this + //connection instance is being used for connecting to the server + //in question, it is okay + Thread.sleep((rand.nextInt(reloginMaxBackoff) + 1)); + return null; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } - disposeSasl(); - //have granularity of milliseconds - //we are sleeping with the Connection lock held but since this - //connection instance is being used for connecting to the server - //in question, it is okay - Thread.sleep((rand.nextInt(reloginMaxBackoff) + 1)); - return null; - } else { String msg = "Couldn't setup connection for " + - UserGroupInformation.getLoginUser().getUserName() + - " to " + serverPrincipal; + UserGroupInformation.getLoginUser().getUserName() + + " to " + serverPrincipal; LOG.warn(msg); throw (IOException) new IOException(msg).initCause(ex); + + } else { + LOG.warn("Exception encountered while connecting to " + + "the server : " + ex); } - } else { - LOG.warn("Exception encountered while connecting to " + - "the server : " + ex); - } - if (ex instanceof RemoteException) { - throw (RemoteException)ex; - } - if (ex instanceof SaslException) { - String msg = "SASL authentication failed." + - " The most likely cause is missing or invalid credentials." + - " Consider 'kinit'."; - LOG.fatal(msg, ex); - throw new RuntimeException(msg, ex); + if (ex instanceof RemoteException) { + throw (RemoteException)ex; + } + if (ex instanceof SaslException) { + String msg = "SASL authentication failed." + + " The most likely cause is missing or invalid credentials." + + " Consider 'kinit'."; + LOG.fatal(msg, ex); + throw new RuntimeException(msg, ex); + } + throw new IOException(ex); } - throw new IOException(ex); - } - }); + }); + } catch (InterruptedException e) { + throw new AssertionError(e); + } } - protected synchronized void setupIOstreams() - throws IOException, InterruptedException { - if (socket != null || shouldCloseConnection.get()) { - return; + /** + * @return true if ready; false if already closed + */ + protected synchronized boolean setupIOstreams() throws IOException { + if (shouldCloseConnection.get()) { + return false; + } + + if (socket != null) { + return true; } if (failedServers.isFailedServer(remoteId.getAddress())) { @@ -850,8 +920,10 @@ public class HBaseClient { touch(); // start the receiver thread after the socket connection has been set up - start(); - return; + Thread thread = new Thread(connectionThreadGroup, this, name); + thread.setDaemon(true); + thread.start(); + return true; } } catch (Throwable t) { failedServers.addToFailedServers(remoteId.address); @@ -899,27 +971,24 @@ public class HBaseClient { /** Close the connection. */ protected synchronized void close() { if (!shouldCloseConnection.get()) { - LOG.error(getName() + ": the connection is not in the closed state"); + LOG.error(name + ": the connection is not in the closed state"); return; } // release the resources // first thing to do;take the connection out of the connection list - synchronized (connections) { - if (connections.get(remoteId) == this) { - connections.remove(remoteId); - } - } + connectionMap.invalidateObject(remoteId, this); // close the streams and therefore the socket IOUtils.closeStream(out); IOUtils.closeStream(in); + socketMap.remove(remoteId.getAddress(), socket); disposeSasl(); // clean up all calls if (closeException == null) { if (!calls.isEmpty()) { - LOG.warn(getName() + ": connection is closed for no cause and calls are not empty. " + + LOG.warn(name + ": connection is closed for no cause and calls are not empty. " + "#Calls: " + calls.size()); // clean up calls anyway @@ -929,7 +998,7 @@ public class HBaseClient { } else { // log the info if (LOG.isDebugEnabled()) { - LOG.debug(getName() + ": closing ipc connection to " + server + ": " + + LOG.debug(name + ": closing ipc connection to " + server + ": " + closeException.getMessage(),closeException); } @@ -937,7 +1006,18 @@ public class HBaseClient { cleanupCalls(); } if (LOG.isDebugEnabled()) - LOG.debug(getName() + ": closed"); + LOG.debug(name + ": closed"); + } + + /** + * Prepares to handle a response for the given {@code call} and + * sends parameters of the {@code call}. + * The response is notified via methods of {@code call}. + */ + void invokeCall(Call call) { + int id = counter.getAndIncrement(); + addCall(id, call); + writeRequest(id, call); } /** @@ -947,19 +1027,20 @@ public class HBaseClient { * @param call * @see #readResponse() */ - protected void writeRequest(Call call) { + protected void writeRequest(int id, Call call) { if (shouldCloseConnection.get()) return; try { RequestHeader.Builder builder = RequestHeader.newBuilder(); - builder.setCallId(call.id); + builder.setCallId(id); + if (Trace.isTracing()) { Span s = Trace.currentTrace(); builder.setTraceInfo(RPCTInfo.newBuilder(). setParentId(s.getSpanId()).setTraceId(s.getTraceId())); } - builder.setMethodName(call.method.getName()); - builder.setRequestParam(call.param != null); - ByteBuffer cellBlock = ipcUtil.buildCellBlock(this.codec, this.compressor, call.cells); + builder.setMethodName(call.getMethod().getName()); + builder.setRequestParam(call.getParam() != null); + ByteBuffer cellBlock = ipcUtil.buildCellBlock(codec, compressor, call.getCells()); if (cellBlock != null) { CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder(); cellBlockBuilder.setLength(cellBlock.limit()); @@ -968,12 +1049,12 @@ public class HBaseClient { //noinspection SynchronizeOnNonFinalField RequestHeader header = builder.build(); synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC - IPCUtil.write(this.out, header, call.param, cellBlock); + IPCUtil.write(this.out, header, call.getParam(), cellBlock); } if (LOG.isDebugEnabled()) { - LOG.debug(getName() + ": wrote request header " + TextFormat.shortDebugString(header)); + LOG.debug(name + ": wrote request header " + TextFormat.shortDebugString(header)); } - } catch(IOException e) { + } catch (IOException e) { markClosed(e); } } @@ -994,7 +1075,7 @@ public class HBaseClient { ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in); int id = responseHeader.getCallId(); if (LOG.isDebugEnabled()) { - LOG.debug(getName() + ": got response header " + + LOG.debug(name + ": got response header " + TextFormat.shortDebugString(responseHeader)); } Call call = calls.get(id); @@ -1012,7 +1093,7 @@ public class HBaseClient { // TODO: Why pb engine pollution in here in this class? FIX. rpcResponseType = ProtobufRpcClientEngine.Invoker.getReturnProtoType( - reflectionCache.getMethod(remoteId.getProtocol(), call.method.getName())); + reflectionCache.getMethod(remoteId.getProtocol(), call.getMethod().getName())); } catch (Exception e) { throw new RuntimeException(e); //local exception } @@ -1027,11 +1108,12 @@ public class HBaseClient { int size = responseHeader.getCellBlockMeta().getLength(); byte [] cellBlock = new byte[size]; IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length); - cellBlockScanner = ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock); + cellBlockScanner = ipcUtil.createCellScanner(codec, compressor, cellBlock); } // it's possible that this call may have been cleaned up due to a RPC // timeout, so check if it still exists before setting the value. - if (call != null) call.setResponse(value, cellBlockScanner); + if (call != null) + call.setResponse(new Pair(value, cellBlockScanner)); } if (call != null) calls.remove(id); } catch (IOException e) { @@ -1091,7 +1173,8 @@ public class HBaseClient { protected void cleanupCalls(long rpcTimeout) { Iterator> itor = calls.entrySet().iterator(); while (itor.hasNext()) { - Call c = itor.next().getValue(); + Entry entry = itor.next(); + Call c = entry.getValue(); long waitTime = System.currentTimeMillis() - c.getStartTime(); if (waitTime >= rpcTimeout) { if (this.closeException == null) { @@ -1101,13 +1184,10 @@ public class HBaseClient { // over on the server; e.g. I just asked the regionserver to bulk // open 3k regions or its a big fat multiput into a heavily-loaded // server (Perhaps this only happens at the extremes?) - this.closeException = new CallTimeoutException("Call id=" + c.id + + this.closeException = new CallTimeoutException("Call id=" + entry.getKey() + ", waitTime=" + waitTime + ", rpcTimetout=" + rpcTimeout); } c.setException(this.closeException); - synchronized (c) { - c.notifyAll(); - } itor.remove(); } else { break; @@ -1166,9 +1246,21 @@ public class HBaseClient { this.compressor = getCompressor(conf); this.socketFactory = factory; this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT; - this.connections = new PoolMap( - getPoolType(conf), getPoolSize(conf)); + this.connectionMap = newConnectionMap(conf); this.failedServers = new FailedServers(conf); + this.connectionThreadGroup = new ThreadGroup( + "IPC Client (" + socketFactory.hashCode() +") connection"); + } + + private static SegmentedSharedMap newConnectionMap(Configuration conf) { + final int size = getPoolSize(conf); + return new SegmentedSharedMap( + new SegmentFactory() { + @Override + public SharedMap create() { + return new RoundRobinSharedMap(size); + } + }); } /** @@ -1222,7 +1314,10 @@ public class HBaseClient { * @param config configuration * @return either a {@link PoolType#RoundRobin} or * {@link PoolType#ThreadLocal} + * @deprecated The thread-local logic is not appropriate + * and never applied to this class anymore. */ + @Deprecated protected static PoolType getPoolType(Configuration config) { return PoolType.valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE), PoolType.RoundRobin, PoolType.ThreadLocal); @@ -1259,14 +1354,10 @@ public class HBaseClient { } // wake up all connections - synchronized (connections) { - for (Connection conn : connections.values()) { - conn.interrupt(); - } - } + connectionThreadGroup.interrupt(); // wait until all connections are closed - while (!connections.isEmpty()) { + while (!connectionMap.isEmpty()) { try { Thread.sleep(100); } catch (InterruptedException ignored) { @@ -1295,37 +1386,40 @@ public class HBaseClient { public Pair call(Method method, Message param, CellScanner cells, InetSocketAddress addr, Class protocol, User ticket, int rpcTimeout) throws InterruptedException, IOException { - Call call = new Call(method, param, cells); - Connection connection = - getConnection(addr, protocol, ticket, rpcTimeout, call, this.codec, this.compressor); - connection.writeRequest(call); // send the parameter - boolean interrupted = false; - //noinspection SynchronizationOnLocalVariableOrMethodParameter - synchronized (call) { - while (!call.done) { - try { - call.wait(); // wait for the result - } catch (InterruptedException ignored) { - // save the fact that we were interrupted - interrupted = true; - } - } + if (Thread.interrupted()) { + throw new InterruptedException(); + } - if (interrupted) { - // set the interrupt flag now that we are done waiting - Thread.currentThread().interrupt(); + SingleCall call = new SingleCall(method, param, cells, System.currentTimeMillis()); + ConnectionId remoteId = new ConnectionId(addr, protocol, ticket, rpcTimeout); + invokeCall(remoteId, call); + try { + return call.getResponse(); + } catch (IOException e) { + if (e instanceof RemoteException) { + e.fillInStackTrace(); + throw e; } + // local exception + throw wrapException(addr, e); + } + } - if (call.error != null) { - if (call.error instanceof RemoteException) { - call.error.fillInStackTrace(); - throw call.error; - } - // local exception - throw wrapException(addr, call.error); + /** + * Prepares to handle a response for the given {@code call} and + * sends parameters of the {@code call}, + * for an appropriate connection specified by the given {@code remoteId}. + * The response is notified via methods of {@code call}. + */ + private void invokeCall(ConnectionId remoteId, Call call) throws IOException { + final Connection connection = getConnection(remoteId); + + connection.invokeCall(new AbstractCallDecorator(call) { + @Override + void done() { + returnConnection(connection); } - return new Pair(call.response, call.cells); - } + }); } /** @@ -1365,16 +1459,15 @@ public class HBaseClient { * safe exception. */ public void cancelConnections(String hostname, int port, IOException ioe) { - synchronized (connections) { - for (Connection connection : connections.values()) { - if (connection.isAlive() && - connection.getRemoteAddress().getPort() == port && - connection.getRemoteAddress().getHostName().equals(hostname)) { - LOG.info("The server on " + hostname + ":" + port + - " is dead - stopping the connection " + connection.remoteId); - connection.closeConnection(); - // We could do a connection.interrupt(), but it's safer not to do it, as the - // interrupted exception behavior is not defined nor enforced enough. + Collection removed = socketMap.remove(new InetSocketAddress(hostname, port)); + if(! removed.isEmpty()) { + LOG.info("The server on " + hostname + ":" + port + " is dead", ioe); + + for(Socket socket : removed) { + try { + socket.close(); + } catch(IOException e) { + LOG.warn("Not able to close a socket", e); } } } @@ -1382,55 +1475,64 @@ public class HBaseClient { /* Get a connection from the pool, or create a new one and add it to the * pool. Connections to a given host/port are reused. */ - protected Connection getConnection(InetSocketAddress addr, Class protocol, - User ticket, int rpcTimeout, Call call, final Codec codec, final CompressionCodec compressor) - throws IOException, InterruptedException { + protected Connection getConnection(ConnectionId remoteId) throws IOException { if (!running.get()) { // the client is stopped throw new IOException("The client is stopped"); } - Connection connection; - /* we could avoid this allocation for each RPC by having a - * connectionsId object and with set() method. We need to manage the - * refs for keys in HashMap properly. For now its ok. - */ - ConnectionId remoteId = new ConnectionId(addr, protocol, ticket, rpcTimeout); - synchronized (connections) { - connection = connections.get(remoteId); - if (connection == null) { - connection = createConnection(remoteId, this.codec, this.compressor); - connections.put(remoteId, connection); + + final SharedMap segment = this.connectionMap.segmentFor(remoteId); + + while (true) { + Connection connection; + synchronized (segment) { + // This synchronization guards against creating excessive instances of Connection. + // Note that the methods of the segment are guarded by the segment itself, + // according to its actual implementation class. + // This synchronization blocks threads which use the same segment, + // and you should not execute long processing for specified remoteId + // in the synchronization. + + connection = segment.borrowObject(remoteId); + if (connection == null) { + connection = createConnection(remoteId); + segment.registerObject(remoteId, connection); + } } + + if (connection.setupIOstreams()) { + // Connection.setupIOstreams() establishes an actual network connection, + // which is relatively long processing for the specified remoteId. + // We execute the processing here separately from creating the instance of Connection, + // because the creation is executed under the synchronization of the segment, + // which blocks threads which use the same segment. + + return connection; + } + + segment.invalidateObject(remoteId, connection); } - connection.addCall(call); - - //we don't invoke the method below inside "synchronized (connections)" - //block above. The reason for that is if the server happens to be slow, - //it will take longer to establish a connection and that will slow the - //entire system down. - //Moreover, if the connection is currently created, there will be many threads - // waiting here; as setupIOstreams is synchronized. If the connection fails with a - // timeout, they will all fail simultaneously. This is checked in setupIOstreams. - connection.setupIOstreams(); - return connection; + } + + private void returnConnection(Connection connection) { + connectionMap.returnObject(connection.remoteId, connection); } /** * This class holds the address and the user ticket. The client connections * to servers are uniquely identified by */ - protected static class ConnectionId { + private final static class ConnectionId { final InetSocketAddress address; + final Class protocol; final User ticket; final int rpcTimeout; - Class protocol; - private static final int PRIME = 16777619; ConnectionId(InetSocketAddress address, Class protocol, User ticket, int rpcTimeout) { - this.protocol = protocol; this.address = address; + this.protocol = protocol; this.ticket = ticket; this.rpcTimeout = rpcTimeout; } @@ -1455,20 +1557,30 @@ public class HBaseClient { @Override public boolean equals(Object obj) { - if (obj instanceof ConnectionId) { - ConnectionId id = (ConnectionId) obj; - return address.equals(id.address) && protocol == id.protocol && - ((ticket != null && ticket.equals(id.ticket)) || - (ticket == id.ticket)) && rpcTimeout == id.rpcTimeout; - } - return false; + if(obj == this) { return true; } + if(! (obj instanceof ConnectionId)) { return false; } + ConnectionId other = (ConnectionId) obj; + + return address.equals(other.address) && + protocol == other.protocol && + (ticket == other.ticket || ticket != null && ticket.equals(other.ticket)) && + rpcTimeout == other.rpcTimeout; } - @Override // simply use the default Object#hashcode() ? + int hashCode; + + @Override public int hashCode() { - int hashcode = (address.hashCode() + PRIME * (PRIME * System.identityHashCode(protocol) ^ - (ticket == null ? 0 : ticket.hashCode()) )) ^ rpcTimeout; - return hashcode; + int hashCode = this.hashCode; + if(hashCode == 0) { + this.hashCode = hashCode = Arrays.hashCode(new int[] { + address.hashCode(), + System.identityHashCode(protocol), + (ticket == null ? 0 : ticket.hashCode()), + rpcTimeout + }); + } + return hashCode; } } -} \ No newline at end of file +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/AbstractSharedMap.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/AbstractSharedMap.java new file mode 100644 index 0000000..b9e1648 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/AbstractSharedMap.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * Skeleton implementation of {@code SharedMap}. + *

+ * All of the public methods are synchronized with this, except for argument checks. + * Objects to be shared are identified with their equality, instead of sameness. + * + * @param the type of keys that you associate objects to be shared with + * @param the type of objects to be shared + */ +abstract class AbstractSharedMap implements SharedMap { + /** Guarded by this */ + private final Map> poolMap = new HashMap>(); + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public V borrowObject(K key) { + if (key == null) { throw new NullPointerException(); } + + synchronized (this) { + Pool pool = poolMap.get(key); + if (pool == null) { + return null; + } + return pool.borrowObject(); + } + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public boolean registerObject(K key, V value) { + if (key == null) { throw new NullPointerException("key"); } + if (value == null) { throw new NullPointerException("value"); } + + synchronized (this) { + Pool pool = poolMap.get(key); + if (pool == null) { + pool = newPool(); + poolMap.put(key, pool); + } + return pool.registerObject(value); + } + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public boolean returnObject(K key, V value) { + if (key == null) { throw new NullPointerException("key"); } + if (value == null) { throw new NullPointerException("value"); } + + synchronized (this) { + Pool pool = poolMap.get(key); + if (pool == null) { + return false; + } + return pool.returnObject(value); + } + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public void invalidateObject(K key, V value) { + if (key == null) { throw new NullPointerException("key"); } + if (value == null) { throw new NullPointerException("value"); } + + synchronized (this) { + Pool pool = poolMap.get(key); + if (pool == null) { + return; + } + + pool.invalidateObject(value); + + if (pool.isEmpty()) { + poolMap.remove(pool); + } + } + } + + @Override + public synchronized Collection clear() { + Collection idleObjects = new ArrayList(); + + for (Pool pool : poolMap.values()) { + idleObjects.addAll(pool.clear()); + } + + poolMap.clear(); + + return idleObjects; + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public Collection clear(Object key) { + if (key == null) { throw new NullPointerException(); } + + synchronized (this) { + Pool pool = poolMap.get(key); + return pool.clear(); + } + } + + @Override + public synchronized int size() { + int size = 0; + for (Pool pool : poolMap.values()) { + size += pool.size(); + } + return size; + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public int size(K key) { + if (key == null) { throw new NullPointerException(); } + + synchronized (this) { + Pool pool = poolMap.get(key); + return pool == null ? 0 : pool.size(); + } + } + + @Override + public synchronized boolean isEmpty() { + for (Pool pool : poolMap.values()) { + if (! pool.isEmpty()) { + return false; + } + } + return true; + } + + abstract Pool newPool(); + + interface Pool { + /** + * Returns one of the shared objects, + * or returns null if there is no appropriate object to borrow. + */ + V borrowObject(); + + /** + * Registers the given {@code value} to this instance. + * At this point the registered object is regarded as borrowed. + *

+ * This method does nothing and returns false if and only if + * the given {@code value} is already registered. + * In practice the {@code value} is expected to be a newly created object, + * and this method is expected to return true. + */ + boolean registerObject(V value); + + /** + * Makes the given {@code value} to be returned into this instance. + * It is expected that the {@code value} has been borrowed from this instance. + *

+ * If this method returns false, + * then the the registration of the {@code value} is invalidated. + * This happens because the pool is full when this method is called, + * or you have explicitly called {@link #invalidateObject()} or {@link #clear()}. + */ + boolean returnObject(V value); + + /** + * Invalidates the registration of the given {@code value}. + * You may call this method regardless of whether the {@code value} is borrowed or not. + */ + void invalidateObject(V value); + + /** + * Invalidates all of the registrations to this instance, + * and returns the cleared idle objects in the pool. + */ + Collection clear(); + + /** + * Returns the count of the registered objects. + */ + int size(); + + /** + * Returns true if there is no registration. + */ + boolean isEmpty(); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/MultivaluedMap.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/MultivaluedMap.java new file mode 100644 index 0000000..254f474 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/MultivaluedMap.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.Set; + +/** + * Defines a simple map that holds a set of values against each key. + * + * @param the type of keys + * @param the type of values + */ +public interface MultivaluedMap { + /** + * Adds the given {@code value} to the set associated with the given {@code key}. + * + * @return true if this did not already contain the given {@code key}-{@code value} + * @throws NullPointerException if {@code key} or {@code value} is null + */ + boolean add(K key, V value); + + /** + * Returns a set containing the values associated with the given {@code key}. + * + * @throws NullPointerException if {@code key} is null + */ + Set get(K key); + + /** + * Removes the given {@code value} from the set associated with the given {@code key}. + * + * @return true if this contained the given {@code key}-{@code value} + * @throws NullPointerException if {@code key} or {@code value} is null + */ + boolean remove(K key, V value); + + /** + * Removes the values associated with the given {@code key}, + * and returns a set containing the removed values. + * + * @throws NullPointerException if {@code key} is null + */ + Set remove(K key); +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java index 9847b30..0686961 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/PoolMap.java @@ -61,10 +61,18 @@ public class PoolMap implements Map { private Map> pools = new ConcurrentHashMap>(); + /** + * @deprecated Use {@link SharedMap} and its implementation classes. + */ + @Deprecated public PoolMap(PoolType poolType) { this.poolType = poolType; } + /** + * @deprecated Use {@link SharedMap} and its implementation classes. + */ + @Deprecated public PoolMap(PoolType poolType, int poolMaxSize) { this.poolType = poolType; this.poolMaxSize = poolMaxSize; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/ReusableSharedMap.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/ReusableSharedMap.java new file mode 100644 index 0000000..443511f --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/ReusableSharedMap.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Queue; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Implementation of {@code SharedMap}, + * ensuring that it lends a shared object to a single caller at the same time. + *

+ * All of the public methods are synchronized with this, except for argument checks. + * Objects to be shared are identified with their equality, instead of sameness. + * + * @param the type of keys that you associate objects to be shared with + * @param the type of objects to be shared + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class ReusableSharedMap extends AbstractSharedMap { + private final int maxObjectCountPerKey; + + /** + * Constructs with the given {@code maxObjectCountPerKey}, + * which is a max count of idle objects to hold for each key. + */ + public ReusableSharedMap(int maxObjectCountPerKey) { + this.maxObjectCountPerKey = maxObjectCountPerKey; + } + + @Override + Pool newPool() { + return new ReusablePool(); + } + + private class ReusablePool implements Pool { + // The boolean values represent busy (borrowing) or not. + // This is for performance instead of sequential search in idleQueue. + final Map registeredObjects = new HashMap(); + final Queue idleQueue = new ArrayDeque(); + + @Override + public V borrowObject() { + V value = idleQueue.poll(); + if (value == null) { + return null; + } + + registeredObjects.put(value, Boolean.TRUE); + return value; + } + + @Override + public boolean registerObject(V value) { + if (registeredObjects.containsKey(value)) { + return false; + } + registeredObjects.put(value, Boolean.TRUE); + return true; + } + + @Override + public boolean returnObject(V value) { + Boolean busy = registeredObjects.get(value); + if (busy == null) { + return false; + } + + if (! busy) { + return true; + } + + if (idleQueue.size() >= maxObjectCountPerKey) { + registeredObjects.remove(value); + return false; + } + + registeredObjects.put(value, Boolean.FALSE); + idleQueue.offer(value); + return true; + } + + @Override + public void invalidateObject(V value) { + registeredObjects.remove(value); + idleQueue.remove(value); + } + + @Override + public Collection clear() { + Collection idleObjects = new ArrayList(idleQueue); + + registeredObjects.clear(); + idleQueue.clear(); + + return idleObjects; + } + + @Override + public int size() { + return registeredObjects.size(); + } + + @Override + public boolean isEmpty() { + return registeredObjects.isEmpty(); + } + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/RoundRobinSharedMap.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/RoundRobinSharedMap.java new file mode 100644 index 0000000..649e312 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/RoundRobinSharedMap.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Queue; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Implementation of {@code SharedMap} based on the round robin logic, + * except that it lends an idle object whenever there is. + * Objects to be shared must be thread safe, + * because the round robin logic will lend the same shared object + * to different callers at the same time. + *

+ * The round robin logic restricts the count of shared objects + * and prevents unlimitedly consuming resources. + * For each key it starts to re-lend shared objects cyclicly + * when the count of borrowed objects exceeds the user specified threshold. + *

+ * All of the public methods are synchronized with this, except for argument checks. + * Objects to be shared are identified with their equality, instead of sameness. + * + * @param the type of keys that you associate objects to be shared with + * @param the type of objects to be shared + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class RoundRobinSharedMap extends AbstractSharedMap { + /** the max count of shared objects for each key, which count is positive */ + private final int maxObjectCountPerKey; + + /** + * Constructs with the given {@code maxObjectCountPerKey}, + * which is a threshold count of borrowed objects for each key + * before starting to lend the same shared object to different callers, + * and which is also a max count of idle objects to hold for each key + * though the count of registered objects may exceed the max count. + *

+ * The given {@code maxObjectCountPerKey} is expected to be positive, + * otherwise the max count is set to be 1 instead of the given value. + */ + public RoundRobinSharedMap(int maxObjectCountPerKey) { + this.maxObjectCountPerKey = Math.max(1, maxObjectCountPerKey); + } + + @Override + Pool newPool() { + return new RoundRobinPool(); + } + + private class RoundRobinPool implements Pool { + // The integer values represent borrowing counts. + final Map registeredObjects = new HashMap(); + final Queue idleQueue = new ArrayDeque(); + final Queue busyQueue = new ArrayDeque(); + + @Override + public V borrowObject() { + V value = idleQueue.poll(); + if (value != null) { + registeredObjects.put(value, 1); + busyQueue.offer(value); + return value; + } + + if (busyQueue.size() < maxObjectCountPerKey) { + return null; + } + + // Now we are ready to pick up and lend a busy object in busyQueue, + // which is not empty because busyQueue.size() >= maxObjectCountPerKey > 0. + + value = busyQueue.poll(); + assert value != null; + registeredObjects.put(value, registeredObjects.get(value) + 1); + busyQueue.offer(value); + return value; + } + + @Override + public boolean registerObject(V value) { + if (registeredObjects.containsKey(value)) { + return false; + } + registeredObjects.put(value, 1); + busyQueue.offer(value); + return true; + } + + @Override + public boolean returnObject(V value) { + Integer busyCountWrapper = registeredObjects.get(value); + if (busyCountWrapper == null) { + return false; + } + + int busyCount = busyCountWrapper; + if (busyCount == 0) { + return true; + } + + int nextBusyCount = busyCount - 1; + if (nextBusyCount != 0) { + registeredObjects.put(value, nextBusyCount); + return true; + } + + busyQueue.remove(value); + + if (idleQueue.size() >= maxObjectCountPerKey) { + registeredObjects.remove(value); + return false; + + } else { + registeredObjects.put(value, 0); + idleQueue.offer(value); + return true; + } + } + + @Override + public void invalidateObject(V value) { + registeredObjects.remove(value); + idleQueue.remove(value); + busyQueue.remove(value); + } + + @Override + public Collection clear() { + Collection idleObjects = new ArrayList(idleQueue); + + registeredObjects.clear(); + idleQueue.clear(); + busyQueue.clear(); + + return idleObjects; + } + + @Override + public int size() { + return registeredObjects.size(); + } + + @Override + public boolean isEmpty() { + return registeredObjects.isEmpty(); + } + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/SegmentationHelper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/SegmentationHelper.java new file mode 100644 index 0000000..3e7608c --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/SegmentationHelper.java @@ -0,0 +1,59 @@ +package org.apache.hadoop.hbase.util; + +import java.util.concurrent.ConcurrentHashMap; + +class SegmentationHelper { + private static final int MAX_SEGMENTS = 1 << 16; + + private final int reuqiredSize; + private final int mask; + private final int shift; + + /** + * @param concurrencyLevel an estimated number of concurrently accessing threads + */ + SegmentationHelper(int concurrencyLevel) { + int requestedSize = Math.min(concurrencyLevel, MAX_SEGMENTS); + + int shift = 32; + int size = 1; + while (size < requestedSize) { + shift--; + size <<= 1; + } + + this.reuqiredSize = size; + this.shift = shift; + this.mask = size - 1; + } + + int requiredSize() { + return reuqiredSize; + } + + /** + * The length of the given {@code segments} must be + * equal to or greater than {@link #requiredSize()}. + */ + T segmentFor(T[] segments, Object key) { + assert segments.length >= reuqiredSize; + + int hash = hash(key.hashCode()); + return segments[(hash >>> shift) & mask]; + } + + /** + * Applies a supplemental hash function to a given hashCode, + * which defends against poor quality hash functions. + * + * @see ConcurrentHashMap#hash(int) + */ + private static int hash(int h) { + h += (h << 15) ^ 0xffffcd7d; + h ^= (h >>> 10); + h += (h << 3); + h ^= (h >>> 6); + h += (h << 2) + (h << 14); + return h ^ (h >>> 16); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/SegmentedMultivaluedMap.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/SegmentedMultivaluedMap.java new file mode 100644 index 0000000..e57dec1 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/SegmentedMultivaluedMap.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.Set; + +/** + * A thread safe implementation class of {@code MultivaluedMap}, + * which delegates operations to segmented synchronized maps according to keys, + * in order to reduce contention between threads. + * + * @param the type of keys + * @param the type of values + */ +public class SegmentedMultivaluedMap implements MultivaluedMap { + private final SegmentationHelper helper; + private final MultivaluedMap[] segments; + + /** + * Constructs with the default concurrency level (16). + */ + public SegmentedMultivaluedMap() { + this(16); + } + + /** + * Constructs with the given {@code concurrencyLevel}, + * which is the estimated number of concurrently accessing threads. + */ + public SegmentedMultivaluedMap(int concurrencyLevel) { + helper = new SegmentationHelper(concurrencyLevel); + + @SuppressWarnings("unchecked") + MultivaluedMap[] segments = new MultivaluedMap[helper.requiredSize()]; + for (int i = 0; i < segments.length; i++) { + segments[i] = new SynchronizedMultivaluedMap(); + } + this.segments = segments; + } + + /** + * @throws NullPointerException if {@code key} is null + */ + private MultivaluedMap segmentFor(K key) { + if(key == null) { + throw new NullPointerException(); + } + return helper.segmentFor(segments, key); + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public boolean add(K key, V value) { + return segmentFor(key).add(key, value); + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public Set get(K key) { + return segmentFor(key).get(key); + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public boolean remove(K key, V value) { + return segmentFor(key).remove(key, value); + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public Set remove(K key) { + return segmentFor(key).remove(key); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/SegmentedSharedMap.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/SegmentedSharedMap.java new file mode 100644 index 0000000..1ee060e --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/SegmentedSharedMap.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.ArrayList; +import java.util.Collection; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Implementation of {@code SharedMap}, + * which delegates operations to segmented maps according to keys, + * in order to reduce contention between threads. + *

+ * Thread safe. + * Objects to be shared are identified with their equality, instead of sameness. + * + * @param the type of keys that you associate objects to be shared with + * @param the type of objects to be shared + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class SegmentedSharedMap implements SharedMap { + /** + * Factory to create a map which will be used as one of segmented maps. + * + * @param the type of keys that you associate objects to be shared with + * @param the type of objects to be shared + */ + public interface SegmentFactory { + SharedMap create(); + } + + private final SegmentationHelper helper; + private final SharedMap[] segments; + + /** + * Constructs with the default concurrency level (16). + * + * @throws NullPointerException if {@code segmentFactory} is null + */ + public SegmentedSharedMap(SegmentFactory segmentFactory) { + this(segmentFactory, 16); + } + + /** + * Constructs with the given {@code concurrencyLevel}, + * which is the estimated number of concurrently accessing threads. + * + * @throws NullPointerException if {@code segmentFactory} is null + */ + public SegmentedSharedMap(SegmentFactory segmentFactory, int concurrencyLevel) { + if (segmentFactory == null) { + throw new NullPointerException(); + } + + helper = new SegmentationHelper(concurrencyLevel); + + @SuppressWarnings("unchecked") + SharedMap[] segments = new SharedMap[helper.requiredSize()]; + for (int i = 0; i < segments.length; i++) { + segments[i] = segmentFactory.create(); + } + this.segments = segments; + } + + /** + * Returns the corresponding internal segment for the given {@code key}. + * + * @throws NullPointerException if {@code key} is null + */ + public SharedMap segmentFor(K key) { + if(key == null) { + throw new NullPointerException(); + } + return helper.segmentFor(segments, key); + } + + /** + * Returns the count of the segments used internally. + */ + int countOfSegments() { + return segments.length; + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public V borrowObject(K key) { + return segmentFor(key).borrowObject(key); + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public boolean registerObject(K key, V value) { + return segmentFor(key).registerObject(key, value); + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public boolean returnObject(K key, V value) { + return segmentFor(key).returnObject(key, value); + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public void invalidateObject(K key, V value) { + segmentFor(key).invalidateObject(key, value); + } + + @Override + public Collection clear() { + Collection result = new ArrayList(); + for (SharedMap segment : segments) { + result.addAll(segment.clear()); + } + return result; + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public Collection clear(K key) { + return segmentFor(key).clear(key); + } + + @Override + public int size() { + int result = 0; + for (SharedMap segment : segments) { + result += segment.size(); + } + return result; + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public int size(K key) { + return segmentFor(key).size(key); + } + + @Override + public boolean isEmpty() { + for (SharedMap segment : segments) { + if (! segment.isEmpty()) { + return false; + } + } + return true; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/SharedMap.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/SharedMap.java new file mode 100644 index 0000000..aa876fa --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/SharedMap.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.Collection; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * This provides functions to share objects. + *

+ * In order to share objects, first of all call {@link #borrowObject()}. + * If the method returns a non-null object, use it. + * If the method returns null, create a new object and call {@link #registerObject()}, + * and use it. Just after the registration the object is regarded as borrowed. + *

+ * After using the borrowed object, + * call {@link #returnObject()} to return the object to the pool, + * or call {@link #invalidateObject()} to invalidate the object's registration + * instead of returning the object. + * If the pool is already full, the method {@code returnObject()} returns false + * with invalidating the registration of the object. + * Also the method returns false if the object's registration is already invalidated + * because {@link #invalidateObject()} or {@link #clear()} has been called. + * Anyway if the method {@code returnObject()} returns false + * the object is rejected to return the pool and is already invalidated, + * and you might tear down the rejected object. + * You can safely tear down the rejected object even in multi-thread contexts, + * because the object is never passed to the other threads via the pool. + *

+ * Incidentally, you can call {@link #invalidateObject} and {@link #clear()} + * in order to invalidate the registrations at any time, by any thread, + * regardless of whether the objects are borrowed or not. + * The method {@code clear()} returns a collection of the idle objects, + * which have been registered but not borrowed and idle in the pool, + * and you can safely tear down them in the sense that + * you get them exclusively in multi-thread contexts. + * The rest of the objects that have been registered are borrowed and in use + * by some other threads at the moment, and you can't tear down them right now. + * But the borrower eventually calls {@code returnObject()} (which returns false) + * or {@code invalidateObject()} after using the borrowed object, and in either method + * the borrower know that the registration of the object is already invalidated + * and the object is ready to tear down. + *

+ * The implementations of this interface are expected to be thread safe, + * but it depends on the implementations whether it borrows the same shared object + * to different callers at the same time. + *

+ * Objects to be shared are identified with their equality, instead of sameness. + * Concretely, when we have two objects {@code o1} and {@code o2} to be shared, + * we won't distinguish between {@code o1} the {@code o2} + * if {@code o1.equals(o2)} returns true rather than if {@code o1 == o2} is true. + * That enables us to use ordinary collections for the objects + * and simplify the implementation logic of this interface. + * Also that is enough for practical uses. + * + * @param the type of keys that you associate objects to be shared with + * @param the type of objects to be shared + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface SharedMap { + /** + * Returns one of the shared objects associated with the given {@code key}, + * or returns null if there is no appropriate object to borrow. + * In the latter case, it is expected that a new instance is created + * and registered to this instance. + * + * @throws NullPointerException if {@code key} is null + */ + V borrowObject(K key); + + /** + * Registers the given {@code value} to this instance, + * associating it with the given {@code key}. + * At this point the registered object is regarded as borrowed + * and you can continue to use the object, + * and after finishing using it you have to return or invalidate + * the registered object into this instance. + *

+ * This method does nothing and returns false if and only if + * the given {@code value} is already registered + * associated with the given {@code key}. + * In practice the {@code value} is expected to be a newly created object, + * and this method is expected to return true. + * + * @throws NullPointerException if {@code key} or {@code value} is null + */ + boolean registerObject(K key, V value); + + /** + * Makes the given {@code value} associated with the given {@code key} + * to be returned into this instance. + * It is expected that the {@code value} has been borrowed from this instance. + *

+ * If this method returns false, then the registration of the {@code value} is invalidated. + * This happens because the pool is full when this method is called, + * or you have explicitly called {@link #invalidateObject()} or {@link #clear()}. + * You might have to tear down the rejected object. + * + * @throws NullPointerException if {@code key} or {@code value} is null + */ + boolean returnObject(K key, V value); + + /** + * Invalidates the registration of the given {@code value} associated with the {@code key}. + * You may call this method regardless of whether the {@code value} is borrowed or not. + * You might have to tear down the removed object. + * + * @throws NullPointerException if {@code key} or {@code value} is null + */ + void invalidateObject(K key, V value); + + /** + * Invalidates all of the registrations to this instance, + * and returns the objects that have been registered but idle in the pool. + * You might have to tear down the returned objects. + *

+ * The rest of the objects that have been registered are still in use at this moment, + * and they are not contained in the collection returned by this method. + * In fact, you should not tear down them until the borrowers complete the usage. + * Instead, because the borrowers are supposed to call {@link #returnObject()} + * (which rejects invalidated objects and returns false) + * or {@link #invalidateObject()} after their usage, + * you can make the borrowers themselves tear down the invalidated objects + * just after calling these methods. + */ + Collection clear(); + + /** + * Invalidates all of the registrations to this instance associated with the given {@code key}, + * and returns the objects that have been registered but idle in the pool. + * You might have to tear down the returned objects. + *

+ * The rest of the objects that have been registered are still in use at this moment, + * and they are not contained in the collection returned by this method. + * In fact, you should not tear down them until the borrowers complete the usage. + * Instead, because the borrowers are supposed to call {@link #returnObject()} + * (which rejects invalidated objects and returns false) + * or {@link #invalidateObject()} after their usage, + * you can make the borrowers themselves tear down the invalidated objects + * just after calling these methods. + * + * @throws NullPointerException if {@code key} is null + */ + Collection clear(K key); + + /** + * Returns the count of the registered objects. + */ + int size(); + + /** + * Returns the count of the registered objects associated with the given {@code key}. + * + * @throws NullPointerException if {@code key} is null + */ + int size(K key); + + /** + * Returns true if there is no registration. + */ + boolean isEmpty(); +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/SynchronizedMultivaluedMap.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/SynchronizedMultivaluedMap.java new file mode 100644 index 0000000..b62633b --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/SynchronizedMultivaluedMap.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * A thread safe implementation class of {@code MultivaluedMap}, + * with locking this instance itself to synchronize methods. + * + * @param the type of keys + * @param the type of values + */ +public class SynchronizedMultivaluedMap implements MultivaluedMap{ + private final Map> map = new HashMap>(); + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public boolean add(K key, V value) { + if (key == null) { + throw new NullPointerException("key"); + } + if (value == null) { + throw new NullPointerException("value"); + } + + synchronized(this) { + Set values = map.get(key); + if (values == null) { + values = new HashSet(); + map.put(key, values); + } + return values.add(value); + } + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public Set get(K key) { + if (key == null) { + throw new NullPointerException(); + } + + synchronized(this) { + Set values = map.get(key); + return values == null ? Collections.emptySet() : new HashSet(values); + } + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public boolean remove(K key, V value) { + if (key == null) { + throw new NullPointerException("key"); + } + if (value == null) { + throw new NullPointerException("value"); + } + + synchronized(this) { + Set values = map.get(key); + if (values == null || ! values.remove(value)) { + return false; + } + + if (values.isEmpty()) { + map.remove(key); + } + return true; + } + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public Set remove(K key) { + if (key == null) { + throw new NullPointerException(); + } + + synchronized(this) { + Set removed = map.remove(key); + return removed == null ? Collections.emptySet() : removed; + } + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/ThreadLocalSharedMapDecorator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/ThreadLocalSharedMapDecorator.java new file mode 100644 index 0000000..4ca9f5c --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/ThreadLocalSharedMapDecorator.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Decorator of {@code SharedMap} + * which gives the same shared object for each key at the same time for each thread. + * This lends the same object even after the other threads invalidate the object, + * until the borrowing thread completely returns the object or invalidates it. + *

+ * Note that it depends on the base instance of {@code SharedMap} + * whether the each shared object is accessed by at most one thread at the same time or not. + *

+ * Thread safe. + * Objects to be shared are identified with their equality, instead of sameness. + * + * @param the type of keys that you associate objects to be shared with + * @param the type of objects to be shared + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class ThreadLocalSharedMapDecorator implements SharedMap { + private static class BorrowingCounter { + final V value; + + int count = 1; + + BorrowingCounter(V value) { + assert value != null; + this.value = value; + } + } + + private final ThreadLocal>> borrowingCounterMapRef + = new ThreadLocal>>() { + @Override + protected Map> initialValue() { + return new HashMap>(); + } + }; + + private final SharedMap base; + + /** + * @throws NullPointerException if {@code base} is null + */ + public ThreadLocalSharedMapDecorator(SharedMap base) { + if (base == null) { throw new NullPointerException(); } + this.base = base; + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public V borrowObject(K key) { + if (key == null) { throw new NullPointerException(); } + + Map> counterMap = borrowingCounterMapRef.get(); + + BorrowingCounter counter = counterMap.get(key); + if (counter == null) { + V value = base.borrowObject(key); + if (value != null) { + counterMap.put(key, new BorrowingCounter(value)); + } + return value; + } + + counter.count++; + return counter.value; + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public boolean registerObject(K key, V value) { + if (! base.registerObject(key, value)) { + return false; + } + + Map> counterMap = borrowingCounterMapRef.get(); + BorrowingCounter counter = counterMap.get(key); + if (counter == null) { + counterMap.put(key, new BorrowingCounter(value)); + } + return true; + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public boolean returnObject(K key, V value) { + if (key == null) { throw new NullPointerException("key"); } + if (value == null) { throw new NullPointerException("value"); } + + Map> counterMap = borrowingCounterMapRef.get(); + + BorrowingCounter counter = counterMap.get(key); + if (counter == null || ! counter.value.equals(value)) { + return base.returnObject(key, value); + } + + if (--counter.count > 0) { + return true; + } + + counterMap.remove(key); + return base.returnObject(key, value); + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public void invalidateObject(K key, V value) { + base.invalidateObject(key, value); + + Map> counterMap = borrowingCounterMapRef.get(); + BorrowingCounter counter = counterMap.get(key); + if (counter == null || ! counter.value.equals(value)) { + return; + } + counterMap.remove(key); + } + + @Override + public Collection clear() { + return base.clear(); + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public Collection clear(K key) { + return base.clear(key); + } + + @Override + public int size() { + return base.size(); + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public int size(K key) { + return base.size(key); + } + + @Override + public boolean isEmpty() { + return base.isEmpty(); + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/util/AbstractTestMultivaluedMap.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/AbstractTestMultivaluedMap.java new file mode 100644 index 0000000..65ccc85 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/AbstractTestMultivaluedMap.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public abstract class AbstractTestMultivaluedMap { + abstract MultivaluedMap getMap(); + + private MultivaluedMap map; + + @Before + public void setUp() { + map = getMap(); + } + + @Test + public void testGet() { + map.add(1, "a"); + map.add(1, "b"); + map.add(2, "c"); + + Collection result = map.get(1); + Assert.assertEquals( + new HashSet(Arrays.asList("a", "b")), + new HashSet(result)); + } + + @Test + public void testRemoveKeyValue() { + map.add(1, "a"); + map.add(1, "b"); + + Assert.assertTrue(map.remove(1, "a")); + Assert.assertFalse(map.remove(1, "a")); + + Assert.assertTrue(map.remove(1, "b")); + Assert.assertFalse(map.remove(1, "b")); + } + + @Test + public void testRemoveKey() { + map.add(1, "a"); + map.add(1, "b"); + Collection removed = map.remove(1); + + Assert.assertEquals( + new HashSet(Arrays.asList("a", "b")), + new HashSet(removed)); + + Assert.assertFalse(map.remove(1, "a")); + Assert.assertFalse(map.remove(1, "b")); + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/util/AbstractTestSharedMap.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/AbstractTestSharedMap.java new file mode 100644 index 0000000..ea8443e --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/AbstractTestSharedMap.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public abstract class AbstractTestSharedMap { + abstract SharedMap newSharedMapWithInfinitePool(); + + SharedMap map; + Object o1; + Object o2; + Object o3; + Object o4; + + @Before + public void setUp() { + map = newSharedMapWithInfinitePool(); + o1 = new Object(); + o2 = new Object(); + o3 = new Object(); + o4 = new Object(); + } + + @Test + public void testBorrowRegisterReturn() { + Assert.assertNull(map.borrowObject("a")); + + map.registerObject("a", o1); + Assert.assertTrue(map.returnObject("a", o1)); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertNull(map.borrowObject("a")); + Assert.assertTrue(map.returnObject("a", o1)); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertNull(map.borrowObject("a")); + Assert.assertTrue(map.returnObject("a", o1)); + + Assert.assertNull(map.borrowObject("b")); + Assert.assertFalse(map.returnObject("a", o2)); + } + + @Test + public void testMultipleRegisters() { + map.registerObject("a", o1); + map.registerObject("a", o2); + + Assert.assertTrue(map.returnObject("a", o1)); + Assert.assertTrue(map.returnObject("a", o2)); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertNull(map.borrowObject("a")); + } + + @Test + public void testRejectDuplicateRegistration() { + Assert.assertTrue(map.registerObject("a", o1)); + Assert.assertFalse(map.registerObject("a", o1)); + } + + @Test + public void testInvalidateBeforeReturn() { + map.registerObject("a", o1); + + map.invalidateObject("a", o1); + + Assert.assertFalse(map.returnObject("a", o1)); + Assert.assertNull(map.borrowObject("a")); + } + + @Test + public void testInvalidateAfterReturn() { + map.registerObject("a", o1); + Assert.assertTrue(map.returnObject("a", o1)); + + map.invalidateObject("a", o1); + + Assert.assertNull(map.borrowObject("a")); + } + + @Test + public void testClear() { + map.registerObject("a", o1); + map.registerObject("b", o2); + map.registerObject("c", o3); + + Assert.assertTrue(map.returnObject("a", o1)); + Assert.assertTrue(map.returnObject("b", o2)); + + Collection targets = map.clear(); + + Assert.assertEquals( + new HashSet(Arrays.asList(o1, o2)), + new HashSet(targets)); + + Assert.assertNull(map.borrowObject("a")); + Assert.assertNull(map.borrowObject("b")); + Assert.assertFalse(map.returnObject("c", o3)); + } + + @Test + public void testClearWithKey() { + map.registerObject("a", o1); + map.registerObject("a", o2); + map.registerObject("b", o3); + map.registerObject("b", o4); + + Assert.assertTrue(map.returnObject("a", o1)); + Assert.assertTrue(map.returnObject("b", o3)); + + Collection targets = map.clear("a"); + + Assert.assertEquals( + new HashSet(Collections.singleton(o1)), + new HashSet(targets)); + + Assert.assertNull(map.borrowObject("a")); + Assert.assertFalse(map.returnObject("a", o2)); + + Assert.assertSame(o3, map.borrowObject("b")); + Assert.assertTrue(map.returnObject("b", o4)); + } + + @Test + public void testSizeAndEmpty() { + Assert.assertEquals(0, map.size()); + Assert.assertEquals(0, map.size("a")); + Assert.assertEquals(0, map.size("b")); + Assert.assertTrue(map.isEmpty()); + + map.registerObject("a", o1); + + Assert.assertEquals(1, map.size()); + Assert.assertEquals(1, map.size("a")); + Assert.assertEquals(0, map.size("b")); + Assert.assertFalse(map.isEmpty()); + + map.registerObject("a", o2); + + Assert.assertEquals(2, map.size()); + Assert.assertEquals(2, map.size("a")); + Assert.assertEquals(0, map.size("b")); + Assert.assertFalse(map.isEmpty()); + + map.registerObject("b", o3); + + Assert.assertEquals(3, map.size()); + Assert.assertEquals(2, map.size("a")); + Assert.assertEquals(1, map.size("b")); + Assert.assertFalse(map.isEmpty()); + + map.clear(); + + Assert.assertEquals(0, map.size()); + Assert.assertEquals(0, map.size("a")); + Assert.assertEquals(0, map.size("b")); + Assert.assertTrue(map.isEmpty()); + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestReusableSharedMap.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestReusableSharedMap.java new file mode 100644 index 0000000..3f60678 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestReusableSharedMap.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import org.apache.hadoop.hbase.SmallTests; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestReusableSharedMap extends AbstractTestSharedMap { + @Override + SharedMap newSharedMapWithInfinitePool() { + return new ReusableSharedMap(Integer.MAX_VALUE); + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestReusableSharedMapWithFinitePool.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestReusableSharedMapWithFinitePool.java new file mode 100644 index 0000000..99452fe --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestReusableSharedMapWithFinitePool.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import org.apache.hadoop.hbase.SmallTests; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestReusableSharedMapWithFinitePool { + SharedMap map; + Object o1; + Object o2; + Object o3; + Object o4; + + @Before + public void setUp() { + map = new ReusableSharedMap(3); + o1 = new Object(); + o2 = new Object(); + o3 = new Object(); + o4 = new Object(); + } + + @Test + public void testOverRegisters() { + map.registerObject("a", o1); + map.registerObject("a", o2); + map.registerObject("a", o3); + map.registerObject("a", o4); + + Assert.assertNull(map.borrowObject("a")); + + Assert.assertTrue(map.returnObject("a", o1)); + Assert.assertTrue(map.returnObject("a", o2)); + Assert.assertTrue(map.returnObject("a", o3)); + Assert.assertFalse(map.returnObject("a", o4)); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertSame(o3, map.borrowObject("a")); + Assert.assertNull(map.borrowObject("a")); + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestRoundRobinSharedMap.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestRoundRobinSharedMap.java new file mode 100644 index 0000000..49140c9 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestRoundRobinSharedMap.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import org.apache.hadoop.hbase.SmallTests; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestRoundRobinSharedMap extends AbstractTestSharedMap { + @Override + SharedMap newSharedMapWithInfinitePool() { + return new RoundRobinSharedMap(Integer.MAX_VALUE); + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestRoundRobinSharedMapWithFinitePool.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestRoundRobinSharedMapWithFinitePool.java new file mode 100644 index 0000000..d21b364 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestRoundRobinSharedMapWithFinitePool.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import org.apache.hadoop.hbase.SmallTests; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestRoundRobinSharedMapWithFinitePool { + SharedMap map; + Object o1; + Object o2; + Object o3; + Object o4; + + @Before + public void setUp() { + map = new RoundRobinSharedMap(3); + o1 = new Object(); + o2 = new Object(); + o3 = new Object(); + o4 = new Object(); + } + + @Test + public void testFullRegisters() { + map.registerObject("a", o1); + map.registerObject("a", o2); + + Assert.assertNull(map.borrowObject("a")); + + map.registerObject("a", o3); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertSame(o3, map.borrowObject("a")); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertSame(o3, map.borrowObject("a")); + } + + @Test + public void testOverRegisters() { + map.registerObject("a", o1); + map.registerObject("a", o2); + map.registerObject("a", o3); + map.registerObject("a", o4); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertSame(o3, map.borrowObject("a")); + Assert.assertSame(o4, map.borrowObject("a")); + + Assert.assertTrue(map.returnObject("a", o1)); + Assert.assertTrue(map.returnObject("a", o2)); + Assert.assertTrue(map.returnObject("a", o3)); + Assert.assertTrue(map.returnObject("a", o4)); + + Assert.assertTrue(map.returnObject("a", o1)); + Assert.assertTrue(map.returnObject("a", o2)); + Assert.assertTrue(map.returnObject("a", o3)); + Assert.assertFalse(map.returnObject("a", o4)); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertSame(o3, map.borrowObject("a")); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertSame(o3, map.borrowObject("a")); + } + + @Test + public void testIdleObjectPriority() { + map.registerObject("a", o1); + map.registerObject("a", o2); + map.registerObject("a", o3); + + map.returnObject("a", o2); + + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertSame(o3, map.borrowObject("a")); + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestSegmentedMultivaluedMap.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestSegmentedMultivaluedMap.java new file mode 100644 index 0000000..5e2d9f1 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestSegmentedMultivaluedMap.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +public class TestSegmentedMultivaluedMap extends AbstractTestMultivaluedMap { + @Override + MultivaluedMap getMap() { + return new SegmentedMultivaluedMap(); + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestSegmentedSharedMap.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestSegmentedSharedMap.java new file mode 100644 index 0000000..15734b7 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestSegmentedSharedMap.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.util.SegmentedSharedMap.SegmentFactory; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestSegmentedSharedMap extends AbstractTestSharedMap { + @Override + SharedMap newSharedMapWithInfinitePool() { + return new SegmentedSharedMap(new SegmentFactory() { + @Override + public SharedMap create() { + return new ReusableSharedMap(Integer.MAX_VALUE); + } + }); + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestSegmentedSharedMapForSegments.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestSegmentedSharedMapForSegments.java new file mode 100644 index 0000000..0477c9c --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestSegmentedSharedMapForSegments.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.util.SegmentedSharedMap.SegmentFactory; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestSegmentedSharedMapForSegments { + @Test + public void testAllSegmentsUsed() { + SegmentedSharedMap map = new SegmentedSharedMap( + new SegmentFactory() { + @Override + public SharedMap create() { + return new ReusableSharedMap(Integer.MAX_VALUE); + } + }, 1000); + + Set> segmentsUsed = new HashSet>(); + + for(int i=0; i<1000000; i++) { + segmentsUsed.add(map.segmentFor(new Object())); + if(segmentsUsed.size() == map.countOfSegments()) { + return; + } + } + + Assert.fail("bug or bad luck"); + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestSynchronizedMultivaluedMap.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestSynchronizedMultivaluedMap.java new file mode 100644 index 0000000..317c5db --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestSynchronizedMultivaluedMap.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +public class TestSynchronizedMultivaluedMap extends AbstractTestMultivaluedMap { + @Override + MultivaluedMap getMap() { + return new SynchronizedMultivaluedMap(); + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestThreadLocalSharedMapDecorator.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestThreadLocalSharedMapDecorator.java new file mode 100644 index 0000000..cc2b9c5 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/util/TestThreadLocalSharedMapDecorator.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.hbase.SmallTests; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestThreadLocalSharedMapDecorator { + SharedMap map; + Object o1; + Object o2; + Object o3; + + @Before + public void setUp() { + map = new ThreadLocalSharedMapDecorator( + new RoundRobinSharedMap(Integer.MAX_VALUE)); + o1 = new Object(); + o2 = new Object(); + o3 = new Object(); + } + + @Test + public void testRegisterAndBorrow() { + Assert.assertNull(map.borrowObject("a")); + + map.registerObject("a", o1); + map.registerObject("a", o2); + map.registerObject("a", o3); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertSame(o1, map.borrowObject("a")); + } + + @Test + public void testReturnAndBorrow() { + Assert.assertNull(map.borrowObject("a")); + + map.registerObject("a", o1); + Assert.assertTrue(map.returnObject("a", o1)); + + map.registerObject("a", o2); + Assert.assertTrue(map.returnObject("a", o2)); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertTrue(map.returnObject("a", o1)); + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertTrue(map.returnObject("a", o1)); + Assert.assertTrue(map.returnObject("a", o1)); + + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertTrue(map.returnObject("a", o2)); + Assert.assertTrue(map.returnObject("a", o2)); + + Assert.assertSame(o1, map.borrowObject("a")); + } + + @Test + public void testRejectDuplicateRegistration() { + Assert.assertTrue(map.registerObject("a", o1)); + Assert.assertFalse(map.registerObject("a", o1)); + } + + @Test + public void testInvalidate() { + map.registerObject("a", o1); + Assert.assertTrue(map.returnObject("a", o1)); + + map.registerObject("a", o2); + Assert.assertTrue(map.returnObject("a", o2)); + + Assert.assertSame(o1, map.borrowObject("a")); + + map.invalidateObject("a", o1); + + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertFalse(map.returnObject("a", o1)); + } + + @Test + public void testClear() { + map.registerObject("a", o1); + + map.clear(); + + // We still enable to borrow until we return completely + // or invalidate from inside the thread. + Assert.assertSame(o1, map.borrowObject("a")); + + Assert.assertTrue(map.returnObject("a", o1)); + Assert.assertFalse(map.returnObject("a", o1)); // completely returned and rejected + } + + @Test + public void testThreadLocal() throws Exception { + map.registerObject("a", o1); + Assert.assertTrue(map.returnObject("a", o1)); + + map.registerObject("a", o2); + Assert.assertTrue(map.returnObject("a", o2)); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertTrue(map.returnObject("a", o1)); + + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertTrue(map.returnObject("a", o2)); + + final AtomicReference borrowedRef = new AtomicReference(); + + Thread thread = new Thread() { + @Override + public void run() { + borrowedRef.set(map.borrowObject("a")); + } + }; + + thread.start(); + thread.join(); + + Assert.assertSame(o1, borrowedRef.get()); + + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertTrue(map.returnObject("a", o2)); + + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertTrue(map.returnObject("a", o2)); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index d07d35a..97db37c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -38,12 +38,13 @@ import java.util.Map; import java.util.NavigableMap; import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -4470,125 +4471,76 @@ public class TestFromClientSide { } @Test - public void testClientPoolRoundRobin() throws IOException { - final byte[] tableName = Bytes.toBytes("testClientPoolRoundRobin"); + public void testClientPool() throws Exception { + final byte[] tableName = Bytes.toBytes("testClientPool"); + final int poolSize = 3; + final int concurrentLevel = poolSize * 2; + final int numVersions = 10; - int poolSize = 3; - int numVersions = poolSize * 2; - Configuration conf = TEST_UTIL.getConfiguration(); + final Configuration conf = TEST_UTIL.getConfiguration(); conf.set(HConstants.HBASE_CLIENT_IPC_POOL_TYPE, "round-robin"); conf.setInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, poolSize); - HTable table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY }, - conf, Integer.MAX_VALUE); - table.setAutoFlush(true); + ExecutorService service = Executors.newCachedThreadPool(); + List> futures = new ArrayList>(); - final long ts = EnvironmentEdgeManager.currentTimeMillis(); - Get get = new Get(ROW); - get.addColumn(FAMILY, QUALIFIER); - get.setMaxVersions(); + final CyclicBarrier readyBarrier = new CyclicBarrier(concurrentLevel); - for (int versions = 1; versions <= numVersions; versions++) { - Put put = new Put(ROW); - put.add(FAMILY, QUALIFIER, ts + versions, VALUE); - table.put(put); + for (int id = 0; id < concurrentLevel; id++) { + final byte[] qualifier = Bytes.toBytes("testQualifier" + id); + futures.add(service.submit(new Callable() { + @Override + public Void call() throws Exception { + readyBarrier.await(); + verifyClientPool(conf, tableName, qualifier, numVersions); + return null; + } + })); + } - Result result = table.get(get); - NavigableMap navigableMap = result.getMap().get(FAMILY) - .get(QUALIFIER); + service.shutdown(); + if (! service.awaitTermination(60, TimeUnit.SECONDS)) { + service.shutdownNow(); + fail("60sec elapsed before termination"); + } - assertEquals("The number of versions of '" + FAMILY + ":" + QUALIFIER - + " did not match " + versions, versions, navigableMap.size()); - for (Map.Entry entry : navigableMap.entrySet()) { - assertTrue("The value at time " + entry.getKey() - + " did not match what was put", - Bytes.equals(VALUE, entry.getValue())); - } + for (Future future : futures) { + future.get(); } } - @Test - public void testClientPoolThreadLocal() throws IOException { - final byte[] tableName = Bytes.toBytes("testClientPoolThreadLocal"); + private static void verifyClientPool( + Configuration conf, byte[] tableName, byte[] qualifier, int numVersions) throws Exception { - int poolSize = Integer.MAX_VALUE; - int numVersions = 3; - Configuration conf = TEST_UTIL.getConfiguration(); - conf.set(HConstants.HBASE_CLIENT_IPC_POOL_TYPE, "thread-local"); - conf.setInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, poolSize); - - final HTable table = TEST_UTIL.createTable(tableName, - new byte[][] { FAMILY }, conf); + HTable table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY }, conf, + Integer.MAX_VALUE); table.setAutoFlush(true); - final long ts = EnvironmentEdgeManager.currentTimeMillis(); - final Get get = new Get(ROW); - get.addColumn(FAMILY, QUALIFIER); + Get get = new Get(ROW); + get.addColumn(FAMILY, qualifier); get.setMaxVersions(); + final long ts = EnvironmentEdgeManager.currentTimeMillis(); + for (int versions = 1; versions <= numVersions; versions++) { Put put = new Put(ROW); - put.add(FAMILY, QUALIFIER, ts + versions, VALUE); + put.add(FAMILY, qualifier, ts + versions, VALUE); table.put(put); Result result = table.get(get); - NavigableMap navigableMap = result.getMap().get(FAMILY) - .get(QUALIFIER); + NavigableMap navigableMap = result.getMap().get(FAMILY).get(qualifier); + + assertEquals( + "The number of versions of '" + FAMILY + ":" + qualifier + " did not match " + versions, + versions, + navigableMap.size()); - assertEquals("The number of versions of '" + FAMILY + ":" + QUALIFIER - + " did not match " + versions, versions, navigableMap.size()); for (Map.Entry entry : navigableMap.entrySet()) { - assertTrue("The value at time " + entry.getKey() - + " did not match what was put", - Bytes.equals(VALUE, entry.getValue())); + assertTrue( + "The value at time " + entry.getKey() + " did not match what was put", + Bytes.equals(VALUE, entry.getValue())); } } - - final Object waitLock = new Object(); - ExecutorService executorService = Executors.newFixedThreadPool(numVersions); - final AtomicReference error = new AtomicReference(null); - for (int versions = numVersions; versions < numVersions * 2; versions++) { - final int versionsCopy = versions; - executorService.submit(new Callable() { - @Override - public Void call() { - try { - Put put = new Put(ROW); - put.add(FAMILY, QUALIFIER, ts + versionsCopy, VALUE); - table.put(put); - - Result result = table.get(get); - NavigableMap navigableMap = result.getMap() - .get(FAMILY).get(QUALIFIER); - - assertEquals("The number of versions of '" + FAMILY + ":" - + QUALIFIER + " did not match " + versionsCopy, versionsCopy, - navigableMap.size()); - for (Map.Entry entry : navigableMap.entrySet()) { - assertTrue("The value at time " + entry.getKey() - + " did not match what was put", - Bytes.equals(VALUE, entry.getValue())); - } - synchronized (waitLock) { - waitLock.wait(); - } - } catch (Exception e) { - } catch (AssertionError e) { - // the error happens in a thread, it won't fail the test, - // need to pass it to the caller for proper handling. - error.set(e); - LOG.error(e); - } - - return null; - } - }); - } - synchronized (waitLock) { - waitLock.notifyAll(); - } - executorService.shutdownNow(); - assertNull(error.get()); } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTablePool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTablePool.java index 49f75aa..522321c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTablePool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTablePool.java @@ -19,10 +19,12 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; import junit.framework.Assert; import junit.framework.TestCase; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.PoolMap.PoolType; @@ -193,20 +195,26 @@ public class TestHTablePool { @Test public void testTableWithMaxSize() throws Exception { + HTableFactoryWithTableCounter tableFactory = new HTableFactoryWithTableCounter(); + HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), 2, - getPoolType()); + tableFactory, getPoolType()); // Request tables from an empty pool HTableInterface table1 = pool.getTable(TABLENAME); HTableInterface table2 = pool.getTable(TABLENAME); HTableInterface table3 = pool.getTable(TABLENAME); + tableFactory.assertTableCount(3); + // Close tables (returns tables to the pool) table1.close(); table2.close(); // The pool should reject this one since it is already full table3.close(); + tableFactory.assertTableCount(2); + // Request tables of the same name HTableInterface sameTable1 = pool.getTable(TABLENAME); HTableInterface sameTable2 = pool.getTable(TABLENAME); @@ -224,8 +232,10 @@ public class TestHTablePool { @Test public void testCloseTablePool() throws IOException { + HTableFactoryWithTableCounter tableFactory = new HTableFactoryWithTableCounter(); + HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), 4, - getPoolType()); + tableFactory, getPoolType()); HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); if (admin.tableExists(TABLENAME)) { @@ -243,19 +253,17 @@ public class TestHTablePool { tables[i] = pool.getTable(TABLENAME); } + tableFactory.assertTableCount(4); + pool.closeTablePool(TABLENAME); + tableFactory.assertTableCount(4); + for (int i = 0; i < 4; ++i) { tables[i].close(); } - Assert.assertEquals(4, - pool.getCurrentPoolSize(Bytes.toString(TABLENAME))); - - pool.closeTablePool(TABLENAME); - - Assert.assertEquals(0, - pool.getCurrentPoolSize(Bytes.toString(TABLENAME))); + tableFactory.assertTableCount(0); } } @@ -268,14 +276,18 @@ public class TestHTablePool { @Test public void testTableWithMaxSize() throws Exception { + HTableFactoryWithTableCounter tableFactory = new HTableFactoryWithTableCounter(); + HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), 2, - getPoolType()); + tableFactory, getPoolType()); // Request tables from an empty pool HTableInterface table1 = pool.getTable(TABLENAME); HTableInterface table2 = pool.getTable(TABLENAME); HTableInterface table3 = pool.getTable(TABLENAME); + tableFactory.assertTableCount(1); + // Close tables (returns tables to the pool) table1.close(); table2.close(); @@ -283,6 +295,8 @@ public class TestHTablePool { // <= 2 table3.close(); + tableFactory.assertTableCount(1); + // Request tables of the same name HTableInterface sameTable1 = pool.getTable(TABLENAME); HTableInterface sameTable2 = pool.getTable(TABLENAME); @@ -300,8 +314,10 @@ public class TestHTablePool { @Test public void testCloseTablePool() throws IOException { + HTableFactoryWithTableCounter tableFactory = new HTableFactoryWithTableCounter(); + HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), 4, - getPoolType()); + tableFactory, getPoolType()); HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); if (admin.tableExists(TABLENAME)) { @@ -319,20 +335,41 @@ public class TestHTablePool { tables[i] = pool.getTable(TABLENAME); } + tableFactory.assertTableCount(1); + pool.closeTablePool(TABLENAME); + tableFactory.assertTableCount(1); + for (int i = 0; i < 4; ++i) { tables[i].close(); } - Assert.assertEquals(1, - pool.getCurrentPoolSize(Bytes.toString(TABLENAME))); - - pool.closeTablePool(TABLENAME); - - Assert.assertEquals(0, - pool.getCurrentPoolSize(Bytes.toString(TABLENAME))); + tableFactory.assertTableCount(0); } } + private static class HTableFactoryWithTableCounter implements HTableInterfaceFactory { + final HTableInterfaceFactory baseTableFactory = new HTableFactory(); + final AtomicInteger tableCounter = new AtomicInteger(); + + @Override + public HTableInterface createHTableInterface(Configuration config, byte[] tableName) { + HTableInterface table = baseTableFactory.createHTableInterface(config, tableName); + tableCounter.incrementAndGet(); + return table; + } + + @Override + public void releaseHTableInterface(HTableInterface table) throws IOException { + tableCounter.decrementAndGet(); + baseTableFactory.releaseHTableInterface(table); + } + + void assertTableCount(int count) { + Assert.assertEquals(count, tableCounter.get()); + } + + } + }