diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 2af14b8..9cbbea5 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InterruptedIOException; import java.io.OutputStream; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -122,7 +123,10 @@ class FSHLog implements HLog, Syncable { private final long blocksize; private final String prefix; private final AtomicLong unflushedEntries = new AtomicLong(0); - private volatile long syncedTillHere = 0; + /** + * pointer for upto what txid the edits have been synced (or tried to be synced). + */ + private volatile AtomicLong syncedTillHere = new AtomicLong(0); private long lastDeferredTxid; private final Path oldLogDir; private volatile boolean logRollRunning; @@ -256,6 +260,27 @@ class FSHLog implements HLog, Syncable { new ConcurrentSkipListMap>(LOG_NAME_COMPARATOR); /** + * Syncer threads which does the sync to hdfs. A syncer thread is responsible for emptying the + * buffer (guaranteeing in-order emptying and flushing it to underlying stream), and syncing to + * HDFS. A sync call is the most expensive call and here FSHLog is creating a dedicated set of + * syncers. All client threads (i.e., Regionserver handlers, LogSyncer thread) notifies the + * syncers for the pending writes that are to be synced; and one of the syncer thread does the + * sync it to HDFS and notifies them. + */ + private Syncer[] syncers; + + /** + * In case the flush/sync fails, we maintain what txid is failed. + */ + private final AtomicLong failedTxid = new AtomicLong(0); + /** + * On successful sync, it records the txid. It is useful to determine when throwing error to + * the callers when a sync has failed. // TODO make it more robust. + */ + private final AtomicLong successFulTxid = new AtomicLong(0); + private IOException failedIoe = null; + + /** * Constructor. * * @param fs filesystem handle @@ -397,6 +422,7 @@ class FSHLog implements HLog, Syncable { throw new IOException("Unable to mkdir " + this.oldLogDir); } } + //initSyncers(); // rollWriter sets this.hdfs_out if it can. rollWriter(); @@ -415,6 +441,16 @@ class FSHLog implements HLog, Syncable { coprocessorHost = new WALCoprocessorHost(this, conf); this.metrics = new MetricsWAL(); + initSyncers(); + } + + private void initSyncers() { + this.syncers = new Syncer[3];// TODO make it configurable. + for (int i = 0; i < 3; i ++) syncers[i] = new Syncer(); + for (Syncer syncer : this.syncers) { + LOG.warn("STARTING SYNCER: " + syncer.getName()); + syncer.start(); + } } /** @@ -482,6 +518,8 @@ class FSHLog implements HLog, Syncable { @Override public byte [][] rollWriter(boolean force) throws FailedLogCloseException, IOException { + LOG.debug("rollWriter: rolling of writer"); + synchronized (rollWriterLock) { // Return if nothing to flush. if (!force && this.writer != null && this.numEntries.get() <= 0) { @@ -716,12 +754,13 @@ class FSHLog implements HLog, Syncable { try { // Wait till all current transactions are written to the hlog. // No new transactions can occur because we have the updatelock. - if (this.unflushedEntries.get() != this.syncedTillHere) { + if (this.unflushedEntries.get() != this.syncedTillHere.get()) { LOG.debug("cleanupCurrentWriter " + " waiting for transactions to get synced " + " total " + this.unflushedEntries.get() + " synced till here " + syncedTillHere); - sync(); + //sync(); + doHDFSSync(unflushedEntries.get()); } this.writer.close(); this.writer = null; @@ -866,6 +905,19 @@ class FSHLog implements HLog, Syncable { Thread.currentThread().interrupt(); } } + // close all syncer threads. + try { + LOG.warn("Closing Syncer threads: "); + for (Syncer syncer : this.syncers) { + syncer.setClose(true); + syncer.interrupt(); + syncer.join(); + } + } catch (InterruptedException ie) { + LOG.error("Exception while waiting for syncer threads to close"); + Thread.currentThread().interrupt(); + } + try { // Prevent all further flushing and rolling. closeBarrier.stopAndDrainOps(); @@ -998,6 +1050,7 @@ class FSHLog implements HLog, Syncable { now, htd, false, isInMemstore, sequenceId, nonceGroup, nonce); } + /** * This class is responsible to hold the HLog's appended Entry list * and to sync them according to a configurable interval. @@ -1032,7 +1085,7 @@ class FSHLog implements HLog, Syncable { while(!this.isInterrupted() && !closeLogSyncer.get()) { try { - if (unflushedEntries.get() <= syncedTillHere) { + if (unflushedEntries.get() <= syncedTillHere.get()) { synchronized (closeLogSyncer) { closeLogSyncer.wait(this.optionalFlushInterval); } @@ -1092,13 +1145,81 @@ class FSHLog implements HLog, Syncable { syncer(this.unflushedEntries.get()); // sync all pending items } - // sync all transactions upto the specified txid - private void syncer(long txid) throws IOException { - // if the transaction that we are interested in is already - // synced, then return immediately. - if (txid <= this.syncedTillHere) { - return; + /** + * The syncer thread which does the syncing. + *

+ * It runs in a loop unless interrupted, and every iteration target sync point to sync: + * sync to that point, and notifies all other threads (usually handler threads), which are + * waiting on {@link FSHLog#syncedTillHere}. + */ + private class Syncer extends HasThread { + private long targetSyncPoint = 0; + private Object syncLock = new Object(); + private volatile boolean isSyncing = false; + private volatile boolean close = false; + + private Syncer() { + super("Syncer - " + System.nanoTime()); + } + + public void setTargetSyncPoint(long targetSyncPoint) { + synchronized (this.syncLock) { + if (targetSyncPoint < this.targetSyncPoint) return; + this.targetSyncPoint = targetSyncPoint; + this.syncLock.notifyAll(); + } + } + public void setClose(boolean close) { + this.close = close; + } + + // do the real syncing logic. + @Override + public void run() { + try { + while (!this.isInterrupted() && !this.close) { + synchronized (this.syncLock) { + while (this.targetSyncPoint <= syncedTillHere.get()) { + this.syncLock.wait(); + } + } + // do the real syncing. + try { + this.isSyncing = true; + doHDFSSync(this.targetSyncPoint); + } catch (IOException e) { + LOG.error("Error while syncing, requesting close of hlog (targetSyncPoint) " + + targetSyncPoint, e); + failedTxid.set(targetSyncPoint); + failedIoe = e; + requestLogRoll(); + } + this.targetSyncPoint = syncedTillHere.longValue(); // no need of lock here; + // as the later is always increasing. + this.isSyncing = false; + // this block is needed in case the above sync call returns early. + // TODO: re-consider it. + synchronized(syncedTillHere) { + syncedTillHere.notifyAll(); + } + } + } catch (InterruptedException ie) { + LOG.debug("Interrupted while syncing... " + this.getName()); + LOG.debug(ie); + } finally { + LOG.warn("Completing Syncer thread: " + this.getName()); + } + } + } + + /** + * Does the sync using the existing logic (takes the updateLock for writer, + * takes the flush lock and flushes the buffer to the stream and then invoke sync api). + * @param txid + * @throws IOException + */ + private void doHDFSSync(long txid) throws IOException { Writer tempWriter; synchronized (this.updateLock) { if (this.closed) return; @@ -1108,73 +1229,97 @@ class FSHLog implements HLog, Syncable { // See HBASE-4387, HBASE-5623, HBASE-7329. tempWriter = this.writer; } - try { - long doneUpto; - long now = EnvironmentEdgeManager.currentTimeMillis(); - // First flush all the pending writes to HDFS. Then - // issue the sync to HDFS. If sync is successful, then update - // syncedTillHere to indicate that transactions till this - // number has been successfully synced. - IOException ioe = null; - List pending = null; - synchronized (flushLock) { - if (txid <= this.syncedTillHere) { - return; - } - doneUpto = this.unflushedEntries.get(); - pending = logSyncer.getPendingWrites(); - try { - logSyncer.hlogFlush(tempWriter, pending); - } catch(IOException io) { - ioe = io; - LOG.error("syncer encountered error, will retry. txid=" + txid, ioe); - } - } - if (ioe != null && pending != null) { - synchronized (this.updateLock) { - synchronized (flushLock) { - // HBASE-4387, HBASE-5623, retry with updateLock held - tempWriter = this.writer; - logSyncer.hlogFlush(tempWriter, pending); - } - } - } - // another thread might have sync'ed avoid double-sync'ing - if (txid <= this.syncedTillHere) { + // empty the writebuffer holding the flushlock. + IOException ioe = null; + List pending = null; + long doneUpto; + long now = EnvironmentEdgeManager.currentTimeMillis(); +// LOG.warn(this.hashCode() + " : doHDFSSync, trying to sync: " + txid +", syncedTillHere.get = " + +// this.syncedTillHere.get() + ", failedTxid: "+ this.failedTxid.get()); + + synchronized (flushLock) { + if (txid <= this.syncedTillHere.get()) { return; } + doneUpto = this.unflushedEntries.get(); + pending = logSyncer.getPendingWrites(); try { - if (tempWriter != null) tempWriter.sync(); - } catch(IOException ex) { - synchronized (this.updateLock) { + logSyncer.hlogFlush(tempWriter, pending); + } catch(IOException io) { + ioe = io; + LOG.error("syncer encountered error, will retry. txid=" + txid, ioe); + } + } + if (ioe != null && pending != null) { + synchronized (this.updateLock) { + synchronized (flushLock) { // HBASE-4387, HBASE-5623, retry with updateLock held - // TODO: we don't actually need to do it for concurrent close - what is the point - // of syncing new unrelated writer? Keep behavior for now. tempWriter = this.writer; - if (tempWriter != null) tempWriter.sync(); + logSyncer.hlogFlush(tempWriter, pending); + } + } + } + // another thread might have sync'ed avoid double-sync'ing + if (txid <= this.syncedTillHere.get()) { + return; + } + try { + if (tempWriter != null) tempWriter.sync(); + } catch(IOException ex) { + synchronized (this.updateLock) { + // HBASE-4387, HBASE-5623, retry with updateLock held + // TODO: we don't actually need to do it for concurrent close - what is the point + // of syncing new unrelated writer? Keep behavior for now. + tempWriter = this.writer; + if (tempWriter != null) tempWriter.sync(); + } + } + + synchronized (this.syncedTillHere) { + this.syncedTillHere.set(Math.max(this.syncedTillHere.get(), doneUpto)); + this.successFulTxid.set(Math.max(syncedTillHere.longValue(), successFulTxid.get())); + } + + this.metrics.finishSync(EnvironmentEdgeManager.currentTimeMillis() - now); + // TODO: preserving the old behavior for now, but this check is strange. It's not + // protected by any locks here, so for all we know rolling locks might start + // as soon as we enter the "if". Is this best-effort optimization check? + if (!this.logRollRunning) { + checkLowReplication(); + try { + if (tempWriter.getLength() > this.logrollsize) { + requestLogRoll(); } + } catch (IOException x) { + LOG.debug("Log roll failed and will be retried. (This is not an error)"); } - this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto); - - this.metrics.finishSync(EnvironmentEdgeManager.currentTimeMillis() - now); - // TODO: preserving the old behavior for now, but this check is strange. It's not - // protected by any locks here, so for all we know rolling locks might start - // as soon as we enter the "if". Is this best-effort optimization check? - if (!this.logRollRunning) { - checkLowReplication(); + } + } + // sync all transactions upto the specified txid + private void syncer(long txid) throws IOException { + // block this thread and wait for the Syncer notification. + if (this.syncedTillHere.get() > txid) return; + synchronized (this.syncedTillHere) { + getASyncer().setTargetSyncPoint(txid); + while (this.syncedTillHere.get() < txid) { try { - if (tempWriter.getLength() > this.logrollsize) { - requestLogRoll(); - } - } catch (IOException x) { - LOG.debug("Log roll failed and will be retried. (This is not an error)"); + this.syncedTillHere.wait(); + if (txid <= failedTxid.get() && txid > this.successFulTxid.get()) + throw failedIoe; + } catch (InterruptedException ie) { + throw new InterruptedIOException( + "Got exception while waiting for sync to finish for txid: " + txid); } } - } catch (IOException e) { - LOG.fatal("Could not sync. Requesting roll of hlog", e); - requestLogRoll(); - throw e; + // sync is done; return. + } + } + private Syncer getASyncer() { + for (int i = 0; i < syncers.length; i++) { + if (!syncers[i].isSyncing) return syncers[i]; } + return syncers[2];// TODO RANDOMIZE IT. The HDFS sync api is not async; so there is no point + // in having concurrent calls but it is good to get max throughput. } private void checkLowReplication() { @@ -1442,7 +1587,7 @@ class FSHLog implements HLog, Syncable { /** Provide access to currently deferred sequence num for tests */ boolean hasDeferredEntries() { - return lastDeferredTxid > syncedTillHere; + return lastDeferredTxid > syncedTillHere.get(); } @Override diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java index 64acde6..b26a17b 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java @@ -120,7 +120,11 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool hlog.appendNoSync(hri, hri.getTable(), walEdit, clusters, now, htd, region.getSequenceId(), true, nonce, nonce); } else { - hlog.append(hri, hri.getTable(), walEdit, now, htd, region.getSequenceId()); + // this is how almost all users of HLog use it (all but compaction calls). + long txid = hlog.appendNoSync(hri, hri.getTable(), walEdit, clusters, now, htd, + region.getSequenceId(), true, nonce, nonce); + hlog.sync(txid); + } } long totalTime = (System.currentTimeMillis() - startTime);