Index: src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java (revision 1125593) +++ src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java (working copy) @@ -140,16 +140,24 @@ */ public void createLease(String leaseName, final LeaseListener listener) throws LeaseStillHeldException { - if (stopRequested) { + addLease(new Lease(leaseName, listener)); + } + + /** + * Inserts lease. Resets expiration before insertion. + * @param lease + * @throws LeaseStillHeldException + */ + public void addLease(final Lease lease) throws LeaseStillHeldException { + if (this.stopRequested) { return; } - Lease lease = new Lease(leaseName, listener, - System.currentTimeMillis() + leasePeriod); + lease.setExpirationTime(System.currentTimeMillis() + this.leasePeriod); synchronized (leaseQueue) { - if (leases.containsKey(leaseName)) { - throw new LeaseStillHeldException(leaseName); + if (leases.containsKey(lease.getLeaseName())) { + throw new LeaseStillHeldException(lease.getLeaseName()); } - leases.put(leaseName, lease); + leases.put(lease.getLeaseName(), lease); leaseQueue.add(lease); } } @@ -189,7 +197,7 @@ // in a corrupt leaseQueue. if (lease == null || !leaseQueue.remove(lease)) { throw new LeaseException("lease '" + leaseName + - "' does not exist or has already expired"); + "' does not exist or has already expired"); } lease.setExpirationTime(System.currentTimeMillis() + leasePeriod); leaseQueue.add(lease); @@ -198,26 +206,44 @@ /** * Client explicitly cancels a lease. + * @param leaseName name of lease + * @throws LeaseException + */ + public void cancelLease(final String leaseName) throws LeaseException { + removeLease(leaseName); + } + + /** + * Remove named lease. + * Lease is removed from the list of leases and removed from the delay queue. + * Lease can be resinserted using {@link #addLease(Lease)} * * @param leaseName name of lease * @throws LeaseException + * @return Removed lease */ - public void cancelLease(final String leaseName) throws LeaseException { + Lease removeLease(final String leaseName) throws LeaseException { + Lease lease = null; synchronized (leaseQueue) { - Lease lease = leases.remove(leaseName); + lease = leases.remove(leaseName); if (lease == null) { throw new LeaseException("lease '" + leaseName + "' does not exist"); } leaseQueue.remove(lease); } + return lease; } /** This class tracks a single Lease. */ - private static class Lease implements Delayed { + static class Lease implements Delayed { private final String leaseName; private final LeaseListener listener; private long expirationTime; + Lease(final String leaseName, LeaseListener listener) { + this(leaseName, listener, 0); + } + Lease(final String leaseName, LeaseListener listener, long expirationTime) { this.leaseName = leaseName; this.listener = listener; @@ -269,14 +295,5 @@ public void setExpirationTime(long expirationTime) { this.expirationTime = expirationTime; } - - /** - * Get the expiration time for that lease - * @return expiration time - */ - public long getExpirationTime() { - return this.expirationTime; - } - } -} +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1125593) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -1911,26 +1911,27 @@ } public Result[] next(final long scannerId, int nbRows) throws IOException { + String scannerName = String.valueOf(scannerId); + InternalScanner s = this.scanners.get(scannerName); + if (s == null) throw new UnknownScannerException("Name: " + scannerName); try { - String scannerName = String.valueOf(scannerId); - InternalScanner s = this.scanners.get(scannerName); - if (s == null) { - throw new UnknownScannerException("Name: " + scannerName); - } + checkOpen(); + } catch (IOException e) { + // If checkOpen failed, server not running or filesystem gone, + // cancel this lease; filesystem is gone or we're closing or something. try { - checkOpen(); - } catch (IOException e) { - // If checkOpen failed, server not running or filesystem gone, - // cancel this lease; filesystem is gone or we're closing or something. - try { - this.leases.cancelLease(scannerName); - } catch (LeaseException le) { - LOG.info("Server shutting down and client tried to access missing scanner " + - scannerName); - } - throw e; + this.leases.cancelLease(scannerName); + } catch (LeaseException le) { + LOG.info("Server shutting down and client tried to access missing scanner " + + scannerName); } - this.leases.renewLease(scannerName); + throw e; + } + Leases.Lease lease = null; + try { + // Remove lease while its being processed in server; protects against case + // where processing of request takes > lease expiration time. + lease = this.leases.removeLease(scannerName); List results = new ArrayList(nbRows); long currentScanResultSize = 0; List values = new ArrayList(); @@ -1992,10 +1993,15 @@ : results.toArray(new Result[0]); } catch (Throwable t) { if (t instanceof NotServingRegionException) { - String scannerName = String.valueOf(scannerId); this.scanners.remove(scannerName); } throw convertThrowableToIOE(cleanup(t)); + } finally { + // We're done. On way out readd the above removed lease. Adding resets + // expiration time on lease. + if (this.scanners.containsKey(scannerName)) { + if (lease != null) this.leases.addLease(lease); + } } }