diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java index 152f131..f109ab6 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java @@ -223,17 +223,18 @@ public class ProcedureStoreTracker { int oldSize = updated.length; newBitmap = new long[oldSize + delta]; + for (int i = 0; i < newBitmap.length; ++i) { + newBitmap[i] = 0; + } System.arraycopy(updated, 0, newBitmap, offset, oldSize); updated = newBitmap; newBitmap = new long[deleted.length + delta]; + for (int i = 0; i < newBitmap.length; ++i) { + newBitmap[i] = WORD_MASK; + } System.arraycopy(deleted, 0, newBitmap, offset, oldSize); deleted = newBitmap; - - for (int i = 0; i < delta; ++i) { - updated[oldSize + i] = 0; - deleted[oldSize + i] = WORD_MASK; - } } public void merge(final BitSetNode rightNode) { diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 09d2f7a..0bda0d1 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -21,9 +21,10 @@ package org.apache.hadoop.hbase.procedure2.store.wal; import java.io.IOException; import java.io.FileNotFoundException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.Arrays; @@ -69,6 +70,12 @@ public class WALProcedureStore implements ProcedureStore { private static final String SYNC_WAIT_MSEC_CONF_KEY = "hbase.procedure.store.wal.sync.wait.msec"; private static final int DEFAULT_SYNC_WAIT_MSEC = 100; + private static final String USE_HSYNC_CONF_KEY = "hbase.procedure.store.wal.use.hsync"; + private static final boolean DEFAULT_USE_HSYNC = true; + + private static final String ROLL_THRESHOLD_CONF_KEY = "hbase.procedure.store.wal.roll.threshold"; + private static final long DEFAULT_ROLL_THRESHOLD = 32 * 1024 * 1024; // 32M + private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList(); @@ -86,14 +93,18 @@ public class WALProcedureStore implements ProcedureStore { private final Path logDir; private AtomicBoolean inSync = new AtomicBoolean(false); - private ArrayBlockingQueue slotsCache = null; + private LinkedTransferQueue slotsCache = null; private Set corruptedLogs = null; + private AtomicLong totalSynced = new AtomicLong(0); private FSDataOutputStream stream = null; - private long totalSynced = 0; + private long lastRollTs = 0; private long flushLogId = 0; private int slotIndex = 0; private Thread syncThread; private ByteSlot[] slots; + + private long rollThreshold; + private boolean useHsync; private int syncWaitMsec; public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path logDir, @@ -112,13 +123,15 @@ public class WALProcedureStore implements ProcedureStore { // Init buffer slots slots = new ByteSlot[numSlots]; - slotsCache = new ArrayBlockingQueue(numSlots, true); - while (slotsCache.remainingCapacity() > 0) { + slotsCache = new LinkedTransferQueue(); + while (slotsCache.size() < numSlots) { slotsCache.offer(new ByteSlot()); } // Tunings + rollThreshold = conf.getLong(ROLL_THRESHOLD_CONF_KEY, DEFAULT_ROLL_THRESHOLD); syncWaitMsec = conf.getInt(SYNC_WAIT_MSEC_CONF_KEY, DEFAULT_SYNC_WAIT_MSEC); + useHsync = conf.getBoolean(USE_HSYNC_CONF_KEY, DEFAULT_USE_HSYNC); // Init sync thread syncThread = new Thread("WALProcedureStoreSyncThread") { @@ -297,9 +310,7 @@ public class WALProcedureStore implements ProcedureStore { // Update the store tracker synchronized (storeTracker) { - if (logId == flushLogId) { - storeTracker.insert(proc, subprocs); - } + storeTracker.insert(proc, subprocs); } } @@ -329,8 +340,8 @@ public class WALProcedureStore implements ProcedureStore { // Update the store tracker boolean removeOldLogs = false; synchronized (storeTracker) { + storeTracker.update(proc); if (logId == flushLogId) { - storeTracker.update(proc); removeOldLogs = storeTracker.isUpdated(); } } @@ -365,9 +376,9 @@ public class WALProcedureStore implements ProcedureStore { boolean removeOldLogs = false; synchronized (storeTracker) { + storeTracker.delete(procId); if (logId == flushLogId) { - storeTracker.delete(procId); - if (storeTracker.isEmpty()) { + if (storeTracker.isEmpty() && totalSynced.get() > rollThreshold) { removeOldLogs = rollWriterOrDie(logId + 1); } } @@ -416,8 +427,10 @@ public class WALProcedureStore implements ProcedureStore { // Notify that the slots are full if (slotIndex == slots.length) { + waitCond.signal(); slotCond.signal(); } + syncCond.await(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -436,7 +449,10 @@ public class WALProcedureStore implements ProcedureStore { // Wait until new data is available if (slotIndex == 0) { if (LOG.isTraceEnabled()) { - LOG.trace("Waiting for data. flushed=" + StringUtils.humanSize(totalSynced)); + float rollTsSec = (System.currentTimeMillis() - lastRollTs) / 1000.0f; + LOG.trace(String.format("Waiting for data. flushed=%s (%s/sec)", + StringUtils.humanSize(totalSynced.get()), + StringUtils.humanSize(totalSynced.get() / rollTsSec))); } waitCond.await(); if (slotIndex == 0) { @@ -446,10 +462,22 @@ public class WALProcedureStore implements ProcedureStore { } // Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing - slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS); + long syncWaitSt = System.currentTimeMillis(); + if (slotIndex != slots.length) { + slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS); + } + long syncWaitMs = System.currentTimeMillis() - syncWaitSt; + if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < slots.length)) { + float rollSec = (System.currentTimeMillis() - lastRollTs) / 1000.0f; + LOG.trace("sync wait " + StringUtils.humanTimeDiff(syncWaitMs) + + " slotIndex=" + slotIndex + + " totalSynced=" + StringUtils.humanSize(totalSynced.get()) + + " " + StringUtils.humanSize(totalSynced.get() / rollSec) + "/sec"); + } + inSync.set(true); - totalSynced += syncSlots(); + totalSynced.addAndGet(syncSlots()); slotIndex = 0; inSync.set(false); syncCond.signalAll(); @@ -487,7 +515,13 @@ public class WALProcedureStore implements ProcedureStore { data.writeTo(stream); totalSynced += data.size(); } - stream.hsync(); + + if (useHsync) { + stream.hsync(); + } else { + stream.hflush(); + } + if (LOG.isTraceEnabled()) { LOG.trace("Sync slots=" + count + '/' + slots.length + " flushed=" + StringUtils.humanSize(totalSynced)); @@ -541,7 +575,8 @@ public class WALProcedureStore implements ProcedureStore { } stream = newStream; flushLogId = logId; - totalSynced = 0; + totalSynced.set(0); + lastRollTs = System.currentTimeMillis(); logs.add(new ProcedureWALFile(fs, newLogFile, header, startPos)); } finally { lock.unlock(); diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/StringUtils.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/StringUtils.java index 97134c2..a2c4713 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/StringUtils.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/StringUtils.java @@ -27,6 +27,10 @@ public final class StringUtils { private StringUtils() {} public static String humanTimeDiff(long timeDiff) { + if (timeDiff < 1000) { + return String.format("%dmsec", timeDiff); + } + StringBuilder buf = new StringBuilder(); long hours = timeDiff / (60*60*1000); long rem = (timeDiff % (60*60*1000));