diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index 6b6acf0..55d032a 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -71,6 +71,7 @@ class ScannerCallableWithReplicas implements RetryingCallable { private int scannerTimeout; private Set outstandingCallables = new HashSet<>(); private boolean someRPCcancelled = false; //required for testing purposes only + private int regionReplication = 0; public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection, ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan, @@ -142,21 +143,28 @@ class ScannerCallableWithReplicas implements RetryingCallable { //2. We should close the "losing" scanners (scanners other than the ones we hear back // from first) // - RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true, - RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName, - currentScannerCallable.getRow()); + // Since RegionReplication is a table attribute, it wont change as long as table is enabled, + // it just needs to be set once. + + if (regionReplication <= 0) { + RegionLocations rl = RpcRetryingCallerWithReadReplicas + .getRegionLocations(true, RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName, + currentScannerCallable.getRow()); + regionReplication = rl.size(); + } + // allocate a boundedcompletion pool of some multiple of number of replicas. // We want to accomodate some RPCs for redundant replica scans (but are still in progress) ResultBoundedCompletionService> cs = new ResultBoundedCompletionService<>( RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf), pool, - rl.size() * 5); + regionReplication * 5); AtomicBoolean done = new AtomicBoolean(false); replicaSwitched.set(false); // submit call for the primary replica. - addCallsForCurrentReplica(cs, rl); + addCallsForCurrentReplica(cs); int startIndex = 0; try { @@ -179,7 +187,7 @@ class ScannerCallableWithReplicas implements RetryingCallable { // If rl's size is 1 or scan's consitency is strong, it needs to throw // out the exception from the primary replica - if ((rl.size() == 1) || (scan.getConsistency() == Consistency.STRONG)) { + if ((regionReplication == 1) || (scan.getConsistency() == Consistency.STRONG)) { // Rethrow the first exception RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries); } @@ -192,13 +200,13 @@ class ScannerCallableWithReplicas implements RetryingCallable { } // submit call for the all of the secondaries at once - int endIndex = rl.size(); + int endIndex = regionReplication; if (scan.getConsistency() == Consistency.STRONG) { // When scan's consistency is strong, do not send to the secondaries endIndex = 1; } else { // TODO: this may be an overkill for large region replication - addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1); + addCallsForOtherReplicas(cs, 0, regionReplication - 1); } try { @@ -287,15 +295,14 @@ class ScannerCallableWithReplicas implements RetryingCallable { } private void addCallsForCurrentReplica( - ResultBoundedCompletionService> cs, RegionLocations rl) { + ResultBoundedCompletionService> cs) { RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable); outstandingCallables.add(currentScannerCallable); cs.submit(retryingOnReplica, scannerTimeout, currentScannerCallable.id); } private void addCallsForOtherReplicas( - ResultBoundedCompletionService> cs, RegionLocations rl, - int min, int max) { + ResultBoundedCompletionService> cs, int min, int max) { for (int id = min; id <= max; id++) { if (currentScannerCallable.id == id) {