Description
Bug in ScannerCallableWithReplicas.addCallsForOtherReplicas method
code in ScannerCallableWithReplicas.addCallsForOtherReplicas
private int addCallsForOtherReplicas( BoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl, int min, int max) { if (scan.getConsistency() == Consistency.STRONG) { return 0; // not scheduling on other replicas for strong consistency } for (int id = min; id <= max; id++) { if (currentScannerCallable.getHRegionInfo().getReplicaId() == id) { continue; //this was already scheduled earlier } ScannerCallable s = currentScannerCallable.getScannerCallableForReplica(id); if (this.lastResult != null) { s.getScan().setStartRow(this.lastResult.getRow()); } outstandingCallables.add(s); RetryingRPC retryingOnReplica = new RetryingRPC(s); cs.submit(retryingOnReplica); } return max - min + 1; //bug? should be "max - min",because "continue" //always happen once }
It can cause completed < submitted always so that the following code will be infinitely blocked.
code in ScannerCallableWithReplicas.call
// submitted larger than the actual one submitted += addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1); try { //here will be affected while (completed < submitted) { try { Future<Pair<Result[], ScannerCallable>> f = cs.take(); Pair<Result[], ScannerCallable> r = f.get(); if (r != null && r.getSecond() != null) { updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool); } return r == null ? null : r.getFirst(); // great we got an answer } catch (ExecutionException e) { // if not cancel or interrupt, wait until all RPC's are done // one of the tasks failed. Save the exception for later. if (exceptions == null) exceptions = new ArrayList<ExecutionException>(rl.size()); exceptions.add(e); completed++; } } } catch (CancellationException e) { throw new InterruptedIOException(e.getMessage()); } catch (InterruptedException e) { throw new InterruptedIOException(e.getMessage()); } finally { // We get there because we were interrupted or because one or more of the // calls succeeded or failed. In all case, we stop all our tasks. cs.cancelAll(true); }
If all replica-RS occur ExecutionException ,it will be infinitely blocked in cs.take()