diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AggressiveWALSwitchPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AggressiveWALSwitchPolicy.java new file mode 100644 index 0000000..da9339c --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AggressiveWALSwitchPolicy.java @@ -0,0 +1,44 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import org.apache.hadoop.conf.Configuration; + +/** + * The aggressive WAL switching policy is to do the switch if one sync + * op took more than the threshold time. + */ +public class AggressiveWALSwitchPolicy implements WALSwitchPolicy { + private final long switchThreshold; + + private Configuration conf; + public AggressiveWALSwitchPolicy(Configuration conf) { + this.conf = conf; + this.switchThreshold = this.conf.getLong("hbase.regionserver.wal.switch.threshold", + DEFAULT_SWITCH_THRESHOLD); + + } + + @Override + public boolean eligibleForSwitch(long lastSyncTimeTaken) { + if (lastSyncTimeTaken > switchThreshold) return true; + return false; + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index c479f16..d7f77a7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -34,6 +34,7 @@ import java.util.NavigableMap; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -41,7 +42,10 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -155,6 +159,7 @@ class FSHLog implements HLog, Syncable { */ private final RingBufferEventHandler ringBufferEventHandler; + private final int syncRunnerHandlerCount; /** * Map of {@link SyncFuture}s keyed by Handler objects. Used so we reuse SyncFutures. */ @@ -208,11 +213,40 @@ class FSHLog implements HLog, Syncable { * Current log file. */ volatile Writer writer; + /** + * If the wal switching feature is enabled or not + */ + private final boolean walSwitchEnabled; + /** + * Two writers to used for switching. If disabled, we just use one of them. + * Only one of them is active any given time. In case it slows dows, we switch to the other one. + */ + private Writer writer1; + private Writer writer2; + /** + * if the WAL is closing. + */ + private volatile boolean closing = false; + /** + * if the WAL switching is in progress. + */ + private volatile boolean switching = false; + /** + * The Sync ops monitoring thread. It monitors all the in-progress sync operations. + */ + private SyncLatencyWatcher syncLatencyWatcher; /** The barrier used to ensure that close() waits for all log rolls and flushes to finish. */ private final DrainBarrier closeBarrier = new DrainBarrier(); /** + * ThreadPool to submit sync operation. The sync calls to the underlying file system are called + * using this thread pool. This enables us better control on the call (for e.g., we can interrupt + * the SyncRunner thread to unblock it from the current call). + */ + private ThreadPoolExecutor syncPool; + + /** * This lock makes sure only one log roll runs at a time. Should not be taken while any other * lock is held. We don't just use synchronized because that results in bogus and tedious * findbugs warning when it thinks synchronized controls writer thread safety. It is held when @@ -233,9 +267,6 @@ class FSHLog implements HLog, Syncable { */ private final boolean forMeta; - // The timestamp (in ms) when the log file was created. - private final AtomicLong filenum = new AtomicLong(-1); - // Number of transactions in the current Hlog. private final AtomicInteger numEntries = new AtomicInteger(0); @@ -445,6 +476,9 @@ class FSHLog implements HLog, Syncable { this.fullPathOldLogDir = new Path(rootDir, oldLogDir); this.forMeta = forMeta; this.conf = conf; + this.walSwitchEnabled = this.conf + .getBoolean("hbase.regionserver.wal.switch.enabled", false); + LOG.debug("walSwitchEnabled=" + walSwitchEnabled); // Register listeners. if (listeners != null) { @@ -492,7 +526,12 @@ class FSHLog implements HLog, Syncable { } // rollWriter sets this.hdfs_out if it can. - rollWriter(); + if (walSwitchEnabled) { + this.writer1 = rollWriterInternal(false, this.writer1, false); + this.writer2 = rollWriterInternal(false, this.writer2, true); + } else { + rollWriterInternal(false, this.writer, true); + } // handle the reflection necessary to call getNumCurrentReplicas() this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out); @@ -505,6 +544,7 @@ class FSHLog implements HLog, Syncable { String hostingThreadName = Thread.currentThread().getName(); this.appendExecutor = Executors. newSingleThreadExecutor(Threads.getNamedThreadFactory(hostingThreadName + ".append")); + this.syncRunnerHandlerCount = conf.getInt("hbase.regionserver.hlog.syncer.count", 5); // Preallocate objects to use on the ring buffer. The way that appends and syncs work, we will // be stuck and make no progress if the buffer is filled with appends only and there is no // sync. If no sync, then the handlers will be outstanding just waiting on sync completion @@ -517,14 +557,25 @@ class FSHLog implements HLog, Syncable { new Disruptor(RingBufferTruck.EVENT_FACTORY, preallocatedEventCount, this.appendExecutor, ProducerType.MULTI, new BlockingWaitStrategy()); this.ringBufferEventHandler = - new RingBufferEventHandler(conf.getInt("hbase.regionserver.hlog.syncer.count", 5), - maxHandlersCount); + new RingBufferEventHandler(this.syncRunnerHandlerCount, maxHandlersCount); this.disruptor.handleExceptionsWith(new RingBufferExceptionHandler()); this.disruptor.handleEventsWith(new RingBufferEventHandler [] {this.ringBufferEventHandler}); // Presize our map of SyncFutures by handler objects. this.syncFuturesByHandler = new ConcurrentHashMap(maxHandlersCount); // Starting up threads in constructor is a no no; Interface should have an init call. this.disruptor.start(); + this.syncPool = new ThreadPoolExecutor(this.syncRunnerHandlerCount, + this.syncRunnerHandlerCount, 60000, TimeUnit.MILLISECONDS, new SynchronousQueue()); + this.syncPool.prestartCoreThread(); + this.syncPool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + if (walSwitchEnabled) initSyncLatencyWatcher(); + } + + // init the sync latency monitoring thread. + private void initSyncLatencyWatcher() { + this.syncLatencyWatcher = new SyncLatencyWatcher(); + this.syncLatencyWatcher.setDaemon(true); + this.syncLatencyWatcher.start(); } /** @@ -565,7 +616,10 @@ class FSHLog implements HLog, Syncable { @Override public long getFilenum() { - return this.filenum.get(); + if (this.writer != null) { + return this.writer.getCurrentFileNum().get(); + } + return 0; } /** @@ -585,18 +639,38 @@ class FSHLog implements HLog, Syncable { return rollWriter(false); } + /** + * @param w + * @return a new Path for a writer + * @throws IOException + */ + private Path getNewPathForWriter(Writer w) throws IOException { + long fileNum = EnvironmentEdgeManager.currentTimeMillis(); + w.getCurrentFileNum().set(fileNum); + Path newPath = computeFilename(fileNum); + while(fs.exists(newPath)) { + fileNum = w.getCurrentFileNum().incrementAndGet(); + newPath = computeFilename(fileNum); + } + return newPath; + } + private Path getNewPath() throws IOException { - this.filenum.set(System.currentTimeMillis()); - Path newPath = computeFilename(); + AtomicLong fileNum = new AtomicLong(System.currentTimeMillis()); + Path newPath = computeFilename(fileNum.get()); while (fs.exists(newPath)) { - this.filenum.incrementAndGet(); - newPath = computeFilename(); + fileNum.incrementAndGet(); + newPath = computeFilename(fileNum.get()); } return newPath; } + @VisibleForTesting Path getOldPath() { - long currentFilenum = this.filenum.get(); + long currentFilenum = 0l; + if (this.writer != null) { + currentFilenum = this.writer.getCurrentFileNum().get(); + } Path oldPath = null; if (currentFilenum > 0) { // ComputeFilename will take care of meta hlog filename @@ -649,44 +723,77 @@ class FSHLog implements HLog, Syncable { @Override public byte [][] rollWriter(boolean force) throws FailedLogCloseException, IOException { + byte[][]regionsToFlush = null; + rollWriterLock.lock(); + try { + // replace the current write with the new writer + rollWriterInternal(force, this.writer, true); + // Can we delete any of the old log files? + if (getNumRolledLogFiles() > 0) { + cleanOldLogs(); + regionsToFlush = findRegionsToForceFlush(); + } + return regionsToFlush; + } finally { + rollWriterLock.unlock(); + } + } + + /** + * Internal method to roll a writer (rolling means closing the passed writer, and creating + * a new writer). + * @param force roll even if is no entries are present. + * @param writer the writer which is to be roll. + * @param replaceCurrentWriter whether to replace the "this.writer" ({@link FSHLog#writer} with + * the newly rolled writer. + * @return The new (rolled) writer + * @throws IOException + */ + private Writer rollWriterInternal(boolean force, Writer writer, boolean replaceCurrentWriter) + throws IOException { rollWriterLock.lock(); try { // Return if nothing to flush. - if (!force && (this.writer != null && this.numEntries.get() <= 0)) return null; - byte [][] regionsToFlush = null; + if (!force && (writer != null && this.numEntries.get() <= 0)) return null; if (this.closed) { LOG.debug("HLog closed. Skipping rolling of writer"); - return regionsToFlush; + return null; } if (!closeBarrier.beginOp()) { LOG.debug("HLog closing. Skipping rolling of writer"); - return regionsToFlush; + return null; } + FSHLog.Writer nextWriter = null; try { - Path oldPath = getOldPath(); - Path newPath = getNewPath(); + Path oldPath = null; + Path newPath = null; + if (writer == null) { // oldPath would be null + newPath = getNewPath(); + } else { + oldPath = computeFilename(writer.getCurrentFileNum().get()); + newPath = getNewPathForWriter(writer); + } + LOG.debug("OLD path of the log: " + oldPath); + LOG.debug("NEW path of the log: " + newPath); // Any exception from here on is catastrophic, non-recoverable so we currently abort. - FSHLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf); + nextWriter = this.createWriterInstance(fs, newPath, conf); + // set the filenum of the writer. + nextWriter.getCurrentFileNum().set(getFileNumFromFileName(newPath)); FSDataOutputStream nextHdfsOut = null; if (nextWriter instanceof ProtobufLogWriter) { - nextHdfsOut = ((ProtobufLogWriter)nextWriter).getStream(); + nextHdfsOut = ((ProtobufLogWriter) nextWriter).getStream(); // If a ProtobufLogWriter, go ahead and try and sync to force setup of pipeline. // If this fails, we just keep going.... it is an optimization, not the end of the world. preemptiveSync((ProtobufLogWriter)nextWriter); } tellListenersAboutPreLogRoll(oldPath, newPath); // NewPath could be equal to oldPath if replaceWriter fails. - newPath = replaceWriter(oldPath, newPath, nextWriter, nextHdfsOut); + if (replaceCurrentWriter) replaceWriter(oldPath, newPath, nextWriter, nextHdfsOut); tellListenersAboutPostLogRoll(oldPath, newPath); - // Can we delete any of the old log files? - if (getNumRolledLogFiles() > 0) { - cleanOldLogs(); - regionsToFlush = findRegionsToForceFlush(); - } } finally { closeBarrier.endOp(); } - return regionsToFlush; + return nextWriter; } finally { rollWriterLock.unlock(); } @@ -945,6 +1052,12 @@ class FSHLog implements HLog, Syncable { } } + protected Path computeFilename() { + long fileNum = 0; + if (this.writer != null) fileNum = this.writer.getCurrentFileNum().get(); + return computeFilename(fileNum); + } + /** * This is a convenience method that computes a new filename with a given * file-number. @@ -952,17 +1065,7 @@ class FSHLog implements HLog, Syncable { * @return Path */ protected Path computeFilename(final long filenum) { - this.filenum.set(filenum); - return computeFilename(); - } - - /** - * This is a convenience method that computes a new filename with a given - * using the current HLog file-number - * @return Path - */ - protected Path computeFilename() { - if (this.filenum.get() < 0) { + if (filenum < 0) { throw new RuntimeException("hlog file number can't be < 0"); } String child = logFilePrefix + "." + filenum; @@ -1041,8 +1144,8 @@ class FSHLog implements HLog, Syncable { // Shutdown the disruptor. Will stop after all entries have been processed. Make sure we have // stopped incoming appends before calling this else it will not shutdown. We are // conservative below waiting a long time and if not elapsed, then halting. + long timeoutms = conf.getLong("hbase.wal.disruptor.shutdown.timeout.ms", 60000); if (this.disruptor != null) { - long timeoutms = conf.getLong("hbase.wal.disruptor.shutdown.timeout.ms", 60000); try { this.disruptor.shutdown(timeoutms, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { @@ -1061,14 +1164,33 @@ class FSHLog implements HLog, Syncable { i.logCloseRequested(); } } - this.closed = true; + this.closing = true; + if (this.syncPool != null) { + this.syncPool.shutdown(); + try { + this.syncPool.awaitTermination(timeoutms, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + // consider legit interrupt + Thread.currentThread().interrupt(); + } + } + // interrupt the SyncRunners + if (ringBufferEventHandler != null) { + for (SyncRunner s : ringBufferEventHandler.syncRunners) { + s.interrupt(); + } + } + if (LOG.isDebugEnabled()) { LOG.debug("Closing WAL writer in " + this.fullPathLogDir.toString()); } + if (this.writer1 != null) this.writer1.close(); + if (this.writer2 != null) this.writer2.close(); if (this.writer != null) { - this.writer.close(); + this.writer.close(); // a noop in case switching is enabled, its okay. this.writer = null; } + this.closed = true; } /** @@ -1164,6 +1286,37 @@ class FSHLog implements HLog, Syncable { } /** + * @return whether it is OK to wait for switching + */ + private boolean shouldWaitForSwitching() { + return walSwitchEnabled && !closing && switching; + } + + /** + * A Callable for making the Writer.sync() call to the under lying filesystem. + * Each SyncRunner thread holds a SyncCallable object and submit it to the + * {@link FSHLog#syncPool}. + */ + private class SyncCallable implements Callable { + Writer writer; + + private void reset() { + this.writer = null; + } + + private void setWriter(Writer writer) { + this.writer = writer; + } + + public Void call() throws IOException { + if (writer != null) writer.sync(); + return null; + } + } + + + + /** * Thread to runs the hdfs sync call. This call takes a while to complete. This is the longest * pole adding edits to the WAL and this must complete to be sure all edits persisted. We run * multiple threads sync'ng rather than one that just syncs in series so we have better @@ -1181,7 +1334,18 @@ class FSHLog implements HLog, Syncable { private class SyncRunner extends HasThread { private volatile long sequence; private final BlockingQueue syncFutures; - + /** + * List of Append ops it is syncing in this batch. + */ + private List appendOps; + /** + * List of SyncFuture(s) in this batch. RegionServer handlers are blocked on it. + */ + private SyncFuture [] syncFuturesToSync; + /** + * The SyncCallable used for making the sync() call to the underlying file system. + */ + private SyncCallable syncCallable; /** * UPDATE! * @param syncs the batch of calls to sync that arrived as this thread was starting; when done, @@ -1198,11 +1362,15 @@ class FSHLog implements HLog, Syncable { // http://www.javacodegeeks.com/2010/09/java-best-practices-queue-battle-and.html // Could use other blockingqueues here or concurrent queues. this.syncFutures = new LinkedBlockingQueue(maxHandlersCount); + this.syncCallable = new SyncCallable(); } - void offer(final long sequence, final SyncFuture [] syncFutures, final int syncFutureCount) { + void offer(final long sequence, final SyncFuture [] syncFutures, final int syncFutureCount, + List appendOps) { // Set sequence first because the add to the queue will wake the thread if sleeping. this.sequence = sequence; + this.appendOps = appendOps; + this.syncFuturesToSync = syncFutures; for (int i = 0; i < syncFutureCount; i++) this.syncFutures.add(syncFutures[i]); } @@ -1226,7 +1394,7 @@ class FSHLog implements HLog, Syncable { * @param t May be non-null if we are processing SyncFutures because an exception was thrown. * @return Count of SyncFutures we let go. */ - private int releaseSyncFutures(final long currentSequence, final Throwable t) { + private int releaseSyncFutures(final long currentSequence, final Throwable t) { int syncCount = 0; for (SyncFuture syncFuture; (syncFuture = this.syncFutures.peek()) != null;) { if (syncFuture.getRingBufferSequence() > currentSequence) break; @@ -1239,30 +1407,13 @@ class FSHLog implements HLog, Syncable { return syncCount; } - /** - * @param sequence The sequence we ran the filesystem sync against. - * @return Current highest synced sequence. - */ - private long updateHighestSyncedSequence(long sequence) { - long currentHighestSyncedSequence; - // Set the highestSyncedSequence IFF our current sequence id is the 'highest'. - do { - currentHighestSyncedSequence = highestSyncedSequence.get(); - if (currentHighestSyncedSequence >= sequence) { - // Set the sync number to current highwater mark; might be able to let go more - // queued sync futures - sequence = currentHighestSyncedSequence; - break; - } - } while (!highestSyncedSequence.compareAndSet(currentHighestSyncedSequence, sequence)); - return sequence; - } - public void run() { long currentSequence; - while (!isInterrupted()) { + boolean gotExecption = false; + while (!closing) { int syncCount = 0; - SyncFuture takeSyncFuture; + SyncFuture takeSyncFuture = null; + Throwable t = null; try { while (true) { // We have to process what we 'take' from the queue @@ -1270,49 +1421,354 @@ class FSHLog implements HLog, Syncable { currentSequence = this.sequence; long syncFutureSequence = takeSyncFuture.getRingBufferSequence(); if (syncFutureSequence > currentSequence) { - throw new IllegalStateException("currentSequence=" + syncFutureSequence + - ", syncFutureSequence=" + syncFutureSequence); + throw new IllegalStateException("currentSequence=" + syncFutureSequence + + ", syncFutureSequence=" + syncFutureSequence); } // See if we can process any syncfutures BEFORE we go sync. long currentHighestSyncedSequence = highestSyncedSequence.get(); if (currentSequence < currentHighestSyncedSequence) { syncCount += releaseSyncFuture(takeSyncFuture, currentHighestSyncedSequence, null); - // Done with the 'take'. Go around again and do a new 'take'. + // Done with the 'take'. Go around again and do a new 'take'. continue; } break; } - // I got something. Lets run. Save off current sequence number in case it changes + // I got something. Lets run. Save off current sequence number in case it changes // while we run. long start = System.nanoTime(); - Throwable t = null; try { - writer.sync(); + // hook for monitoring the sync call. Add 'this' to the monitoring map, with + // current ms time. + syncOps.put(getThread(), EnvironmentEdgeManager.currentTimeMillis()); + this.syncCallable.setWriter(writer); + Future f = syncPool.submit(this.syncCallable); + WALFaultInjector.get().sync(); + f.get(); // wait till the sync op is done. + this.syncCallable.reset(); + // done syncing, free up Append ops. + this.appendOps = null; + this.syncFuturesToSync = null; currentSequence = updateHighestSyncedSequence(currentSequence); - } catch (IOException e) { - LOG.error("Error syncing, request close of hlog ", e); - t = e; - } catch (Exception e) { - LOG.warn("UNEXPECTED", e); - t = e; + } catch (InterruptedException ie) { + LOG.info("Interrupted while syncing, " + " switching=" + switching + ", closing=" + + closing + ", takeSyncFuture=" + takeSyncFuture.getRingBufferSequence()); + gotExecption = true; + } catch (ExecutionException ee) { + // this could be thrown due to any error underneath + // check whether we want to make a switch + t = ee; + gotExecption = true; } finally { - // First release what we 'took' from the queue. + // Remove the thread from the monitoring structure + syncOps.remove(getThread()); + // check if we are suppose to wait for switching? + if (gotExecption && shouldWaitForSwitching()) { + try { + t = null; // reset it as we are switching + gotExecption = false; + waitForSwitching(); + } catch (IOException ioe) { + t = ioe; // this is the case when rolling the old writer fails, pass it above + } + } + // Release what we 'took' from the queue. syncCount += releaseSyncFuture(takeSyncFuture, currentSequence, t); // Can we release other syncs? syncCount += releaseSyncFutures(currentSequence, t); - if (t != null) { - requestLogRoll(); - } else checkLogRoll(); + postSync(System.nanoTime() - start, syncCount); } - postSync(System.nanoTime() - start, syncCount); } catch (InterruptedException e) { - // Presume legit interrupt. + // it means the blocking queue was empty, and we were interrupted while on take(). + LOG.info("Interrupted while waiting to get sync ops, " + " switching=" + switching + + ", closing=" + closing); + gotExecption = true; + } catch (Throwable th) { + LOG.warn("UNEXPECTED, continuing", th); + t = th; + } finally { + if (gotExecption && shouldWaitForSwitching()) { + try { + gotExecption = false; + waitForSwitching(); + } catch (IOException ioe) { + LOG.warn("IOE while waiting for switch, continuing", ioe); + t = ioe; // this is the case when rolling the old writer fails, pass it above + } + } + if (t != null) { + requestLogRoll(); + } else checkLogRoll(); + } + } + } + + /** + * wait till the SyncRunner is done with the current WAL switching iteration. + * @throws IOException + */ + private void waitForSwitching() throws IOException { + try { + attainSafePointForSwitching(); + waitForSwitchingToComplete(); + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for sync to complete...", e); + // legit interrupt + Thread.currentThread().interrupt(); + } + } + + + private void attainSafePointForSwitching() throws InterruptedException { + if (syncLatencyWatcher == null) return; + syncLatencyWatcher.switchLatch.safePointAttained(); + } + + private void waitForSwitchingToComplete() throws InterruptedException { + if (syncLatencyWatcher == null) return; + syncLatencyWatcher.switchLatch.waitForSafePoint(); + + } + + private List getAppendOps() { + return appendOps; + } + + private SyncFuture[] getSyncFuturesToSync() { + return syncFuturesToSync; + } + } + + Map syncOps = new ConcurrentHashMap(); + + /** + * @param sequence The sequence we ran the filesystem sync against. + * @return Current highest synced sequence. + */ + private long updateHighestSyncedSequence(long sequence) { + long currentHighestSyncedSequence; + // Set the highestSyncedSequence IFF our current sequence id is the 'highest'. + do { + currentHighestSyncedSequence = highestSyncedSequence.get(); + if (currentHighestSyncedSequence >= sequence) { + // Set the sync number to current highwater mark; might be able to let go more + // queued sync futures + sequence = currentHighestSyncedSequence; + break; + } + } while (!highestSyncedSequence.compareAndSet(currentHighestSyncedSequence, sequence)); + return sequence; + } + + /** + * The sync latency watcher thread. It monitors the current sync operation, and consult with the + * the Switching Policy ({@link WALSwitchPolicy}. If the policy suggests to do a switch, + * it kicks off the switching process. + */ + private class SyncLatencyWatcher extends HasThread { + // The latch to do thread co-ordination b/w SyncRunners and us. Before doing the real switching + // SyncRunners need to be unblocked off the previous sync call (which kick started the switch + // process). This is achieved by interrupting them, and ensuring they are waiting for the + // switch to complete. + private SafePointZigZagLatch switchLatch; + // Switch policy, which tells whether it is a right time to do the switch or not. + private WALSwitchPolicy switchPolicy; + + public SyncLatencyWatcher() { + super("SyncLatencyWatcher"); + this.switchPolicy = new AggressiveWALSwitchPolicy(conf);// TODO Make it configurable + } + + @Override + public void run() { + while (!isInterrupted()) { + try { + if (!switching && eligibleForSwitching()) { + try { + switchWriter(); + } catch (IOException e) { + LOG.error("Got IOE while switching.... ", e); + if (e instanceof FailedLogCloseException) { + // This is a fatal error in normal scenario, i.e., our log rolling fail. + // Here, we kill this thread. + throw new RuntimeException(e); + } + } + } + Thread.sleep(100);// start monitoring after every 100ms. + } catch (InterruptedException ie) { + // legal interrupt Thread.currentThread().interrupt(); - } catch (Throwable t) { - LOG.warn("UNEXPECTED, continuing", t); } } } + + /** + * Consult the switching policy, and returns whether it is time for a switch. + * @return true in case we want to switch Wrtier, false otherwise. + */ + private boolean eligibleForSwitching() { + Long startTime = EnvironmentEdgeManager.currentTimeMillis(); + for (Map.Entry e : syncOps.entrySet()) { + if (this.switchPolicy.eligibleForSwitch(startTime - e.getValue())) { + LOG.debug("Switching: Sync op time=" + (startTime - e.getValue()) +"ms"); + return true; + } + } + return false; + } + + /** + * The WAL Switch Steps: + * NOTE Don't switch if there is a ongoing log roll. Most likely, this could be a redundant + * step. + *
    + *
  • Set {@link FSHLog#switching} to true. This blocks the + * {@link RingBufferEventHandler#onEvent(RingBufferTruck, long, boolean)} + *
  • Interrupt the {@link SyncRunner} threads to unblock them from their current sync call. + * Wait till they reach a safepoint. + *
  • Grab their Append lists (i.e., whatever they were trying to sync). Consolidate, and + * sort it. + *
  • Get the max SyncFuture object, and note its sequenceId. We ought to unblock all + * all handlers that are waiting for all sequence <= max_syncedSequenceId, after switching + *
  • Take the "reserved" writer, and append-sync these "in-flight" edits + *
  • Set this.writer to this reserved writer + *
  • Tell SyncRunners that switch is done, and let them take new writes (complete the latch) + *
  • Set {@link FSHLog#switching} to false + *
  • Roll the old writer + *
+ *

+ * Error Handling: We are safe as long as we have not switched "this.writer". Once we are past + * that, ensure we roll the old writer. + * @throws IOException + * @throws InterruptedException + */ + private void switchWriter() throws IOException, InterruptedException { + if (rollWriterLock.tryLock()) { // don't switch if rolling in progress... + try { + // i) set switching to true + switching = true; + this.switchLatch = new SafePointZigZagLatch(syncRunnerHandlerCount, 1); + // block the RingBufferHandler. This is done by setting the FSHLog#switching true + // TODO Could we use a latch, same as used for SyncRunners (later)? + // ii) interrupt the SyncRunner, and make them wait till switch is done. + for (SyncRunner s : ringBufferEventHandler.syncRunners) { + s.interrupt(); + } + // and, wait for all sync runners to reach the safe point. + this.switchLatch.waitForSafePoint(); + Writer oldWriter = writer; + Map highestRegionSeqIdsForCurWAL = null; + + try { + // All SyncRunners are now waiting for the switching to complete. + // the syncers would wait on the switchLatch, till we are done. + // The Append lists are non-overlapping, and are sorted by their id. + List inFlightAppends = setInflightAppends(); + long maxSeqSyncId = getMaxSyncSeqId(); + + Writer reservedWriter = getReservedWriter(); + // Append all the inflight edits to the reserved writer. + for (RingBufferTruck r : inFlightAppends) { + reservedWriter.append(r.getFSWALEntryPayload()); + } + reservedWriter.sync(); + // update the completeSyncSequenceId. + updateHighestSyncedSequence(maxSeqSyncId); + // switch the writer + writer = reservedWriter; + hdfs_out = ((ProtobufLogWriter) writer).getStream(); + // get the region:sequenceId map. Use it book keeping the while rolling the current WAL + // (soon be old) file. We don't reset it as the same entries are used by new writer. + highestRegionSeqIdsForCurWAL = new HashMap(highestRegionSequenceIds); + } finally { + // the new writer is ready to take on new writes. Unblock the SyncRunners. This would + // also unblock handlers (SyncRunners would do this leg work when they compare + // their sequenceId with highestSyncedSequenceId. + switchLatch.releaseSafePoint(); + } + Path oldPath = getOldPath(); + // Rolling is done in a separate try-finally block. Failing while rolling is FATAL for + // this thread. + try { + oldWriter = rollWriterInternal(true, oldWriter, false); + if (oldPath != null) byWalRegionSequenceIds.put(oldPath, highestRegionSeqIdsForCurWAL); + // Set the appropriate writer1, writer2 instances + if (writer.getCurrentFileNum().get() == writer1.getCurrentFileNum().get()) + writer2 = oldWriter; + else writer1 = oldWriter; + } catch (IOException e) { + LOG.error("Failed close of WAL " + oldPath, e); + throw new FailedLogCloseException("Failed close of WAL " + oldPath, e); + } + } finally { + switching = false; + rollWriterLock.unlock(); + } + } else { + LOG.info("Couldn't do the switching rollLock was not available"); + } + } + + /** + * @return the max SyncSequenceId among all SyncFutures in this batch. + */ + private long getMaxSyncSeqId() { + long maxSeqSyncId = 0; + for (SyncRunner s : ringBufferEventHandler.syncRunners) { + // Get a non null SyncFuture from the array of SyncFutures. + if (s.getSyncFuturesToSync() == null) continue; + for (int i = s.getSyncFuturesToSync().length - 1; i >= 0; i--) { + if (s.getSyncFuturesToSync()[i] != null + && maxSeqSyncId < s.getSyncFuturesToSync()[i].getRingBufferSequence()) { + maxSeqSyncId = s.getSyncFuturesToSync()[i].getRingBufferSequence(); + // LOG.warn("switch: Got the max sync future to sync: " + maxSeqSyncId); + } + } + } + return maxSeqSyncId; + } + /** + * @return the sorted list of all the Append ops (RingBufferTruck) in this batch + */ + private List setInflightAppends() { + List inFlightAppends = null; + int sizeOfAppendOps = 0; + Map> appendsMap = new TreeMap>(); + for (SyncRunner s : ringBufferEventHandler.syncRunners) { + if (s.getAppendOps() != null && s.getAppendOps().size() > 0) { + sizeOfAppendOps += s.getAppendOps().size(); + appendsMap.put(s.getAppendOps().get(0).getFSWALEntryPayload().getSequence(), + s.getAppendOps()); + } + } + // Combine all the appendOps + inFlightAppends = new ArrayList(sizeOfAppendOps); + for (Map.Entry> e : appendsMap.entrySet()) { + inFlightAppends.addAll(e.getValue()); + } + return inFlightAppends; + } + + /** + * The writers keep changing as FSHLog calls the rollWriter. The fileNumber of any writer is + * unique (there can't be two files with exact absolute path). We use this fileNumber for + * comparing between two writers. + * @return the "other" writer (i.e., which is not the current active writer) + */ + private Writer getReservedWriter() { + if (checkIfWritersHasSameFilenum(writer, writer1)) return writer2; + return writer1; + } + + /** + * @param w1 + * @param w2 + * @return true if the writers has the same file num (i.e., point to same file on hdfs). + */ + private boolean checkIfWritersHasSameFilenum(Writer w1, Writer w2) { + if (w1.getCurrentFileNum().get()== w2.getCurrentFileNum().get()) return true; + return false; + } } /** @@ -1387,6 +1843,7 @@ class FSHLog implements HLog, Syncable { long sequence = this.disruptor.getRingBuffer().next(); SyncFuture syncFuture = getSyncFuture(sequence); try { + LOG.debug("Creating a RBT with sequence " + sequence); RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence); truck.loadPayload(syncFuture); } finally { @@ -1684,7 +2141,28 @@ class FSHLog implements HLog, Syncable { } return syncFuture; } - + + SafePointZigZagLatch() {} + + /** + * @param attainCount: Number of threads which would call attainSafePoint + * @param releaseCount: Number of threads which would call releaseSafePoint. + */ + SafePointZigZagLatch(int attainCount, int releaseCount) { + this.safePointAttainedLatch = new CountDownLatch(attainCount); + this.safePointReleasedLatch = new CountDownLatch(releaseCount); + } + + /** + * For Thread A to call it while it waits for Thread B to come to a safe place. + * @throws InterruptedException + */ + void waitForSafePoint() throws InterruptedException { + while (true) { + if (this.safePointAttainedLatch.await(1, TimeUnit.NANOSECONDS)) break; + } + } + /** * Called by Thread B when it attains the 'safe point'. In this method, Thread B signals * Thread A it can proceed. Thread B will be held in here until {@link #releaseSafePoint()} @@ -1752,7 +2230,12 @@ class FSHLog implements HLog, Syncable { * Which syncrunner to use next. */ private int syncRunnerIndex; - + /** + * used for passing the Append RBTs to the Syncer thread. The later uses it to re-append to + * a new Writer in case it is switching. + */ + private List entries = new ArrayList(); + RingBufferEventHandler(final int syncRunnerCount, final int maxHandlersCount) { this.syncFutures = new SyncFuture[maxHandlersCount]; this.syncRunners = new SyncRunner[syncRunnerCount]; @@ -1773,10 +2256,17 @@ class FSHLog implements HLog, Syncable { // Appends and syncs are coming in order off the ringbuffer. We depend on this fact. We'll // add appends to dfsclient as they come in. Batching appends doesn't give any significant // benefit on measurement. Handler sync calls we will batch up. + // collection to store in-flight RBTs // TODO: Trace only working for appends, not for syncs. TraceScope scope = truck.getSpanPayload() != null? Trace.continueSpan(truck.getSpanPayload()): null; + + // wait here till we are done switching. + //TODO: use latch (or wait-notify) later. + while (shouldWaitForSwitching()) { + Thread.sleep(1); + } try { if (truck.getSyncFuturePayload() != null) { this.syncFutures[this.syncFuturesCount++] = truck.getSyncFuturePayload(); @@ -1784,6 +2274,7 @@ class FSHLog implements HLog, Syncable { if (this.syncFuturesCount == this.syncFutures.length) endOfBatch = true; } else if (truck.getFSWALEntryPayload() != null) { try { + this.entries.add(truck); append(truck.getFSWALEntryPayload()); } catch (Exception e) { // If append fails, presume any pending syncs will fail too; let all waiting handlers @@ -1815,9 +2306,13 @@ class FSHLog implements HLog, Syncable { // Below expects that the offer 'transfers' responsibility for the outstanding syncs to the // syncRunner. int index = Math.abs(this.syncRunnerIndex++) % this.syncRunners.length; - this.syncRunners[index].offer(sequence, this.syncFutures, this.syncFuturesCount); + this.syncRunners[index].offer(sequence, this.syncFutures, this.syncFuturesCount, + this.entries); attainSafePoint(sequence); this.syncFuturesCount = 0; + // TODO: Get the index early on, and set its arraylist of Appends directly (later). + // this would avoid re-creating the below list. + this.entries = new ArrayList(); } catch (Throwable t) { LOG.error("UNEXPECTED!!!", t); } finally { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index ce29783..646be97 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -122,6 +122,13 @@ public interface HLog { */ // TODO: Why a trailer on the log? void setWALTrailer(WALTrailer walTrailer); + + /** + * A writer is instantiated on a per file basis. The filename is computed using current + * timestamp + * @return the timestamp associated with the current filename. + */ + AtomicLong getCurrentFileNum(); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index 682b954..37bf6fa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -48,6 +49,7 @@ public class ProtobufLogWriter extends WriterBase { // maximum size of the wal Trailer in bytes. If a user writes/reads a trailer with size larger // than this size, it is written/read respectively, with a WARN message in the log. private int trailerWarnSize; + private AtomicLong fileNum = new AtomicLong(-1); // TODO compare it with FSHLog#filenum public ProtobufLogWriter() { super(); @@ -181,4 +183,10 @@ public class ProtobufLogWriter extends WriterBase { public void setWALTrailer(WALTrailer walTrailer) { this.trailer = walTrailer; } + + @Override + public AtomicLong getCurrentFileNum() { + return this.fileNum; + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALFaultInjector.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALFaultInjector.java new file mode 100644 index 0000000..3d40459 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALFaultInjector.java @@ -0,0 +1,42 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import org.apache.hadoop.classification.InterfaceAudience; + +import com.google.common.annotations.VisibleForTesting; + +@InterfaceAudience.Private +@VisibleForTesting +public class WALFaultInjector { + + public static WALFaultInjector instance = new WALFaultInjector(); + + public static WALFaultInjector get() { + return instance; + } + + public void sync() {} + + public void sync(long sleepInterval) throws InterruptedException { + if (sleepInterval > 0) { + Thread.sleep(sleepInterval); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALSwitchPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALSwitchPolicy.java new file mode 100644 index 0000000..83909a5 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALSwitchPolicy.java @@ -0,0 +1,41 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Defines the switching strategy. Depending upon implementation, it takes the input parameters + * (such as last N sync time, last switched time, etc), and decides whether the WAL should switch + * or not. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface WALSwitchPolicy { + final static long DEFAULT_SWITCH_THRESHOLD = 2000; // 2 sec + + /** + * Based on last sync op duration, it decides whether WAL should do a switch to another writer. + * @param lastSyncTimeTaken + * @return true if WAL is eligible for switching. + */ + boolean eligibleForSwitch(long lastSyncTimeTaken); +} + diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java index c6d8068..474dd7a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java @@ -183,7 +183,6 @@ public class TestHLogRecordReader { Thread.sleep(1); // make sure 2nd log gets a later timestamp long secondTs = System.currentTimeMillis(); log.rollWriter(); - edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), System.currentTimeMillis(), value)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java index 206034e..de5be34 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -219,4 +220,9 @@ public class SequenceFileLogWriter extends WriterBase { @Override public void setWALTrailer(WALTrailer walTrailer) { } + @Override + public AtomicLong getCurrentFileNum() { + // TODO Auto-generated method stub + return null; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java index 57dc333..c45a0cf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java @@ -60,6 +60,9 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; /** JUnit test case for HLog */ @Category(LargeTests.class) @@ -502,8 +505,8 @@ public class TestHLog { log.append(info, tableName, cols, System.currentTimeMillis(), htd, sequenceId); log.startCacheFlush(info.getEncodedNameAsBytes()); log.completeCacheFlush(info.getEncodedNameAsBytes()); - log.close(); Path filename = ((FSHLog) log).computeFilename(); + log.close(); log = null; // Now open a reader on the log and assert append worked. reader = HLogFactory.createReader(fs, filename, conf); @@ -560,8 +563,8 @@ public class TestHLog { log.append(hri, tableName, cols, System.currentTimeMillis(), htd, sequenceId); log.startCacheFlush(hri.getEncodedNameAsBytes()); log.completeCacheFlush(hri.getEncodedNameAsBytes()); - log.close(); Path filename = ((FSHLog) log).computeFilename(); + log.close(); log = null; // Now open a reader on the log and assert append worked. reader = HLogFactory.createReader(fs, filename, conf); @@ -1176,6 +1179,74 @@ public class TestHLog { } /** + * Methods to test the switch functionality. + * @throws IOException + */ + @Test + public void testSwitchOccurred() throws Exception { + LOG.debug("testSwitchOccurred"); + Configuration conf1 = HBaseConfiguration.create(conf); + conf1.setBoolean("hbase.regionserver.wal.switch.enabled", true); + conf1.setInt("hbase.regionserver.hlog.syncer.count", 1); + // Create a mock for WALFaultInjector. + WALFaultInjector.instance = Mockito.mock(WALFaultInjector.class); + WALFaultInjector injector = WALFaultInjector.get(); + Mockito.doAnswer(new Answer() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + LOG.debug("Sleeping while syncing..."); + Thread.sleep(2*WALSwitchPolicy.DEFAULT_SWITCH_THRESHOLD); + return null; + } + }).when(injector).sync(); + + TableName table1 = TableName.valueOf("t1"); + TableName table2 = TableName.valueOf("t2"); + HLog hlog = HLogFactory.createHLog(fs, FSUtils.getRootDir(conf1), dir.toString(), conf1); + try { + assertEquals(0, ((FSHLog) hlog).getNumRolledLogFiles()); + HRegionInfo hri1 = new HRegionInfo(table1, HConstants.EMPTY_START_ROW, + HConstants.EMPTY_END_ROW); + HRegionInfo hri2 = new HRegionInfo(table2, HConstants.EMPTY_START_ROW, + HConstants.EMPTY_END_ROW); + // ensure that we don't split the regions. + hri1.setSplit(false); + hri2.setSplit(false); + // variables to mock region sequenceIds. + final AtomicLong sequenceId1 = new AtomicLong(1); + // start with the testing logic: insert a waledit, and roll writer + int curNumLogFiles = hlog.getNumLogFiles(); + + addEdits(hlog, hri1, table1, 1, sequenceId1); + // invoke sync before calling roll... + hlog.sync(); + addEdits(hlog, hri1, table1, 1, sequenceId1); + assertTrue("Old log count is same as current log count", + hlog.getNumLogFiles() > curNumLogFiles); + curNumLogFiles = hlog.getNumLogFiles(); + hlog.sync(); + // ensure empty sync also works + hlog.sync(); + assertTrue("Old log count is same as current log count", + hlog.getNumLogFiles() > curNumLogFiles); + LOG.warn("Before calling the roll writer.. switch"); + // test concurrent log roll. + hlog.rollWriter(); + // assert that the wal is rolled + // add edits in the second wal file, and roll writer. + addEdits(hlog, hri1, table1, 1, sequenceId1); + hlog.sync(); + hlog.rollWriter(); + + } finally { + if (hlog != null) hlog.close(); + Mockito.reset(injector); + } + + } + + /** * helper method to simulate region flush for a WAL. * @param hlog * @param regionEncodedName