diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index ec42d6a..9bea27d 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -101,6 +101,7 @@ public class WALProcedureStore extends ProcedureStoreBase { private final LinkedList logs = new LinkedList(); private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker(); private final AtomicLong inactiveLogsMaxId = new AtomicLong(0); + private final ReentrantLock trackerLock = new ReentrantLock(); private final ReentrantLock lock = new ReentrantLock(); private final Condition waitCond = lock.newCondition(); private final Condition slotCond = lock.newCondition(); @@ -358,11 +359,14 @@ public class WALProcedureStore extends ProcedureStoreBase { } // Update the store tracker - synchronized (storeTracker) { + trackerLock.lock(); + try { storeTracker.insert(proc, subprocs); if (logId == flushLogId) { checkAndTryRoll(); } + } finally { + trackerLock.unlock(); } } @@ -391,12 +395,15 @@ public class WALProcedureStore extends ProcedureStoreBase { // Update the store tracker boolean removeOldLogs = false; - synchronized (storeTracker) { + trackerLock.lock(); + try { storeTracker.update(proc); if (logId == flushLogId) { removeOldLogs = storeTracker.isUpdated(); checkAndTryRoll(); } + } finally { + trackerLock.unlock(); } if (removeOldLogs) { @@ -428,7 +435,8 @@ public class WALProcedureStore extends ProcedureStoreBase { } boolean removeOldLogs = false; - synchronized (storeTracker) { + trackerLock.lock(); + try { storeTracker.delete(procId); if (logId == flushLogId) { if (storeTracker.isEmpty() || storeTracker.isUpdated()) { @@ -437,6 +445,8 @@ public class WALProcedureStore extends ProcedureStoreBase { checkAndTryRoll(); } } + } finally { + trackerLock.unlock(); } if (removeOldLogs) { @@ -514,12 +524,12 @@ public class WALProcedureStore extends ProcedureStoreBase { } protected void periodicRoll() throws IOException { - long logId; - boolean removeOldLogs; - synchronized (storeTracker) { - logId = flushLogId; - removeOldLogs = storeTracker.isEmpty(); - } + if (!trackerLock.tryLock()) return; + + long logId = flushLogId; + boolean removeOldLogs = storeTracker.isEmpty(); + trackerLock.unlock(); + if (checkAndTryRoll() && removeOldLogs) { setInactiveLogsMaxId(logId); } @@ -733,8 +743,12 @@ public class WALProcedureStore extends ProcedureStoreBase { lock.lock(); try { closeStream(); - synchronized (storeTracker) { + + trackerLock.lock(); + try { storeTracker.resetUpdates(); + } finally { + trackerLock.unlock(); } stream = newStream; flushLogId = logId; @@ -754,10 +768,13 @@ public class WALProcedureStore extends ProcedureStoreBase { try { if (stream != null) { try { - synchronized (storeTracker) { + trackerLock.lock(); + try { ProcedureWALFile log = logs.getLast(); log.setProcIds(storeTracker.getUpdatedMinProcId(), storeTracker.getUpdatedMaxProcId()); ProcedureWALFormat.writeTrailer(stream, storeTracker); + } finally { + trackerLock.unlock(); } } catch (IOException e) { LOG.warn("Unable to write the trailer: " + e.getMessage()); @@ -794,11 +811,12 @@ public class WALProcedureStore extends ProcedureStoreBase { // Verify if the ProcId of the first oldest is still active. if not remove the file. while (logs.size() > 1) { ProcedureWALFile log = logs.getFirst(); - synchronized (storeTracker) { - if (storeTracker.isTracking(log.getMinProcId(), log.getMaxProcId())) { - break; - } + if (!trackerLock.tryLock()) break; + if (storeTracker.isTracking(log.getMinProcId(), log.getMaxProcId())) { + trackerLock.unlock(); + break; } + trackerLock.unlock(); removeLogFile(log); } }