diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index dc8eab4..bf7f422 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -139,8 +139,10 @@ public class HLog implements Syncable { private final long optionalFlushInterval; private final long blocksize; private final String prefix; + private final AtomicLong unflushedEntries = new AtomicLong(0); private volatile long syncedTillHere = 0; + private long lastDeferredTxid; private final Path oldLogDir; private volatile boolean logRollRunning; @@ -297,7 +299,6 @@ public class HLog implements Syncable { private static Metric writeSize = new Metric(); // For measuring latency of syncs private static Metric syncTime = new Metric(); - private static AtomicLong syncBatchSize = new AtomicLong(); //For measuring slow HLog appends private static AtomicLong slowHLogAppendCount = new AtomicLong(); private static Metric slowHLogAppendTime = new Metric(); @@ -314,14 +315,10 @@ public class HLog implements Syncable { return syncTime.get(); } - public static long getSyncBatchSize() { - return syncBatchSize.getAndSet(0); - } - public static long getSlowAppendCount() { return slowHLogAppendCount.get(); } - + public static Metric getSlowAppendTime() { return slowHLogAppendTime.get(); } @@ -1169,17 +1166,20 @@ public class HLog implements Syncable { * it happens. */ class LogSyncer extends HasThread { - private final long optionalFlushInterval; - + + /** + * Set this flag to shutdown this thread. Set it by calling {@link #closeLogSyncer} + */ private boolean closeLogSyncer = false; - // List of pending writes to the HLog. There corresponds to transactions + // List of pending writes to the HLog. These correspond to transactions // that have not yet returned to the client. We keep them cached here // instead of writing them to HDFS piecemeal, because the HDFS write // method is pretty heavyweight as far as locking is concerned. The // goal is to increase the batchsize for writing-to-hdfs as well as // sync-to-hdfs, so that we can get better system throughput. + // TODO: Could these copies cause us OOME if we back up on writes to HDFS? private List pendingWrites = new LinkedList(); LogSyncer(long optionalFlushInterval) { @@ -1192,7 +1192,6 @@ public class HLog implements Syncable { // awaiting with a timeout doesn't always // throw exceptions on interrupt while(!this.isInterrupted() && !closeLogSyncer) { - try { if (unflushedEntries.get() <= syncedTillHere) { Thread.sleep(this.optionalFlushInterval); @@ -1217,24 +1216,31 @@ public class HLog implements Syncable { pendingWrites.add(e); } + synchronized long doAppend(final Writer writer) throws IOException { + long upto = unflushedEntries.get(); + doAppend(writer, getPendingWrites()); + return upto; + } + // Returns all currently pending writes. New writes // will accumulate in a new list. - synchronized List getPendingWrites() { + private synchronized List getPendingWrites() { List save = this.pendingWrites; this.pendingWrites = new LinkedList(); return save; } - // writes out pending entries to the HLog - void hlogFlush(Writer writer, List pending) throws IOException { - if (pending == null) return; - - // write out all accumulated Entries to hdfs. + // Writes out pending entries to the HLog + private List doAppend(Writer writer, List pending) throws IOException { + if (pending == null) return pending; + // Write out all accumulated Entries to hdfs. + // TODO: Pass a List or array of Entries rather than append one at a time. for (Entry e : pending) { writer.append(e); } + return pending; } - + void close(){ closeLogSyncer = true; } @@ -1254,34 +1260,13 @@ public class HLog implements Syncable { } // if the transaction that we are interested in is already // synced, then return immediately. - if (txid <= this.syncedTillHere) { - return; - } + if (txid <= this.syncedTillHere) return; try { - long doneUpto = this.unflushedEntries.get(); long now = System.currentTimeMillis(); - // Done in parallel for all writer threads, thanks to HDFS-895 - List pending = logSyncerThread.getPendingWrites(); - try { - // First flush all the pending writes to HDFS. Then - // issue the sync to HDFS. If sync is successful, then update - // syncedTillHere to indicate that transactions till this - // number has been successfully synced. - logSyncerThread.hlogFlush(tempWriter, pending); - pending = null; - tempWriter.sync(); - syncBatchSize.addAndGet(doneUpto - this.syncedTillHere); - this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto); - } catch(IOException io) { - synchronized (this.updateLock) { - // HBASE-4387, HBASE-5623, retry with updateLock held - tempWriter = this.writer; - logSyncerThread.hlogFlush(tempWriter, pending); - tempWriter.sync(); - syncBatchSize.addAndGet(doneUpto - this.syncedTillHere); - this.syncedTillHere = doneUpto; - } - } + long upto = this.logSyncerThread.doAppend(tempWriter); + // Could we oversync? Call sync more than we need to? + tempWriter.sync(); + this.syncedTillHere = Math.max(this.syncedTillHere, upto); // We try to not acquire the updateLock just to update statistics. // Make these statistics as AtomicLong. syncTime.inc(System.currentTimeMillis() - now);