diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 7fabbe8..2d7b047 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -39,6 +39,7 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; @@ -137,8 +138,10 @@ public class HLog implements Syncable { private final long optionalFlushInterval; private final long blocksize; private final String prefix; - private final AtomicLong unflushedEntries = new AtomicLong(0); - private volatile long syncedTillHere = 0; + + // The highest + private SyncInfo syncInfo; + private long lastDeferredTxid; private final Path oldLogDir; private volatile boolean logRollRunning; @@ -833,13 +836,7 @@ public class HLog implements Syncable { try { // Wait till all current transactions are written to the hlog. // No new transactions can occur because we have the updatelock. - if (this.unflushedEntries.get() != this.syncedTillHere) { - LOG.debug("cleanupCurrentWriter " + - " waiting for transactions to get synced " + - " total " + this.unflushedEntries.get() + - " synced till here " + syncedTillHere); - sync(); - } + waitForSeqNumToSync(this.logSeqNum.get()); this.writer.close(); this.writer = null; closeErrorCount.set(0); @@ -1006,7 +1003,7 @@ public class HLog implements Syncable { * @param logEdit * @param logKey * @param doSync shall we sync after writing the transaction - * @return The txid of this transaction + * @return The seqnum of this transaction * @throws IOException */ public long append(HRegionInfo regionInfo, HLogKey logKey, WALEdit logEdit, @@ -1015,9 +1012,9 @@ public class HLog implements Syncable { if (this.closed) { throw new IOException("Cannot append; log is closed"); } - long txid = 0; + long seqNum; synchronized (updateLock) { - long seqNum = obtainSeqNum(); + seqNum = obtainSeqNum(); logKey.setLogSeqNum(seqNum); // The 'lastSeqWritten' map holds the sequence number of the oldest // write for each region (i.e. the first edit added to the particular @@ -1027,10 +1024,9 @@ public class HLog implements Syncable { this.lastSeqWritten.putIfAbsent(regionInfo.getEncodedNameAsBytes(), Long.valueOf(seqNum)); doWrite(regionInfo, logKey, logEdit, htd); - txid = this.unflushedEntries.incrementAndGet(); this.numEntries.incrementAndGet(); if (htd.isDeferredLogFlush()) { - lastDeferredTxid = txid; + lastDeferredTxid = seqNum; } } @@ -1040,9 +1036,9 @@ public class HLog implements Syncable { (regionInfo.isMetaRegion() || !htd.isDeferredLogFlush())) { // sync txn to file system - this.sync(txid); + this.sync(seqNum); } - return txid; + return seqNum; } /** @@ -1090,11 +1086,10 @@ public class HLog implements Syncable { private long append(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId, final long now, HTableDescriptor htd, boolean doSync) throws IOException { - if (edits.isEmpty()) return this.unflushedEntries.get();; + if (edits.isEmpty()) return this.logSeqNum.get(); if (this.closed) { throw new IOException("Cannot append; log is closed"); } - long txid = 0; synchronized (this.updateLock) { long seqNum = obtainSeqNum(); // The 'lastSeqWritten' map holds the sequence number of the oldest @@ -1109,9 +1104,8 @@ public class HLog implements Syncable { HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterId); doWrite(info, logKey, edits, htd); this.numEntries.incrementAndGet(); - txid = this.unflushedEntries.incrementAndGet(); if (htd.isDeferredLogFlush()) { - lastDeferredTxid = txid; + lastDeferredTxid = seqNum; } } // Sync if catalog region, and if not then check if that table supports @@ -1120,9 +1114,9 @@ public class HLog implements Syncable { (info.isMetaRegion() || !htd.isDeferredLogFlush())) { // sync txn to file system - this.sync(txid); + this.sync(seqNum); } - return txid; + return seqNum; } /** @@ -1180,6 +1174,8 @@ public class HLog implements Syncable { // goal is to increase the batchsize for writing-to-hdfs as well as // sync-to-hdfs, so that we can get better system throughput. private List pendingWrites = new LinkedList(); + long lastSeqAppended = -1; + private Object flushLock = new Object(); LogSyncer(long optionalFlushInterval) { this.optionalFlushInterval = optionalFlushInterval; @@ -1193,7 +1189,7 @@ public class HLog implements Syncable { while(!this.isInterrupted() && !closeLogSyncer) { try { - if (unflushedEntries.get() <= syncedTillHere) { + if (syncInfo.getLastSyncedTxId() >= logSeqNum.get()) { Thread.sleep(this.optionalFlushInterval); } sync(); @@ -1213,25 +1209,45 @@ public class HLog implements Syncable { // our own queue rather than writing it to the HDFS output stream because // HDFSOutputStream.writeChunk is not lightweight at all. synchronized void append(Entry e) throws IOException { + long seq = e.getKey().getLogSeqNum(); + assert seq > lastSeqAppended; + lastSeqAppended = seq; pendingWrites.add(e); } // Returns all currently pending writes. New writes // will accumulate in a new list. - synchronized List getPendingWrites() { - List save = this.pendingWrites; - this.pendingWrites = new LinkedList(); - return save; - } + long flushWritesTo(Writer writer) { + long lastFlushedSeq = 0; + + synchronized (flushLock) { + List pending; - // writes out pending entries to the HLog - void hlogFlush(Writer writer, List pending) throws IOException { - if (pending == null) return; + synchronized (this) { + pending = pendingWrites; + pendingWrites = new LinkedList(); + } - // write out all accumulated Entries to hdfs. - for (Entry e : pending) { - writer.append(e); + boolean success = false; + try { + for (Entry e : pending) { + writer.append(e); + long seq = e.getKey().getLogSeqNum(); + assert seq > lastFlushedSeq; + lastFlushedSeq = seq; + } + success = true; + } finally { + if (!success) { + // push back our batch into the pending list + synchronized (this) { + pending.addAll(pendingWrites); + pendingWrites = pending; + } + } + } } + return lastFlushedSeq; } void close(){ @@ -1239,9 +1255,31 @@ public class HLog implements Syncable { } } + private static class SyncInfo { + private long syncedTillHere = 0; + + synchronized long getLastSyncedTxId() { + return syncedTillHere; + } + + synchronized void notifySynced(long txid) { + if (txid > syncedTillHere) { + syncedTillHere = txid; + } + notifyAll(); + } + + synchronized void waitForSync(long txid) { + while (syncedTillHere < txid) { + wait(); + } + } + } + + // sync all known transactions private void syncer() throws IOException { - syncer(this.unflushedEntries.get()); // sync all pending items + syncer(logSeqNum.get()); // sync all pending } // sync all transactions upto the specified txid @@ -1253,34 +1291,25 @@ public class HLog implements Syncable { } // if the transaction that we are interested in is already // synced, then return immediately. - if (txid <= this.syncedTillHere) { + if (syncInfo.getLastSyncedTxId >= txid) { return; } try { - long doneUpto = this.unflushedEntries.get(); long now = System.currentTimeMillis(); - // Done in parallel for all writer threads, thanks to HDFS-895 - List pending = logSyncerThread.getPendingWrites(); + long flushedSeqId; try { - // First flush all the pending writes to HDFS. Then - // issue the sync to HDFS. If sync is successful, then update - // syncedTillHere to indicate that transactions till this - // number has been successfully synced. - logSyncerThread.hlogFlush(tempWriter, pending); - pending = null; + flushedSeqId = logSyncerThread.flushWritesTo(tempWriter); tempWriter.sync(); - syncBatchSize.addAndGet(doneUpto - this.syncedTillHere); - this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto); } catch(IOException io) { synchronized (this.updateLock) { // HBASE-4387, HBASE-5623, retry with updateLock held tempWriter = this.writer; - logSyncerThread.hlogFlush(tempWriter, pending); + flushedSeqId = logSyncerThread.flushWritesTo(tempWriter); tempWriter.sync(); - syncBatchSize.addAndGet(doneUpto - this.syncedTillHere); - this.syncedTillHere = doneUpto; } } + // TODO: restore metric syncBatchSize.addAndGet(doneUpto - this.syncedTillHere); + syncInfo.notifySynced(flushedSeqId); // We try to not acquire the updateLock just to update statistics. // Make these statistics as AtomicLong. syncTime.inc(System.currentTimeMillis() - now); @@ -1537,14 +1566,17 @@ public class HLog implements Syncable { if (this.closed) { return; } - long txid = 0; + long seqNumOfCompletionEdit; synchronized (updateLock) { + seqNumOfCompletionEdit = obtainSeqNum(); long now = System.currentTimeMillis(); + + // TODO: this is broken - we need to enclode logSeqId in the content + // of the edit, not in the _key_ of the edit! WALEdit edit = completeCacheFlushLogEdit(); - HLogKey key = makeKey(encodedRegionName, tableName, logSeqId, + HLogKey key = makeKey(encodedRegionName, tableName, seqNumOfCompletionEdit, System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID); logSyncerThread.append(new Entry(key, edit)); - txid = this.unflushedEntries.incrementAndGet(); writeTime.inc(System.currentTimeMillis() - now); long len = 0; for (KeyValue kv : edit.getKeyValues()) { @@ -1554,7 +1586,7 @@ public class HLog implements Syncable { this.numEntries.incrementAndGet(); } // sync txn to file system - this.sync(txid); + this.sync(seqNumOfCompletionEdit); } finally { // updateLock not needed for removing snapshot's entry @@ -1879,4 +1911,4 @@ public class HLog implements Syncable { System.exit(-1); } } -} \ No newline at end of file +}