Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (revision 1327371) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (working copy) @@ -230,6 +230,7 @@ // during an update // locked during appends private final Object updateLock = new Object(); + private final Object flushLock = new Object(); private final boolean enabled; @@ -297,7 +298,6 @@ private static Metric writeSize = new Metric(); // For measuring latency of syncs private static Metric syncTime = new Metric(); - private static AtomicLong syncBatchSize = new AtomicLong(); //For measuring slow HLog appends private static AtomicLong slowHLogAppendCount = new AtomicLong(); private static Metric slowHLogAppendTime = new Metric(); @@ -314,10 +314,6 @@ return syncTime.get(); } - public static long getSyncBatchSize() { - return syncBatchSize.getAndSet(0); - } - public static long getSlowAppendCount() { return slowHLogAppendCount.get(); } @@ -1258,32 +1254,43 @@ return; } try { - long doneUpto = this.unflushedEntries.get(); + long doneUpto; long now = System.currentTimeMillis(); - // Done in parallel for all writer threads, thanks to HDFS-895 - List pending = logSyncerThread.getPendingWrites(); + // First flush all the pending writes to HDFS. Then + // issue the sync to HDFS. If sync is successful, then update + // syncedTillHere to indicate that transactions till this + // number has been successfully synced. + synchronized (flushLock) { + if (txid <= this.syncedTillHere) { + return; + } + doneUpto = this.unflushedEntries.get(); + List pending = logSyncerThread.getPendingWrites(); + try { + logSyncerThread.hlogFlush(tempWriter, pending); + } catch(IOException io) { + synchronized (this.updateLock) { + // HBASE-4387, HBASE-5623, retry with updateLock held + tempWriter = this.writer; + logSyncerThread.hlogFlush(tempWriter, pending); + } + } + } + // another thread might have sync'ed avoid double-sync'ing + if (txid <= this.syncedTillHere) { + return; + } try { - // First flush all the pending writes to HDFS. Then - // issue the sync to HDFS. If sync is successful, then update - // syncedTillHere to indicate that transactions till this - // number has been successfully synced. - logSyncerThread.hlogFlush(tempWriter, pending); - pending = null; tempWriter.sync(); - syncBatchSize.addAndGet(doneUpto - this.syncedTillHere); - this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto); } catch(IOException io) { synchronized (this.updateLock) { // HBASE-4387, HBASE-5623, retry with updateLock held tempWriter = this.writer; - logSyncerThread.hlogFlush(tempWriter, pending); tempWriter.sync(); - syncBatchSize.addAndGet(doneUpto - this.syncedTillHere); - this.syncedTillHere = doneUpto; } } - // We try to not acquire the updateLock just to update statistics. - // Make these statistics as AtomicLong. + this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto); + syncTime.inc(System.currentTimeMillis() - now); if (!this.logRollRunning) { checkLowReplication();