commit 0e06ce18c1146a57b34d1c786b61e681cf8ef351 Author: Todd Lipcon Date: Mon Apr 16 22:24:57 2012 -0700 hlog fix diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index dc8eab4..3eee3ca 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -48,6 +48,7 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Matcher; import java.util.regex.Pattern; +import com.google.common.base.Charsets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -139,8 +140,10 @@ public class HLog implements Syncable { private final long optionalFlushInterval; private final long blocksize; private final String prefix; - private final AtomicLong unflushedEntries = new AtomicLong(0); - private volatile long syncedTillHere = 0; + + /** tracking information about what has been synced */ + private SyncInfo syncInfo = new SyncInfo(); + private long lastDeferredTxid; private final Path oldLogDir; private volatile boolean logRollRunning; @@ -834,17 +837,11 @@ public class HLog implements Syncable { try { // Wait till all current transactions are written to the hlog. // No new transactions can occur because we have the updatelock. - if (this.unflushedEntries.get() != this.syncedTillHere) { - LOG.debug("cleanupCurrentWriter " + - " waiting for transactions to get synced " + - " total " + this.unflushedEntries.get() + - " synced till here " + syncedTillHere); - sync(); - } + sync(); this.writer.close(); this.writer = null; closeErrorCount.set(0); - } catch (IOException e) { + } catch (Exception e) { LOG.error("Failed close of HLog writer", e); int errors = closeErrorCount.incrementAndGet(); if (errors <= closeErrorsTolerated && !hasDeferredEntries()) { @@ -1007,7 +1004,7 @@ public class HLog implements Syncable { * @param logEdit * @param logKey * @param doSync shall we sync after writing the transaction - * @return The txid of this transaction + * @return The seqnum of this transaction * @throws IOException */ public long append(HRegionInfo regionInfo, HLogKey logKey, WALEdit logEdit, @@ -1016,9 +1013,9 @@ public class HLog implements Syncable { if (this.closed) { throw new IOException("Cannot append; log is closed"); } - long txid = 0; + long seqNum; synchronized (updateLock) { - long seqNum = obtainSeqNum(); + seqNum = obtainSeqNum(); logKey.setLogSeqNum(seqNum); // The 'lastSeqWritten' map holds the sequence number of the oldest // write for each region (i.e. the first edit added to the particular @@ -1028,10 +1025,9 @@ public class HLog implements Syncable { this.lastSeqWritten.putIfAbsent(regionInfo.getEncodedNameAsBytes(), Long.valueOf(seqNum)); doWrite(regionInfo, logKey, logEdit, htd); - txid = this.unflushedEntries.incrementAndGet(); this.numEntries.incrementAndGet(); if (htd.isDeferredLogFlush()) { - lastDeferredTxid = txid; + lastDeferredTxid = seqNum; } } @@ -1041,9 +1037,9 @@ public class HLog implements Syncable { (regionInfo.isMetaRegion() || !htd.isDeferredLogFlush())) { // sync txn to file system - this.sync(txid); + this.sync(seqNum); } - return txid; + return seqNum; } /** @@ -1091,13 +1087,13 @@ public class HLog implements Syncable { private long append(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId, final long now, HTableDescriptor htd, boolean doSync) throws IOException { - if (edits.isEmpty()) return this.unflushedEntries.get();; + if (edits.isEmpty()) return this.logSeqNum.get(); if (this.closed) { throw new IOException("Cannot append; log is closed"); } - long txid = 0; + long seqNum; synchronized (this.updateLock) { - long seqNum = obtainSeqNum(); + seqNum = obtainSeqNum(); // The 'lastSeqWritten' map holds the sequence number of the oldest // write for each region (i.e. the first edit added to the particular // memstore). . When the cache is flushed, the entry for the @@ -1110,9 +1106,8 @@ public class HLog implements Syncable { HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterId); doWrite(info, logKey, edits, htd); this.numEntries.incrementAndGet(); - txid = this.unflushedEntries.incrementAndGet(); if (htd.isDeferredLogFlush()) { - lastDeferredTxid = txid; + lastDeferredTxid = seqNum; } } // Sync if catalog region, and if not then check if that table supports @@ -1121,9 +1116,9 @@ public class HLog implements Syncable { (info.isMetaRegion() || !htd.isDeferredLogFlush())) { // sync txn to file system - this.sync(txid); + this.sync(seqNum); } - return txid; + return seqNum; } /** @@ -1181,6 +1176,9 @@ public class HLog implements Syncable { // goal is to increase the batchsize for writing-to-hdfs as well as // sync-to-hdfs, so that we can get better system throughput. private List pendingWrites = new LinkedList(); + long lastSeqAppended = -1; + long lastSeqFlushed = -1; + private Object flushLock = new Object(); LogSyncer(long optionalFlushInterval) { this.optionalFlushInterval = optionalFlushInterval; @@ -1194,7 +1192,7 @@ public class HLog implements Syncable { while(!this.isInterrupted() && !closeLogSyncer) { try { - if (unflushedEntries.get() <= syncedTillHere) { + if (syncInfo.getLastSyncedTxId() >= logSeqNum.get()) { Thread.sleep(this.optionalFlushInterval); } sync(); @@ -1214,24 +1212,45 @@ public class HLog implements Syncable { // our own queue rather than writing it to the HDFS output stream because // HDFSOutputStream.writeChunk is not lightweight at all. synchronized void append(Entry e) throws IOException { + long seq = e.getKey().getLogSeqNum(); + assert seq > lastSeqAppended; + lastSeqAppended = seq; pendingWrites.add(e); } // Returns all currently pending writes. New writes // will accumulate in a new list. - synchronized List getPendingWrites() { - List save = this.pendingWrites; - this.pendingWrites = new LinkedList(); - return save; - } + long flushWritesTo(Writer writer) throws IOException { + synchronized (flushLock) { + List pending; - // writes out pending entries to the HLog - void hlogFlush(Writer writer, List pending) throws IOException { - if (pending == null) return; + synchronized (this) { + pending = pendingWrites; + pendingWrites = new LinkedList(); + } - // write out all accumulated Entries to hdfs. - for (Entry e : pending) { - writer.append(e); + boolean success = false; + try { + int numFlushed = 0; + for (Entry e : pending) { + writer.append(e); + long seq = e.getKey().getLogSeqNum(); + assert seq > lastSeqFlushed; + lastSeqFlushed = seq; + numFlushed++; + } + syncBatchSize.addAndGet(numFlushed); + success = true; + } finally { + if (!success) { + // push back our batch into the pending list + synchronized (this) { + pending.addAll(pendingWrites); + pendingWrites = pending; + } + } + } + return lastSeqFlushed; } } @@ -1240,9 +1259,31 @@ public class HLog implements Syncable { } } + private static class SyncInfo { + private long syncedTillHere = 0; + + synchronized long getLastSyncedTxId() { + return syncedTillHere; + } + + synchronized void notifySynced(long txid) { + if (txid > syncedTillHere) { + syncedTillHere = txid; + } + notifyAll(); + } + + synchronized void waitForSync(long txid) throws InterruptedException { + while (syncedTillHere < txid) { + wait(); + } + } + } + + // sync all known transactions private void syncer() throws IOException { - syncer(this.unflushedEntries.get()); // sync all pending items + syncer(logSeqNum.get()); // sync all pending } // sync all transactions upto the specified txid @@ -1254,34 +1295,24 @@ public class HLog implements Syncable { } // if the transaction that we are interested in is already // synced, then return immediately. - if (txid <= this.syncedTillHere) { + if (syncInfo.getLastSyncedTxId() >= txid) { return; } try { - long doneUpto = this.unflushedEntries.get(); long now = System.currentTimeMillis(); - // Done in parallel for all writer threads, thanks to HDFS-895 - List pending = logSyncerThread.getPendingWrites(); + long flushedSeqId; try { - // First flush all the pending writes to HDFS. Then - // issue the sync to HDFS. If sync is successful, then update - // syncedTillHere to indicate that transactions till this - // number has been successfully synced. - logSyncerThread.hlogFlush(tempWriter, pending); - pending = null; + flushedSeqId = logSyncerThread.flushWritesTo(tempWriter); tempWriter.sync(); - syncBatchSize.addAndGet(doneUpto - this.syncedTillHere); - this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto); } catch(IOException io) { synchronized (this.updateLock) { // HBASE-4387, HBASE-5623, retry with updateLock held tempWriter = this.writer; - logSyncerThread.hlogFlush(tempWriter, pending); + flushedSeqId = logSyncerThread.flushWritesTo(tempWriter); tempWriter.sync(); - syncBatchSize.addAndGet(doneUpto - this.syncedTillHere); - this.syncedTillHere = doneUpto; } } + syncInfo.notifySynced(flushedSeqId); // We try to not acquire the updateLock just to update statistics. // Make these statistics as AtomicLong. syncTime.inc(System.currentTimeMillis() - now); @@ -1538,14 +1569,15 @@ public class HLog implements Syncable { if (this.closed) { return; } - long txid = 0; + long seqNumOfCompletionEdit; synchronized (updateLock) { + seqNumOfCompletionEdit = obtainSeqNum(); long now = System.currentTimeMillis(); - WALEdit edit = completeCacheFlushLogEdit(); - HLogKey key = makeKey(encodedRegionName, tableName, logSeqId, + + WALEdit edit = completeCacheFlushLogEdit(logSeqId); + HLogKey key = makeKey(encodedRegionName, tableName, seqNumOfCompletionEdit, System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID); logSyncerThread.append(new Entry(key, edit)); - txid = this.unflushedEntries.incrementAndGet(); writeTime.inc(System.currentTimeMillis() - now); long len = 0; for (KeyValue kv : edit.getKeyValues()) { @@ -1555,7 +1587,7 @@ public class HLog implements Syncable { this.numEntries.incrementAndGet(); } // sync txn to file system - this.sync(txid); + this.sync(seqNumOfCompletionEdit); } finally { // updateLock not needed for removing snapshot's entry @@ -1566,9 +1598,17 @@ public class HLog implements Syncable { } } - private WALEdit completeCacheFlushLogEdit() { + private WALEdit completeCacheFlushLogEdit(long seqIdOfFlush) { + // The data is not actually used here - we just need to write + // something to the log to make sure we're still the owner of the + // pipeline. + byte[] data = Bytes.add( + COMPLETE_CACHE_FLUSH, + ":".getBytes(Charsets.UTF_8), + Bytes.toBytes(String.valueOf(seqIdOfFlush))); + KeyValue kv = new KeyValue(METAROW, METAFAMILY, null, - System.currentTimeMillis(), COMPLETE_CACHE_FLUSH); + System.currentTimeMillis(), data); WALEdit e = new WALEdit(); e.add(kv); return e; @@ -1844,7 +1884,7 @@ public class HLog implements Syncable { /** Provide access to currently deferred sequence num for tests */ boolean hasDeferredEntries() { - return lastDeferredTxid > syncedTillHere; + return lastDeferredTxid > syncInfo.getLastSyncedTxId(); } /** @@ -1880,4 +1920,4 @@ public class HLog implements Syncable { System.exit(-1); } } -} \ No newline at end of file +} diff --git src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java index 2937765..8317aa7 100644 --- src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java +++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java @@ -533,8 +533,9 @@ public class TestHLog { KeyValue kv = val.getKeyValues().get(0); assertTrue(Bytes.equals(HLog.METAROW, kv.getRow())); assertTrue(Bytes.equals(HLog.METAFAMILY, kv.getFamily())); - assertEquals(0, Bytes.compareTo(HLog.COMPLETE_CACHE_FLUSH, - val.getKeyValues().get(0).getValue())); + assertTrue(Bytes.startsWith( + val.getKeyValues().get(0).getValue(), + HLog.COMPLETE_CACHE_FLUSH)); System.out.println(key + " " + val); } } finally { @@ -601,8 +602,9 @@ public class TestHLog { assertTrue(Bytes.equals(tableName, entry.getKey().getTablename())); assertTrue(Bytes.equals(HLog.METAROW, val.getRow())); assertTrue(Bytes.equals(HLog.METAFAMILY, val.getFamily())); - assertEquals(0, Bytes.compareTo(HLog.COMPLETE_CACHE_FLUSH, - val.getValue())); + assertTrue(Bytes.startsWith( + val.getValue(), + HLog.COMPLETE_CACHE_FLUSH)); System.out.println(entry.getKey() + " " + val); } } finally {