diff --git hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSource.java hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSource.java index 1c59f65..25c0f3f 100644 --- hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSource.java +++ hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSource.java @@ -57,6 +57,11 @@ public interface MetricsWALSource extends BaseSource { String SLOW_APPEND_COUNT_DESC = "Number of appends that were slow."; String SYNC_TIME = "syncTime"; String SYNC_TIME_DESC = "The time it took to sync the HLog to HDFS."; + String APPEND_COUNT_IN_WAL_SWITCH = "inflightAppendCountInWALSwitch"; + String APPEND_COUNT_IN_WAL_SWITCH_DESC = "Inflight Appends added to new WAL writer during" + + " Switching"; + String WAL_SWITCH_COUNT = "walSwitchCount"; + String WAL_SWITCH_COUNT_DESC = "Number of WAL switches"; /** * Add the append size. @@ -83,4 +88,15 @@ public interface MetricsWALSource extends BaseSource { */ void incrementSyncTime(long time); + /** + * Increment the number of inflight appends that got added while WAL switching + * @param inflightAppendsCount + */ + void incrementInflightAppendCountInWALSwitch(int inflightAppendsCount); + + /** + * Increment the WALSwitch count + */ + void incrementWALSwitchCount(); + } diff --git hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSourceImpl.java hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSourceImpl.java index 8ebfe05..45cf5a5 100644 --- hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSourceImpl.java +++ hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSourceImpl.java @@ -37,6 +37,9 @@ public class MetricsWALSourceImpl extends BaseSourceImpl implements MetricsWALSo private final MetricHistogram syncTimeHisto; private final MutableCounterLong appendCount; private final MutableCounterLong slowAppendCount; + private final MutableCounterLong appendCountInWALSwitch; + private final MutableCounterLong walSwitchCount; + public MetricsWALSourceImpl() { this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT); @@ -54,6 +57,10 @@ public class MetricsWALSourceImpl extends BaseSourceImpl implements MetricsWALSo appendCount = this.getMetricsRegistry().newCounter(APPEND_COUNT, APPEND_COUNT_DESC, 0l); slowAppendCount = this.getMetricsRegistry().newCounter(SLOW_APPEND_COUNT, SLOW_APPEND_COUNT_DESC, 0l); syncTimeHisto = this.getMetricsRegistry().newHistogram(SYNC_TIME, SYNC_TIME_DESC); + appendCountInWALSwitch = this.getMetricsRegistry().newCounter(APPEND_COUNT_IN_WAL_SWITCH, + APPEND_COUNT_IN_WAL_SWITCH_DESC, 0l); + walSwitchCount = this.getMetricsRegistry().newCounter(WAL_SWITCH_COUNT, WAL_SWITCH_COUNT_DESC, + 0l); } @Override @@ -80,4 +87,14 @@ public class MetricsWALSourceImpl extends BaseSourceImpl implements MetricsWALSo public void incrementSyncTime(long time) { syncTimeHisto.add(time); } + + @Override + public void incrementInflightAppendCountInWALSwitch(int inflightAppendsCount) { + appendCountInWALSwitch.incr(inflightAppendsCount); + } + + @Override + public void incrementWALSwitchCount() { + walSwitchCount.incr(); + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AggressiveWALSwitchPolicy.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AggressiveWALSwitchPolicy.java new file mode 100644 index 0000000..adac70f --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AggressiveWALSwitchPolicy.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; + +@InterfaceAudience.Private +class AggressiveWALSwitchPolicy implements WALSwitchPolicy { + + /** + * These are set in the init method. + */ + private Configuration conf; + + /** + * configurable WAL switch threshold. + */ + private long walSwitchThreshold; + + public AggressiveWALSwitchPolicy() { + } + + @Override + public void init(Configuration conf) { + this.conf = conf; + this.walSwitchThreshold = this.conf.getLong("hbase.regionserver.wal.switch.threshold", + AGGRESSIVE_DEFAULT_WAL_SWITCH_THRESHOLD); + + } + + @Override + public long getWalSwitchThreshold() { + return walSwitchThreshold; + } + + @Override + public boolean makeAWALSwitch(long lastSyncTimeTaken) { + return (lastSyncTimeTaken > walSwitchThreshold); + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 6525dc2..d44f8a4 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -31,7 +31,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NavigableMap; +import java.util.Set; import java.util.TreeMap; +import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; @@ -209,6 +211,12 @@ class FSHLog implements HLog, Syncable { */ volatile Writer writer; + /** + * A reserved writer to be used when the current active writer straggles and WAL Switch + * is enabled + */ + private volatile Writer reservedWriter; + /** The barrier used to ensure that close() waits for all log rolls and flushes to finish. */ private final DrainBarrier closeBarrier = new DrainBarrier(); @@ -332,6 +340,19 @@ class FSHLog implements HLog, Syncable { new ConcurrentSkipListMap>(LOG_NAME_COMPARATOR); /** + * contains the "in-flight" sync ops, i.e., SyncFuture(s) which are being synced by the current + * writer. + */ + Map inflightSyncOps = null; + + private final boolean walSwitchingEnabled; + + /** + * WALSwitcher instance to provide wal switching functionality. + */ + private final WALSwitcher walSwitcher; + + /** * Exception handler to pass the disruptor ringbuffer. Same as native implemenation only it * logs using our logger instead of java native logger. */ @@ -490,6 +511,10 @@ class FSHLog implements HLog, Syncable { throw new IOException("Unable to mkdir " + this.fullPathOldLogDir); } } + walSwitchingEnabled = conf.getBoolean("hbase.regionserver.wal.switch.enabled", false); + LOG.debug("isWALSwitchingEnabled=" + walSwitchingEnabled); + // rollWriter sets this.hdfs_out if it can. + if (walSwitchingEnabled) reservedWriter = rollWriterInternal(true, null, false); // rollWriter sets this.hdfs_out if it can. rollWriter(); @@ -528,6 +553,21 @@ class FSHLog implements HLog, Syncable { this.syncFuturesByHandler = new ConcurrentHashMap(maxHandlersCount); // Starting up threads in constructor is a no no; Interface should have an init call. this.disruptor.start(); + walSwitcher = initWALSwitchAttrsAndGetWALSwitcher(); + } + + /** + * Inits the required attributes required by WALSwitching feature, and returns the WALSwitcher + * reference. + * @return if WALSwitching is enabled, it returns a reference to the WALSwitcher object, null + * otherwise. + */ + private WALSwitcher initWALSwitchAttrsAndGetWALSwitcher() { + if (!walSwitchingEnabled) return null; + inflightSyncOps = new ConcurrentHashMap(); + WALSwitcher ws = new WALSwitcher(); + ws.init(); + return ws; } /** @@ -651,43 +691,17 @@ class FSHLog implements HLog, Syncable { } @Override - public byte [][] rollWriter(boolean force) throws FailedLogCloseException, IOException { + public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException { rollWriterLock.lock(); try { // Return if nothing to flush. - if (!force && (this.writer != null && this.numEntries.get() <= 0)) return null; - byte [][] regionsToFlush = null; - if (this.closed) { - LOG.debug("HLog closed. Skipping rolling of writer"); - return regionsToFlush; - } - if (!closeBarrier.beginOp()) { - LOG.debug("HLog closing. Skipping rolling of writer"); - return regionsToFlush; - } - try { - Path oldPath = getOldPath(); - Path newPath = getNewPath(); - // Any exception from here on is catastrophic, non-recoverable so we currently abort. - FSHLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf); - FSDataOutputStream nextHdfsOut = null; - if (nextWriter instanceof ProtobufLogWriter) { - nextHdfsOut = ((ProtobufLogWriter)nextWriter).getStream(); - // If a ProtobufLogWriter, go ahead and try and sync to force setup of pipeline. - // If this fails, we just keep going.... it is an optimization, not the end of the world. - preemptiveSync((ProtobufLogWriter)nextWriter); - } - tellListenersAboutPreLogRoll(oldPath, newPath); - // NewPath could be equal to oldPath if replaceWriter fails. - newPath = replaceWriter(oldPath, newPath, nextWriter, nextHdfsOut); - tellListenersAboutPostLogRoll(oldPath, newPath); - // Can we delete any of the old log files? - if (getNumRolledLogFiles() > 0) { - cleanOldLogs(); - regionsToFlush = findRegionsToForceFlush(); - } - } finally { - closeBarrier.endOp(); + Writer newWriter = rollWriterInternal(force, this.writer, true); + if (newWriter == null) return null; // roll doesn't happen + byte[][] regionsToFlush = null; + // Can we delete any of the old log files? + if (getNumRolledLogFiles() > 0) { + cleanOldLogs(); + regionsToFlush = findRegionsToForceFlush(); } return regionsToFlush; } finally { @@ -696,6 +710,56 @@ class FSHLog implements HLog, Syncable { } /** + * Rolls the passed writer. Caller must hold rollWriterLock if they want to replace the current + * writer (i.e., if the parameter replaceCurrentWriter is true). + * @param force true means roll even if there is no entry in the current WAL. + * @param oldWriter the writer to roll + * @param replaceCurrentWriter whether to replace the current 'this.writer' with the new writer. + * @return new Writer. + * @throws IOException + */ + private Writer rollWriterInternal(boolean force, Writer oldWriter, boolean replaceCurrentWriter) + throws IOException { + if (!force && (oldWriter != null && this.numEntries.get() <= 0)) return null; + if (this.closed) { + LOG.debug("HLog closed. Skipping rolling of writer"); + return null; + } + if (!closeBarrier.beginOp()) { + LOG.debug("HLog closing. Skipping rolling of writer"); + return null; + } + FSHLog.Writer nextWriter = null; + try { + Path oldPath = null; + Path newPath = null; + if (oldWriter == null) { + newPath = getNewPath(); + } else { + oldPath = oldWriter.getCurrentWriterPath(); + newPath = getNewPath(); + } + // In case we are replacing current writer, any exception from here on is catastrophic, + // non-recoverable so we currently abort. + nextWriter = this.createWriterInstance(fs, newPath, conf); + FSDataOutputStream nextHdfsOut = null; + if (nextWriter instanceof ProtobufLogWriter) { + nextHdfsOut = ((ProtobufLogWriter) nextWriter).getStream(); + // If a ProtobufLogWriter, go ahead and try and sync to force setup of pipeline. + // If this fails, we just keep going.... it is an optimization, not the end of the world. + preemptiveSync((ProtobufLogWriter) nextWriter); + } + tellListenersAboutPreLogRoll(oldPath, newPath); + // NewPath could be equal to oldPath if replaceWriter fails. + if (replaceCurrentWriter) replaceWriter(oldPath, newPath, nextWriter, nextHdfsOut); + tellListenersAboutPostLogRoll(oldPath, newPath); + } finally { + closeBarrier.endOp(); + } + return nextWriter; + } + + /** * This method allows subclasses to inject different writers without having to * extend other methods like rollWriter(). * @@ -837,7 +901,7 @@ class FSHLog implements HLog, Syncable { /** * Cleans up current writer closing it and then puts in place the passed in * nextWriter - * + * Assumes caller holds rollWriterLock. * @param oldPath * @param newPath * @param nextWriter @@ -966,7 +1030,7 @@ class FSHLog implements HLog, Syncable { */ protected Path computeFilename() { if (this.filenum.get() < 0) { - throw new RuntimeException("hlog file number can't be < 0"); + throw new IllegalArgumentException("hlog file number can't be < 0"); } String child = logFilePrefix + "." + filenum; if (forMeta) { @@ -1058,6 +1122,12 @@ class FSHLog implements HLog, Syncable { // With disruptor down, this is safe to let go. if (this.appendExecutor != null) this.appendExecutor.shutdown(); + // If switching is enabled, close out its resources + if (walSwitchingEnabled) { + if (this.walSwitcher != null) this.walSwitcher.close(); + if (this.reservedWriter != null) this.reservedWriter.close(); + } + // Tell our listeners that the log is closing if (!this.listeners.isEmpty()) { for (WALActionsListener i : this.listeners) { @@ -1167,6 +1237,25 @@ class FSHLog implements HLog, Syncable { } /** + * @param sequence The sequence we ran the filesystem sync against. + * @return Current highest synced sequence. + */ + private long updateHighestSyncedSequence(long sequence) { + long currentHighestSyncedSequence; + // Set the highestSyncedSequence IFF our current sequence id is the 'highest'. + do { + currentHighestSyncedSequence = this.highestSyncedSequence.get(); + if (currentHighestSyncedSequence >= sequence) { + // Set the sync number to current highwater mark; might be able to let go more + // queued sync futures + sequence = currentHighestSyncedSequence; + break; + } + } while (!this.highestSyncedSequence.compareAndSet(currentHighestSyncedSequence, sequence)); + return sequence; + } + + /** * Thread to runs the hdfs sync call. This call takes a while to complete. This is the longest * pole adding edits to the WAL and this must complete to be sure all edits persisted. We run * multiple threads sync'ng rather than one that just syncs in series so we have better @@ -1184,7 +1273,17 @@ class FSHLog implements HLog, Syncable { private class SyncRunner extends HasThread { private volatile long sequence; private final BlockingQueue syncFutures; - + /** + * List of Append ops it is syncing in this batch. + */ + private List appendsToSync; + + /** + * The current SyncFuture taken from the blocking queue which is being processed. + * We need this to make the WAL switch as it is an 'in-flight' SyncFuture. + */ + private SyncFuture currentSyncFutureInProgress; + /** * UPDATE! * @param syncs the batch of calls to sync that arrived as this thread was starting; when done, @@ -1203,9 +1302,15 @@ class FSHLog implements HLog, Syncable { this.syncFutures = new LinkedBlockingQueue(maxHandlersCount); } - void offer(final long sequence, final SyncFuture [] syncFutures, final int syncFutureCount) { + void offer(final long sequence, final SyncFuture [] syncFutures, final int syncFutureCount, + List appendsToSync) { // Set sequence first because the add to the queue will wake the thread if sleeping. this.sequence = sequence; + if (walSwitchingEnabled) { + // first clear the list to free out of older Appends. + if (this.appendsToSync != null) this.appendsToSync.clear(); + this.appendsToSync = appendsToSync; + } for (int i = 0; i < syncFutureCount; i++) this.syncFutures.add(syncFutures[i]); } @@ -1213,27 +1318,41 @@ class FSHLog implements HLog, Syncable { * Release the passed syncFuture * @param syncFuture * @param currentSequence + * @param offeredSequence The offeredSequence of the SyncRunner, as set in the 'offer' method * @param t * @return Returns 1. */ private int releaseSyncFuture(final SyncFuture syncFuture, final long currentSequence, - final Throwable t) { - if (!syncFuture.done(currentSequence, t)) throw new IllegalStateException(); - // This function releases one sync future only. - return 1; - } - + final long offeredSequence, final Throwable t) { + if (!syncFuture.done(currentSequence, offeredSequence, t) && + !walSwitchingEnabled) { + LOG.warn("SyncFuture couldn't be done " + syncFuture + ", currentSequence=" + + currentSequence); + // set the SyncFuture as failed so handler goes away. + // If switching enabled, old SyncRunner and SwitchMonitor race to 'do' the SyncFuture. + // If the SyncFuture has been completed by SwitchMonitor, and old SyncRunner tries to do + // it later, it returns false, so, it is normal and safe to not fail the syncFuture. + syncFuture.doneAndMarkAsFailed(currentSequence, t); + throw new IllegalStateException("SyncFuture couldn't be done " + syncFuture + + ", currentSequence=" + currentSequence); + } + // This function releases one sync future only. + return 1; + } + /** * Release all SyncFutures whose sequence is <= currentSequence. * @param currentSequence + * @param offeredSequence The offeredSequence of the SyncRunner, as set in the 'offer' method * @param t May be non-null if we are processing SyncFutures because an exception was thrown. * @return Count of SyncFutures we let go. */ - private int releaseSyncFutures(final long currentSequence, final Throwable t) { + private int releaseSyncFutures(final long currentSequence, final long offeredSequence, + final Throwable t) { int syncCount = 0; for (SyncFuture syncFuture; (syncFuture = this.syncFutures.peek()) != null;) { if (syncFuture.getRingBufferSequence() > currentSequence) break; - releaseSyncFuture(syncFuture, currentSequence, t); + releaseSyncFuture(syncFuture, currentSequence, offeredSequence, t); if (!this.syncFutures.remove(syncFuture)) { throw new IllegalStateException(syncFuture.toString()); } @@ -1242,45 +1361,25 @@ class FSHLog implements HLog, Syncable { return syncCount; } - /** - * @param sequence The sequence we ran the filesystem sync against. - * @return Current highest synced sequence. - */ - private long updateHighestSyncedSequence(long sequence) { - long currentHighestSyncedSequence; - // Set the highestSyncedSequence IFF our current sequence id is the 'highest'. - do { - currentHighestSyncedSequence = highestSyncedSequence.get(); - if (currentHighestSyncedSequence >= sequence) { - // Set the sync number to current highwater mark; might be able to let go more - // queued sync futures - sequence = currentHighestSyncedSequence; - break; - } - } while (!highestSyncedSequence.compareAndSet(currentHighestSyncedSequence, sequence)); - return sequence; - } - public void run() { long currentSequence; while (!isInterrupted()) { int syncCount = 0; - SyncFuture takeSyncFuture; try { while (true) { // We have to process what we 'take' from the queue - takeSyncFuture = this.syncFutures.take(); + currentSyncFutureInProgress = this.syncFutures.take(); currentSequence = this.sequence; - long syncFutureSequence = takeSyncFuture.getRingBufferSequence(); + long syncFutureSequence = currentSyncFutureInProgress.getRingBufferSequence(); if (syncFutureSequence > currentSequence) { - throw new IllegalStateException("currentSequence=" + syncFutureSequence + + throw new IllegalStateException("currentSequence=" + currentSequence + ", syncFutureSequence=" + syncFutureSequence); } // See if we can process any syncfutures BEFORE we go sync. long currentHighestSyncedSequence = highestSyncedSequence.get(); if (currentSequence < currentHighestSyncedSequence) { - syncCount += releaseSyncFuture(takeSyncFuture, currentHighestSyncedSequence, null); - // Done with the 'take'. Go around again and do a new 'take'. + syncCount += releaseSyncFuture(currentSyncFutureInProgress, + currentHighestSyncedSequence, sequence, null); continue; } break; @@ -1288,10 +1387,16 @@ class FSHLog implements HLog, Syncable { // I got something. Lets run. Save off current sequence number in case it changes // while we run. long start = System.nanoTime(); + if (inflightSyncOps != null) + inflightSyncOps.put(getThread(), EnvironmentEdgeManager.currentTimeMillis()); Throwable t = null; try { writer.sync(); currentSequence = updateHighestSyncedSequence(currentSequence); + } catch (InterruptedIOException e) { + LOG.error("Error syncing, request close of hlog ", e); + Thread.currentThread().interrupt(); + t = e; } catch (IOException e) { LOG.error("Error syncing, request close of hlog ", e); t = e; @@ -1299,10 +1404,13 @@ class FSHLog implements HLog, Syncable { LOG.warn("UNEXPECTED", e); t = e; } finally { - // First release what we 'took' from the queue. - syncCount += releaseSyncFuture(takeSyncFuture, currentSequence, t); + // First remove the entry from the monitoring map + if (inflightSyncOps != null) inflightSyncOps.remove(getThread()); + // release what we 'took' from the queue. + syncCount += releaseSyncFuture(currentSyncFutureInProgress, currentSequence, + this.sequence, t); // Can we release other syncs? - syncCount += releaseSyncFutures(currentSequence, t); + syncCount += releaseSyncFutures(currentSequence, this.sequence, t); if (t != null) { requestLogRoll(); } else checkLogRoll(); @@ -1666,7 +1774,12 @@ class FSHLog implements HLog, Syncable { * Latch to wait on. Will be released when we can proceed. */ private volatile CountDownLatch safePointReleasedLatch = new CountDownLatch(1); - + /** + * A marker value Thread A can share to Thread B to inform about its status about the reached + * safepoint. + */ + private long safePointMarker; + /** * For Thread A to call when it is ready to wait on the 'safe point' to be attained. * Thread A will be held in here until Thread B calls {@link #safePointAttained()} @@ -1700,6 +1813,26 @@ class FSHLog implements HLog, Syncable { } /** + * Called by Thread B when it attains the 'safe point'. In this method, Thread B signals + * Thread A it can proceed. Thread B will be held in here until {@link #releaseSafePoint()} + * is called by Thread A. Thread B sets the marker point so as it can be read by Thread A to + * determine the safepoint state, if needed. + * @throws InterruptedException + */ + void safePointAttained(long safePointMarker) throws InterruptedException { + this.safePointMarker = safePointMarker; + this.safePointAttainedLatch.countDown(); + this.safePointReleasedLatch.await(); + } + + /** + * @return the safepoint marker state. + */ + private long getSafePointMarker() { + return safePointMarker; + } + + /** * Called by Thread A when it is done with the work it needs to do while Thread B is * halted. This will release the Thread B held in a call to {@link #safePointAttained()} */ @@ -1746,6 +1879,10 @@ class FSHLog implements HLog, Syncable { private volatile int syncFuturesCount = 0; private volatile SafePointZigZagLatch zigzagLatch; /** + * zigzag latch to co-ordinate with the WAL Switching. + */ + private volatile SafePointZigZagLatch walSwitchLatch; + /** * Object to block on while waiting on safe point. */ private final Object safePointWaiter = new Object(); @@ -1755,6 +1892,11 @@ class FSHLog implements HLog, Syncable { * Which syncrunner to use next. */ private int syncRunnerIndex; + /** + * These entries are passed to the SyncRunner when it does the sync. These are called + * 'in-flight' Appends as the SyncRunner is trying to 'sync' these Appends. + */ + private List entries = new ArrayList(); RingBufferEventHandler(final int syncRunnerCount, final int maxHandlersCount) { this.syncFutures = new SyncFuture[maxHandlersCount]; @@ -1765,7 +1907,8 @@ class FSHLog implements HLog, Syncable { } private void cleanupOutstandingSyncsOnException(final long sequence, final Exception e) { - for (int i = 0; i < this.syncFuturesCount; i++) this.syncFutures[i].done(sequence, e); + for (int i = 0; i < this.syncFuturesCount; i++) + this.syncFutures[i].done(sequence, sequence, e); this.syncFuturesCount = 0; } @@ -1787,7 +1930,9 @@ class FSHLog implements HLog, Syncable { if (this.syncFuturesCount == this.syncFutures.length) endOfBatch = true; } else if (truck.hasFSWALEntryPayload()) { try { - append(truck.unloadFSWALEntryPayload()); + FSWALEntry e = truck.unloadFSWALEntryPayload(); + this.entries.add(e); // saving a local reference to re-use for WAL switch + append(e); } catch (Exception e) { // If append fails, presume any pending syncs will fail too; let all waiting handlers // know of the exception. @@ -1814,13 +1959,14 @@ class FSHLog implements HLog, Syncable { if (LOG.isTraceEnabled()) { LOG.trace("Sequence=" + sequence + ", syncCount=" + this.syncFuturesCount); } - + checkAndAttainSafePointForSwitching(sequence); // Below expects that the offer 'transfers' responsibility for the outstanding syncs to the // syncRunner. int index = Math.abs(this.syncRunnerIndex++) % this.syncRunners.length; - this.syncRunners[index].offer(sequence, this.syncFutures, this.syncFuturesCount); + this.syncRunners[index].offer(sequence, this.syncFutures, this.syncFuturesCount, + this.entries); attainSafePoint(sequence); - this.syncFuturesCount = 0; + resetSFCountAndAppendList(); } catch (Throwable t) { LOG.error("UNEXPECTED!!!", t); } finally { @@ -1830,6 +1976,28 @@ class FSHLog implements HLog, Syncable { } } + private void resetSFCountAndAppendList() { + this.syncFuturesCount = 0; + this.entries = new ArrayList(); + } + + private SafePointZigZagLatch attainSafePointForWALSwitching() { + this.walSwitchLatch = new SafePointZigZagLatch(); + return this.walSwitchLatch; + } + + private void checkAndAttainSafePointForSwitching(long sequence) { + if (this.walSwitchLatch == null || !this.walSwitchLatch.isCocked()) return; + // blocks the RingBufferEventHandler till and tell the SwitchWriter that it can proceed with + // switching + try { + this.walSwitchLatch.safePointAttained(sequence); + } catch (InterruptedException e) { + LOG.warn("Interrupted ", e); + Thread.currentThread().interrupt(); + } + } + SafePointZigZagLatch attainSafePoint() { this.zigzagLatch = new SafePointZigZagLatch(); return this.zigzagLatch; @@ -1922,10 +2090,388 @@ class FSHLog implements HLog, Syncable { } } + /** + * This inner class provides WAL Switching functionality. It switches the current WAL writer to a + * reserved WAL writer in case the former becomes slow for any reason. The decision whether to + * make switch is done after consulting {@link WALSwitchPolicy}. + * + *

+ * At the core is SyncOpsMonitor (SM), which is daemon thread monitoring all the 'in-flight' HDFS + * sync() calls. A SyncRunner registers itself before calling the sync(), and unregisters when + * done. + * + *

+ * If any sync op takes more time than {@link WALSwitchPolicy#getWalSwitchThreshold()}, it + * consults with the {@link WALSwitchPolicy} whether it is a time to do a switch or not. + * + *

+ * There are some invariants to be maintained while doing a WAL switch: + *

    + *
  • There must not be any OUT-OF-ORDER WAL edits in any WAL file. + *
  • A Switch involves taking all the in-flight WALEdits and append-sync them to a new WAL + * writer. It must use the same WALEdits objects while maintaining their order. + *
+ * + *

+ * Some ASCII art to show how SyncRunner (SR), and SyncMonitor (SM) are interacting + * when switching is enabled. + * + *(SyncRunner) (SyncMonitor) + * | Map | + * | |-----------------| |(continuously + * X-- Registers Sync Call --------->| SR : Start time |<--------| watches the map) + * | |-----------------| |<---| + * | |<---| + * | | | | + * X Unregisters when done---------->| | | + * \/ | | \/ + * + *

+ * The below list is the major events occured while doing a WAL switch: + *

    + *
  • Block the RingBuffer at a safepoint, sequence id X. + *
  • SM takes the list of All Appends, and Sync ops, and does an append-sync. + *
  • It releases those SFs which have sequence Id <= X. Those SFs with sequence Id > X + * would be taken care by when RingBuffer is unblocked, and the SF goes to a SR. The reason it + * has higher sequence Id than X is by the time SyncMonitor processed this SF, the old SR has + * already synced, and released it, and the freed handler has again put it in the RingBuffer, + * with a higher sequence Id. + *
  • Switch the Writers, and re-start the SRs, and then unblock the RingBuffer. + *
+ * + *

+ * The above model adds minimal overhead for the 'regular' case. Measurement shows there is + * almost no extra context switching overhead when there is no switch occurred and the + * switching threshold is 1 sec. + * + *

+ * It adds one extra thread (SyncMonitor). + */ + private class WALSwitcher { + + /** + * Monitor thread that monitors the in-flight sync ops. + */ + private SyncOpsMonitor monitor; + + /** + * configured switch policy + */ + private WALSwitchPolicy switchPolicy; + + /** + * Is there is a WAL switch in-progress. + */ + private volatile boolean switching = false; + + /** + * Number of switches incurred so far. Used for naming SyncRunners. + */ + private int switchCount = 0; + + private void init() { + this.monitor = new SyncOpsMonitor("WALSwitcher.SyncOpsMonitor"); + Class switchPolicyClass = conf.getClass( + "hbase.regionserver.wal.switch.policy", AggressiveWALSwitchPolicy.class, + WALSwitchPolicy.class); + try { + switchPolicy = (WALSwitchPolicy) switchPolicyClass.newInstance(); + switchPolicy.init(conf); + } catch (Exception e) { + LOG.warn("Couldn't instantiate the configured WAL Switch policy.", e); + } + this.monitor.setDaemon(true); + this.monitor.start(); + } + + /** + * The SyncMonitor (SM) thread, which monitors the on-going sync ops. + */ + private class SyncOpsMonitor extends HasThread { + + public SyncOpsMonitor(String threadName) { + super(threadName); + } + + @Override + public void run() { + while (!this.isInterrupted()) { + try { + if (!switching && eligibleForWALSwitch()) { + // check with the Switch policy to see what it thinks about making a switch. + try { + LOG.info("Making a WAL switch"); + switchWALWriter(); + } catch (IOException ioe) { + // Rolling of the old WAL file fails. This is fatal for us. We play safe and kill + // ourself otherwise switching again might result in out-of-order edits. In regular + // flow, LogRoller would also detect this and abort the RegionServer. + throw new IllegalStateException("Could Not Roll WAL File," + + " Aborting the SyncOpsMonitor", ioe); + } + } + Thread.sleep(switchPolicy.getWalSwitchThreshold()); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + } + + /** + * Iterates over the in-progress Sync ops start time, and check it with configured + * {@link WALSwitchPolicy} whether it is time to make a WAL Switch. + * @return true if it should make a WALSwitch, false otherwise + */ + private boolean eligibleForWALSwitch() { + Long startTime = EnvironmentEdgeManager.currentTimeMillis(); + for (Map.Entry e : inflightSyncOps.entrySet()) { + // check the current sync-op time with the SwitchPolicy + // and decide whether to make a switch. + if (switchPolicy.makeAWALSwitch(startTime - e.getValue().longValue())) { + return true; + } + } + return false; + } + } + + /** + * The core method for switching the WAL writer when the current one starts straggling. + * + * NOTE: Don't switch if there is a ongoing log roll. Most likely, this could be a redundant + * step. + * + * Below is the list of steps involved when doing a WAL switch. + *

    + *
  • Try to take the rollWriter lock. If we couldn't, abort this Switch attempt. This is to + * avoid WAL switching because of a WAL roll. + *
  • Block the {@link RingBufferEventHandler#onEvent(RingBufferTruck, long, boolean)}. using + * a zigzag latch. And, note the sequence Id at which it is blocked. + *
  • Grab the 'in-flight' Append and SyncFuture lists (i.e., whatever they were trying to + * sync). + *
  • Use the "reserved" writer to append-sync these 'in-flight' edits + *
  • Swap the current writer to this reserved writer + *
  • Release all the current SFs which has sequenceId < 'X' + *
  • Interrupt SyncRunners. Create new SyncRunners and set them in the RBH. + *
  • Set {@link FSHLog#switching} to false + *
  • Roll the old writer + *
  • Release the rollWriter lock. + *
+ *

+ * Error Handling: We are safe as long as we have not switched "this.writer". Once we are past + * that, ensure we roll the old writer, otherwise we kill the SwitchMonitor. This is to avoid + * any out-of-order edits. + * @throws IOException + * @throws InterruptedException + */ + private void switchWALWriter() throws IOException { + if (rollWriterLock.tryLock()) { + switchCount++; + SafePointZigZagLatch zigzagLatch = null; + try { + switching = true; + zigzagLatch = (ringBufferEventHandler == null) ? null : ringBufferEventHandler + .attainSafePointForWALSwitching(); + // we want the RBH to come to a safe point (basically, block itself) when we are making + // the switch. Getting the zigzaglatch and publishing a SyncFuture on it would ensure + // the RBH reaches the safepoint. + // NOTE: Consider Syncs which are less than ringBufferBlockedPointMarker. + long ringBufferBlockedPointMarker = -1l; // the ringbufferSequence at which is blocked. + try { + if (zigzagLatch != null) { + zigzagLatch.waitSafePoint(publishSyncOnRingBuffer()); + ringBufferBlockedPointMarker = zigzagLatch.getSafePointMarker(); + } + } catch (Exception e) { + // couldn't block the ring buffer, fail the switching process. + LOG.warn("Couldn't block the RingBufferEventHandler, aborting the WAL switching.", e); + return; + } + LOG.debug("RBT should be blocked at = " + ringBufferBlockedPointMarker); + + // Get the list of Appends, and SyncFutures from SRs. And, also include from the RBH. + // First create TreeSets for both Appends, and SyncFuturs, and then populate them. + Set inFlightAppendsSet = createInflightAppendsSet(); + Set inflightSyncFuturesSet = createInflightSyncFuturesSet(); + Path oldPath = null; + Map highestRegionSeqIdsForCurWAL = null; + try { + populateInflightAppendsAndSFs(ringBufferBlockedPointMarker, inFlightAppendsSet, + inflightSyncFuturesSet); + Throwable t = null; + oldPath = writer.getCurrentWriterPath(); + try { + for (FSWALEntry e : inFlightAppendsSet) + reservedWriter.append(e); + reservedWriter.sync(); + postWALSwitch(inFlightAppendsSet.size()); + } catch (IOException ioe) { + t = ioe; + } catch (Exception e) { + t = e; + } finally { + // update the maxSyncedSequence + updateHighestSyncedSequence(ringBufferBlockedPointMarker); + // release the SyncFuture, which should be released. + // Now that we have process all the inflight appends and sync. we can interrupt the + // SR threads, and create new SRs. + for (SyncFuture sf : inflightSyncFuturesSet) { + // it may happen that the SF is completed by the original thread meanwhile while + // were switching. In that case, check whether we should invoke the done call or + // the handler thread is already unblocked. + synchronized (sf) { + if (!sf.isDone() && sf.getRingBufferSequence() <= ringBufferBlockedPointMarker) { + sf.done(ringBufferBlockedPointMarker, ringBufferBlockedPointMarker, t); + } + } + } + } + for (int i = 0; i < ringBufferEventHandler.syncRunners.length; i++) { + ringBufferEventHandler.syncRunners[i].interrupt(); + // create a new SR. + ringBufferEventHandler.syncRunners[i] = new SyncRunner("sync.sw." + i + "-" + + switchCount, conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 200)); + ringBufferEventHandler.syncRunners[i].start(); + } + // swap the writer + Writer tmpWriter = writer; + writer = reservedWriter; + if (writer instanceof ProtobufLogWriter) { + hdfs_out = ((ProtobufLogWriter) writer).getStream(); + } + reservedWriter = tmpWriter; + } finally { + // release the safepoint in order to RBH resume its processing. + ringBufferEventHandler.resetSFCountAndAppendList(); + if (zigzagLatch != null) zigzagLatch.releaseSafePoint(); + } + // get the region:sequenceId map. Use it book keeping the while rolling the current WAL + // (soon be old) file. We don't reset it as the same entries are used by new writer. + highestRegionSeqIdsForCurWAL = new HashMap(highestRegionSequenceIds); + // roll the reserved writer + reservedWriter = rollWriterInternal(true, reservedWriter, false); + if (oldPath != null) byWalRegionSequenceIds.put(oldPath, highestRegionSeqIdsForCurWAL); + switching = false; + } finally { + rollWriterLock.unlock(); + } + } else { + LOG.info("Couldn't get the roll writer lock, continuing"); + } + } + + /** + * Take the 'in-flight' Appends and SyncFutures from SyncRunners and RingBuffer. Take only + * those which have sequence Id < ringBufferBlockedPointMarker. + * @param ringBufferBlockedPointMarker + * @param inFlightAppendsSet + * @param inflightSyncFuturesSet + */ + private void populateInflightAppendsAndSFs(final long ringBufferBlockedPointMarker, + final Set inFlightAppendsSet, final Set inflightSyncFuturesSet) { + getInflightEntriesFromSyncRunners(ringBufferBlockedPointMarker, inFlightAppendsSet, + inflightSyncFuturesSet); + + // Add RBH intransit A's and SFs + getInflightEntriesFromRB(ringBufferBlockedPointMarker, inFlightAppendsSet, + inflightSyncFuturesSet); + } + + /** + * Take inflight Appends and SFs from RB. Ignore which have larger sequence Ids than its + * blocked point. + * @param ringBufferBlockedPointMarker + * @param inFlightAppendsSet + * @param inflightSyncFuturesSet + */ + private void getInflightEntriesFromRB(final long ringBufferBlockedPointMarker, + final Set inFlightAppendsSet, + final Set inflightSyncFuturesSet) { + if (ringBufferEventHandler.entries != null && ringBufferEventHandler.entries.size() > 0) + inFlightAppendsSet.addAll(ringBufferEventHandler.entries); + for (int i = 0; i < ringBufferEventHandler.syncFuturesCount; i++) { + if (ringBufferEventHandler.syncFutures[i] == null || + ringBufferEventHandler.syncFutures[i].isDone() || + (ringBufferEventHandler.syncFutures[i].getRingBufferSequence() > + ringBufferBlockedPointMarker)) continue; + inflightSyncFuturesSet.add(ringBufferEventHandler.syncFutures[i]); + } + } + + /** + * Take the inflight Syncs and Appends fron current SyncRunners. Since a SyncRunners 'takes' a + * SyncFuture, also take the current in-progress SyncFuture. The SyncFuture could be stuck on + * this SyncFuture and it needs to be processed too. + * @param ringBufferBlockedPointMarker + * @param inFlightAppendsSet + * @param inflightSyncFuturesSet + */ + private void getInflightEntriesFromSyncRunners(final long ringBufferBlockedPointMarker, + final Set inFlightAppendsSet, + final Set inflightSyncFuturesSet) { + for (SyncRunner s : ringBufferEventHandler.syncRunners) { + if (s.appendsToSync != null && s.appendsToSync.size() > 0) { + inFlightAppendsSet.addAll(s.appendsToSync); + } + // take the current one. + if (s.currentSyncFutureInProgress != null) + inflightSyncFuturesSet.add(s.currentSyncFutureInProgress); + for (SyncFuture sf : s.syncFutures) { + if (sf == null || sf.isDone()) continue; + if (sf.getRingBufferSequence() > ringBufferBlockedPointMarker) continue; + inflightSyncFuturesSet.add(sf); + } + } + } + + /** + * @return a TreeSet to store SyncFutures (it is accessed by a single thread, so TeeSet is safe) + */ + private TreeSet createInflightSyncFuturesSet() { + return new TreeSet( + new Comparator() { + @Override + public int compare(SyncFuture o1, SyncFuture o2) { + if (o1 == o2) return 0; + long l1 = o1.getRingBufferSequence(); + long l2 = o2.getRingBufferSequence(); + return (l1 < l2 ? -1 : (l1 == l2 ? 0 : 1)); + } + }); + } + + /** + * @return a TreeSet to store FSWALEntries (it is accessed by a single thread, so TeeSet is safe) + */ + private TreeSet createInflightAppendsSet() { + return new TreeSet( + new Comparator() { + @Override + public int compare(FSWALEntry o1, FSWALEntry o2) { + if (o1 == o2) return 0; + long l1 = o1.getSequence(); + long l2 = o2.getSequence(); + return (l1 < l2 ? -1 : (l1 == l2 ? 0 : 1)); + } + }); + } + + private void close() { + if (monitor != null) monitor.interrupt(); + } + } + private static IOException ensureIOException(final Throwable t) { return (t instanceof IOException)? (IOException)t: new IOException(t); } + void postWALSwitch(int size) { + if (metrics != null) { + metrics.incrementInFlightAppendsWhileWALSwitch(size); + metrics.finishWALSwitch(); + } + } + private static void usage() { System.err.println("Usage: HLog "); System.err.println("Arguments:"); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index ce29783..d96b229 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -117,6 +117,10 @@ public interface HLog { long getLength() throws IOException; /** + * @return the Path of this writer. + */ + Path getCurrentWriterPath(); + /** * Sets HLog/WAL's WALTrailer. This trailer is appended at the end of WAL on closing. * @param walTrailer trailer to append to WAL. */ diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java index 85aaef8..f3f2795 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java @@ -71,7 +71,11 @@ public class HLogFactory { static void resetLogReaderClass() { logReaderClass = null; } - + + static void resetLogWriterClass() { + logWriterClass = null; + } + public static HLog.Reader createReader(final FileSystem fs, final Path path, Configuration conf) throws IOException { return createReader(fs, path, conf, null); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java index 82375c7..c70a793 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java @@ -58,4 +58,13 @@ public class MetricsWAL { StringUtils.humanReadableInt(size))); } } + + public void incrementInFlightAppendsWhileWALSwitch(int inflightAppendsCount) { + source.incrementInflightAppendCountInWALSwitch(inflightAppendsCount); + } + + public void finishWALSwitch() { + source.incrementWALSwitchCount(); + } + } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/RingBufferTruck.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/RingBufferTruck.java index 6bddc00..90ed999 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/RingBufferTruck.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/RingBufferTruck.java @@ -113,6 +113,14 @@ class RingBufferTruck { return ret; } + public int compareTo(RingBufferTruck other) { + if (this == other) return 0; + if (this.entry == other.entry) return 0; + long l1 = this.entry.getSequence(); + long l2 = other.entry.getSequence(); + return (l1 < l2 ? -1 : (l1 == l2 ? 0 : 1)); + } + /** * Factory for making a bunch of these. Needed by the ringbuffer/disruptor. */ diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java index f0c3fa0..b6949eb 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java @@ -68,6 +68,10 @@ class SyncFuture { private Throwable throwable = null; private Thread t; + /** + * The last completed sequence Id. + */ + private long lastCompletedSequence = -1; /** * Call this method to clear old usage and get it ready for new deploy. Call @@ -80,8 +84,10 @@ class SyncFuture { if (t != null && t != Thread.currentThread()) throw new IllegalStateException(); t = Thread.currentThread(); if (!isDone()) throw new IllegalStateException("" + sequence + " " + Thread.currentThread()); + this.lastCompletedSequence = this.doneSequence; this.doneSequence = NOT_DONE; this.ringBufferSequence = sequence; + return this; } @@ -100,17 +106,34 @@ class SyncFuture { * @return True if we successfully marked this outstanding future as completed/done. * Returns false if this future is already 'done' when this method called. */ - synchronized boolean done(final long sequence, final Throwable t) { + synchronized boolean done(final long sequence, final long offeredSequence, final Throwable t) { + // Earlier we would throw IllegalStateException whether the passed + // sequence < this.ringBufferSequence. This is no longer true because if the current + // SyncRunner is stuck, the SwitchMonitor would complete the SyncFuture. And, the SyncFuture + // could be put again into the RingBuffer by the RegionServer handler. If the old SyncRunner + // now becomes live, it would try to release this SF, but its sequence Id < ringBufferSequence. + + // More interestingly, before coming here, the old SyncRunner could also update its sequence + // Id equal to current highestSyncedSequenceId, and then would try to mark this SynFuture + // as done. We can't allow that as it would fail the current SyncRunner when it tries to mark + // as complete. + + // We now rather check the last completed sequence Id with the offeredSequence value of the + // SyncRunner. In case the offeredSequence < lastCompletedSequence, it means the SyncFuture + // is already passed the state which SyncRunner is trying to complete. In case switching is not + // enabled, this should never be true, and we throw an IllegalStateException in the + // FSHLog#releaseSyncFuture method. + + // The below condition means that SR trying to complete a SF which is already completed. + // it is like completing a old SF (but it is again cocked because of a new request from RBH + // and would be handled by another done request from a separate SR. Ignore its error too. + if (offeredSequence < this.lastCompletedSequence) return false; + if (isDone()) return false; + // coming here means it has the right to process it. Any 't' is valid. this.throwable = t; - if (sequence < this.ringBufferSequence) { - // Something badly wrong. - if (throwable == null) { - this.throwable = new IllegalStateException("sequence=" + sequence + - ", ringBufferSequence=" + this.ringBufferSequence); - } - } - // Mark done. + + // Mark done. this.doneSequence = sequence; // Wake up waiting threads. notify(); @@ -149,4 +172,18 @@ class SyncFuture { synchronized Throwable getThrowable() { return this.throwable; } + + public long getLastCompletedRingBufferSequence() { + return lastCompletedSequence; + } + + /** + * Mark the current SyncFuture as completed and failed. + * @param sequence + * @param t + */ + public synchronized void doneAndMarkAsFailed(long sequence, Throwable t) { + this.doneSequence = sequence; + this.throwable = t; + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALSwitchPolicy.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALSwitchPolicy.java new file mode 100644 index 0000000..c67d64a --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALSwitchPolicy.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; + +/** + * Defines the switching strategy. Depending upon implementation, it takes the input parameters + * (such as last N sync time, last switched time, etc), and decides whether the WAL should + * switch or not. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface WALSwitchPolicy { + + final static long AGGRESSIVE_DEFAULT_WAL_SWITCH_THRESHOLD = 2000; // 2 sec + + void init(Configuration conf); + + /** + * Based on last sync op duration, it decides whether WAL should do a switch to another writer. + * @param lastSyncTimeTaken + * @return true if WAL is eligible for switching. + */ + boolean makeAWALSwitch(long lastSyncTimeTaken); + + /** + * @return the walSwitch threshold time. + */ + long getWalSwitchThreshold(); +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java index f01300b..9bfdd9f 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java @@ -37,10 +37,12 @@ public abstract class WriterBase implements HLog.Writer { protected CompressionContext compressionContext; protected Configuration conf; + private Path path; @Override public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable) throws IOException { this.conf = conf; + this.path = path; } public boolean initializeCompressionContext(Configuration conf, Path path) throws IOException { @@ -57,4 +59,8 @@ public abstract class WriterBase implements HLog.Writer { return doCompress; } + @Override + public Path getCurrentWriterPath() { + return this.path; + } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java index 82dca79..83c7a83 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java @@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import com.yammer.metrics.core.Counter; import com.yammer.metrics.core.Histogram; import com.yammer.metrics.core.Meter; import com.yammer.metrics.core.MetricsRegistry; @@ -68,6 +69,9 @@ import com.yammer.metrics.reporting.ConsoleReporter; public final class HLogPerformanceEvaluation extends Configured implements Tool { static final Log LOG = LogFactory.getLog(HLogPerformanceEvaluation.class.getName()); private final MetricsRegistry metrics = new MetricsRegistry(); + + private final Counter inflightAppendsCounter = + metrics.newCounter(HLogPerformanceEvaluation.class, "inflightAppends"); private final Meter syncMeter = metrics.newMeter(HLogPerformanceEvaluation.class, "syncMeter", "syncs", TimeUnit.MILLISECONDS); private final Histogram syncHistogram = @@ -273,6 +277,12 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool appendMeter.mark(size); return size; } + + @Override + void postWALSwitch(int size) { + inflightAppendsCounter.inc(size); + } + }; hlog.registerWALActionsListener(new WALActionsListener() { private int appends = 0; @@ -343,9 +353,25 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool if (!fs.exists(p)) throw new IllegalStateException(p.toString()); editCount += verify(p, verbose); } - long expected = numIterations * numThreads; - if (editCount != expected) { - throw new IllegalStateException("Counted=" + editCount + ", expected=" + expected); + long expectedWithoutWALSwitch = numIterations * numThreads; + if (inflightAppendsCounter.count() > 0) { + // switching occurs. It may happen that some writes were not completed and + // were interrupted. + // We can define the range of number of edits: + // (numIterations * threads + infligtEditsCount) >= x >= numIterations * threads + LOG.warn("Counted=" + + editCount + ", expected range=[" + expectedWithoutWALSwitch + ", " + + (inflightAppendsCounter.count() + expectedWithoutWALSwitch) + "]"); + if (editCount > (inflightAppendsCounter.count() + expectedWithoutWALSwitch) || + editCount < expectedWithoutWALSwitch) + throw new IllegalStateException("Counted=" + + editCount + ", expected range=[" + expectedWithoutWALSwitch + ", " + + inflightAppendsCounter.count() + expectedWithoutWALSwitch + "]"); + } else { + if (editCount != expectedWithoutWALSwitch) { + throw new IllegalStateException("Counted=" + editCount + + ", expected=" + expectedWithoutWALSwitch); + } } } } finally { diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SlowSyncOpWriter.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SlowSyncOpWriter.java new file mode 100644 index 0000000..760ab9b --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SlowSyncOpWriter.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class SlowSyncOpWriter extends ProtobufLogWriter { + private static int count; + private static final Log LOG = LogFactory.getLog(SlowSyncOpWriter.class); + + public SlowSyncOpWriter() { + super(); + } + + @Override + public void sync() throws IOException { + try { + if ((++count % 3) == 0) { + LOG.debug("Sleeping for 6 sec..., count=" + count); + Thread.sleep(6000); + } + super.sync(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + super.sync(); + } +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java index 57dc333..d8f074a 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java @@ -20,9 +20,11 @@ package org.apache.hadoop.hbase.regionserver.wal; import static org.junit.Assert.*; +import java.io.FileNotFoundException; import java.io.IOException; import java.lang.reflect.Method; import java.net.BindException; +import java.net.URI; import java.util.Comparator; import java.util.HashMap; import java.util.List; @@ -38,6 +40,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader; import org.apache.hadoop.hbase.util.Bytes; @@ -52,6 +55,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.FSConstants; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.namenode.LeaseManager; +import org.apache.hadoop.util.Progressable; import org.apache.log4j.Level; import org.junit.After; import org.junit.AfterClass; @@ -60,6 +64,8 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; /** JUnit test case for HLog */ @Category(LargeTests.class) @@ -1185,6 +1191,8 @@ public class TestHLog { hlog.completeCacheFlush(regionEncodedName); } + + static class DumbWALActionsListener implements WALActionsListener { int increments = 0; diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSwitch.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSwitch.java new file mode 100644 index 0000000..474eb9e --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSwitch.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(LargeTests.class) +public class TestHLogSwitch { + private static final Log LOG = LogFactory.getLog(TestHLogSwitch.class); + private static Configuration conf; + private static FileSystem fs; + private static Path dir; + private static MiniDFSCluster cluster; + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static Path hbaseDir; + + @Before + public void setUp() throws Exception { + FileStatus[] entries = fs.listStatus(new Path("/")); + for (FileStatus dir : entries) { + fs.delete(dir.getPath(), true); + } + } + + @After + public void tearDown() throws Exception { + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // The below stuff is copied from TestHLog. + // Make block sizes small. + TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024); + // needed for testAppendClose() + TEST_UTIL.getConfiguration().setBoolean("dfs.support.broken.append", true); + TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true); + // quicker heartbeat interval for faster DN death notification + TEST_UTIL.getConfiguration().setInt("heartbeat.recheck.interval", 5000); + TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1); + TEST_UTIL.getConfiguration().setInt("dfs.socket.timeout", 5000); + // faster failover with cluster.shutdown();fs.close() idiom + TEST_UTIL.getConfiguration().setInt("ipc.client.connect.max.retries", 1); + TEST_UTIL.getConfiguration().setInt("dfs.client.block.recovery.retries", 1); + TEST_UTIL.getConfiguration().setInt("ipc.client.connection.maxidletime", 500); + TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, + SampleRegionWALObserver.class.getName()); + TEST_UTIL.startMiniDFSCluster(3); + + conf = TEST_UTIL.getConfiguration(); + cluster = TEST_UTIL.getDFSCluster(); + fs = cluster.getFileSystem(); + + hbaseDir = TEST_UTIL.createRootDir(); + dir = new Path(hbaseDir, getName()); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + private static String getName() { + return "TestHLogSwitch"; + } + + /** + * Tests the WAL Switch functionality correctness. + * Write to a log file with twenty concurrent threads, with switching aggressively. + * Verify that there is no "out-of-order" edits, and the data is in the expected range. + * @throws Exception + */ + @Test + public void testConcurrentWritesWithSwitching() throws Exception { + // Run the HPE tool with twenty threads writing 5000 edits each concurrently. + // When done, verify that all edits were written. + LOG.debug("testConcurrentWritesWithSwitching"); + Configuration conf1 = HBaseConfiguration.create(conf); + conf1.setBoolean("hbase.regionserver.wal.switch.enabled", true); + conf1.setLong("hbase.regionserver.wal.switch.threshold", 10); + + int errCode = HLogPerformanceEvaluation.innerMain(conf1, new String[] { "-threads", "20", + "-verify", "-noclosefs", "-iterations", "5000" }); + assertEquals(0, errCode); + } + + /** + * Methods to test the switch functionality. + * @throws IOException + */ + @Test + public void testSwitchOccurred() throws Exception { + LOG.debug("testSwitchOccurred"); + Configuration conf1 = HBaseConfiguration.create(conf); + conf1.setBoolean("hbase.regionserver.wal.switch.enabled", true); + conf1.setClass("hbase.regionserver.hlog.writer.impl", SlowSyncOpWriter.class, + HLog.Writer.class); + TableName table1 = TableName.valueOf("t1"); + HLogFactory.resetLogWriterClass(); + HLog hlog = HLogFactory.createHLog(fs, FSUtils.getRootDir(conf1), dir.toString(), conf1); + try { + assertEquals(0, ((FSHLog) hlog).getNumRolledLogFiles()); + HRegionInfo hri1 = new HRegionInfo(table1, HConstants.EMPTY_START_ROW, + HConstants.EMPTY_END_ROW); + // variables to mock region sequenceIds. + final AtomicLong sequenceId1 = new AtomicLong(1); + // start with the testing logic: insert a waledit, and roll writer + int curNumLogFiles = hlog.getNumLogFiles(); + addEdits(hlog, hri1, table1, 1, sequenceId1); + // invoke sync before calling roll... + hlog.sync(); + addEdits(hlog, hri1, table1, 1, sequenceId1); + assertTrue("Old log count is same as current log count", + hlog.getNumLogFiles() > curNumLogFiles); + curNumLogFiles = hlog.getNumLogFiles(); + hlog.sync(); + // ensure empty sync also works + hlog.sync(); + assertTrue("Old log count is same as current log count", + hlog.getNumLogFiles() > curNumLogFiles); + LOG.warn("Before calling the roll writer.. switch"); + // test concurrent log roll. + hlog.rollWriter(); + // assert that the wal is rolled + // add edits in the second wal file, and roll writer. + addEdits(hlog, hri1, table1, 1, sequenceId1); + hlog.sync(); + hlog.rollWriter(); + } finally { + HLogFactory.resetLogWriterClass(); + if (hlog != null) hlog.close(); + } + } + + private void addEdits(HLog log, HRegionInfo hri, TableName tableName, int times, + AtomicLong sequenceId) throws IOException { + HTableDescriptor htd = new HTableDescriptor(); + htd.addFamily(new HColumnDescriptor("row")); + + final byte[] row = Bytes.toBytes("row"); + for (int i = 0; i < times; i++) { + long timestamp = System.currentTimeMillis(); + WALEdit cols = new WALEdit(); + cols.add(new KeyValue(row, row, row, timestamp, row)); + log.append(hri, tableName, cols, timestamp, htd, sequenceId); + } + } + +}