From 4eb1834ba4e7f8828131229cb165d480544e5777 Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Tue, 6 Aug 2013 16:55:38 -0700 Subject: [PATCH] Try to remove lock contention. --- .../apache/hadoop/hbase/regionserver/Leases.java | 155 ++++++++++---------- 1 file changed, 81 insertions(+), 74 deletions(-) diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java index 78ceb97..88d7d5e 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java @@ -25,7 +25,9 @@ import org.apache.hadoop.hbase.util.HasThread; import java.util.ConcurrentModificationException; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Delayed; import java.util.concurrent.DelayQueue; import java.util.concurrent.TimeUnit; @@ -54,10 +56,10 @@ import java.io.IOException; @InterfaceAudience.Private public class Leases extends HasThread { private static final Log LOG = LogFactory.getLog(Leases.class.getName()); - private final int leaseCheckFrequency; - private final DelayQueue leaseQueue = new DelayQueue(); - protected final Map leases = new HashMap(); - private volatile boolean stopRequested = false; + private final Map leases = new ConcurrentHashMap(); + + protected final int leaseCheckFrequency; + protected volatile boolean stopRequested = false; /** * Creates a lease monitor @@ -71,14 +73,23 @@ public class Leases extends HasThread { } /** - * @see java.lang.Thread#run() + * @see Thread#run() */ @Override public void run() { - while (!stopRequested || (stopRequested && leaseQueue.size() > 0) ) { - Lease lease = null; + long toWait = leaseCheckFrequency; + Lease nextLease = null; + long nextLeaseDelay = Long.MAX_VALUE; + + while (!stopRequested || (stopRequested && !leases.isEmpty()) ) { + try { - lease = leaseQueue.poll(leaseCheckFrequency, TimeUnit.MILLISECONDS); + if (nextLease != null) { + toWait = nextLease.getDelay(TimeUnit.MILLISECONDS); + } + + toWait = Math.min(leaseCheckFrequency, toWait); + Thread.sleep(toWait); } catch (InterruptedException e) { continue; } catch (ConcurrentModificationException e) { @@ -87,18 +98,28 @@ public class Leases extends HasThread { LOG.fatal("Unexpected exception killed leases thread", e); break; } - if (lease == null) { - continue; - } - // A lease expired. Run the expired code before removing from queue - // since its presence in queue is used to see if lease exists still. - if (lease.getListener() == null) { - LOG.error("lease listener is null for lease " + lease.getLeaseName()); - } else { - lease.getListener().leaseExpired(); - } - synchronized (leaseQueue) { - leases.remove(lease.getLeaseName()); + + nextLease = null; + nextLeaseDelay = Long.MAX_VALUE; + for (Iterator> it = leases.entrySet().iterator(); it.hasNext();) { + Map.Entry entry = it.next(); + Lease lease = entry.getValue(); + long thisLeaseDelay = lease.getDelay(TimeUnit.MILLISECONDS); + if ( thisLeaseDelay > 0) { + if (nextLease == null || thisLeaseDelay < nextLeaseDelay) { + nextLease = lease; + nextLeaseDelay = thisLeaseDelay; + } + } else { + it.remove(); + // A lease expired. Run the expired code before removing from queue + // since its presence in queue is used to see if lease exists still. + if (lease.getListener() == null) { + LOG.error("lease listener is null for lease " + lease.getLeaseName()); + } else { + lease.getListener().leaseExpired(); + } + } } } close(); @@ -122,17 +143,13 @@ public class Leases extends HasThread { public void close() { LOG.info(Thread.currentThread().getName() + " closing leases"); this.stopRequested = true; - synchronized (leaseQueue) { - leaseQueue.clear(); - leases.clear(); - leaseQueue.notifyAll(); - } + leases.clear(); LOG.info(Thread.currentThread().getName() + " closed leases"); } /** * Obtain a lease. - * + * * @param leaseName name of the lease * @param leaseTimeoutPeriod length of the lease in milliseconds * @param listener listener that will process lease expirations @@ -153,61 +170,34 @@ public class Leases extends HasThread { return; } lease.resetExpirationTime(); - synchronized (leaseQueue) { - if (leases.containsKey(lease.getLeaseName())) { - throw new LeaseStillHeldException(lease.getLeaseName()); - } - leases.put(lease.getLeaseName(), lease); - leaseQueue.add(lease); - } - } - - /** - * Thrown if we are asked create a lease but lease on passed name already - * exists. - */ - @SuppressWarnings("serial") - public static class LeaseStillHeldException extends IOException { - private final String leaseName; - - /** - * @param name - */ - public LeaseStillHeldException(final String name) { - this.leaseName = name; - } - - /** @return name of lease */ - public String getName() { - return this.leaseName; + if (leases.containsKey(lease.getLeaseName())) { + throw new LeaseStillHeldException(lease.getLeaseName()); } + leases.put(lease.getLeaseName(), lease); } /** * Renew a lease * * @param leaseName name of lease - * @throws org.apache.hadoop.hbase.regionserver.LeaseException + * @throws LeaseException */ public void renewLease(final String leaseName) throws LeaseException { - synchronized (leaseQueue) { - Lease lease = leases.get(leaseName); - // We need to check to see if the remove is successful as the poll in the run() - // method could have completed between the get and the remove which will result - // in a corrupt leaseQueue. - if (lease == null || !leaseQueue.remove(lease)) { - throw new LeaseException("lease '" + leaseName + - "' does not exist or has already expired"); - } - lease.resetExpirationTime(); - leaseQueue.add(lease); + Lease lease = leases.get(leaseName); + // We need to check to see if the remove is successful as the poll in the run() + // method could have completed between the get and the remove which will result + // in a corrupt leaseQueue. + if (lease == null ) { + throw new LeaseException("lease '" + leaseName + + "' does not exist or has already expired"); } + lease.resetExpirationTime(); } /** * Client explicitly cancels a lease. * @param leaseName name of lease - * @throws LeaseException + * @throws org.apache.hadoop.hbase.regionserver.LeaseException */ public void cancelLease(final String leaseName) throws LeaseException { removeLease(leaseName); @@ -219,21 +209,38 @@ public class Leases extends HasThread { * Lease can be resinserted using {@link #addLease(Lease)} * * @param leaseName name of lease - * @throws LeaseException + * @throws org.apache.hadoop.hbase.regionserver.LeaseException * @return Removed lease */ Lease removeLease(final String leaseName) throws LeaseException { - Lease lease = null; - synchronized (leaseQueue) { - lease = leases.remove(leaseName); - if (lease == null) { - throw new LeaseException("lease '" + leaseName + "' does not exist"); - } - leaseQueue.remove(lease); + Lease lease = leases.remove(leaseName); + if (lease == null) { + throw new LeaseException("lease '" + leaseName + "' does not exist"); } return lease; } + /** + * Thrown if we are asked create a lease but lease on passed name already + * exists. + */ + @SuppressWarnings("serial") + public static class LeaseStillHeldException extends IOException { + private final String leaseName; + + /** + * @param name + */ + public LeaseStillHeldException(final String name) { + this.leaseName = name; + } + + /** @return name of lease */ + public String getName() { + return this.leaseName; + } + } + /** This class tracks a single Lease. */ static class Lease implements Delayed { private final String leaseName; -- 1.7.10.2 (Apple Git-33)