Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (revision 1596113) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (working copy) @@ -48,8 +48,7 @@ * If there are multiple regions in a table, this scanner will iterate * through them all. */ -@InterfaceAudience.Public -@InterfaceStability.Stable +@InterfaceAudience.Private public class ClientScanner extends AbstractClientScanner { private final Log LOG = LogFactory.getLog(this.getClass()); protected Scan scan; @@ -78,6 +77,7 @@ protected final int primaryOperationTimeout; private int retries; protected final ExecutorService pool; + protected final ExecutorService closePool; /** * Create a new ClientScanner for the specified table. @@ -89,96 +89,30 @@ * @param connection * @param pool * @param primaryOperationTimeout + * @param closePool * @throws IOException */ public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName, ClusterConnection connection, - ExecutorService pool, int primaryOperationTimeout) + ExecutorService pool, int primaryOperationTimeout, ExecutorService closePool) throws IOException { this(conf, scan, tableName, connection, - new RpcRetryingCallerFactory(conf), pool, primaryOperationTimeout); + new RpcRetryingCallerFactory(conf), pool, primaryOperationTimeout, closePool); } - /** - * Create a new ClientScanner for the specified table. - * Note that the passed {@link Scan}'s start row maybe changed. - * - * @param conf - * @param scan - * @param tableName - * @param connection - * @param rpcFactory - * @throws IOException - */ - public ClientScanner(final Configuration conf, final Scan scan, - final TableName tableName, ClusterConnection connection, - RpcRetryingCallerFactory rpcFactory) - throws IOException { - this(conf, scan, tableName, connection, rpcFactory, null, 0); - } - - /** - * Create a new ClientScanner for the specified table. A ClusterConnection will be - * retrieved using the passed Configuration. - * Note that the passed {@link Scan}'s start row maybe changed. - * - * @param conf The {@link Configuration} to use. - * @param scan {@link Scan} to use in this scanner - * @param tableName The table that we wish to scan - * @throws IOException - */ - public ClientScanner(final Configuration conf, final Scan scan, - final TableName tableName) throws IOException { - this(conf, scan, tableName, ConnectionManager.getConnectionInternal(conf)); - } - - /** - * @deprecated Use {@link #ClientScanner(Configuration, Scan, TableName)} - */ - @Deprecated - public ClientScanner(final Configuration conf, final Scan scan, - final byte [] tableName) throws IOException { - this(conf, scan, TableName.valueOf(tableName)); - } - - - /** - * Create a new ClientScanner for the specified table - * Note that the passed {@link Scan}'s start row maybe changed. - * - * @param conf The {@link Configuration} to use. - * @param scan {@link Scan} to use in this scanner - * @param tableName The table that we wish to scan - * @param connection Connection identifying the cluster - * @throws IOException - */ - public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName, - ClusterConnection connection) throws IOException { - this(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf), null, 0); - } - /** - * @deprecated Use {@link #ClientScanner(Configuration, Scan, TableName, HConnection)} - */ - @Deprecated - public ClientScanner(final Configuration conf, final Scan scan, final byte [] tableName, - ClusterConnection connection) throws IOException { - this(conf, scan, TableName.valueOf(tableName), connection, new RpcRetryingCallerFactory(conf), - null, 0); - } - - /** * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start * row maybe changed changed. * @param conf The {@link Configuration} to use. * @param scan {@link Scan} to use in this scanner * @param tableName The table that we wish to scan * @param connection Connection identifying the cluster + * @param closePool * @throws IOException */ public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, ExecutorService pool, - int primaryOperationTimeout) throws IOException { + int primaryOperationTimeout, ExecutorService closePool) throws IOException { if (LOG.isTraceEnabled()) { LOG.trace("Scan table=" + tableName + ", startRow=" + Bytes.toStringBinary(scan.getStartRow())); @@ -188,6 +122,7 @@ this.lastNext = System.currentTimeMillis(); this.connection = connection; this.pool = pool; + this.closePool = closePool; this.primaryOperationTimeout = primaryOperationTimeout; this.retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); @@ -382,7 +317,7 @@ s.setCaching(nbRows); ScannerCallableWithReplicas sr = new ScannerCallableWithReplicas(tableName, getConnection(), s, pool, primaryOperationTimeout, scan, - retries, scannerTimeout, caching, conf, caller); + retries, scannerTimeout, caching, conf, caller, closePool); return sr; } Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java (revision 1596113) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java (working copy) @@ -48,8 +48,7 @@ * * For small scan, it will get better performance than {@link ClientScanner} */ -@InterfaceAudience.Public -@InterfaceStability.Evolving +@InterfaceAudience.Private public class ClientSmallScanner extends ClientScanner { private final Log LOG = LogFactory.getLog(this.getClass()); private ScannerCallableWithReplicas smallScanCallable = null; @@ -58,36 +57,6 @@ private byte[] skipRowOfFirstResult = null; /** - * Create a new ClientSmallScanner for the specified table. An HConnection - * will be retrieved using the passed Configuration. Note that the passed - * {@link Scan} 's start row maybe changed. - * - * @param conf The {@link Configuration} to use. - * @param scan {@link Scan} to use in this scanner - * @param tableName The table that we wish to rangeGet - * @throws IOException - */ - public ClientSmallScanner(final Configuration conf, final Scan scan, - final TableName tableName) throws IOException { - this(conf, scan, tableName, ConnectionManager.getConnectionInternal(conf)); - } - - /** - * Create a new ClientSmallScanner for the specified table. An HConnection - * will be retrieved using the passed Configuration. Note that the passed - * {@link Scan} 's start row maybe changed. - * @param conf - * @param scan - * @param tableName - * @param connection - * @throws IOException - */ - public ClientSmallScanner(final Configuration conf, final Scan scan, - final TableName tableName, ClusterConnection connection) throws IOException { - this(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf)); - } - - /** * Create a new ClientSmallScanner for the specified table. Note that the passed * {@link Scan} 's start row maybe changed. * @param conf @@ -96,31 +65,16 @@ * @param connection * @param pool * @param primaryOperationTimeout + * @param closePool * @throws IOException */ public ClientSmallScanner(final Configuration conf, final Scan scan, final TableName tableName, ClusterConnection connection, ExecutorService pool, - int primaryOperationTimeout) throws IOException { - super(conf, scan, tableName, connection, pool, primaryOperationTimeout); + int primaryOperationTimeout, ExecutorService closePool) throws IOException { + super(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf), pool, + primaryOperationTimeout, closePool); } - /** - * Create a new ShortClientScanner for the specified table Note that the - * passed {@link Scan}'s start row maybe changed changed. - * - * @param conf The {@link Configuration} to use. - * @param scan {@link Scan} to use in this scanner - * @param tableName The table that we wish to rangeGet - * @param connection Connection identifying the cluster - * @param rpcFactory - * @throws IOException - */ - public ClientSmallScanner(final Configuration conf, final Scan scan, - final TableName tableName, ClusterConnection connection, - RpcRetryingCallerFactory rpcFactory) throws IOException { - super(conf, scan, tableName, connection, rpcFactory); - } - @Override protected void initializeScannerInConstruction() throws IOException { // No need to initialize the scanner when constructing instance, do it when @@ -183,7 +137,7 @@ ScannerCallableWithReplicas scannerCallableWithReplicas = new ScannerCallableWithReplicas(getTable(), getConnection(), s, getPool(), getPrimaryOperationTimeout(), getScan(), getRetries(), - getScannerTimeout(), cacheNum, conf, caller); + getScannerTimeout(), cacheNum, conf, caller, closePool); return scannerCallableWithReplicas; } @@ -196,6 +150,7 @@ @Override public Result[] call() throws IOException { + if (this.closed) return null; if (Thread.interrupted()) { throw new InterruptedIOException(); } @@ -217,9 +172,6 @@ public ScannerCallable getScannerCallableForReplica(int id) { return new SmallScannerCallable(id, this.getCaching()); } - - @Override - public void setClose(){} } @Override Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java (revision 1596113) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java (working copy) @@ -129,7 +129,8 @@ protected long currentWriteBufferSize; protected int scannerCaching; private int maxKeyValueSize; - private ExecutorService pool; // For Multi + private ExecutorService pool; // For Multi & Scan + private ExecutorService closePool; // For scheduling close for scans private boolean closed; private int operationTimeout; private int retries; @@ -200,6 +201,7 @@ this.configuration = conf; this.pool = getDefaultExecutor(conf); + this.closePool = getDefaultExecutor(this.configuration); this.finishSetup(); } @@ -221,6 +223,7 @@ this.configuration = connection.getConfiguration(); this.pool = getDefaultExecutor(this.configuration); + this.closePool = getDefaultExecutor(this.configuration); this.finishSetup(); } @@ -282,6 +285,7 @@ this.cleanupPoolOnClose = false; } this.tableName = tableName; + this.closePool = getDefaultExecutor(this.configuration); this.cleanupConnectionOnClose = true; this.finishSetup(); } @@ -339,6 +343,7 @@ } else { this.cleanupPoolOnClose = false; } + this.closePool = getDefaultExecutor(this.configuration); this.finishSetup(); } @@ -777,13 +782,13 @@ if (scan.isSmall() && !scan.isReversed()) { return new ClientSmallScanner(getConfiguration(), scan, getName(), - this.connection, pool, replicaCallTimeoutMicroSecondScan); + this.connection, pool, replicaCallTimeoutMicroSecondScan, closePool); } else if (scan.isReversed()) { return new ReversedClientScanner(getConfiguration(), scan, getName(), - this.connection, pool, replicaCallTimeoutMicroSecondScan); + this.connection, pool, replicaCallTimeoutMicroSecondScan, closePool); } return new ClientScanner(getConfiguration(), scan, - getName(), this.connection, pool, replicaCallTimeoutMicroSecondScan); + getName(), this.connection, pool, replicaCallTimeoutMicroSecondScan, closePool); } /** @@ -1411,6 +1416,18 @@ LOG.warn("waitForTermination interrupted"); } } + if (closePool != null) { + this.closePool.shutdown(); + try { + boolean terminated = false; + do { + // wait until the pool has terminated + terminated = this.closePool.awaitTermination(60, TimeUnit.SECONDS); + } while (!terminated); + } catch (InterruptedException e) { + LOG.warn("waitForTermination interrupted"); + } + } if (cleanupConnectionOnClose) { if (this.connection != null) { this.connection.close(); Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java (revision 1596113) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java (working copy) @@ -36,26 +36,12 @@ /** * A reversed client scanner which support backward scanning */ -@InterfaceAudience.Public -@InterfaceStability.Evolving +@InterfaceAudience.Private public class ReversedClientScanner extends ClientScanner { private static final Log LOG = LogFactory.getLog(ReversedClientScanner.class); // A byte array in which all elements are the max byte, and it is used to // construct closest front row static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9); - /** - * Create a new ReversibleClientScanner for the specified table Note that the - * passed {@link Scan}'s start row maybe changed. - * @param conf The {@link Configuration} to use. - * @param scan {@link Scan} to use in this scanner - * @param tableName The table that we wish to scan - * @param connection Connection identifying the cluster - * @throws IOException - */ - public ReversedClientScanner(Configuration conf, Scan scan, - TableName tableName, ClusterConnection connection) throws IOException { - super(conf, scan, tableName, connection); - } /** * Create a new ReversibleClientScanner for the specified table Note that the @@ -66,12 +52,14 @@ * @param connection * @param pool * @param primaryOperationTimeout + * @param closePool * @throws IOException */ public ReversedClientScanner(Configuration conf, Scan scan, TableName tableName, ClusterConnection connection, ExecutorService pool, - int primaryOperationTimeout) throws IOException { - super(conf, scan, tableName, connection, pool, primaryOperationTimeout); + int primaryOperationTimeout, ExecutorService closePool) throws IOException { + super(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf), pool, + primaryOperationTimeout, closePool); } @Override @@ -152,7 +140,7 @@ s.setCaching(nbRows); ScannerCallableWithReplicas sr = new ScannerCallableWithReplicas(getTable(), getConnection(), s, pool, primaryOperationTimeout, scan, - getRetries(), getScannerTimeout(), caching, getConf(), caller); + getRetries(), getScannerTimeout(), caching, getConf(), caller, closePool); return sr; } Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java (revision 1596113) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java (working copy) @@ -61,6 +61,7 @@ AtomicBoolean replicaSwitched = new AtomicBoolean(false); final ClusterConnection cConnection; protected final ExecutorService pool; + protected final ExecutorService closePool; protected final int timeBeforeReplicas; private final Scan scan; private final int retries; @@ -74,10 +75,11 @@ public ScannerCallableWithReplicas (TableName tableName, ClusterConnection cConnection, ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan, int retries, int scannerTimeout, int caching, Configuration conf, - RpcRetryingCaller caller) { + RpcRetryingCaller caller, ExecutorService closePool) { this.currentScannerCallable = baseCallable; this.cConnection = cConnection; this.pool = pool; + this.closePool = closePool; if (timeBeforeReplicas <= 0) { throw new IllegalArgumentException("Invalid value of operation timeout on the primary"); } @@ -132,11 +134,17 @@ RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true, RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName, currentScannerCallable.getRow()); - // allocate a boundedcompletion pool of some multiple of number of replicas. - // We want accommodate the "scan" RPC call and the "close" RPC call (we schedule "close" - // RPCs for unneeded replica scans using the same pool) + // we maintain two completion services - one for Scans and another for Closes. It simplifies + // management of the pools when we want to cancel tasks (we would only cancel + // the scan tasks and not the close tasks). The sizes of the completionservice is a multiple of the + // number of replicas just to account for cases where we have too many replica switches + // happening for a given 'next' or 'openScanner' call, and there are quite a few tasks + // to complete (this is not the normal case). + // TODO: if the completionservice is full, we should block BoundedCompletionService> cs = new BoundedCompletionService>(pool, rl.size() * 5); + BoundedCompletionService> closeCS = + new BoundedCompletionService>(closePool, rl.size() * 5); List exceptions = null; int submitted = 0, completed = 0; @@ -151,7 +159,7 @@ if (f != null) { Pair r = f.get(); if (r != null && r.getSecond() != null) { - updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, cs); + updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, closeCS); } return r == null ? null : r.getFirst(); //great we got a response } @@ -175,7 +183,7 @@ Future> f = cs.take(); Pair r = f.get(); if (r != null && r.getSecond() != null) { - updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, cs); + updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, closeCS); } return r == null ? null : r.getFirst(); // great we got an answer } catch (ExecutionException e) {