Index: src/java/org/apache/hadoop/hbase/Leases.java =================================================================== --- src/java/org/apache/hadoop/hbase/Leases.java (revision 929708) +++ src/java/org/apache/hadoop/hbase/Leases.java (working copy) @@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit; import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; /** * Leases @@ -90,6 +91,11 @@ if (lease == null) { continue; } + if (lease.getCurrentUsage() > 0) { + lease.setExpirationTime(System.currentTimeMillis() + leasePeriod); + leaseQueue.add(lease); + continue; + } // A lease expired. Run the expired code before removing from queue // since its presence in queue is used to see if lease exists still. if (lease.getListener() == null) { @@ -175,12 +181,44 @@ } /** - * Renew a lease + * Renew a lease and increment its usage counter + * Always call decrementLeaseUsage after you are done with a call * * @param leaseName name of lease * @throws LeaseException */ + public void incrementLeaseUsage(final String leaseName) throws LeaseException { + this.changeLeaseUsage(leaseName, 1); + } + + /** + * Decrease the usage of a lease so that it can be garbage collected + * @param leaseName Name of the lease + * @throws LeaseException If the lease doesn't exist + */ + public void decrementLeaseUsage(final String leaseName) throws LeaseException { + this.changeLeaseUsage(leaseName, -1); + } + + /** + * Renew the lease, for long running processes use increment/decrement + * @param leaseName Name of the lease + * @throws LeaseException If the lease doesn't exist + */ public void renewLease(final String leaseName) throws LeaseException { + this.changeLeaseUsage(leaseName, 0); + } + + /** + * Add the value to the usage of the lease and if it's not negative, + * renew the lease. + * @param leaseName Name of the lease + * @param add The amount to add to the lease, set it to 0 to not change the + * usage counter and still renew the lease. + * @throws LeaseException If the lease doesn't exist + */ + protected void changeLeaseUsage(final String leaseName, final int add) + throws LeaseException { synchronized (leaseQueue) { Lease lease = leases.get(leaseName); // We need to check to see if the remove is successful as the poll in the run() @@ -190,7 +228,10 @@ throw new LeaseException("lease '" + leaseName + "' does not exist or has already expired"); } - lease.setExpirationTime(System.currentTimeMillis() + leasePeriod); + lease.addUsage(add); + if (add > -1) { + lease.setExpirationTime(System.currentTimeMillis() + leasePeriod); + } leaseQueue.add(lease); } } @@ -215,6 +256,7 @@ private static class Lease implements Delayed { private final String leaseName; private final LeaseListener listener; + private final AtomicInteger currentUsage = new AtomicInteger(); private long expirationTime; Lease(final String leaseName, LeaseListener listener, long expirationTime) { @@ -276,5 +318,20 @@ public long getExpirationTime() { return this.expirationTime; } + + /** + * Add the value to the usage counter. + */ + public void addUsage(int add) { + this.currentUsage.addAndGet(add); + } + + /** + * Get the number of current users of this lease + * @return current number of users + */ + public int getCurrentUsage() { + return currentUsage.get(); + } } } Index: src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 929708) +++ src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -1885,8 +1885,8 @@ } public Result [] next(final long scannerId, int nbRows) throws IOException { + String scannerName = String.valueOf(scannerId); try { - String scannerName = String.valueOf(scannerId); InternalScanner s = this.scanners.get(scannerName); if (s == null) { throw new UnknownScannerException("Name: " + scannerName); @@ -1899,7 +1899,7 @@ this.leases.cancelLease(scannerName); throw e; } - this.leases.renewLease(scannerName); + this.leases.incrementLeaseUsage(scannerName); List results = new ArrayList(); long currentScanResultSize = 0; for (int i = 0; i < nbRows && currentScanResultSize < maxScannerResultSize; i++) { @@ -1927,10 +1927,11 @@ null: results.toArray(new Result[0]); } catch (Throwable t) { if (t instanceof NotServingRegionException) { - String scannerName = String.valueOf(scannerId); this.scanners.remove(scannerName); } throw convertThrowableToIOE(cleanup(t)); + } finally { + this.leases.decrementLeaseUsage(scannerName); } }