Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (revision 1596113) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (working copy) @@ -48,8 +48,7 @@ * If there are multiple regions in a table, this scanner will iterate * through them all. */ -@InterfaceAudience.Public -@InterfaceStability.Stable +@InterfaceAudience.Private public class ClientScanner extends AbstractClientScanner { private final Log LOG = LogFactory.getLog(this.getClass()); protected Scan scan; @@ -99,75 +98,7 @@ new RpcRetryingCallerFactory(conf), pool, primaryOperationTimeout); } - /** - * Create a new ClientScanner for the specified table. - * Note that the passed {@link Scan}'s start row maybe changed. - * - * @param conf - * @param scan - * @param tableName - * @param connection - * @param rpcFactory - * @throws IOException - */ - public ClientScanner(final Configuration conf, final Scan scan, - final TableName tableName, ClusterConnection connection, - RpcRetryingCallerFactory rpcFactory) - throws IOException { - this(conf, scan, tableName, connection, rpcFactory, null, 0); - } - - /** - * Create a new ClientScanner for the specified table. A ClusterConnection will be - * retrieved using the passed Configuration. - * Note that the passed {@link Scan}'s start row maybe changed. - * - * @param conf The {@link Configuration} to use. - * @param scan {@link Scan} to use in this scanner - * @param tableName The table that we wish to scan - * @throws IOException - */ - public ClientScanner(final Configuration conf, final Scan scan, - final TableName tableName) throws IOException { - this(conf, scan, tableName, ConnectionManager.getConnectionInternal(conf)); - } - - /** - * @deprecated Use {@link #ClientScanner(Configuration, Scan, TableName)} - */ - @Deprecated - public ClientScanner(final Configuration conf, final Scan scan, - final byte [] tableName) throws IOException { - this(conf, scan, TableName.valueOf(tableName)); - } - - - /** - * Create a new ClientScanner for the specified table - * Note that the passed {@link Scan}'s start row maybe changed. - * - * @param conf The {@link Configuration} to use. - * @param scan {@link Scan} to use in this scanner - * @param tableName The table that we wish to scan - * @param connection Connection identifying the cluster - * @throws IOException - */ - public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName, - ClusterConnection connection) throws IOException { - this(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf), null, 0); - } - /** - * @deprecated Use {@link #ClientScanner(Configuration, Scan, TableName, HConnection)} - */ - @Deprecated - public ClientScanner(final Configuration conf, final Scan scan, final byte [] tableName, - ClusterConnection connection) throws IOException { - this(conf, scan, TableName.valueOf(tableName), connection, new RpcRetryingCallerFactory(conf), - null, 0); - } - - /** * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start * row maybe changed changed. * @param conf The {@link Configuration} to use. Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java (revision 1596113) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java (working copy) @@ -48,8 +48,7 @@ * * For small scan, it will get better performance than {@link ClientScanner} */ -@InterfaceAudience.Public -@InterfaceStability.Evolving +@InterfaceAudience.Private public class ClientSmallScanner extends ClientScanner { private final Log LOG = LogFactory.getLog(this.getClass()); private ScannerCallableWithReplicas smallScanCallable = null; @@ -58,36 +57,6 @@ private byte[] skipRowOfFirstResult = null; /** - * Create a new ClientSmallScanner for the specified table. An HConnection - * will be retrieved using the passed Configuration. Note that the passed - * {@link Scan} 's start row maybe changed. - * - * @param conf The {@link Configuration} to use. - * @param scan {@link Scan} to use in this scanner - * @param tableName The table that we wish to rangeGet - * @throws IOException - */ - public ClientSmallScanner(final Configuration conf, final Scan scan, - final TableName tableName) throws IOException { - this(conf, scan, tableName, ConnectionManager.getConnectionInternal(conf)); - } - - /** - * Create a new ClientSmallScanner for the specified table. An HConnection - * will be retrieved using the passed Configuration. Note that the passed - * {@link Scan} 's start row maybe changed. - * @param conf - * @param scan - * @param tableName - * @param connection - * @throws IOException - */ - public ClientSmallScanner(final Configuration conf, final Scan scan, - final TableName tableName, ClusterConnection connection) throws IOException { - this(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf)); - } - - /** * Create a new ClientSmallScanner for the specified table. Note that the passed * {@link Scan} 's start row maybe changed. * @param conf @@ -101,26 +70,10 @@ public ClientSmallScanner(final Configuration conf, final Scan scan, final TableName tableName, ClusterConnection connection, ExecutorService pool, int primaryOperationTimeout) throws IOException { - super(conf, scan, tableName, connection, pool, primaryOperationTimeout); + super(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf), pool, + primaryOperationTimeout); } - /** - * Create a new ShortClientScanner for the specified table Note that the - * passed {@link Scan}'s start row maybe changed changed. - * - * @param conf The {@link Configuration} to use. - * @param scan {@link Scan} to use in this scanner - * @param tableName The table that we wish to rangeGet - * @param connection Connection identifying the cluster - * @param rpcFactory - * @throws IOException - */ - public ClientSmallScanner(final Configuration conf, final Scan scan, - final TableName tableName, ClusterConnection connection, - RpcRetryingCallerFactory rpcFactory) throws IOException { - super(conf, scan, tableName, connection, rpcFactory); - } - @Override protected void initializeScannerInConstruction() throws IOException { // No need to initialize the scanner when constructing instance, do it when @@ -196,6 +149,7 @@ @Override public Result[] call() throws IOException { + if (this.closed) return null; if (Thread.interrupted()) { throw new InterruptedIOException(); } @@ -217,9 +171,6 @@ public ScannerCallable getScannerCallableForReplica(int id) { return new SmallScannerCallable(id, this.getCaching()); } - - @Override - public void setClose(){} } @Override Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java (revision 1596113) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java (working copy) @@ -129,7 +129,7 @@ protected long currentWriteBufferSize; protected int scannerCaching; private int maxKeyValueSize; - private ExecutorService pool; // For Multi + private ExecutorService pool; // For Multi & Scan private boolean closed; private int operationTimeout; private int retries; Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java (revision 1596113) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java (working copy) @@ -36,26 +36,12 @@ /** * A reversed client scanner which support backward scanning */ -@InterfaceAudience.Public -@InterfaceStability.Evolving +@InterfaceAudience.Private public class ReversedClientScanner extends ClientScanner { private static final Log LOG = LogFactory.getLog(ReversedClientScanner.class); // A byte array in which all elements are the max byte, and it is used to // construct closest front row static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9); - /** - * Create a new ReversibleClientScanner for the specified table Note that the - * passed {@link Scan}'s start row maybe changed. - * @param conf The {@link Configuration} to use. - * @param scan {@link Scan} to use in this scanner - * @param tableName The table that we wish to scan - * @param connection Connection identifying the cluster - * @throws IOException - */ - public ReversedClientScanner(Configuration conf, Scan scan, - TableName tableName, ClusterConnection connection) throws IOException { - super(conf, scan, tableName, connection); - } /** * Create a new ReversibleClientScanner for the specified table Note that the @@ -71,7 +57,8 @@ public ReversedClientScanner(Configuration conf, Scan scan, TableName tableName, ClusterConnection connection, ExecutorService pool, int primaryOperationTimeout) throws IOException { - super(conf, scan, tableName, connection, pool, primaryOperationTimeout); + super(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf), pool, + primaryOperationTimeout); } @Override Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java (revision 1596113) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java (working copy) @@ -132,9 +132,9 @@ RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true, RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName, currentScannerCallable.getRow()); + // allocate a boundedcompletion pool of some multiple of number of replicas. - // We want accommodate the "scan" RPC call and the "close" RPC call (we schedule "close" - // RPCs for unneeded replica scans using the same pool) + // We want to accomodate some RPCs for redundant replica scans (but are still in progress) BoundedCompletionService> cs = new BoundedCompletionService>(pool, rl.size() * 5); @@ -151,7 +151,7 @@ if (f != null) { Pair r = f.get(); if (r != null && r.getSecond() != null) { - updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, cs); + updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool); } return r == null ? null : r.getFirst(); //great we got a response } @@ -175,7 +175,7 @@ Future> f = cs.take(); Pair r = f.get(); if (r != null && r.getSecond() != null) { - updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, cs); + updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool); } return r == null ? null : r.getFirst(); // great we got an answer } catch (ExecutionException e) { @@ -204,7 +204,7 @@ } private void updateCurrentlyServingReplica(ScannerCallable scanner, Result[] result, - AtomicBoolean done, BoundedCompletionService> cs) { + AtomicBoolean done, ExecutorService pool) { if (done.compareAndSet(false, true)) { if (currentScannerCallable != scanner) replicaSwitched.set(true); currentScannerCallable = scanner; @@ -226,7 +226,7 @@ // the table is closed (when the awaitTermination of the underlying pool is called) s.setClose(); RetryingRPC r = new RetryingRPC(s); - cs.submit(r); + pool.submit(r); } // now clear outstandingCallables since we scheduled a close for all the contained scanners outstandingCallables.clear();