diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java index 3dc5b49..86364b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java @@ -24,8 +24,8 @@ import java.util.Collection; import java.util.List; import java.util.Map; -import com.google.protobuf.Service; -import com.google.protobuf.ServiceException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -36,8 +36,13 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.PoolMap; import org.apache.hadoop.hbase.util.PoolMap.PoolType; +import org.apache.hadoop.hbase.util.ReusableSharedMap; +import org.apache.hadoop.hbase.util.SharedMap; +import org.apache.hadoop.hbase.util.ThreadLocalSharedMapDecorator; + +import com.google.protobuf.Service; +import com.google.protobuf.ServiceException; /** * A simple pool of HTable instances. @@ -64,9 +69,9 @@ import org.apache.hadoop.hbase.util.PoolMap.PoolType; @InterfaceAudience.Public @InterfaceStability.Stable public class HTablePool implements Closeable { - private final PoolMap tables; - private final int maxSize; - private final PoolType poolType; + private static final Log log = LogFactory.getLog(HTablePool.class); + + private final SharedMap tables; private final Configuration config; private final HTableInterfaceFactory tableFactory; @@ -144,24 +149,14 @@ public class HTablePool implements Closeable { // Make a new configuration instance so I can safely cleanup when // done with the pool. this.config = config == null ? HBaseConfiguration.create() : config; - this.maxSize = maxSize; - this.tableFactory = tableFactory == null ? new HTableFactory() - : tableFactory; - if (poolType == null) { - this.poolType = PoolType.Reusable; + this.tableFactory = tableFactory == null ? new HTableFactory() : tableFactory; + + if (poolType != PoolType.ThreadLocal) { + tables = new ReusableSharedMap(maxSize); } else { - switch (poolType) { - case Reusable: - case ThreadLocal: - this.poolType = poolType; - break; - default: - this.poolType = PoolType.Reusable; - break; - } + tables = new ThreadLocalSharedMapDecorator( + new ReusableSharedMap(maxSize)); } - this.tables = new PoolMap(this.poolType, - this.maxSize); } /** @@ -196,9 +191,10 @@ public class HTablePool implements Closeable { * if there is a problem instantiating the HTable */ private HTableInterface findOrCreateTable(String tableName) { - HTableInterface table = tables.get(tableName); + HTableInterface table = tables.borrowObject(tableName); if (table == null) { table = createHTable(tableName); + tables.registerObject(tableName, table); } return table; } @@ -258,13 +254,11 @@ public class HTablePool implements Closeable { private void returnTable(HTableInterface table) throws IOException { // this is the old putTable method renamed and made private String tableName = Bytes.toString(table.getTableName()); - if (tables.size(tableName) >= maxSize) { + + if (! tables.returnObject(tableName, table)) { // release table instance since we're not reusing it - this.tables.remove(tableName, table); - this.tableFactory.releaseHTableInterface(table); - return; + releaseTableQuietly(table); } - tables.put(tableName, table); } protected HTableInterface createHTable(String tableName) { @@ -283,13 +277,7 @@ public class HTablePool implements Closeable { * @param tableName */ public void closeTablePool(final String tableName) throws IOException { - Collection tables = this.tables.values(tableName); - if (tables != null) { - for (HTableInterface table : tables) { - this.tableFactory.releaseHTableInterface(table); - } - } - this.tables.remove(tableName); + releaseTablesQuietly(tables.clear(tableName)); } /** @@ -308,14 +296,23 @@ public class HTablePool implements Closeable { * Note: this is a 'shutdown' of all the table pools. */ public void close() throws IOException { - for (String tableName : tables.keySet()) { - closeTablePool(tableName); + releaseTablesQuietly(tables.clear()); + } + + private void releaseTableQuietly(HTableInterface table) { + try { + tableFactory.releaseHTableInterface(table); + } catch (Exception e) { + if (log.isErrorEnabled()) { + log.error("Failed to release a table: " + Bytes.toString(table.getTableName()), e); + } } - this.tables.clear(); } - int getCurrentPoolSize(String tableName) { - return tables.size(tableName); + private void releaseTablesQuietly(Collection tables) { + for (HTableInterface table : tables) { + releaseTableQuietly(table); + } } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java index 379982e..ac19858 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java @@ -35,14 +35,20 @@ import java.net.SocketException; import java.net.SocketTimeoutException; import java.net.UnknownHostException; import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Random; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import javax.net.SocketFactory; @@ -54,13 +60,13 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseBody; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader.Status; -import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader; -import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation; import org.apache.hadoop.hbase.protobuf.generated.Tracing.RPCTInfo; import org.apache.hadoop.hbase.security.HBaseSaslRpcClient; @@ -72,8 +78,9 @@ import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier; import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.PoolMap; import org.apache.hadoop.hbase.util.PoolMap.PoolType; +import org.apache.hadoop.hbase.util.RoundRobinSharedMap; +import org.apache.hadoop.hbase.util.SharedMap; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RemoteException; @@ -83,7 +90,6 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenSelector; -import org.apache.hadoop.util.ReflectionUtils; import org.cloudera.htrace.Span; import org.cloudera.htrace.Trace; @@ -106,9 +112,10 @@ public class HBaseClient { private static final Log LOG = LogFactory .getLog("org.apache.hadoop.ipc.HBaseClient"); - protected final PoolMap connections; + protected final SharedMap connections; + private final ThreadGroup connectionThreadGroup; - protected int counter; // counter for call ids + protected final AtomicInteger counter = new AtomicInteger(); // counter for call ids protected final AtomicBoolean running = new AtomicBoolean(true); // if client runs final protected Configuration conf; final protected int maxIdleTime; // connections will be culled if it was idle for @@ -255,54 +262,131 @@ public class HBaseClient { return refCount==0; } + private interface CallRequest { + int getId(); + RpcRequestBody getParam(); + } + /** A call waiting for a value. */ - protected class Call { + private interface Call extends CallRequest { + long getStartTime(); + + /** Set the exception when there is an error. + * Notify the caller the call is done. + * + * @param error exception thrown by the call; either local or remote + */ + void setException(IOException error); + + /** Set the return value when there is no error. + * Notify the caller the call is done. + * + * @param value return value of the call. + */ + void setValue(Message value); + } + + private static abstract class AbstractCallDecorator implements Call { + final Call base; + + AbstractCallDecorator(Call base) { + this.base = base; + } + + @Override + public int getId() { + return base.getId(); + } + + @Override + public RpcRequestBody getParam() { + return base.getParam(); + } + + @Override + public long getStartTime() { + return base.getStartTime(); + } + + @Override + public void setException(IOException error) { + base.setException(error); + done(); + } + + @Override + public void setValue(Message value) { + base.setValue(value); + done(); + } + + void done() {} + } + + private static class SingleCall implements Call { final int id; // call id final RpcRequestBody param; // rpc request object + final long startTime; + Message value; // value, null if error IOException error; // exception, null if value - boolean done; // true when call is done - long startTime; + final CountDownLatch doneLatch = new CountDownLatch(1); - protected Call(RpcRequestBody param) { + SingleCall(int id, RpcRequestBody param, long startTime) { + this.id = id; this.param = param; - this.startTime = System.currentTimeMillis(); - synchronized (HBaseClient.this) { - this.id = counter++; - } + this.startTime = startTime; } - /** Indicate when the call is complete and the - * value or error are available. Notifies by default. */ - protected synchronized void callComplete() { - this.done = true; - notify(); // notify caller + @Override + public int getId() { + return id; } - /** Set the exception when there is an error. - * Notify the caller the call is done. - * - * @param error exception thrown by the call; either local or remote - */ - public synchronized void setException(IOException error) { + @Override + public RpcRequestBody getParam() { + return param; + } + + @Override + public long getStartTime() { + return startTime; + } + + @Override + public void setException(IOException error) { this.error = error; - callComplete(); + doneLatch.countDown(); } - /** Set the return value when there is no error. - * Notify the caller the call is done. - * - * @param value return value of the call. - */ - public synchronized void setValue(Message value) { + @Override + public void setValue(Message value) { this.value = value; - callComplete(); + doneLatch.countDown(); } - public long getStartTime() { - return this.startTime; + Message getValue() throws IOException { + boolean interrupted = false; + try { + while (true) { + try { + doneLatch.await(); + if (error != null) { + throw error; + } + return value; + } catch (InterruptedException e) { + interrupted = true; + } + } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } } } + protected static Map> tokenHandlers = new HashMap>(); static { @@ -318,10 +402,11 @@ public class HBaseClient { return new Connection(remoteId); } - /** Thread that reads responses and notifies callers. Each connection owns a + /** Runnable object that reads responses and notifies callers. Each connection owns a * socket connected to a remote address. Calls are multiplexed through this * socket: responses may be delivered out of order. */ - protected class Connection extends Thread { + protected class Connection implements Runnable { + private final String name; private ConnectionHeader header; // connection header protected ConnectionId remoteId; protected Socket socket = null; // connected socket @@ -402,11 +487,10 @@ public class HBaseClient { } this.header = builder.build(); - this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " + + this.name = "IPC Client (" + socketFactory.hashCode() +") connection to " + remoteId.getAddress().toString() + - ((ticket==null)?" from an unknown user": (" from " - + ticket.getUserName()))); - this.setDaemon(true); + ((ticket==null)?" from an unknown user": (" from " + + ticket.getUserName())); } private UserInformation getUserInfoPB(UserGroupInformation ugi) { @@ -450,15 +534,12 @@ public class HBaseClient { if (this.shouldCloseConnection.get()) { if (this.closeException == null) { call.setException(new IOException( - "Call " + call.id + " not added as the connection " + remoteId + " is closing")); + "Call " + call.getId() + " not added as the connection " + remoteId + " is closing")); } else { call.setException(this.closeException); } - synchronized (call) { - call.notifyAll(); - } } else { - calls.put(call.id, call); + calls.put(call.getId(), call); notify(); } } @@ -648,7 +729,7 @@ public class HBaseClient { @Override public void run() { if (LOG.isDebugEnabled()) - LOG.debug(getName() + ": starting, having connections " + LOG.debug(name + ": starting, having connections " + connections.size()); try { @@ -663,7 +744,7 @@ public class HBaseClient { close(); if (LOG.isDebugEnabled()) - LOG.debug(getName() + ": stopped, remaining connections " + LOG.debug(name + ": stopped, remaining connections " + connections.size()); } @@ -843,7 +924,9 @@ public class HBaseClient { touch(); // start the receiver thread after the socket connection has been set up - start(); + Thread thread = new Thread(connectionThreadGroup, this, name); + thread.setDaemon(true); + thread.start(); return; } } catch (IOException e) { @@ -884,11 +967,7 @@ public class HBaseClient { // release the resources // first thing to do;take the connection out of the connection list - synchronized (connections) { - if (connections.get(remoteId) == this) { - connections.remove(remoteId); - } - } + connections.invalidateObject(remoteId, this); // close the streams and therefore the socket IOUtils.closeStream(out); @@ -917,23 +996,23 @@ public class HBaseClient { cleanupCalls(); } if (LOG.isDebugEnabled()) - LOG.debug(getName() + ": closed"); + LOG.debug(name + ": closed"); } /* Initiates a call by sending the parameter to the remote server. * Note: this is not called from the Connection thread, but by other * threads. */ - protected void sendParam(Call call) { + protected void sendParam(CallRequest call) { if (shouldCloseConnection.get()) { return; } try { if (LOG.isDebugEnabled()) - LOG.debug(getName() + " sending #" + call.id); + LOG.debug(name + " sending #" + call.getId()); RpcRequestHeader.Builder headerBuilder = RPCProtos.RpcRequestHeader.newBuilder(); - headerBuilder.setCallId(call.id); + headerBuilder.setCallId(call.getId()); if (Trace.isTracing()) { Span s = Trace.currentTrace(); @@ -946,16 +1025,16 @@ public class HBaseClient { synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC RpcRequestHeader header = headerBuilder.build(); int serializedHeaderSize = header.getSerializedSize(); - int requestSerializedSize = call.param.getSerializedSize(); + int requestSerializedSize = call.getParam().getSerializedSize(); this.out.writeInt(serializedHeaderSize + CodedOutputStream.computeRawVarint32Size(serializedHeaderSize) + requestSerializedSize + CodedOutputStream.computeRawVarint32Size(requestSerializedSize)); header.writeDelimitedTo(this.out); - call.param.writeDelimitedTo(this.out); + call.getParam().writeDelimitedTo(this.out); this.out.flush(); } - } catch(IOException e) { + } catch (IOException e) { markClosed(e); } } @@ -984,7 +1063,7 @@ public class HBaseClient { int id = response.getCallId(); if (LOG.isDebugEnabled()) - LOG.debug(getName() + " got value #" + id); + LOG.debug(name + " got value #" + id); Call call = calls.get(id); Status status = response.getStatus(); @@ -993,7 +1072,7 @@ public class HBaseClient { try { rpcResponseType = ProtobufRpcEngine.Invoker.getReturnProtoType( ProtobufRpcEngine.Server.getMethod(remoteId.getProtocol(), - call.param.getMethodName())); + call.getParam().getMethodName())); } catch (Exception e) { throw new RuntimeException(e); //local exception } @@ -1065,13 +1144,10 @@ public class HBaseClient { // over on the server; e.g. I just asked the regionserver to bulk // open 3k regions or its a big fat multiput into a heavily-loaded // server (Perhaps this only happens at the extremes?) - this.closeException = new CallTimeoutException("Call id=" + c.id + + this.closeException = new CallTimeoutException("Call id=" + c.getId() + ", waitTime=" + waitTime + ", rpcTimetout=" + rpcTimeout); } c.setException(this.closeException); - synchronized (c) { - c.notifyAll(); - } itor.remove(); } else { break; @@ -1106,44 +1182,64 @@ public class HBaseClient { } } - /** Call implementation used for parallel calls. */ - protected class ParallelCall extends Call { - private final ParallelResults results; - protected final int index; + /** Result collector for parallel calls. */ + private static class ParallelResults { + final Message[] values; + final CountDownLatch doneLatch; - public ParallelCall(RpcRequestBody param, ParallelResults results, int index) { - super(param); - this.results = results; - this.index = index; + ParallelResults(int size) { + values = new RpcResponseBody[size]; + doneLatch = new CountDownLatch(size); } - /** Deliver result to result collector. */ - @Override - protected void callComplete() { - results.callComplete(this); + Message[] getValues() { + boolean interrupted = false; + try { + while (true) { + try { + doneLatch.await(); + return values; + } catch (InterruptedException e) { + interrupted = true; + } + } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } } - } - /** Result collector for parallel calls. */ - protected static class ParallelResults { - protected final Message[] values; - protected int size; - protected int count; + Call getCall(final int index, final int id, final RpcRequestBody param, final long startTime) { + return new Call() { + @Override + public int getId() { + return id; + } - public ParallelResults(int size) { - this.values = new RpcResponseBody[size]; - this.size = size; - } + @Override + public RpcRequestBody getParam() { + return param; + } - /* - * Collect a result. - */ - synchronized void callComplete(ParallelCall call) { - // FindBugs IS2_INCONSISTENT_SYNC - values[call.index] = call.value; // store the value - count++; // count it - if (count == size) // if all values are in - notify(); // then notify waiting caller + @Override + public long getStartTime() { + return startTime; + } + + @Override + public void setException(IOException error) { + // log errors + LOG.info("Exception in parallel calls: " + error.getMessage(), error); + doneLatch.countDown(); + } + + @Override + public void setValue(Message value) { + values[index] = value; + doneLatch.countDown(); + } + }; } } @@ -1167,9 +1263,13 @@ public class HBaseClient { this.conf = conf; this.socketFactory = factory; this.clusterId = conf.get(HConstants.CLUSTER_ID, "default"); - this.connections = new PoolMap( - getPoolType(conf), getPoolSize(conf)); + this.connections = newConnections(conf); this.failedServers = new FailedServers(conf); + this.connectionThreadGroup = new ThreadGroup("IPC Client (" + socketFactory.hashCode() +") connection"); + } + + private static SharedMap newConnections(Configuration conf) { + return new RoundRobinSharedMap(getPoolSize(conf)); } /** @@ -1232,11 +1332,7 @@ public class HBaseClient { } // wake up all connections - synchronized (connections) { - for (Connection conn : connections.values()) { - conn.interrupt(); - } - } + connectionThreadGroup.interrupt(); // wait until all connections are closed while (!connections.isEmpty()) { @@ -1275,35 +1371,19 @@ public class HBaseClient { Class protocol, User ticket, int rpcTimeout) throws InterruptedException, IOException { - Call call = new Call(param); - Connection connection = getConnection(addr, protocol, ticket, rpcTimeout, call); - connection.sendParam(call); // send the parameter - boolean interrupted = false; - //noinspection SynchronizationOnLocalVariableOrMethodParameter - synchronized (call) { - while (!call.done) { - try { - call.wait(); // wait for the result - } catch (InterruptedException ignored) { - // save the fact that we were interrupted - interrupted = true; - } - } - - if (interrupted) { - // set the interrupt flag now that we are done waiting - Thread.currentThread().interrupt(); - } - if (call.error != null) { - if (call.error instanceof RemoteException) { - call.error.fillInStackTrace(); - throw call.error; - } - // local exception - throw wrapException(addr, call.error); + SingleCall call = new SingleCall(counter.getAndIncrement(), param, System.currentTimeMillis()); + ConnectionId remoteId = getConnectionId(addr, protocol, ticket, rpcTimeout); + sendParam(remoteId, call); + try { + return call.getValue(); + } catch (IOException e) { + if (e instanceof RemoteException) { + e.fillInStackTrace(); + throw e; } - return call.value; + // local exception + throw wrapException(addr, e); } } @@ -1366,68 +1446,111 @@ public class HBaseClient { if (addresses.length == 0) return new RpcResponseBody[0]; ParallelResults results = new ParallelResults(params.length); - // TODO this synchronization block doesnt make any sense, we should possibly fix it - //noinspection SynchronizationOnLocalVariableOrMethodParameter - synchronized (results) { - for (int i = 0; i < params.length; i++) { - ParallelCall call = new ParallelCall(params[i], results, i); - try { - Connection connection = - getConnection(addresses[i], protocol, ticket, 0, call); - connection.sendParam(call); // send each parameter - } catch (IOException e) { - // log errors - LOG.info("Calling "+addresses[i]+" caught: " + - e.getMessage(),e); - results.size--; // wait for one fewer result - } + long currentTime = System.currentTimeMillis(); + + Map> callMap = new LinkedHashMap>(); + for (int i = 0; i < params.length; i++) { + Call call = results.getCall(i, counter.getAndIncrement(), params[i], currentTime); + + List calls = callMap.get(addresses[i]); + if (calls == null) { + calls = new ArrayList(); + callMap.put(addresses[i], calls); } - while (results.count != results.size) { - try { - results.wait(); // wait for all results - } catch (InterruptedException ignored) {} + calls.add(call); + } + + for (Entry> entry : callMap.entrySet()) { + ConnectionId remoteId = getConnectionId(entry.getKey(), protocol, ticket, 0); + sendParam(remoteId, entry.getValue()); + } + + return results.getValues(); + } + + private void sendParam(final ConnectionId remoteId, Call call) + throws IOException, InterruptedException { + final Connection connection = getConnection(remoteId); + + connection.addCall(new AbstractCallDecorator(call) { + @Override + void done() { + connections.returnObject(remoteId, connection); } + }); + + //we don't invoke the method below inside "synchronized (connections)" + //block in getConnection method above. + //The reason for that is if the server happens to be slow, + //it will take longer to establish a connection and that will slow the + //entire system down. + //Moreover, if the connection is currently created, there will be many threads + //waiting here; as setupIOstreams is synchronized. If the connection fails with a + //timeout, they will all fail simultaneously. This is checked in setupIOstreams. + connection.setupIOstreams(); + + connection.sendParam(call); + } + + private void sendParam(final ConnectionId remoteId, Collection calls) + throws IOException, InterruptedException { + final Connection connection = getConnection(remoteId); + + final AtomicInteger countDown = new AtomicInteger(calls.size()); - return results.values; + for (Call call : calls) { + connection.addCall(new AbstractCallDecorator(call) { + @Override + void done() { + if (countDown.decrementAndGet() == 0) { + connections.returnObject(remoteId, connection); + } + } + }); + } + + //we don't invoke the method below inside "synchronized (connections)" + //block in getConnection method above. + //The reason for that is if the server happens to be slow, + //it will take longer to establish a connection and that will slow the + //entire system down. + //Moreover, if the connection is currently created, there will be many threads + //waiting here; as setupIOstreams is synchronized. If the connection fails with a + //timeout, they will all fail simultaneously. This is checked in setupIOstreams. + connection.setupIOstreams(); + + for (Call call : calls) { + connection.sendParam(call); } } /* Get a connection from the pool, or create a new one and add it to the * pool. Connections to a given host/port are reused. */ - protected Connection getConnection(InetSocketAddress addr, - Class protocol, - User ticket, - int rpcTimeout, - Call call) - throws IOException, InterruptedException { + private Connection getConnection(ConnectionId remoteId) throws IOException { if (!running.get()) { // the client is stopped throw new IOException("The client is stopped"); } - Connection connection; - /* we could avoid this allocation for each RPC by having a - * connectionsId object and with set() method. We need to manage the - * refs for keys in HashMap properly. For now its ok. - */ - ConnectionId remoteId = new ConnectionId(addr, protocol, ticket, rpcTimeout); + synchronized (connections) { - connection = connections.get(remoteId); + Connection connection = connections.borrowObject(remoteId); if (connection == null) { connection = createConnection(remoteId); - connections.put(remoteId, connection); + connections.registerObject(remoteId, connection); } + return connection; } - connection.addCall(call); + } - //we don't invoke the method below inside "synchronized (connections)" - //block above. The reason for that is if the server happens to be slow, - //it will take longer to establish a connection and that will slow the - //entire system down. - //Moreover, if the connection is currently created, there will be many threads - // waiting here; as setupIOstreams is synchronized. If the connection fails with a - // timeout, they will all fail simultaneously. This is checked in setupIOstreams. - connection.setupIOstreams(); - return connection; + /* we could avoid this allocation for each RPC by having a + * connectionsId object and with set() method. We need to manage the + * refs for keys in HashMap properly. For now its ok. + */ + private static ConnectionId getConnectionId(InetSocketAddress address, + Class protocol, + User ticket, + int rpcTimeout) { + return new ConnectionId(address, protocol, ticket, rpcTimeout); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractSharedMap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractSharedMap.java new file mode 100644 index 0000000..2350ef2 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractSharedMap.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * Skeleton implementation of {@code SharedMap}. + *

+ * All of the public methods are synchronized with this, except for argument checks. + * Objects to be pooled are identified with their equality, instead of sameness. + * + * @param the type of keys that you associate objects to be pooled with + * @param the type of objects to be pooled + */ +abstract class AbstractSharedMap implements SharedMap { + /** Guarded by this */ + private final Map> map = new HashMap>(); + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public V borrowObject(K key) { + if (key == null) { throw new NullPointerException(); } + + synchronized (this) { + Pool pool = map.get(key); + if (pool == null) { + return null; + } + return pool.borrowObject(); + } + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public boolean registerObject(K key, V value) { + if (key == null) { throw new NullPointerException("key"); } + if (value == null) { throw new NullPointerException("value"); } + + synchronized (this) { + Pool pool = map.get(key); + if (pool == null) { + pool = newPool(); + map.put(key, pool); + } + return pool.registerObject(value); + } + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public boolean returnObject(K key, V value) { + if (key == null) { throw new NullPointerException("key"); } + if (value == null) { throw new NullPointerException("value"); } + + synchronized (this) { + Pool pool = map.get(key); + if (pool == null) { + return false; + } + return pool.returnObject(value); + } + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public void invalidateObject(K key, V value) { + if (key == null) { throw new NullPointerException("key"); } + if (value == null) { throw new NullPointerException("value"); } + + synchronized (this) { + Pool pool = map.get(key); + if (pool == null) { + return; + } + + pool.invalidateObject(value); + + if (pool.isEmpty()) { + map.remove(pool); + } + } + } + + @Override + public synchronized Collection clear() { + Collection idleObjects = new ArrayList(); + + for (Pool pool : map.values()) { + idleObjects.addAll(pool.clear()); + } + + map.clear(); + + return idleObjects; + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public Collection clear(Object key) { + if (key == null) { throw new NullPointerException(); } + + synchronized (this) { + Pool pool = map.get(key); + return pool.clear(); + } + } + + @Override + public synchronized int size() { + int size = 0; + for (Pool pool : map.values()) { + size += pool.size(); + } + return size; + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public int size(K key) { + if (key == null) { throw new NullPointerException(); } + + synchronized (this) { + Pool pool = map.get(key); + return pool == null ? 0 : pool.size(); + } + } + + @Override + public synchronized boolean isEmpty() { + for (Pool pool : map.values()) { + if (! pool.isEmpty()) { + return false; + } + } + return true; + } + + abstract Pool newPool(); + + interface Pool { + /** + * Returns one of the pooled objects, + * or returns null if no appropriate object is pooled. + */ + V borrowObject(); + + /** + * Registers the given {@code value} to this instance. + * At this point the registered object is regarded as borrowed. + *

+ * This method does nothing and returns false if and only if + * the given {@code value} is already registered. + * In practice the {@code value} is expected to be a newly created object, + * and this method is expected to return true. + */ + boolean registerObject(V value); + + /** + * Makes the given {@code value} to be returned into this instance. + * It is expected that the {@code value} has been borrowed from this instance. + *

+ * If this method returns false, + * then the the registration of the {@code value} is invalidated. + * This happens because the pool is full when this method is called, + * or you have explicitly called {@link #invalidateObject()} or {@link #clear()}. + */ + boolean returnObject(V value); + + /** + * Invalidates the registration of the given {@code value}. + * You may call this method regardless of whether the {@code value} is borrowed or pooled. + */ + void invalidateObject(V value); + + /** + * Invalidates all of the registrations to this instance, + * and returns the cleared idle objects in the pool. + */ + Collection clear(); + + /** + * Returns the count of the registered objects. + */ + int size(); + + /** + * Returns true if there is no registration. + */ + boolean isEmpty(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ReusableSharedMap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ReusableSharedMap.java new file mode 100644 index 0000000..9f6a27e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ReusableSharedMap.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Queue; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Implementation of {@code SharedMap}, + * ensuring that it lends a pooled object to a single caller at the same time. + *

+ * All of the public methods are synchronized with this, except for argument checks. + * Objects to be pooled are identified with their equality, instead of sameness. + * + * @param the type of keys that you associate objects to be pooled with + * @param the type of objects to be pooled + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class ReusableSharedMap extends AbstractSharedMap { + private final int maxObjectCountPerKey; + + /** + * Constructs with the given {@code maxObjectCountPerKey}, + * which is a max count of idle objects to hold for each key. + */ + public ReusableSharedMap(int maxObjectCountPerKey) { + this.maxObjectCountPerKey = maxObjectCountPerKey; + } + + @Override + Pool newPool() { + return new ReusablePool(); + } + + private class ReusablePool implements Pool { + // The boolean values represent busy (borrowing) or not. + // This is for performance instead of sequential search in idleQueue. + final Map registeredObjects = new HashMap(); + final Queue idleQueue = new ArrayDeque(); + + @Override + public V borrowObject() { + V value = idleQueue.poll(); + if (value == null) { + return null; + } + + registeredObjects.put(value, Boolean.TRUE); + return value; + } + + @Override + public boolean registerObject(V value) { + if (registeredObjects.containsKey(value)) { + return false; + } + registeredObjects.put(value, Boolean.TRUE); + return true; + } + + @Override + public boolean returnObject(V value) { + Boolean busy = registeredObjects.get(value); + if (busy == null) { + return false; + } + + if (! busy) { + return true; + } + + if (idleQueue.size() >= maxObjectCountPerKey) { + registeredObjects.remove(value); + return false; + } + + registeredObjects.put(value, Boolean.FALSE); + idleQueue.offer(value); + return true; + } + + @Override + public void invalidateObject(V value) { + registeredObjects.remove(value); + idleQueue.remove(value); + } + + @Override + public Collection clear() { + Collection idleObjects = new ArrayList(idleQueue); + + registeredObjects.clear(); + idleQueue.clear(); + + return idleObjects; + } + + @Override + public int size() { + return registeredObjects.size(); + } + + @Override + public boolean isEmpty() { + return registeredObjects.isEmpty(); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RoundRobinSharedMap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RoundRobinSharedMap.java new file mode 100644 index 0000000..15447bf --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RoundRobinSharedMap.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Queue; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Implementation of {@code SharedMap} based on the round robin logic, + * except that it rather lends an idle object + * than fills the pool wastefully or give an pooled instance unconditionally. + *

+ * This implementation may lend the same pooled object to different callers at the same time, + * and the pooled object must be thread safe. + *

+ * All of the public methods are synchronized with this, except for argument checks. + * Objects to be pooled are identified with their equality, instead of sameness. + * + * @param the type of keys that you associate objects to be pooled with + * @param the type of objects to be pooled + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class RoundRobinSharedMap extends AbstractSharedMap { + /** the max count of pooled objects for each key, which count is positive */ + private final int maxObjectCountPerKey; + + /** + * Constructs with the given {@code maxObjectCountPerKey}, + * which is a max count of pooled objects for each key + * before starting to lend the same pooled object to different callers, + * and which is also a max count of idle objects to hold for each key + * even if number of registering objects exceeds the max count. + *

+ * The given {@code maxObjectCountPerKey} is expected to be positive, + * otherwise the max count is set to be 1 instead of the given value. + */ + public RoundRobinSharedMap(int maxObjectCountPerKey) { + this.maxObjectCountPerKey = Math.max(1, maxObjectCountPerKey); + } + + @Override + Pool newPool() { + return new RoundRobinPool(); + } + + private class RoundRobinPool implements Pool { + // The integer values represent borrowing counts. + final Map registeredObjects = new HashMap(); + final Queue idleQueue = new ArrayDeque(); + final Queue busyQueue = new ArrayDeque(); + + @Override + public V borrowObject() { + V value = idleQueue.poll(); + if (value != null) { + registeredObjects.put(value, 1); + busyQueue.offer(value); + return value; + } + + if (busyQueue.size() < maxObjectCountPerKey) { + return null; + } + + // Now we are ready to pick up and borrow a busy object in busyQueue, + // which is not empty because busyQueue.size() >= maxObjectCountPerKey > 0. + + value = busyQueue.poll(); + assert value != null; + registeredObjects.put(value, registeredObjects.get(value) + 1); + busyQueue.offer(value); + return value; + } + + @Override + public boolean registerObject(V value) { + if (registeredObjects.containsKey(value)) { + return false; + } + registeredObjects.put(value, 1); + busyQueue.offer(value); + return true; + } + + @Override + public boolean returnObject(V value) { + Integer busyCountWrapper = registeredObjects.get(value); + if (busyCountWrapper == null) { + return false; + } + + int busyCount = busyCountWrapper; + if (busyCount == 0) { + return true; + } + + int nextBusyCount = busyCount - 1; + if (nextBusyCount != 0) { + registeredObjects.put(value, nextBusyCount); + return true; + } + + busyQueue.remove(value); + + if (idleQueue.size() >= maxObjectCountPerKey) { + registeredObjects.remove(value); + return false; + + } else { + registeredObjects.put(value, 0); + idleQueue.offer(value); + return true; + } + } + + @Override + public void invalidateObject(V pooledObject) { + registeredObjects.remove(pooledObject); + idleQueue.remove(pooledObject); + busyQueue.remove(pooledObject); + } + + @Override + public Collection clear() { + Collection idleObjects = new ArrayList(idleQueue); + + registeredObjects.clear(); + idleQueue.clear(); + busyQueue.clear(); + + return idleObjects; + } + + @Override + public int size() { + return registeredObjects.size(); + } + + @Override + public boolean isEmpty() { + return registeredObjects.isEmpty(); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/SharedMap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/SharedMap.java new file mode 100644 index 0000000..27d0c92 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/SharedMap.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.Collection; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * This provides functions to pool and share objects. + *

+ * In order to use the pooled objects, first of all call {@link #borrowObject()}. + * If the method returns a non-null object, use it. + * If the method returns null, create a new object and call {@link #registerObject()}, + * and use it. Just after the registration the object is regarded as borrowed. + *

+ * After using the borrowed object, + * call {@link #returnObject()} to return the object to the pool, + * or call {@link #invalidateObject()} to invalidate the object's registration + * instead of returning the object. + * If the pool is already full, the method {@code returnObject()} returns false + * with invalidating the registration of the object. + * Also the method returns false if the object's registration is already invalidated + * because {@link #invalidateObject()} or {@link #clear()} has been called. + * Anyway if the method {@code returnObject()} returns false + * the object is rejected to return the pool and is already invalidated, + * and you might tear down the rejected object. + * You can safely tear down the rejected object even in multi-thread contexts, + * because the object is never passed to the other threads via the pool. + *

+ * Incidentally, you can call {@link #invalidateObject} and {@link #clear()} + * in order to invalidate the registrations at any time, by any thread, + * regardless of whether the objects are borrowed or not. + * The method {@code clear()} returns a collection of the idle objects, + * which have been registered but not borrowed and idle in the pool, + * and you can safely tear down them in the sense that + * you get them exclusively in multi-thread contexts. + * The rest of the objects that have been registered are borrowed and in use + * by some other threads at the moment, and you can't tear down them right now. + * But the borrower eventually calls {@code returnObject()} (which returns false) + * or {@code invalidateObject()} after using the borrowed object, and in either method + * the borrower know that the registration of the object is already invalidated + * and the object is ready to tear down. + *

+ * The implementations of this interface are expected to be thread safe, + * but it depends on the implementations whether it borrows the same pooled object + * to different callers at the same time. + *

+ * Objects to be pooled are identified with their equality, instead of sameness. + * That enables us to use ordinary collections for the objects + * and simplify the implementation logic of this interface. + * Also that is enough for practical uses. + * + * @param the type of keys that you associate objects to be pooled with + * @param the type of objects to be pooled + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface SharedMap { + /** + * Returns one of the pooled objects associated with the given {@code key}, + * or returns null if no appropriate object is pooled. + * In the latter case, it is expected that a new instance is created + * and registered to this instance. + * + * @throws NullPointerException if {@code key} is null + */ + V borrowObject(K key); + + /** + * Registers the given {@code value} to this instance, + * associating it with the given {@code key}. + * At this point the registered object is regarded as borrowed + * and you can continue to use the object, + * and after finishing using it you have to return or invalidate + * the registered object into this instance. + *

+ * This method does nothing and returns false if and only if + * the given {@code value} is already registered + * associated with the given {@code key}. + * In practice the {@code value} is expected to be a newly created object, + * and this method is expected to return true. + * + * @throws NullPointerException if {@code key} or {@code value} is null + */ + boolean registerObject(K key, V value); + + /** + * Makes the given {@code value} associated with the given {@code key} + * to be returned into this instance. + * It is expected that the {@code value} has been borrowed from this instance. + *

+ * If this method returns false, then the registration of the {@code value} is invalidated. + * This happens because the pool is full when this method is called, + * or you have explicitly called {@link #invalidateObject()} or {@link #clear()}. + * You might have to tear down the rejected object. + * + * @throws NullPointerException if {@code key} or {@code value} is null + */ + boolean returnObject(K key, V value); + + /** + * Invalidates the registration of the given {@code value} associated with the {@code key}. + * You may call this method regardless of whether the {@code value} is borrowed or pooled. + * You might have to tear down the removed object. + * + * @throws NullPointerException if {@code key} or {@code value} is null + */ + void invalidateObject(K key, V value); + + /** + * Invalidates all of the registrations to this instance, + * and returns the objects that have been registered but idle in the pool. + * You might have to tear down the returned objects. + *

+ * The rest of the objects that have been registered are still in use at this moment, + * and they are not contained in the collection returned by this method. + * In fact, you should not tear down them until the borrowers complete the usage. + * Instead, because the borrowers are supposed to call {@link #returnObject()} + * (which rejects invalidated objects and returns false) + * or {@link #invalidateObject()} after their usage, + * you can make the borrowers themselves tear down the invalidated objects + * just after calling these methods. + */ + Collection clear(); + + /** + * Invalidates all of the registrations to this instance associated with the given {@code key}, + * and returns the objects that have been registered but idle in the pool. + * You might have to tear down the returned objects. + *

+ * The rest of the objects that have been registered are still in use at this moment, + * and they are not contained in the collection returned by this method. + * In fact, you should not tear down them until the borrowers complete the usage. + * Instead, because the borrowers are supposed to call {@link #returnObject()} + * (which rejects invalidated objects and returns false) + * or {@link #invalidateObject()} after their usage, + * you can make the borrowers themselves tear down the invalidated objects + * just after calling these methods. + * + * @throws NullPointerException if {@code key} is null + */ + Collection clear(K key); + + /** + * Returns the count of the registered objects. + */ + int size(); + + /** + * Returns the count of the registered objects associated with the given {@code key}. + * + * @throws NullPointerException if {@code key} is null + */ + int size(K key); + + /** + * Returns true if there is no registration. + */ + boolean isEmpty(); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ThreadLocalSharedMapDecorator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ThreadLocalSharedMapDecorator.java new file mode 100644 index 0000000..78c7d9f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ThreadLocalSharedMapDecorator.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Decorator of {@code SharedMap}, + * giving the same pooled object for each key at the same time for each thread. + * This lends the same object even after the other threads invalidate the object, + * until the borrowing thread completely returns the object or invalidates it. + *

+ * Note that it depends on the base instance of {@code SharedMap} + * whether the each pooled object is accessed by at most one thread at the same time or not. + *

+ * Thread safe. + * Objects to be pooled are identified with their equality, instead of sameness. + * + * @param the type of keys that you associate objects to be pooled with + * @param the type of objects to be pooled + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class ThreadLocalSharedMapDecorator implements SharedMap { + private static class BorrowingCounter { + final V value; + + int count = 1; + + BorrowingCounter(V value) { + assert value != null; + this.value = value; + } + } + + private final ThreadLocal>> borrowingCounterMapRef + = new ThreadLocal>>() { + @Override + protected Map> initialValue() { + return new HashMap>(); + } + }; + + private final SharedMap base; + + /** + * @throws NullPointerException if {@code base} is null + */ + public ThreadLocalSharedMapDecorator(SharedMap base) { + if (base == null) { throw new NullPointerException(); } + this.base = base; + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public V borrowObject(K key) { + if (key == null) { throw new NullPointerException(); } + + Map> counterMap = borrowingCounterMapRef.get(); + + BorrowingCounter counter = counterMap.get(key); + if (counter == null) { + V value = base.borrowObject(key); + if (value != null) { + counterMap.put(key, new BorrowingCounter(value)); + } + return value; + } + + counter.count++; + return counter.value; + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public boolean registerObject(K key, V value) { + if (! base.registerObject(key, value)) { + return false; + } + + Map> counterMap = borrowingCounterMapRef.get(); + BorrowingCounter counter = counterMap.get(key); + if (counter == null) { + counterMap.put(key, new BorrowingCounter(value)); + } + return true; + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public boolean returnObject(K key, V value) { + if (key == null) { throw new NullPointerException("key"); } + if (value == null) { throw new NullPointerException("value"); } + + Map> counterMap = borrowingCounterMapRef.get(); + + BorrowingCounter counter = counterMap.get(key); + if (counter == null || ! counter.value.equals(value)) { + return base.returnObject(key, value); + } + + if (--counter.count > 0) { + return true; + } + + counterMap.remove(key); + return base.returnObject(key, value); + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public void invalidateObject(K key, V value) { + base.invalidateObject(key, value); + + Map> counterMap = borrowingCounterMapRef.get(); + BorrowingCounter counter = counterMap.get(key); + if (counter == null || ! counter.value.equals(value)) { + return; + } + counterMap.remove(key); + } + + @Override + public Collection clear() { + return base.clear(); + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public Collection clear(K key) { + return base.clear(key); + } + + @Override + public int size() { + return base.size(); + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + @Override + public int size(K key) { + return base.size(key); + } + + @Override + public boolean isEmpty() { + return base.isEmpty(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTablePool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTablePool.java index 49f75aa..522321c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTablePool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHTablePool.java @@ -19,10 +19,12 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; import junit.framework.Assert; import junit.framework.TestCase; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.PoolMap.PoolType; @@ -193,20 +195,26 @@ public class TestHTablePool { @Test public void testTableWithMaxSize() throws Exception { + HTableFactoryWithTableCounter tableFactory = new HTableFactoryWithTableCounter(); + HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), 2, - getPoolType()); + tableFactory, getPoolType()); // Request tables from an empty pool HTableInterface table1 = pool.getTable(TABLENAME); HTableInterface table2 = pool.getTable(TABLENAME); HTableInterface table3 = pool.getTable(TABLENAME); + tableFactory.assertTableCount(3); + // Close tables (returns tables to the pool) table1.close(); table2.close(); // The pool should reject this one since it is already full table3.close(); + tableFactory.assertTableCount(2); + // Request tables of the same name HTableInterface sameTable1 = pool.getTable(TABLENAME); HTableInterface sameTable2 = pool.getTable(TABLENAME); @@ -224,8 +232,10 @@ public class TestHTablePool { @Test public void testCloseTablePool() throws IOException { + HTableFactoryWithTableCounter tableFactory = new HTableFactoryWithTableCounter(); + HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), 4, - getPoolType()); + tableFactory, getPoolType()); HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); if (admin.tableExists(TABLENAME)) { @@ -243,19 +253,17 @@ public class TestHTablePool { tables[i] = pool.getTable(TABLENAME); } + tableFactory.assertTableCount(4); + pool.closeTablePool(TABLENAME); + tableFactory.assertTableCount(4); + for (int i = 0; i < 4; ++i) { tables[i].close(); } - Assert.assertEquals(4, - pool.getCurrentPoolSize(Bytes.toString(TABLENAME))); - - pool.closeTablePool(TABLENAME); - - Assert.assertEquals(0, - pool.getCurrentPoolSize(Bytes.toString(TABLENAME))); + tableFactory.assertTableCount(0); } } @@ -268,14 +276,18 @@ public class TestHTablePool { @Test public void testTableWithMaxSize() throws Exception { + HTableFactoryWithTableCounter tableFactory = new HTableFactoryWithTableCounter(); + HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), 2, - getPoolType()); + tableFactory, getPoolType()); // Request tables from an empty pool HTableInterface table1 = pool.getTable(TABLENAME); HTableInterface table2 = pool.getTable(TABLENAME); HTableInterface table3 = pool.getTable(TABLENAME); + tableFactory.assertTableCount(1); + // Close tables (returns tables to the pool) table1.close(); table2.close(); @@ -283,6 +295,8 @@ public class TestHTablePool { // <= 2 table3.close(); + tableFactory.assertTableCount(1); + // Request tables of the same name HTableInterface sameTable1 = pool.getTable(TABLENAME); HTableInterface sameTable2 = pool.getTable(TABLENAME); @@ -300,8 +314,10 @@ public class TestHTablePool { @Test public void testCloseTablePool() throws IOException { + HTableFactoryWithTableCounter tableFactory = new HTableFactoryWithTableCounter(); + HTablePool pool = new HTablePool(TEST_UTIL.getConfiguration(), 4, - getPoolType()); + tableFactory, getPoolType()); HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); if (admin.tableExists(TABLENAME)) { @@ -319,20 +335,41 @@ public class TestHTablePool { tables[i] = pool.getTable(TABLENAME); } + tableFactory.assertTableCount(1); + pool.closeTablePool(TABLENAME); + tableFactory.assertTableCount(1); + for (int i = 0; i < 4; ++i) { tables[i].close(); } - Assert.assertEquals(1, - pool.getCurrentPoolSize(Bytes.toString(TABLENAME))); - - pool.closeTablePool(TABLENAME); - - Assert.assertEquals(0, - pool.getCurrentPoolSize(Bytes.toString(TABLENAME))); + tableFactory.assertTableCount(0); } } + private static class HTableFactoryWithTableCounter implements HTableInterfaceFactory { + final HTableInterfaceFactory baseTableFactory = new HTableFactory(); + final AtomicInteger tableCounter = new AtomicInteger(); + + @Override + public HTableInterface createHTableInterface(Configuration config, byte[] tableName) { + HTableInterface table = baseTableFactory.createHTableInterface(config, tableName); + tableCounter.incrementAndGet(); + return table; + } + + @Override + public void releaseHTableInterface(HTableInterface table) throws IOException { + tableCounter.decrementAndGet(); + baseTableFactory.releaseHTableInterface(table); + } + + void assertTableCount(int count) { + Assert.assertEquals(count, tableCounter.get()); + } + + } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/AbstractTestSharedMap.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/AbstractTestSharedMap.java new file mode 100644 index 0000000..5030346 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/AbstractTestSharedMap.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public abstract class AbstractTestSharedMap { + abstract SharedMap newSharedMapWith3MaxPoolCount(); + + SharedMap map; + Object o1; + Object o2; + Object o3; + Object o4; + + @Before + public void setUp() { + map = newSharedMapWith3MaxPoolCount(); + o1 = new Object(); + o2 = new Object(); + o3 = new Object(); + o4 = new Object(); + } + + @Test + public void testBorrowRegisterReturn() { + Assert.assertNull(map.borrowObject("a")); + + map.registerObject("a", o1); + Assert.assertTrue(map.returnObject("a", o1)); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertNull(map.borrowObject("a")); + Assert.assertTrue(map.returnObject("a", o1)); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertNull(map.borrowObject("a")); + Assert.assertTrue(map.returnObject("a", o1)); + + Assert.assertNull(map.borrowObject("b")); + Assert.assertFalse(map.returnObject("a", o2)); + } + + @Test + public void testMultipleRegisters() { + map.registerObject("a", o1); + map.registerObject("a", o2); + + Assert.assertTrue(map.returnObject("a", o1)); + Assert.assertTrue(map.returnObject("a", o2)); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertNull(map.borrowObject("a")); + } + + @Test + public void testRejectDuplicateRegistration() { + Assert.assertTrue(map.registerObject("a", o1)); + Assert.assertFalse(map.registerObject("a", o1)); + } + + @Test + public void testInvalidateBeforeReturn() { + map.registerObject("a", o1); + + map.invalidateObject("a", o1); + + Assert.assertFalse(map.returnObject("a", o1)); + Assert.assertNull(map.borrowObject("a")); + } + + @Test + public void testInvalidateAfterReturn() { + map.registerObject("a", o1); + Assert.assertTrue(map.returnObject("a", o1)); + + map.invalidateObject("a", o1); + + Assert.assertNull(map.borrowObject("a")); + } + + @Test + public void testClear() { + map.registerObject("a", o1); + map.registerObject("b", o2); + map.registerObject("c", o3); + + Assert.assertTrue(map.returnObject("a", o1)); + Assert.assertTrue(map.returnObject("b", o2)); + + Collection targets = map.clear(); + + Assert.assertEquals( + new HashSet(Arrays.asList(o1, o2)), + new HashSet(targets)); + + Assert.assertNull(map.borrowObject("a")); + Assert.assertNull(map.borrowObject("b")); + Assert.assertFalse(map.returnObject("c", o3)); + } + + @Test + public void testClearWithKey() { + map.registerObject("a", o1); + map.registerObject("a", o2); + map.registerObject("b", o3); + map.registerObject("b", o4); + + Assert.assertTrue(map.returnObject("a", o1)); + Assert.assertTrue(map.returnObject("b", o3)); + + Collection targets = map.clear("a"); + + Assert.assertEquals( + new HashSet(Collections.singleton(o1)), + new HashSet(targets)); + + Assert.assertNull(map.borrowObject("a")); + Assert.assertFalse(map.returnObject("a", o2)); + + Assert.assertSame(o3, map.borrowObject("b")); + Assert.assertTrue(map.returnObject("b", o4)); + } + + @Test + public void testSizeAndEmpty() { + Assert.assertEquals(0, map.size()); + Assert.assertEquals(0, map.size("a")); + Assert.assertEquals(0, map.size("b")); + Assert.assertTrue(map.isEmpty()); + + map.registerObject("a", o1); + + Assert.assertEquals(1, map.size()); + Assert.assertEquals(1, map.size("a")); + Assert.assertEquals(0, map.size("b")); + Assert.assertFalse(map.isEmpty()); + + map.registerObject("a", o2); + + Assert.assertEquals(2, map.size()); + Assert.assertEquals(2, map.size("a")); + Assert.assertEquals(0, map.size("b")); + Assert.assertFalse(map.isEmpty()); + + map.registerObject("b", o3); + + Assert.assertEquals(3, map.size()); + Assert.assertEquals(2, map.size("a")); + Assert.assertEquals(1, map.size("b")); + Assert.assertFalse(map.isEmpty()); + + map.clear(); + + Assert.assertEquals(0, map.size()); + Assert.assertEquals(0, map.size("a")); + Assert.assertEquals(0, map.size("b")); + Assert.assertTrue(map.isEmpty()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestReusableSharedMap.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestReusableSharedMap.java new file mode 100644 index 0000000..9f0823e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestReusableSharedMap.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import org.apache.hadoop.hbase.SmallTests; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestReusableSharedMap extends AbstractTestSharedMap { + @Override + SharedMap newSharedMapWith3MaxPoolCount() { + return new ReusableSharedMap(3); + } + + @Test + public void testOverRegisters() { + map.registerObject("a", o1); + map.registerObject("a", o2); + map.registerObject("a", o3); + map.registerObject("a", o4); + + Assert.assertNull(map.borrowObject("a")); + + Assert.assertTrue(map.returnObject("a", o1)); + Assert.assertTrue(map.returnObject("a", o2)); + Assert.assertTrue(map.returnObject("a", o3)); + Assert.assertFalse(map.returnObject("a", o4)); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertSame(o3, map.borrowObject("a")); + Assert.assertNull(map.borrowObject("a")); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestRoundRobinSharedMap.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestRoundRobinSharedMap.java new file mode 100644 index 0000000..56b9eb8 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestRoundRobinSharedMap.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import org.apache.hadoop.hbase.SmallTests; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestRoundRobinSharedMap extends AbstractTestSharedMap { + @Override + SharedMap newSharedMapWith3MaxPoolCount() { + return new RoundRobinSharedMap(3); + } + + @Test + public void testFullRegisters() { + map.registerObject("a", o1); + map.registerObject("a", o2); + + Assert.assertNull(map.borrowObject("a")); + + map.registerObject("a", o3); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertSame(o3, map.borrowObject("a")); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertSame(o3, map.borrowObject("a")); + } + + @Test + public void testOverRegisters() { + map.registerObject("a", o1); + map.registerObject("a", o2); + map.registerObject("a", o3); + map.registerObject("a", o4); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertSame(o3, map.borrowObject("a")); + Assert.assertSame(o4, map.borrowObject("a")); + + Assert.assertTrue(map.returnObject("a", o1)); + Assert.assertTrue(map.returnObject("a", o2)); + Assert.assertTrue(map.returnObject("a", o3)); + Assert.assertTrue(map.returnObject("a", o4)); + + Assert.assertTrue(map.returnObject("a", o1)); + Assert.assertTrue(map.returnObject("a", o2)); + Assert.assertTrue(map.returnObject("a", o3)); + Assert.assertFalse(map.returnObject("a", o4)); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertSame(o3, map.borrowObject("a")); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertSame(o3, map.borrowObject("a")); + } + + @Test + public void testIdleObjectPriority() { + map.registerObject("a", o1); + map.registerObject("a", o2); + map.registerObject("a", o3); + + map.returnObject("a", o2); + + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertSame(o3, map.borrowObject("a")); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestThreadLocalSharedMapDecorator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestThreadLocalSharedMapDecorator.java new file mode 100644 index 0000000..cb99414 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestThreadLocalSharedMapDecorator.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.hbase.SmallTests; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestThreadLocalSharedMapDecorator { + SharedMap map; + Object o1; + Object o2; + Object o3; + + @Before + public void setUp() { + map = new ThreadLocalSharedMapDecorator( + new RoundRobinSharedMap(3)); + o1 = new Object(); + o2 = new Object(); + o3 = new Object(); + } + + @Test + public void testRegisterAndBorrow() { + Assert.assertNull(map.borrowObject("a")); + + map.registerObject("a", o1); + map.registerObject("a", o2); + map.registerObject("a", o3); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertSame(o1, map.borrowObject("a")); + } + + @Test + public void testReturnAndBorrow() { + Assert.assertNull(map.borrowObject("a")); + + map.registerObject("a", o1); + Assert.assertTrue(map.returnObject("a", o1)); + + map.registerObject("a", o2); + Assert.assertTrue(map.returnObject("a", o2)); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertTrue(map.returnObject("a", o1)); + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertTrue(map.returnObject("a", o1)); + Assert.assertTrue(map.returnObject("a", o1)); + + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertTrue(map.returnObject("a", o2)); + Assert.assertTrue(map.returnObject("a", o2)); + + Assert.assertSame(o1, map.borrowObject("a")); + } + + @Test + public void testRejectDuplicateRegistration() { + Assert.assertTrue(map.registerObject("a", o1)); + Assert.assertFalse(map.registerObject("a", o1)); + } + + @Test + public void testInvalidate() { + map.registerObject("a", o1); + Assert.assertTrue(map.returnObject("a", o1)); + + map.registerObject("a", o2); + Assert.assertTrue(map.returnObject("a", o2)); + + Assert.assertSame(o1, map.borrowObject("a")); + + map.invalidateObject("a", o1); + + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertFalse(map.returnObject("a", o1)); + } + + @Test + public void testClear() { + map.registerObject("a", o1); + + map.clear(); + + // We still enable to borrow until we return completely + // or invalidate from inside the thread. + Assert.assertSame(o1, map.borrowObject("a")); + + Assert.assertTrue(map.returnObject("a", o1)); + Assert.assertFalse(map.returnObject("a", o1)); // completely returned and rejected + } + + @Test + public void testThreadLocal() throws Exception { + map.registerObject("a", o1); + Assert.assertTrue(map.returnObject("a", o1)); + + map.registerObject("a", o2); + Assert.assertTrue(map.returnObject("a", o2)); + + Assert.assertSame(o1, map.borrowObject("a")); + Assert.assertTrue(map.returnObject("a", o1)); + + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertTrue(map.returnObject("a", o2)); + + final AtomicReference borrowedRef = new AtomicReference(); + + Thread thread = new Thread() { + @Override + public void run() { + borrowedRef.set(map.borrowObject("a")); + } + }; + + thread.start(); + thread.join(); + + Assert.assertSame(o1, borrowedRef.get()); + + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertTrue(map.returnObject("a", o2)); + + Assert.assertSame(o2, map.borrowObject("a")); + Assert.assertTrue(map.returnObject("a", o2)); + } +}