Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java (revision 603502) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java (working copy) @@ -19,11 +19,13 @@ */ package org.apache.hadoop.hbase; +import java.io.IOException; +import java.util.HashMap; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import java.io.*; -import java.util.*; -import java.util.concurrent.atomic.AtomicBoolean; /** * Leases @@ -45,10 +47,10 @@ protected final int leasePeriod; protected final int leaseCheckFrequency; private final Thread leaseMonitorThread; - protected final Map leases = - new HashMap(); - protected final TreeSet sortedLeases = new TreeSet(); - protected AtomicBoolean stop = new AtomicBoolean(false); + protected volatile HashMap leases = + new HashMap(); + protected volatile TreeSet sortedLeases = new TreeSet(); + protected volatile AtomicBoolean stop = new AtomicBoolean(false); /** * Creates a lease @@ -74,7 +76,7 @@ * @param name Set name on the lease checking daemon thread. */ public void setName(final String name) { - this.leaseMonitorThread.setName(name); + leaseMonitorThread.setName(name); } /** @@ -85,13 +87,13 @@ * allocation of new leases. */ public void closeAfterLeasesExpire() { - synchronized(this.leases) { - while (this.leases.size() > 0) { + synchronized (leases) { + while (leases.size() > 0) { LOG.info(Thread.currentThread().getName() + " " + Integer.toString(leases.size()) + " lease(s) " + "outstanding. Waiting for them to expire."); try { - this.leases.wait(this.leaseCheckFrequency); + leases.wait(leaseCheckFrequency); } catch (InterruptedException e) { // continue } @@ -107,17 +109,17 @@ */ public void close() { LOG.info(Thread.currentThread().getName() + " closing leases"); - this.stop.set(true); - while (this.leaseMonitorThread.isAlive()) { + stop.set(true); + while (leaseMonitorThread.isAlive()) { try { - this.leaseMonitorThread.interrupt(); - this.leaseMonitorThread.join(); + leaseMonitorThread.interrupt(); + leaseMonitorThread.join(); } catch (InterruptedException iex) { // Ignore } } - synchronized(leases) { - synchronized(sortedLeases) { + synchronized (leases) { + synchronized (sortedLeases) { leases.clear(); sortedLeases.clear(); } @@ -130,81 +132,64 @@ /** * Obtain a lease * - * @param holderId id of lease holder - * @param resourceId id of resource being leased + * @param leaseName lease identifier * @param listener listener that will process lease expirations */ - public void createLease(final long holderId, final long resourceId, + public void createLease(final String leaseName, final LeaseListener listener) { - LeaseName name = null; - synchronized(leases) { - synchronized(sortedLeases) { - Lease lease = new Lease(holderId, resourceId, listener); - name = lease.getLeaseName(); - if(leases.get(name) != null) { + synchronized (leases) { + synchronized (sortedLeases) { + if (leases.get(leaseName) != null) { throw new AssertionError("Impossible state for createLease(): " + - "Lease " + name + " is still held."); + "Lease " + leaseName + " is still held."); } - leases.put(name, lease); + Lease lease = new Lease(leaseName, listener); + leases.put(leaseName, lease); sortedLeases.add(lease); } } -// if (LOG.isDebugEnabled()) { -// LOG.debug("Created lease " + name); -// } } - /* A client renews a lease... */ /** * Renew a lease * - * @param holderId id of lease holder - * @param resourceId id of resource being leased + * @param leaseName name of lease * @throws IOException */ - public void renewLease(final long holderId, final long resourceId) - throws IOException { - LeaseName name = null; - synchronized(leases) { - synchronized(sortedLeases) { - name = createLeaseName(holderId, resourceId); - Lease lease = leases.get(name); + public void renewLease(final String leaseName) throws IOException { + synchronized (leases) { + synchronized (sortedLeases) { + Lease lease = leases.get(leaseName); if (lease == null) { // It's possible that someone tries to renew the lease, but // it just expired a moment ago. So fail. throw new IOException("Cannot renew lease that is not held: " + - name); + leaseName); } sortedLeases.remove(lease); lease.renew(); sortedLeases.add(lease); } } -// if (LOG.isDebugEnabled()) { -// LOG.debug("Renewed lease " + name); -// } } /** * Client explicitly cancels a lease. * - * @param holderId id of lease holder - * @param resourceId id of resource being leased + * @param leaseName name of lease */ - public void cancelLease(final long holderId, final long resourceId) { - LeaseName name = null; - synchronized(leases) { - synchronized(sortedLeases) { - name = createLeaseName(holderId, resourceId); - Lease lease = leases.get(name); + public void cancelLease(final String leaseName) { + synchronized (leases) { + synchronized (sortedLeases) { + Lease lease = leases.remove(leaseName); if (lease == null) { // It's possible that someone tries to renew the lease, but // it just expired a moment ago. So just skip it. return; } sortedLeases.remove(lease); - leases.remove(name); } + leases.notifyAll(); } } @@ -224,13 +209,13 @@ /** {@inheritDoc} */ @Override protected void chore() { - synchronized(leases) { - synchronized(sortedLeases) { + synchronized (leases) { + synchronized (sortedLeases) { Lease top; - while((sortedLeases.size() > 0) + while ((sortedLeases.size() > 0) && ((top = sortedLeases.first()) != null)) { - if(top.shouldExpire()) { - leases.remove(top.getLeaseName()); + if (top.shouldExpire()) { + leases.remove(top.name); sortedLeases.remove(top); top.expired(); } else { @@ -238,109 +223,34 @@ } } } + leases.notifyAll(); } } } - /* - * A Lease name. - * More lightweight than String or Text. - */ - @SuppressWarnings("unchecked") - class LeaseName implements Comparable { - private final long holderId; - private final long resourceId; - - LeaseName(final long hid, final long rid) { - this.holderId = hid; - this.resourceId = rid; - } - - /** {@inheritDoc} */ - @Override - public boolean equals(Object obj) { - LeaseName other = (LeaseName)obj; - return this.holderId == other.holderId && - this.resourceId == other.resourceId; - } - - /** {@inheritDoc} */ - @Override - public int hashCode() { - // Copy OR'ing from javadoc for Long#hashCode. - int result = (int)(this.holderId ^ (this.holderId >>> 32)); - result ^= (int)(this.resourceId ^ (this.resourceId >>> 32)); - return result; - } - - /** {@inheritDoc} */ - @Override - public String toString() { - return Long.toString(this.holderId) + "/" + - Long.toString(this.resourceId); - } - - /** {@inheritDoc} */ - public int compareTo(Object obj) { - LeaseName other = (LeaseName)obj; - if (this.holderId < other.holderId) { - return -1; - } - if (this.holderId > other.holderId) { - return 1; - } - // holderIds are equal - if (this.resourceId < other.resourceId) { - return -1; - } - if (this.resourceId > other.resourceId) { - return 1; - } - // Objects are equal - return 0; - } - } - - /** Create a lease id out of the holder and resource ids. */ - protected LeaseName createLeaseName(final long hid, final long rid) { - return new LeaseName(hid, rid); - } - /** This class tracks a single Lease. */ @SuppressWarnings("unchecked") private class Lease implements Comparable { - final long holderId; - final long resourceId; + final String name; final LeaseListener listener; - long lastUpdate; - private LeaseName leaseId; + volatile long lastUpdate; - Lease(final long holderId, final long resourceId, - final LeaseListener listener) { - this.holderId = holderId; - this.resourceId = resourceId; + Lease(final String leaseName, final LeaseListener listener) { + this.name = leaseName; this.listener = listener; renew(); } - synchronized LeaseName getLeaseName() { - if (this.leaseId == null) { - this.leaseId = createLeaseName(holderId, resourceId); - } - return this.leaseId; - } - boolean shouldExpire() { return (System.currentTimeMillis() - lastUpdate > leasePeriod); } void renew() { - this.lastUpdate = System.currentTimeMillis(); + lastUpdate = System.currentTimeMillis(); } void expired() { - LOG.info(Thread.currentThread().getName() + " lease expired " + - getLeaseName()); + LOG.info(Thread.currentThread().getName() + " lease expired " + name); listener.leaseExpired(); } @@ -353,24 +263,24 @@ /** {@inheritDoc} */ @Override public int hashCode() { - int result = this.getLeaseName().hashCode(); - result ^= Long.valueOf(this.lastUpdate).hashCode(); + int result = name.hashCode(); + result ^= Long.valueOf(lastUpdate).hashCode(); return result; } - ////////////////////////////////////////////////////////////////////////////// + // // Comparable - ////////////////////////////////////////////////////////////////////////////// + // /** {@inheritDoc} */ public int compareTo(Object o) { Lease other = (Lease) o; - if(this.lastUpdate < other.lastUpdate) { + if (lastUpdate < other.lastUpdate) { return -1; - } else if(this.lastUpdate > other.lastUpdate) { + } else if (lastUpdate > other.lastUpdate) { return 1; } else { - return this.getLeaseName().compareTo(other.getLeaseName()); + return name.compareTo(other.name); } } } Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (revision 603502) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (working copy) @@ -1377,7 +1377,7 @@ if (s == null) { throw new UnknownScannerException("Name: " + scannerName); } - this.leases.renewLease(scannerId, scannerId); + this.leases.renewLease(scannerName); // Collect values to be returned here MapWritable values = new MapWritable(); @@ -1441,7 +1441,7 @@ scanners.put(scannerName, s); } this.leases. - createLease(scannerId, scannerId, new ScannerListener(scannerName)); + createLease(scannerName, new ScannerListener(scannerName)); return scannerId; } catch (IOException e) { LOG.error("Error opening scanner (fsOk: " + this.fsOk + ")", @@ -1465,7 +1465,7 @@ throw new UnknownScannerException(scannerName); } s.close(); - this.leases.cancelLease(scannerId, scannerId); + this.leases.cancelLease(scannerName); } catch (IOException e) { checkFileSystem(); throw e; Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java =================================================================== --- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (revision 603502) +++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (working copy) @@ -1269,8 +1269,7 @@ loadToServers.put(load, servers); if (!closed.get()) { - long serverLabel = getServerLabel(s); - serverLeases.createLease(serverLabel, serverLabel, new ServerExpirer(s)); + serverLeases.createLease(s, new ServerExpirer(s)); } return createConfigurationSubset(); @@ -1290,15 +1289,10 @@ return mw; } - private long getServerLabel(final String s) { - return s.hashCode(); - } - /** {@inheritDoc} */ public HMsg[] regionServerReport(HServerInfo serverInfo, HMsg msgs[]) throws IOException { String serverName = serverInfo.getServerAddress().toString().trim(); - long serverLabel = getServerLabel(serverName); if (msgs.length > 0) { if (msgs[0].getMsg() == HMsg.MSG_REPORT_EXITING) { synchronized (serversToServerInfo) { @@ -1311,7 +1305,7 @@ ": MSG_REPORT_EXITING -- cancelling lease"); } - if (cancelLease(serverName, serverLabel)) { + if (cancelLease(serverName)) { // Only process the exit message if the server still has a lease. // Otherwise we could end up processing the server exit twice. LOG.info("Region server " + serverName + @@ -1395,7 +1389,7 @@ } synchronized (serversToServerInfo) { - cancelLease(serverName, serverLabel); + cancelLease(serverName); serversToServerInfo.notifyAll(); } return new HMsg[]{new HMsg(HMsg.MSG_REGIONSERVER_STOP)}; @@ -1406,7 +1400,7 @@ // This will always succeed; otherwise, the fetch of serversToServerInfo // would have failed above. - serverLeases.renewLease(serverLabel, serverLabel); + serverLeases.renewLease(serverName); // Refresh the info object and the load information @@ -1443,7 +1437,7 @@ } /** Cancel a server's lease and update its load information */ - private boolean cancelLease(final String serverName, final long serverLabel) { + private boolean cancelLease(final String serverName) { boolean leaseCancelled = false; HServerInfo info = serversToServerInfo.remove(serverName); if (info != null) { @@ -1454,7 +1448,7 @@ unassignRootRegion(); } LOG.info("Cancelling lease for " + serverName); - serverLeases.cancelLease(serverLabel, serverLabel); + serverLeases.cancelLease(serverName); leaseCancelled = true; // update load information