From c614c0e239312480eb059fdaecae592e677d2b61 Mon Sep 17 00:00:00 2001 From: Mike Drob Date: Thu, 17 Jul 2014 13:19:23 -0500 Subject: [PATCH] HBASE-11537 Clean up usage on ConcurrentMap Remove synchronization on ConcurrentMap and use interface methods instead. Remove dead store to Future and unreachable null check. --- .../hbase/procedure/ProcedureCoordinator.java | 67 +++++++++++----------- .../hadoop/hbase/procedure/ProcedureMember.java | 34 +++++------ 2 files changed, 46 insertions(+), 55 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java index fe7318b..fd9606a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java @@ -24,7 +24,6 @@ import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -145,48 +144,48 @@ public class ProcedureCoordinator { String procName = proc.getName(); // make sure we aren't already running a procedure of that name - synchronized (procedures) { - Procedure oldProc = procedures.get(procName); - if (oldProc != null) { - // procedures are always eventually completed on both successful and failed execution - try { - if (!oldProc.isCompleted()) { - LOG.warn("Procedure " + procName + " currently running. Rejecting new request"); - return false; - } - else { - LOG.debug("Procedure " + procName + Procedure oldProc = procedures.get(procName); + if (oldProc != null) { + // procedures are always eventually completed on both successful and failed execution + try { + if (!oldProc.isCompleted()) { + LOG.warn("Procedure " + procName + " currently running. Rejecting new request"); + return false; + } else { + LOG.debug("Procedure " + procName + " was in running list but was completed. Accepting new attempt."); - procedures.remove(procName); + if (!procedures.remove(procName, oldProc)) { + LOG.warn("Procedure " + procName + + " has been resubmitted by another thread. Rejecting this request."); + return false; } - } catch (ForeignException e) { - LOG.debug("Procedure " + procName + } + } catch (ForeignException e) { + LOG.debug("Procedure " + procName + " was in running list but has exception. Accepting new attempt."); - procedures.remove(procName); + if (!procedures.remove(procName, oldProc)) { + LOG.warn("Procedure " + procName + + " has been resubmitted by another thread. Rejecting this request."); + return false; } } } // kick off the procedure's execution in a separate thread - Future f = null; try { - synchronized (procedures) { - this.procedures.put(procName, proc); - f = this.pool.submit(proc); + if (this.procedures.putIfAbsent(procName, proc) == null) { + this.pool.submit(proc); + return true; + } else { + LOG.error("Another thread has submitted procedure '" + procName + "'. Ignoring this attempt."); + return false; } - return true; } catch (RejectedExecutionException e) { - LOG.warn("Procedure " + procName + " rejected by execution pool. Propagating error and " + - "cancelling operation.", e); + LOG.warn("Procedure " + procName + " rejected by execution pool. Propagating error.", e); // Remove the procedure from the list since is not started - this.procedures.remove(procName); + this.procedures.remove(procName, proc); // the thread pool is full and we can't run the procedure proc.receive(new ForeignException(procName, e)); - - // cancel procedure proactively - if (f != null) { - f.cancel(true); - } } return false; } @@ -217,13 +216,11 @@ public class ProcedureCoordinator { */ public void abortProcedure(String procName, ForeignException reason) { // if we know about the Procedure, notify it - synchronized(procedures) { - Procedure proc = procedures.get(procName); - if (proc == null) { - return; - } - proc.receive(reason); + Procedure proc = procedures.get(procName); + if (proc == null) { + return; } + proc.receive(reason); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java index 0559e12..89760f9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.util.Collection; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -139,41 +138,36 @@ public class ProcedureMember implements Closeable { } // make sure we aren't already running an subprocedure of that name - Subprocedure rsub; - synchronized (subprocs) { - rsub = subprocs.get(procName); - } + Subprocedure rsub = subprocs.get(procName); if (rsub != null) { if (!rsub.isComplete()) { LOG.error("Subproc '" + procName + "' is already running. Bailing out"); return false; } LOG.warn("A completed old subproc " + procName + " is still present, removing"); - subprocs.remove(procName); + if (!subprocs.remove(procName, rsub)) { + LOG.error("Another thread has replaced existing subproc '" + procName + "'. Bailing out"); + return false; + } } LOG.debug("Submitting new Subprocedure:" + procName); // kick off the subprocedure - Future future = null; try { - synchronized (subprocs) { - subprocs.put(procName, subproc); + if (subprocs.putIfAbsent(procName, subproc) == null) { + this.pool.submit(subproc); + return true; + } else { + LOG.error("Another thread has submitted subproc '" + procName + "'. Bailing out"); + return false; } - future = this.pool.submit(subproc); - return true; } catch (RejectedExecutionException e) { - synchronized (subprocs) { - subprocs.remove(procName); - } + subprocs.remove(procName, subproc); + // the thread pool is full and we can't run the subprocedure String msg = "Subprocedure pool is full!"; subproc.cancel(msg, e.getCause()); - - // cancel all subprocedures proactively - if (future != null) { - future.cancel(true); - } } LOG.error("Failed to start subprocedure '" + procName + "'"); @@ -182,7 +176,7 @@ public class ProcedureMember implements Closeable { /** * Notification that procedure coordinator has reached the global barrier - * @param procName name of the subprocedure that should start running the the in-barrier phase + * @param procName name of the subprocedure that should start running the in-barrier phase */ public void receivedReachedGlobalBarrier(String procName) { Subprocedure subproc = subprocs.get(procName); -- 1.8.3.2