diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreBase.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreBase.java index 0e0e46f..17423cb 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreBase.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreBase.java @@ -28,6 +28,7 @@ public abstract class ProcedureStoreBase implements ProcedureStore { private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList(); + private final AtomicBoolean selfAbort = new AtomicBoolean(false); private final AtomicBoolean running = new AtomicBoolean(false); /** @@ -56,6 +57,10 @@ public abstract class ProcedureStoreBase implements ProcedureStore { return listeners.remove(listener); } + protected boolean isSelfAborting() { + return selfAbort.get(); + } + protected void sendPostSyncSignal() { if (!this.listeners.isEmpty()) { for (ProcedureStoreListener listener : this.listeners) { @@ -65,6 +70,7 @@ public abstract class ProcedureStoreBase implements ProcedureStore { } protected void sendAbortProcessSignal() { + selfAbort.set(true); if (!this.listeners.isEmpty()) { for (ProcedureStoreListener listener : this.listeners) { listener.abortProcess(); diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 36cf7af..2d86a43 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -122,6 +122,7 @@ public class WALProcedureStore extends ProcedureStoreBase { private final AtomicBoolean inSync = new AtomicBoolean(false); private final AtomicLong totalSynced = new AtomicLong(0); private final AtomicLong lastRollTs = new AtomicLong(0); + private final AtomicLong syncId = new AtomicLong(0); private LinkedTransferQueue slotsCache = null; private Set corruptedLogs = null; @@ -226,15 +227,15 @@ public class WALProcedureStore extends ProcedureStoreBase { } @Override - public void stop(boolean abort) { + public void stop(final boolean abort) { if (!setRunning(false)) { return; } - LOG.info("Stopping the WAL Procedure Store"); + LOG.info("Stopping the WAL Procedure Store, isAbort=" + abort + + (isSelfAborting() ? " (self aborting)" : "")); sendStopSignal(); - - if (!abort) { + if (!isSelfAborting()) { try { while (syncThread.isAlive()) { sendStopSignal(); @@ -525,6 +526,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } } + final long pushSyncId = syncId.get(); updateStoreTracker(type, procId, subProcIds); slots[slotIndex++] = slot; logId = flushLogId; @@ -540,7 +542,9 @@ public class WALProcedureStore extends ProcedureStoreBase { slotCond.signal(); } - syncCond.await(); + while (pushSyncId == syncId.get() && isRunning()) { + syncCond.await(); + } } catch (InterruptedException e) { Thread.currentThread().interrupt(); sendAbortProcessSignal(); @@ -642,6 +646,7 @@ public class WALProcedureStore extends ProcedureStoreBase { totalSyncedToStore = totalSynced.addAndGet(slotSize); slotIndex = 0; inSync.set(false); + syncId.incrementAndGet(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); sendAbortProcessSignal(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index 670642f..9bdcf76 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -374,4 +374,9 @@ public interface MasterServices extends Server { * @return load balancer */ public LoadBalancer getLoadBalancer(); + + /** + * @return True if this master is stopping. + */ + boolean isStopping(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java index 213f80c..e90813c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java @@ -61,10 +61,15 @@ public class MasterProcedureEnv { @Override public boolean progress() { LOG.debug("Recover Procedure Store log lease: " + path); - return master.isActiveMaster(); + return isRunning(); } }); } + + private boolean isRunning() { + return master.isActiveMaster() && !master.isStopped() && + !master.isStopping() && !master.isAborted(); + } } @InterfaceAudience.Private diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java index 56a8522..87fb169 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java @@ -180,6 +180,11 @@ public class MockNoopMasterServices implements MasterServices, Server { } @Override + public boolean isStopping() { + return stopped; + } + + @Override public boolean isStopped() { return stopped; }