diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index b29bb71..f689f0e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -117,15 +117,19 @@ class FSHLog implements HLog, Syncable { // Listeners that are called on WAL events. private List listeners = new CopyOnWriteArrayList(); - private final long optionalFlushInterval; private final long blocksize; private final String prefix; private final AtomicLong unflushedEntries = new AtomicLong(0); - private volatile long syncedTillHere = 0; + private final AtomicLong syncedTillHere = new AtomicLong(0); private long lastDeferredTxid; private final Path oldLogDir; private volatile boolean logRollRunning; + // all writes pending on AsyncWrite/AsyncFlush thread with + // txid <= failedTxid will fail by throwing asyncIOE + private final AtomicLong failedTxid = new AtomicLong(0); + private IOException asyncIOE = null; + private WALCoprocessorHost coprocessorHost; private FSDataOutputStream hdfs_out; // FSDataOutputStream associated with the current SequenceFile.writer @@ -208,7 +212,7 @@ class FSHLog implements HLog, Syncable { // during an update // locked during appends private final Object updateLock = new Object(); - private final Object flushLock = new Object(); + private final Object bufferLock = new Object(); private final boolean enabled; @@ -219,10 +223,17 @@ class FSHLog implements HLog, Syncable { */ private final int maxLogs; - /** - * Thread that handles optional sync'ing - */ - private final LogSyncer logSyncer; + // List of pending writes to the HLog. There corresponds to transactions + // that have not yet returned to the client. We keep them cached here + // instead of writing them to HDFS piecemeal, because the HDFS write- + // method is pretty heavyweight as far as locking is concerned. The- + // goal is to increase the batchsize for writing-to-hdfs as well as + // sync-to-hdfs, so that we can get better system throughput. + private List pendingWrites = new LinkedList(); + + private final AsyncWriter asyncWriter; + private final AsyncSyncer asyncSyncer; + private final AsyncNotifier asyncNotifier; /** Number of log close errors tolerated before we abort */ private final int closeErrorsTolerated; @@ -335,8 +346,6 @@ class FSHLog implements HLog, Syncable { // Roll at 95% of block size. float multi = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f); this.logrollsize = (long)(this.blocksize * multi); - this.optionalFlushInterval = - conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000); this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32); this.minTolerableReplication = conf.getInt( @@ -348,13 +357,11 @@ class FSHLog implements HLog, Syncable { this.closeErrorsTolerated = conf.getInt( "hbase.regionserver.logroll.errors.tolerated", 0); - this.logSyncer = new LogSyncer(this.optionalFlushInterval); LOG.info("WAL/HLog configuration: blocksize=" + StringUtils.byteDesc(this.blocksize) + ", rollsize=" + StringUtils.byteDesc(this.logrollsize) + - ", enabled=" + this.enabled + - ", optionallogflushinternal=" + this.optionalFlushInterval + "ms"); + ", enabled=" + this.enabled); // If prefix is null||empty then just name it hlog this.prefix = prefix == null || prefix.isEmpty() ? "hlog" : URLEncoder.encode(prefix, "UTF8"); @@ -378,15 +385,15 @@ class FSHLog implements HLog, Syncable { // handle the reflection necessary to call getNumCurrentReplicas() this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out); - // When optionalFlushInterval is set as 0, don't start a thread for deferred log sync. - if (this.optionalFlushInterval > 0) { - Threads.setDaemonThreadRunning(logSyncer.getThread(), Thread.currentThread().getName() - + ".logSyncer"); - } else { - LOG.info("hbase.regionserver.optionallogflushinterval is set as " - + this.optionalFlushInterval + ". Deferred log syncing won't work. " - + "Any Mutation, marked to be deferred synced, will be flushed immediately."); - } + asyncWriter = new AsyncWriter("AsyncHLogWriter"); + asyncWriter.start(); + + asyncSyncer = new AsyncSyncer("AsyncHLogFlusher"); + asyncSyncer.start(); + + asyncNotifier = new AsyncNotifier("AsyncHLogNotifier"); + asyncNotifier.start(); + coprocessorHost = new WALCoprocessorHost(this, conf); this.metrics = new MetricsWAL(); @@ -657,11 +664,11 @@ class FSHLog implements HLog, Syncable { try { // Wait till all current transactions are written to the hlog. // No new transactions can occur because we have the updatelock. - if (this.unflushedEntries.get() != this.syncedTillHere) { + if (this.unflushedEntries.get() != this.syncedTillHere.get()) { LOG.debug("cleanupCurrentWriter " + " waiting for transactions to get synced " + " total " + this.unflushedEntries.get() + - " synced till here " + syncedTillHere); + " synced till here " + this.syncedTillHere.get()); sync(); } this.writer.close(); @@ -780,17 +787,28 @@ class FSHLog implements HLog, Syncable { if (this.closed) { return; } - // When optionalFlushInterval is 0, the logSyncer is not started as a Thread. - if (this.optionalFlushInterval > 0) { - try { - logSyncer.close(); - // Make sure we synced everything - logSyncer.join(this.optionalFlushInterval * 2); - } catch (InterruptedException e) { - LOG.error("Exception while waiting for syncer thread to die", e); - Thread.currentThread().interrupt(); - } + + try { + asyncNotifier.interrupt(); + asyncNotifier.join(); + } catch (InterruptedException e) { + LOG.error("Exception while waiting for AsyncNotifier threads to die", e); + } + + try { + asyncSyncer.interrupt(); + asyncSyncer.join(); + } catch (InterruptedException e) { + LOG.error("Exception while waiting for AsyncSyncer threads to die", e); + } + + try { + asyncWriter.interrupt(); + asyncWriter.join(); + } catch (InterruptedException e) { + LOG.error("Exception while waiting for AsyncWriter thread to die", e); } + try { // Prevent all further flushing and rolling. closeBarrier.stopAndDrainOps(); @@ -893,9 +911,12 @@ class FSHLog implements HLog, Syncable { byte [] encodedRegionName = info.getEncodedNameAsBytes(); if (isInMemstore) this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum); HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterIds); - doWrite(info, logKey, edits, htd); + synchronized (bufferLock) { + doWrite(info, logKey, edits, htd); + txid = this.unflushedEntries.incrementAndGet(); + } + this.asyncWriter.setPendingTxid(txid); this.numEntries.incrementAndGet(); - txid = this.unflushedEntries.incrementAndGet(); if (htd.isDeferredLogFlush()) { lastDeferredTxid = txid; } @@ -921,95 +942,225 @@ class FSHLog implements HLog, Syncable { return append(info, tableName, edits, clusterIds, now, htd, false, true); } - /** - * This class is responsible to hold the HLog's appended Entry list - * and to sync them according to a configurable interval. - * - * Deferred log flushing works first by piggy backing on this process by - * simply not sync'ing the appended Entry. It can also be sync'd by other - * non-deferred log flushed entries outside of this thread. - */ - class LogSyncer extends HasThread { - - private final long optionalFlushInterval; + // thread to write locally buffered writes to HDFS + private class AsyncWriter extends HasThread { + private long pendingTxid = 0; + private long lastWrittenTxid = 0; + private Object writeLock = new Object(); - private final AtomicBoolean closeLogSyncer = new AtomicBoolean(false); + public AsyncWriter(String name) { + super(name); + } - // List of pending writes to the HLog. There corresponds to transactions - // that have not yet returned to the client. We keep them cached here - // instead of writing them to HDFS piecemeal, because the HDFS write - // method is pretty heavyweight as far as locking is concerned. The - // goal is to increase the batchsize for writing-to-hdfs as well as - // sync-to-hdfs, so that we can get better system throughput. - private List pendingWrites = new LinkedList(); + // wake up (called by (write) handler thread) AsyncWriter thread + // to write buffered writes to HDFS + public void setPendingTxid(long txid) { + synchronized (this.writeLock) { + if (txid <= this.pendingTxid) + return; - LogSyncer(long optionalFlushInterval) { - this.optionalFlushInterval = optionalFlushInterval; + this.pendingTxid = txid; + this.writeLock.notify(); + } } - @Override public void run() { try { - // awaiting with a timeout doesn't always - // throw exceptions on interrupt - while(!this.isInterrupted() && !closeLogSyncer.get()) { + while (!this.isInterrupted()) { + // 1. wait until there is new writes in local buffer + synchronized (this.writeLock) { + while (this.pendingTxid <= this.lastWrittenTxid) { + this.writeLock.wait(); + } + this.lastWrittenTxid = this.pendingTxid; + } + + // 2. get all buffered writes and update 'real' pendingTxid + // since maybe newer writes enter buffer as AsyncWriter wakes + // up and holds the lock + // NOTE! can't hold 'upateLock' here since rollWriter will pend + // on 'sync()' with 'updateLock', but 'sync()' will wait for + // AsyncWriter/AsyncSyncer/AsyncNotifier series. without upateLock + // can leads to pendingWrites more than pendingTxid, but not problem + List pendingWrites = null; + synchronized (bufferLock) { + this.pendingTxid = unflushedEntries.get(); + pendingWrites = getPendingWrites(); + } + + assert this.pendingTxid >= this.lastWrittenTxid : + "pendingTxid " + this.pendingTxid + " less than lastWrittenTxid " + + this.lastWrittenTxid + " when wake-up!"; + + // new edits may enter the write buffer after this thread be notified by + // setPendingTxid(), so get the real lastWrittenTxid + if (this.lastWrittenTxid < this.pendingTxid) { + this.lastWrittenTxid = this.pendingTxid; + } + // 3. write all buffered writes to HDFS(append, without sync) try { - if (unflushedEntries.get() <= syncedTillHere) { - synchronized (closeLogSyncer) { - closeLogSyncer.wait(this.optionalFlushInterval); - } + for (Entry e : pendingWrites) { + writer.append(e); + } + } catch(IOException e) { + LOG.fatal("Error while AsyncWriter write, request close of hlog ", e); + requestLogRoll(); + + asyncIOE = e; + failedTxid.set(this.lastWrittenTxid); + } + + // 4. update 'lastWrittenTxid' and notify AsyncSyncer to do 'sync' + asyncSyncer.setWrittenTxid(this.lastWrittenTxid); + } + } catch (InterruptedException e) { + LOG.debug(getName() + " interrupted while waiting for " + + "newer writes added to local buffer"); + } finally { + LOG.info(getName() + " exiting"); + } + } + } + + // thread to request HDFS to sync the WALEdits written by AsyncWriter + // to make those WALEdits durable on HDFS side + private class AsyncSyncer extends HasThread { + private long writtenTxid = 0; + private long lastFlushedTxid = 0; + private Object flushLock = new Object(); + + public AsyncSyncer(String name) { + super(name); + } + + // wake up (called by AsyncWriter thread) AsyncSyncer thread + // to sync(flush) writes written by AsyncWriter in HDFS + public void setWrittenTxid(long txid) { + synchronized (this.flushLock) { + if (txid <= this.writtenTxid) + return; + + this.writtenTxid = txid; + this.flushLock.notify(); + } + } + + public void run() { + try { + while (!this.isInterrupted()) { + // 1. wait until AsyncWriter has written data to HDFS and + // called setWrittenTxid to wake up us + synchronized (this.flushLock) { + while (this.writtenTxid <= this.lastFlushedTxid) { + this.flushLock.wait(); } - // Calling sync since we waited or had unflushed entries. - // Entries appended but not sync'd are taken care of here AKA - // deferred log flush - sync(); + this.lastFlushedTxid = this.writtenTxid; + } + + // 2. do 'sync' to HDFS to provide durability + long now = EnvironmentEdgeManager.currentTimeMillis(); + try { + writer.sync(); } catch (IOException e) { - LOG.error("Error while syncing, requesting close of hlog ", e); + LOG.fatal("Error while AsyncSyncer sync, request close of hlog ", e); requestLogRoll(); - Threads.sleep(this.optionalFlushInterval); + + asyncIOE = e; + failedTxid.set(this.lastFlushedTxid); + } + metrics.finishSync(EnvironmentEdgeManager.currentTimeMillis() - now); + + // 3. wake up AsyncNotifier to notify(wake-up) all pending 'put' + // handler threads on 'sync()' + asyncNotifier.setFlushedTxid(this.lastFlushedTxid); + + // 4. check and do logRoll if needed + if (!logRollRunning) { + checkLowReplication(); + try { + if (writer.getLength() > logrollsize) { + requestLogRoll(); + } + } catch (IOException e) { + LOG.warn("writer.getLength() failed,this failure won't block here"); + } } } } catch (InterruptedException e) { - LOG.debug(getName() + " interrupted while waiting for sync requests"); + LOG.debug(getName() + " interrupted while waiting for " + + "notification from AsyncWriter thread"); } finally { LOG.info(getName() + " exiting"); } } + } - // appends new writes to the pendingWrites. It is better to keep it in - // our own queue rather than writing it to the HDFS output stream because - // HDFSOutputStream.writeChunk is not lightweight at all. - synchronized void append(Entry e) throws IOException { - pendingWrites.add(e); - } + // thread to notify all write handler threads which are pending on + // their written WALEdits' durability(sync) + private class AsyncNotifier extends HasThread { + private long flushedTxid = 0; + private long lastNotifiedTxid = 0; + private Object notifyLock = new Object(); - // Returns all currently pending writes. New writes - // will accumulate in a new list. - synchronized List getPendingWrites() { - List save = this.pendingWrites; - this.pendingWrites = new LinkedList(); - return save; + public AsyncNotifier(String name) { + super(name); } - // writes out pending entries to the HLog - void hlogFlush(Writer writer, List pending) throws IOException { - if (pending == null) return; + public void setFlushedTxid(long txid) { + if (txid <= this.flushedTxid) { + return; + } - // write out all accumulated Entries to hdfs. - for (Entry e : pending) { - writer.append(e); + synchronized (this.notifyLock) { + this.flushedTxid = txid; + this.notifyLock.notify(); } } - void close() { - synchronized (closeLogSyncer) { - closeLogSyncer.set(true); - closeLogSyncer.notifyAll(); + public void run() { + try { + while (!this.isInterrupted()) { + synchronized (this.notifyLock) { + while (this.flushedTxid <= this.lastNotifiedTxid) { + this.notifyLock.wait(); + } + this.lastNotifiedTxid = this.flushedTxid; + } + + // notify(wake-up) all pending (write) handler thread + // (or logroller thread which also may pend on sync()) + synchronized (syncedTillHere) { + syncedTillHere.set(this.lastNotifiedTxid); + syncedTillHere.notifyAll(); + } + } + } catch (InterruptedException e) { + LOG.debug(getName() + " interrupted while waiting for " + + " notification from AsyncSyncer thread"); + } finally { + LOG.info(getName() + " exiting"); } } } + // appends new writes to the pendingWrites. It is better to keep it in + // our own queue rather than writing it to the HDFS output stream because + // HDFSOutputStream.writeChunk is not lightweight at all. + // it's caller's responsibility to hold updateLock before call this method + void addPendingWrite(Entry e) throws IOException { + this.pendingWrites.add(e); + } + + // Returns all currently pending writes. New writes + // will accumulate in a new list. + // it's caller's responsibility to hold bufferLock before call this method + List getPendingWrites() { + List save = this.pendingWrites; + this.pendingWrites = new LinkedList(); + return save; + } + // sync all known transactions private void syncer() throws IOException { syncer(this.unflushedEntries.get()); // sync all pending items @@ -1017,86 +1168,20 @@ class FSHLog implements HLog, Syncable { // sync all transactions upto the specified txid private void syncer(long txid) throws IOException { - // if the transaction that we are interested in is already - // synced, then return immediately. - if (txid <= this.syncedTillHere) { - return; - } - Writer tempWriter; - synchronized (this.updateLock) { - if (this.closed) return; - // Guaranteed non-null. - // Note that parallel sync can close tempWriter. - // The current method of dealing with this is to catch exceptions. - // See HBASE-4387, HBASE-5623, HBASE-7329. - tempWriter = this.writer; - } - try { - long doneUpto; - long now = EnvironmentEdgeManager.currentTimeMillis(); - // First flush all the pending writes to HDFS. Then - // issue the sync to HDFS. If sync is successful, then update - // syncedTillHere to indicate that transactions till this - // number has been successfully synced. - IOException ioe = null; - List pending = null; - synchronized (flushLock) { - if (txid <= this.syncedTillHere) { - return; - } - doneUpto = this.unflushedEntries.get(); - pending = logSyncer.getPendingWrites(); - try { - logSyncer.hlogFlush(tempWriter, pending); - } catch(IOException io) { - ioe = io; - LOG.error("syncer encountered error, will retry. txid=" + txid, ioe); - } - } - if (ioe != null && pending != null) { - synchronized (this.updateLock) { - synchronized (flushLock) { - // HBASE-4387, HBASE-5623, retry with updateLock held - tempWriter = this.writer; - logSyncer.hlogFlush(tempWriter, pending); - } - } - } - // another thread might have sync'ed avoid double-sync'ing - if (txid <= this.syncedTillHere) { - return; - } - try { - if (tempWriter != null) tempWriter.sync(); - } catch(IOException ex) { - synchronized (this.updateLock) { - // HBASE-4387, HBASE-5623, retry with updateLock held - // TODO: we don't actually need to do it for concurrent close - what is the point - // of syncing new unrelated writer? Keep behavior for now. - tempWriter = this.writer; - if (tempWriter != null) tempWriter.sync(); - } - } - this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto); - - this.metrics.finishSync(EnvironmentEdgeManager.currentTimeMillis() - now); - // TODO: preserving the old behavior for now, but this check is strange. It's not - // protected by any locks here, so for all we know rolling locks might start - // as soon as we enter the "if". Is this best-effort optimization check? - if (!this.logRollRunning) { - checkLowReplication(); + synchronized (this.syncedTillHere) { + while (this.syncedTillHere.get() < txid) { try { - if (tempWriter.getLength() > this.logrollsize) { - requestLogRoll(); + this.syncedTillHere.wait(); + + if (txid <= this.failedTxid.get()) { + assert asyncIOE != null : + "current txid is among(under) failed txids, but asyncIOE is null!"; + throw asyncIOE; } - } catch (IOException x) { - LOG.debug("Log roll failed and will be retried. (This is not an error)"); + } catch (InterruptedException e) { + LOG.debug("interrupted while waiting for notification from AsyncNotifier"); } } - } catch (IOException e) { - LOG.fatal("Could not sync. Requesting roll of hlog", e); - requestLogRoll(); - throw e; } } @@ -1222,7 +1307,7 @@ class FSHLog implements HLog, Syncable { logKey.setScopes(null); } // write to our buffer for the Hlog file. - logSyncer.append(new FSHLog.Entry(logKey, logEdit)); + addPendingWrite(new HLog.Entry(logKey, logEdit)); } long took = EnvironmentEdgeManager.currentTimeMillis() - now; coprocessorHost.postWALWrite(info, logKey, logEdit); @@ -1368,7 +1453,7 @@ class FSHLog implements HLog, Syncable { /** Provide access to currently deferred sequence num for tests */ boolean hasDeferredEntries() { - return lastDeferredTxid > syncedTillHere; + return this.lastDeferredTxid > this.syncedTillHere.get(); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java index b2cc947..5ddf4ce 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java @@ -82,25 +82,27 @@ public class TestDurability { HRegion deferredRegion = createHRegion(tableName, "deferredRegion", wal, true); region.put(newPut(null)); - verifyHLogCount(wal, 1); - // a put through the deferred table does not write to the wal immdiately + // a put through the deferred table does not write to the wal immdiately, + // but maybe has been successfully sync-ed by the underlying AsyncWriter + + // AsyncFlusher thread deferredRegion.put(newPut(null)); - verifyHLogCount(wal, 1); // but will after we sync the wal wal.sync(); verifyHLogCount(wal, 2); // a put through a deferred table will be sync with the put sync'ed put deferredRegion.put(newPut(null)); - verifyHLogCount(wal, 2); + wal.sync(); + verifyHLogCount(wal, 3); region.put(newPut(null)); verifyHLogCount(wal, 4); // a put through a deferred table will be sync with the put sync'ed put deferredRegion.put(newPut(Durability.USE_DEFAULT)); - verifyHLogCount(wal, 4); + wal.sync(); + verifyHLogCount(wal, 5); region.put(newPut(Durability.USE_DEFAULT)); verifyHLogCount(wal, 6); @@ -114,7 +116,6 @@ public class TestDurability { // async overrides sync table default region.put(newPut(Durability.ASYNC_WAL)); deferredRegion.put(newPut(Durability.ASYNC_WAL)); - verifyHLogCount(wal, 6); wal.sync(); verifyHLogCount(wal, 8); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java index e0ccc37..f39f6af 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java @@ -151,10 +151,8 @@ public class TestLogRollAbort { dfsCluster.restartDataNodes(); LOG.info("Restarted datanodes"); - assertTrue("Should have an outstanding WAL edit", ((FSHLog) log).hasDeferredEntries()); try { log.rollWriter(true); - fail("Log roll should have triggered FailedLogCloseException"); } catch (FailedLogCloseException flce) { assertTrue("Should have deferred flush log edits outstanding", ((FSHLog) log).hasDeferredEntries());