diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 2e2dbdf..2f34960 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -1409,7 +1409,7 @@ public class ProcedureExecutor { final Procedure procedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS); if (procedure == null) continue; - activeExecutorCount.incrementAndGet(); + store.setRunningProcedureCount(activeExecutorCount.incrementAndGet()); executionStartTime.set(EnvironmentEdgeManager.currentTime()); try { if (isTraceEnabled) { @@ -1417,7 +1417,7 @@ public class ProcedureExecutor { } executeProcedure(procedure); } finally { - activeExecutorCount.decrementAndGet(); + store.setRunningProcedureCount(activeExecutorCount.decrementAndGet()); lastUpdate = EnvironmentEdgeManager.currentTime(); executionStartTime.set(Long.MAX_VALUE); } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java index 82ef8f0..a457e42 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java @@ -52,6 +52,11 @@ public class NoopProcedureStore extends ProcedureStoreBase { } @Override + public void setRunningProcedureCount(final int count) { + // no-op + } + + @Override public void load(final ProcedureLoader loader) throws IOException { loader.setMaxProcId(0); } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java index 7df5226..2a72414 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java @@ -151,6 +151,12 @@ public interface ProcedureStore { int getNumThreads(); /** + * Set the number of procedure running. + * This can be used, for example, by the store to know how long to wait before a sync. + */ + void setRunningProcedureCount(int count); + + /** * Acquire the lease for the procedure store. */ void recoverLease() throws IOException; diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 54f375b..4ab224c 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -128,7 +128,9 @@ public class WALProcedureStore extends ProcedureStoreBase { private LinkedTransferQueue slotsCache = null; private Set corruptedLogs = null; private FSDataOutputStream stream = null; + private int runningProcCount = 1; private long flushLogId = 0; + private int syncMaxSlot = 1; private int slotIndex = 0; private Thread syncThread; private ByteSlot[] slots; @@ -189,6 +191,8 @@ public class WALProcedureStore extends ProcedureStoreBase { // Init buffer slots loading.set(true); + runningProcCount = numSlots; + syncMaxSlot = numSlots; slots = new ByteSlot[numSlots]; slotsCache = new LinkedTransferQueue(); while (slotsCache.size() < numSlots) { @@ -276,6 +280,12 @@ public class WALProcedureStore extends ProcedureStoreBase { return slots == null ? 0 : slots.length; } + @Override + public void setRunningProcedureCount(final int count) { + LOG.debug("set running procedure count=" + count + " slots=" + slots.length); + this.runningProcCount = count > 0 ? Math.min(count, slots.length) : slots.length; + } + public ProcedureStoreTracker getStoreTracker() { return storeTracker; } @@ -558,7 +568,7 @@ public class WALProcedureStore extends ProcedureStoreBase { throw new RuntimeException("sync aborted", syncException.get()); } else if (inSync.get()) { syncCond.await(); - } else if (slotIndex == slots.length) { + } else if (slotIndex >= syncMaxSlot) { slotCond.signal(); syncCond.await(); } else { @@ -577,7 +587,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } // Notify that the slots are full - if (slotIndex == slots.length) { + if (slotIndex == syncMaxSlot) { waitCond.signal(); slotCond.signal(); } @@ -658,8 +668,10 @@ public class WALProcedureStore extends ProcedureStoreBase { } } // Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing + syncMaxSlot = runningProcCount; + assert syncMaxSlot > 0 : "unexpected syncMaxSlot=" + syncMaxSlot; final long syncWaitSt = System.currentTimeMillis(); - if (slotIndex != slots.length) { + if (slotIndex != syncMaxSlot) { slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS); } @@ -667,7 +679,7 @@ public class WALProcedureStore extends ProcedureStoreBase { final long syncWaitMs = currentTs - syncWaitSt; final float rollSec = getMillisFromLastRoll() / 1000.0f; final float syncedPerSec = totalSyncedToStore / rollSec; - if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < slots.length)) { + if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < syncMaxSlot)) { LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s (%s/sec)", StringUtils.humanTimeDiff(syncWaitMs), slotIndex, StringUtils.humanSize(totalSyncedToStore), @@ -746,29 +758,33 @@ public class WALProcedureStore extends ProcedureStoreBase { return totalSynced; } - protected long syncSlots(FSDataOutputStream stream, ByteSlot[] slots, int offset, int count) - throws IOException { + protected long syncSlots(final FSDataOutputStream stream, final ByteSlot[] slots, + final int offset, final int count) throws IOException { long totalSynced = 0; for (int i = 0; i < count; ++i) { - ByteSlot data = slots[offset + i]; + final ByteSlot data = slots[offset + i]; data.writeTo(stream); totalSynced += data.size(); } - if (useHsync) { - stream.hsync(); - } else { - stream.hflush(); - } + syncStream(stream); sendPostSyncSignal(); if (LOG.isTraceEnabled()) { - LOG.trace("Sync slots=" + count + '/' + slots.length + + LOG.trace("Sync slots=" + count + '/' + syncMaxSlot + ", flushed=" + StringUtils.humanSize(totalSynced)); } return totalSynced; } + protected void syncStream(final FSDataOutputStream stream) throws IOException { + if (useHsync) { + stream.hsync(); + } else { + stream.hflush(); + } + } + private boolean rollWriterWithRetries() { for (int i = 0; i < rollRetries && isRunning(); ++i) { if (i > 0) Threads.sleepWithoutInterrupt(waitBeforeRoll * i); diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java index 363574b..641ac8e 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java @@ -149,7 +149,7 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool { // Start worker threads. long start = System.currentTimeMillis(); for (int i = 0; i < numThreads; i++) { - futures[i] = executor.submit(this.new Worker(start)); + futures[i] = executor.submit(new Worker(start)); } boolean failure = false; try { @@ -197,8 +197,8 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool { * If procedure store fails to roll log file (throws IOException), all threads quit, and at * least one returns value of {@link AbstractHBaseTool#EXIT_FAILURE}. */ - class Worker implements Callable { - final long start; + private final class Worker implements Callable { + private final long start; public Worker(long start) { this.start = start; @@ -243,7 +243,7 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool { } } - public class NoSyncWalProcedureStore extends WALProcedureStore { + private class NoSyncWalProcedureStore extends WALProcedureStore { public NoSyncWalProcedureStore(final Configuration conf, final FileSystem fs, final Path logDir) { super(conf, fs, logDir, new WALProcedureStore.LeaseRecovery() { @@ -255,13 +255,8 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool { } @Override - protected long syncSlots(FSDataOutputStream stream, ByteSlot[] slots, int offset, int count) - throws IOException { - long totalSynced = 0; - for (int i = 0; i < count; ++i) { - totalSynced += slots[offset + i].size(); - } - return totalSynced; + protected void syncStream(FSDataOutputStream stream) { + // no-op } }