diff --git a/conf/hbase-env.sh b/conf/hbase-env.sh index 91aca76..fa72483 100644 --- a/conf/hbase-env.sh +++ b/conf/hbase-env.sh @@ -38,7 +38,9 @@ # Below are what we set by default. May only work with SUN JVM. # For more on why as well as other possible settings, # see http://wiki.apache.org/hadoop/PerformanceTuning -export HBASE_OPTS="-XX:+UseConcMarkSweepGC" +#export HBASE_OPTS="-XX:+UseConcMarkSweepGC" +export HBASE_OPTS=" -XX:+ExtendedDTraceProbes -XX:+UseConcMarkSweepGC" +#export HBASE_OPTS=" -agentpath:/Applications/jprofiler8/bin/macos/libjprofilerti.jnilib=port=8849 -XX:+UseConcMarkSweepGC" # Uncomment one of the below three options to enable java garbage collection logging for the server-side processes. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java index d1c7daf..5dd7ebe 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java @@ -602,7 +602,8 @@ public class HTableDescriptor implements WritableComparable { * @return true if that deferred log flush is enabled on the table * * @see #setDeferredLogFlush(boolean) - * @deprecated use {@link #getDurability()} + * @deprecated Since 0.96 we no longer have an explicity deferred log flush/sync functionality. + * Use {@link #getDurability()}. */ @Deprecated public synchronized boolean isDeferredLogFlush() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Durability.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Durability.java index 61ed045..8df8e10 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Durability.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Durability.java @@ -42,6 +42,7 @@ public enum Durability { /** * Write the Mutation to the WAL asynchronously */ + // This is the default. We append a batch and then sync them every 1ms or so. ASYNC_WAL, /** * Write the Mutation to the WAL synchronously. diff --git a/hbase-server/pom.xml b/hbase-server/pom.xml index f5723b2..ea5660c 100644 --- a/hbase-server/pom.xml +++ b/hbase-server/pom.xml @@ -446,7 +446,11 @@ org.cloudera.htrace htrace-core - + + + com.lmax + disruptor + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 083aff9..7b89a86 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -1,5 +1,4 @@ /** - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,15 +19,17 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InterruptedIOException; import java.io.OutputStream; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.URLEncoder; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Comparator; import java.util.HashMap; -import java.util.LinkedList; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -36,7 +37,9 @@ import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; @@ -62,157 +65,159 @@ import org.apache.hadoop.hbase.util.DrainBarrier; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HasThread; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.util.StringUtils; import org.cloudera.htrace.Trace; import org.cloudera.htrace.TraceScope; import com.google.common.annotations.VisibleForTesting; +import com.lmax.disruptor.BlockingWaitStrategy; +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.LifecycleAware; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; /** - * HLog stores all the edits to the HStore. Its the hbase write-ahead-log - * implementation. - * - * It performs logfile-rolling, so external callers are not aware that the - * underlying file is being rolled. - * - *

- * There is one HLog per RegionServer. All edits for all Regions carried by - * a particular RegionServer are entered first in the HLog. + * Implementation of {@link HLog} to go against {@link FileSystem}; i.e. keep WAL in filesystem. * - *

- * Each HRegion is identified by a unique long int. HRegions do - * not need to declare themselves before using the HLog; they simply include - * their HRegion-id in the append or - * completeCacheFlush calls. + * In this implementation, there is one HLog/WAL. All edits for all Regions are entered first in the HLog/WAL. Each + * HRegion is identified by a unique long int. HRegions do not need to declare themselves before using the + * HLog/WAL; they simply include their HRegion-id in the append or completeCacheFlush calls. * - *

- * An HLog consists of multiple on-disk files, which have a chronological order. - * As data is flushed to other (better) on-disk structures, the log becomes - * obsolete. We can destroy all the log messages for a given HRegion-id up to - * the most-recent CACHEFLUSH message from that HRegion. + *

This HLog/WAL implementation keeps multiple on-disk files kept in a chronological order. As data is flushed to + * other (better) on-disk structures (files sorted by key, hfiles), the log becomes obsolete. We can let go of all the + * log edits/entries for a given HRegion-id up to the most-recent CACHEFLUSH message from that HRegion. A bunch of work + * in the below is done keeping account of these region sequence ids -- what is flushed out to hfiles, and what is yet + * in WAL and in memory only. * - *

- * It's only practical to delete entire files. Thus, we delete an entire on-disk - * file F when all of the messages in F have a log-sequence-id that's older - * (smaller) than the most-recent CACHEFLUSH message for every HRegion that has - * a message in F. + *

Its only practical to delete entire files. Thus, we delete an entire on-disk file F when all of the + * edits in F have a log-sequence-id that's older (smaller) than the most-recent CACHEFLUSH message for + * every HRegion that has an edit in F. * - *

- * Synchronized methods can never execute in parallel. However, between the - * start of a cache flush and the completion point, appends are allowed but log - * rolling is not. To prevent log rolling taking place during this period, a - * separate reentrant lock is used. + *

This implementation performs logfile-rolling internal to the implementation, so external callers do not have to be + * concerned with log rolling. Synchronized methods can never execute in parallel. However, between the start of a cache + * flush and the completion point, appends are allowed but log rolling is not. To prevent log rolling taking place + * during this period, a separate reentrant lock is used. * *

To read an HLog, call {@link HLogFactory#createReader(org.apache.hadoop.fs.FileSystem, * org.apache.hadoop.fs.Path, org.apache.hadoop.conf.Configuration)}. - * */ @InterfaceAudience.Private class FSHLog implements HLog, Syncable { static final Log LOG = LogFactory.getLog(FSHLog.class); + /** + * Disruptor is a fancy ring buffer. This disruptor/ring buffer is used to take edits and sync calls + * from the Handlers and passes them to the append and sync executors with minimal contention. + */ + private final Disruptor disruptor; + + /** + * An executorservice that runs the AppendEventHandler append executor. + */ + private final ExecutorService appendExecutor; + + /** + * This fellow is run by the above appendExecutor service but it is all about batching up appends and syncs; it may + * shutdown without cleaning out the last few appends or syncs. To guard against this, keep a reference to this + * handler and do explicit close on way out to make sure all flushed out before we exit. + */ + private final AppendEventHandler appendEventHandler; + + /** + * Map of {@link SyncFuture}s keyed by Handler objects. Used so we reuse SyncFutures. + */ + private final Map syncFuturesByHandler; + private final FileSystem fs; - private final Path rootDir; - private final Path dir; + private final Path fullPathLogDir; + private final Path fullPathOldLogDir; private final Configuration conf; - // Listeners that are called on WAL events. - private List listeners = - new CopyOnWriteArrayList(); - private final long blocksize; - private final String prefix; - private final AtomicLong unflushedEntries = new AtomicLong(0); - private final AtomicLong syncedTillHere = new AtomicLong(0); - private long lastDeferredTxid; - private final Path oldLogDir; - - // all writes pending on AsyncWriter/AsyncSyncer thread with - // txid <= failedTxid will fail by throwing asyncIOE - private final AtomicLong failedTxid = new AtomicLong(0); - private volatile IOException asyncIOE = null; + private final String logFilePrefix; + + /** + * The highest known outstanding unsync'd WALEdit sequence number where sequence number is the ring buffer sequence. + */ + private volatile long highestUnsyncedSequence = -1; + + /** + * Updated to the ring buffer sequence of the last successful sync call. This can be less than + * {@link #highestUnsyncedSequence} for case where we have an append where a sync has not yet come in for it. + */ + private final AtomicLong highestSyncedSequence = new AtomicLong(0); private WALCoprocessorHost coprocessorHost; private FSDataOutputStream hdfs_out; // FSDataOutputStream associated with the current SequenceFile.writer - // Minimum tolerable replicas, if the actual value is lower than it, - // rollWriter will be triggered - private int minTolerableReplication; - private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas - final static Object [] NO_ARGS = new Object []{}; - /** The barrier used to ensure that close() waits for all log rolls and flushes to finish. */ - private DrainBarrier closeBarrier = new DrainBarrier(); + // All about log rolling if not enough replicas outstanding. + + // Minimum tolerable replicas, if the actual value is lower than it, rollWriter will be triggered + private final int minTolerableReplication; + // DFSOutputStream.getNumCurrentReplicas method instance gotten via reflection. + private final Method getNumCurrentReplicas; + private final static Object [] NO_ARGS = new Object []{}; + // If live datanode count is lower than the default replicas value, + // RollWriter will be triggered in each sync(So the RollWriter will be + // triggered one by one in a short time). Using it as a workaround to slow + // down the roll frequency triggered by checkLowReplication(). + private final AtomicInteger consecutiveLogRolls = new AtomicInteger(0); + private final int lowReplicationRollLimit; + // If consecutiveLogRolls is larger than lowReplicationRollLimit, + // then disable the rolling in checkLowReplication(). + // Enable it if the replications recover. + private volatile boolean lowReplicationRollEnabled = true; /** * Current log file. */ Writer writer; - /** - * This lock synchronizes all operations on oldestUnflushedSeqNums and oldestFlushingSeqNums, - * with the exception of append's putIfAbsent into oldestUnflushedSeqNums. - * We only use these to find out the low bound seqNum, or to find regions with old seqNums to - * force flush them, so we don't care about these numbers messing with anything. */ - private final Object oldestSeqNumsLock = new Object(); + /** The barrier used to ensure that close() waits for all log rolls and flushes to finish. */ + private final DrainBarrier closeBarrier = new DrainBarrier(); /** - * This lock makes sure only one log roll runs at the same time. Should not be taken while - * any other lock is held. We don't just use synchronized because that results in bogus and - * tedious findbugs warning when it thinks synchronized controls writer thread safety */ + * This lock makes sure only one log roll runs at a time. Should not be taken while any other lock is held. We don't + * just use synchronized because that results in bogus and tedious findbugs warning when it thinks synchronized + * controls writer thread safety. It is held when we are actually rolling the log. It is checked when we are + * looking to see if we should roll the log or not. + */ private final ReentrantLock rollWriterLock = new ReentrantLock(true); /** - * Map of encoded region names to their most recent sequence/edit id in their memstore. + * We synchronize on updateLock to prevent a log roll or closing during an append. It is held during append so roll + * cannot happen during append. Also held when closing the log so we can do final sync without new updates coming in + * and close the log. */ - private final ConcurrentSkipListMap oldestUnflushedSeqNums = - new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); - /** - * Map of encoded region names to their most recent sequence/edit id in their memstore; - * contains the regions that are currently flushing. That way we can store two numbers for - * flushing and non-flushing (oldestUnflushedSeqNums) memstore for the same region. - */ - private final Map oldestFlushingSeqNums = - new TreeMap(Bytes.BYTES_COMPARATOR); + private final Object updateLock = new Object(); + + // Listeners that are called on WAL events. + private final List listeners = new CopyOnWriteArrayList(); private volatile boolean closed = false; - private boolean forMeta = false; + /** + * Set when this WAL is for meta only (we run a WAL for all regions except meta -- it has its own dedicated WAL). + */ + private final boolean forMeta; // The timestamp (in ms) when the log file was created. private volatile long filenum = -1; - //number of transactions in the current Hlog. + // Number of transactions in the current Hlog. private final AtomicInteger numEntries = new AtomicInteger(0); - // If live datanode count is lower than the default replicas value, - // RollWriter will be triggered in each sync(So the RollWriter will be - // triggered one by one in a short time). Using it as a workaround to slow - // down the roll frequency triggered by checkLowReplication(). - private AtomicInteger consecutiveLogRolls = new AtomicInteger(0); - private final int lowReplicationRollLimit; - - // If consecutiveLogRolls is larger than lowReplicationRollLimit, - // then disable the rolling in checkLowReplication(). - // Enable it if the replications recover. - private volatile boolean lowReplicationRollEnabled = true; - - // If > than this size, roll the log. This is typically 0.95 times the size - // of the default Hdfs block size. + // If > than this size, roll the log. private final long logrollsize; - - /** size of current log */ - private long curLogSize = 0; /** * The total size of hlog */ private AtomicLong totalLogSize = new AtomicLong(0); - - // We synchronize on updateLock to prevent updates and to prevent a log roll - // during an update - // locked during appends - private final Object updateLock = new Object(); - private final Object pendingWritesLock = new Object(); + /** + * If WAL is enabled. + */ private final boolean enabled; /* @@ -222,39 +227,53 @@ class FSHLog implements HLog, Syncable { */ private final int maxLogs; - // List of pending writes to the HLog. There corresponds to transactions - // that have not yet returned to the client. We keep them cached here - // instead of writing them to HDFS piecemeal. The goal is to increase - // the batchsize for writing-to-hdfs as well as sync-to-hdfs, so that - // we can get better system throughput. - private List pendingWrites = new LinkedList(); - - private final AsyncWriter asyncWriter; - // since AsyncSyncer takes much longer than other phase(add WALEdits to local - // buffer, write local buffer to HDFS, notify pending write handler threads), - // when a sync is ongoing, all other phase pend, we use multiple parallel - // AsyncSyncer threads to improve overall throughput. - private final AsyncSyncer[] asyncSyncers; - private final AsyncNotifier asyncNotifier; - /** Number of log close errors tolerated before we abort */ private final int closeErrorsTolerated; private final AtomicInteger closeErrorCount = new AtomicInteger(); private final MetricsWAL metrics; -/** - * Map of region encoded names to the latest sequence num obtained from them while appending - * WALEdits to the wal. We create one map for each WAL file at the time it is rolled. - *

- * When deciding whether to archive a WAL file, we compare the sequence IDs in this map to - * {@link #oldestFlushingSeqNums} and {@link #oldestUnflushedSeqNums}. - * See {@link FSHLog#areAllRegionsFlushed(Map, Map, Map)} for more info. - *

- * This map uses byte[] as the key, and uses reference equality. It works in our use case as we - * use {@link HRegionInfo#getEncodedNameAsBytes()} as keys. For a given region, it always returns - * the same array. - */ - private Map latestSequenceNums = new HashMap(); + + // Region sequence id accounting across flushes and for knowing when we can GC a WAL. These sequence id numbers are + // by region and unrelated to the ring buffer sequence number accounting done above in failedSequence, highest + // sequence, etc. + /** + * This lock ties all operations on oldestFlushingRegionSequenceIds and oldestFlushedRegionSequenceIds Maps with the + * exception of append's putIfAbsent call into oldestUnflushedSeqNums. We use these Maps to find out the low bound + * seqNum, or to find regions with old seqNums to force flush; we are interested in old stuff not the new additions + * (TODO: IS THIS SAFE? CHECK!). + */ + private final Object regionSequenceIdLock = new Object(); + + /** + * Map of encoded region names to their OLDEST -- i.e. their first, the longest-lived -- sequence id in memstore. + * Note that this sequenceid is the region sequence id. This is not related to the id we use above for + * {@link #highestSyncedSequence} and {@link #highestUnsyncedSequence} which is the sequence from the disruptor + * ring buffer, an internal detail. + */ + private final ConcurrentSkipListMap oldestUnflushedRegionSequenceIds = + new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); + + /** + * Map of encoded region names to their lowest or OLDEST sequence/edit id in memstore currently being flushed out to + * hfiles. Entries are moved here from {@link #oldestUnflushedRegionSequenceIds} while the lock + * {@link #regionSequenceIdLock} is held (so movement between the Maps is atomic). This is not related to the + * id we use above for {@link #highestSyncedSequence} and {@link #highestUnsyncedSequence} which is the sequence from + * the disruptor ring buffer, an internal detail. + */ + private final Map lowestFlushingRegionSequenceIds = new TreeMap(Bytes.BYTES_COMPARATOR); + + /** + * Map of region encoded names to the latest region sequence id. Updated on each append of WALEdits to the WAL. + * We create one map for each WAL file at the time it is rolled. + *

When deciding whether to archive a WAL file, we compare the sequence IDs in this map to + * {@link #lowestFlushingRegionSequenceIds} and {@link #oldestUnflushedRegionSequenceIds}. + * See {@link FSHLog#areAllRegionsFlushed(Map, Map, Map)} for more info. + *

+ * This map uses byte[] as the key, and uses reference equality. It works in our use case as we + * use {@link HRegionInfo#getEncodedNameAsBytes()} as keys. For a given region, it always returns + * the same array. + */ + private Map highestRegionSequenceIds = new HashMap(); /** * WAL Comparator; it compares the timestamp (log filenum), present in the log file name. @@ -270,10 +289,10 @@ class FSHLog implements HLog, Syncable { }; /** - * Map of log file to the latest sequence nums of all regions it has entries of. + * Map of wal log file to the latest sequence nums of all regions it has entries of. * The map is sorted by the log file creation timestamp (contained in the log file name). */ - private NavigableMap> hlogSequenceNums = + private NavigableMap> byWalRegionSequenceIds = new ConcurrentSkipListMap>(LOG_NAME_COMPARATOR); /** @@ -285,11 +304,9 @@ class FSHLog implements HLog, Syncable { * @param conf configuration to use * @throws IOException */ - public FSHLog(final FileSystem fs, final Path root, final String logDir, - final Configuration conf) + public FSHLog(final FileSystem fs, final Path root, final String logDir, final Configuration conf) throws IOException { - this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, - conf, null, true, null, false); + this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, false); } /** @@ -302,11 +319,10 @@ class FSHLog implements HLog, Syncable { * @param conf configuration to use * @throws IOException */ - public FSHLog(final FileSystem fs, final Path root, final String logDir, - final String oldLogDir, final Configuration conf) + public FSHLog(final FileSystem fs, final Path root, final String logDir, final String oldLogDir, + final Configuration conf) throws IOException { - this(fs, root, logDir, oldLogDir, - conf, null, true, null, false); + this(fs, root, logDir, oldLogDir, conf, null, true, null, false); } /** @@ -331,8 +347,7 @@ class FSHLog implements HLog, Syncable { public FSHLog(final FileSystem fs, final Path root, final String logDir, final Configuration conf, final List listeners, final String prefix) throws IOException { - this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, - conf, listeners, true, prefix, false); + this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners, true, prefix, false); } /** @@ -343,7 +358,7 @@ class FSHLog implements HLog, Syncable { * HLog object is started up. * * @param fs filesystem handle - * @param root path to where logs and oldlogs + * @param rootDir path to where logs and oldlogs * @param logDir dir where hlogs are stored * @param oldLogDir dir where hlogs are archived * @param conf configuration to use @@ -357,116 +372,107 @@ class FSHLog implements HLog, Syncable { * @param forMeta if this hlog is meant for meta updates * @throws IOException */ - public FSHLog(final FileSystem fs, final Path root, final String logDir, + public FSHLog(final FileSystem fs, final Path rootDir, final String logDir, final String oldLogDir, final Configuration conf, final List listeners, final boolean failIfLogDirExists, final String prefix, boolean forMeta) throws IOException { super(); this.fs = fs; - this.rootDir = root; - this.dir = new Path(this.rootDir, logDir); - this.oldLogDir = new Path(this.rootDir, oldLogDir); + this.fullPathLogDir = new Path(rootDir, logDir); + this.fullPathOldLogDir = new Path(rootDir, oldLogDir); this.forMeta = forMeta; this.conf = conf; + // Register listeners. if (listeners != null) { for (WALActionsListener i: listeners) { registerWALActionsListener(i); } } - this.blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize", - FSUtils.getDefaultBlockSize(this.fs, this.dir)); - // Roll at 95% of block size. - float multi = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f); - this.logrollsize = (long)(this.blocksize * multi); + // Get size to roll log at. Roll at 95% of HDFS block size so we avoid crossing HDFS blocks (it costs x'ing bocks) + long blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize", + FSUtils.getDefaultBlockSize(this.fs, this.fullPathLogDir)); + this.logrollsize = (long)(blocksize * conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f)); this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32); - this.minTolerableReplication = conf.getInt( - "hbase.regionserver.hlog.tolerable.lowreplication", - FSUtils.getDefaultReplication(fs, this.dir)); - this.lowReplicationRollLimit = conf.getInt( - "hbase.regionserver.hlog.lowreplication.rolllimit", 5); + this.minTolerableReplication = conf.getInt( "hbase.regionserver.hlog.tolerable.lowreplication", + FSUtils.getDefaultReplication(fs, this.fullPathLogDir)); + this.lowReplicationRollLimit = conf.getInt("hbase.regionserver.hlog.lowreplication.rolllimit", 5); this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true); - this.closeErrorsTolerated = conf.getInt( - "hbase.regionserver.logroll.errors.tolerated", 0); - + this.closeErrorsTolerated = conf.getInt("hbase.regionserver.logroll.errors.tolerated", 0); + // If prefix is null||empty then just name it hlog + this.logFilePrefix = prefix == null || prefix.isEmpty() ? "hlog" : URLEncoder.encode(prefix, "UTF8"); + int maxHandlersCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 200); - LOG.info("WAL/HLog configuration: blocksize=" + - StringUtils.byteDesc(this.blocksize) + + LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize=" + StringUtils.byteDesc(this.logrollsize) + - ", enabled=" + this.enabled); - // If prefix is null||empty then just name it hlog - this.prefix = prefix == null || prefix.isEmpty() ? - "hlog" : URLEncoder.encode(prefix, "UTF8"); + ", enabled=" + this.enabled + ", prefix=" + this.logFilePrefix + ", logDir=" + this.fullPathLogDir + + ", oldLogDir=" + this.fullPathOldLogDir); boolean dirExists = false; - if (failIfLogDirExists && (dirExists = this.fs.exists(dir))) { - throw new IOException("Target HLog directory already exists: " + dir); + if (failIfLogDirExists && (dirExists = this.fs.exists(fullPathLogDir))) { + throw new IOException("Target HLog directory already exists: " + fullPathLogDir); } - if (!dirExists && !fs.mkdirs(dir)) { - throw new IOException("Unable to mkdir " + dir); + if (!dirExists && !fs.mkdirs(fullPathLogDir)) { + throw new IOException("Unable to mkdir " + fullPathLogDir); } - if (!fs.exists(this.oldLogDir)) { - if (!fs.mkdirs(this.oldLogDir)) { - throw new IOException("Unable to mkdir " + this.oldLogDir); + if (!fs.exists(this.fullPathOldLogDir)) { + if (!fs.mkdirs(this.fullPathOldLogDir)) { + throw new IOException("Unable to mkdir " + this.fullPathOldLogDir); } } + // rollWriter sets this.hdfs_out if it can. rollWriter(); // handle the reflection necessary to call getNumCurrentReplicas() this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out); - final String n = Thread.currentThread().getName(); - - - asyncWriter = new AsyncWriter(n + "-WAL.AsyncWriter"); - asyncWriter.start(); - - int syncerNums = conf.getInt("hbase.hlog.asyncer.number", 5); - asyncSyncers = new AsyncSyncer[syncerNums]; - for (int i = 0; i < asyncSyncers.length; ++i) { - asyncSyncers[i] = new AsyncSyncer(n + "-WAL.AsyncSyncer" + i); - asyncSyncers[i].start(); - } - - asyncNotifier = new AsyncNotifier(n + "-WAL.AsyncNotifier"); - asyncNotifier.start(); - - coprocessorHost = new WALCoprocessorHost(this, conf); - + this.coprocessorHost = new WALCoprocessorHost(this, conf); this.metrics = new MetricsWAL(); + + // This is the 'writer' -- a single threaded executor. This single thread 'consumes' what is put on the + // ring buffer. + this.appendExecutor = Executors.newSingleThreadExecutor(Threads.getNamedThreadFactory("append")); + final int preallocatedEventCount = this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024); + // Using BlockingWaitStrategy. Stuff that is going on here takes so long it makes no sense spinning as other + // strategies do. + this.disruptor = new Disruptor(RingBufferTruck.EVENT_FACTORY, preallocatedEventCount, + this.appendExecutor, ProducerType.MULTI, new BlockingWaitStrategy()); + this.appendEventHandler = + new AppendEventHandler(conf.getInt("hbase.regionserver.hlog.syncer.count", 5), maxHandlersCount); + this.disruptor.handleEventsWith(new AppendEventHandler [] {this.appendEventHandler}); + // Presize our map of SyncFutures by handler objects. + this.syncFuturesByHandler = new HashMap(maxHandlersCount); + // Starting up threads in constructor is a no no; Interface should have an init call. + this.disruptor.start(); } /** * Find the 'getNumCurrentReplicas' on the passed os stream. * @return Method or null. */ - private Method getGetNumCurrentReplicas(final FSDataOutputStream os) { + private static Method getGetNumCurrentReplicas(final FSDataOutputStream os) { Method m = null; if (os != null) { - Class wrappedStreamClass = os.getWrappedStream() - .getClass(); + Class wrappedStreamClass = os.getWrappedStream().getClass(); try { - m = wrappedStreamClass.getDeclaredMethod("getNumCurrentReplicas", - new Class[] {}); + m = wrappedStreamClass.getDeclaredMethod("getNumCurrentReplicas", new Class[] {}); m.setAccessible(true); } catch (NoSuchMethodException e) { - LOG.info("FileSystem's output stream doesn't support" - + " getNumCurrentReplicas; --HDFS-826 not available; fsOut=" - + wrappedStreamClass.getName()); + LOG.info("FileSystem's output stream doesn't support getNumCurrentReplicas; HDFS-826 not available; fsOut=" + + wrappedStreamClass.getName()); } catch (SecurityException e) { - LOG.info("Doesn't have access to getNumCurrentReplicas on " - + "FileSystems's output stream --HDFS-826 not available; fsOut=" - + wrappedStreamClass.getName(), e); + LOG.info("No access to getNumCurrentReplicas on FileSystems's output stream; HDFS-826 not available; fsOut=" + + wrappedStreamClass.getName(), e); m = null; // could happen on setAccessible() } } if (m != null) { - if (LOG.isTraceEnabled()) LOG.trace("Using getNumCurrentReplicas--HDFS-826"); + if (LOG.isTraceEnabled()) LOG.trace("Using getNumCurrentReplicas"); } return m; } @@ -504,8 +510,7 @@ class FSHLog implements HLog, Syncable { } @Override - public byte [][] rollWriter(boolean force) - throws FailedLogCloseException, IOException { + public byte [][] rollWriter(boolean force) throws FailedLogCloseException, IOException { rollWriterLock.lock(); try { // Return if nothing to flush. @@ -530,7 +535,9 @@ class FSHLog implements HLog, Syncable { //computeFilename will take care of meta hlog filename oldPath = computeFilename(currentFilenum); } - this.filenum = System.currentTimeMillis(); + long start = System.nanoTime(); + // Filename is ms, not nanos. + this.filenum = start/1000000; Path newPath = computeFilename(); while (fs.exists(newPath)) { this.filenum++; @@ -551,7 +558,7 @@ class FSHLog implements HLog, Syncable { // perform the costly sync before we get the lock to roll writers. try { nextWriter.sync(); - postSync(); + postSync(System.nanoTime() - start, 0); } catch (IOException e) { // optimization failed, no need to abort here. LOG.warn("pre-sync failed", e); @@ -568,8 +575,8 @@ class FSHLog implements HLog, Syncable { this.hdfs_out = nextHdfsOut; this.numEntries.set(0); if (oldFile != null) { - this.hlogSequenceNums.put(oldFile, this.latestSequenceNums); - this.latestSequenceNums = new HashMap(); + this.byWalRegionSequenceIds.put(oldFile, this.highestRegionSequenceIds); + this.highestRegionSequenceIds = new HashMap(); } } if (oldFile == null) LOG.info("New WAL " + FSUtils.getPath(newPath)); @@ -622,12 +629,12 @@ class FSHLog implements HLog, Syncable { } /** - * Archive old logs that could be archived: a log is eligible for archiving if all its WALEdits - * are already flushed by the corresponding regions. + * Archive old logs that could be archived: a log is eligible for archiving if all its WALEdits have been flushed + * to hfiles. *

* For each log file, it compares its region to sequenceId map - * (@link {@link FSHLog#latestSequenceNums} with corresponding region entries in - * {@link FSHLog#oldestFlushingSeqNums} and {@link FSHLog#oldestUnflushedSeqNums}. + * (@link {@link FSHLog#highestRegionSequenceIds} with corresponding region entries in + * {@link FSHLog#lowestFlushingRegionSequenceIds} and {@link FSHLog#oldestUnflushedRegionSequenceIds}. * If all the regions in the map are flushed past of their value, then the wal is eligible for * archiving. * @throws IOException @@ -637,32 +644,31 @@ class FSHLog implements HLog, Syncable { Map oldestUnflushedSeqNumsLocal = null; List logsToArchive = new ArrayList(); // make a local copy so as to avoid locking when we iterate over these maps. - synchronized (oldestSeqNumsLock) { - oldestFlushingSeqNumsLocal = new HashMap(this.oldestFlushingSeqNums); - oldestUnflushedSeqNumsLocal = new HashMap(this.oldestUnflushedSeqNums); + synchronized (regionSequenceIdLock) { + oldestFlushingSeqNumsLocal = new HashMap(this.lowestFlushingRegionSequenceIds); + oldestUnflushedSeqNumsLocal = new HashMap(this.oldestUnflushedRegionSequenceIds); } - for (Map.Entry> e : hlogSequenceNums.entrySet()) { + for (Map.Entry> e : byWalRegionSequenceIds.entrySet()) { // iterate over the log file. Path log = e.getKey(); Map sequenceNums = e.getValue(); // iterate over the map for this log file, and tell whether it should be archive or not. - if (areAllRegionsFlushed(sequenceNums, oldestFlushingSeqNumsLocal, - oldestUnflushedSeqNumsLocal)) { + if (areAllRegionsFlushed(sequenceNums, oldestFlushingSeqNumsLocal, oldestUnflushedSeqNumsLocal)) { logsToArchive.add(log); - LOG.debug("log file is ready for archiving " + log); + LOG.debug("WAL file ready for archiving " + log); } } for (Path p : logsToArchive) { this.totalLogSize.addAndGet(-this.fs.getFileStatus(p).getLen()); archiveLogFile(p); - this.hlogSequenceNums.remove(p); + this.byWalRegionSequenceIds.remove(p); } } /** * Takes a region:sequenceId map for a WAL file, and checks whether the file can be archived. * It compares the region entries present in the passed sequenceNums map with the local copy of - * {@link #oldestUnflushedSeqNums} and {@link #oldestFlushingSeqNums}. If, for all regions, + * {@link #oldestUnflushedRegionSequenceIds} and {@link #lowestFlushingRegionSequenceIds}. If, for all regions, * the value is lesser than the minimum of values present in the oldestFlushing/UnflushedSeqNums, * then the wal file is eligible for archiving. * @param sequenceNums for a HLog, at the time when it was rolled. @@ -673,7 +679,7 @@ class FSHLog implements HLog, Syncable { static boolean areAllRegionsFlushed(Map sequenceNums, Map oldestFlushingMap, Map oldestUnflushedMap) { for (Map.Entry regionSeqIdEntry : sequenceNums.entrySet()) { - // find region entries in the flushing/unflushed map. If there is no entry, it means + // find region entries in the flushing/unflushed map. If there is no entry, it meansj // a region doesn't have any unflushed entry. long oldestFlushing = oldestFlushingMap.containsKey(regionSeqIdEntry.getKey()) ? oldestFlushingMap.get(regionSeqIdEntry.getKey()) : Long.MAX_VALUE; @@ -688,7 +694,7 @@ class FSHLog implements HLog, Syncable { /** * Iterates over the given map of regions, and compares their sequence numbers with corresponding - * entries in {@link #oldestUnflushedSeqNums}. If the sequence number is greater or equal, the + * entries in {@link #oldestUnflushedRegionSequenceIds}. If the sequence number is greater or equal, the * region is eligible to flush, otherwise, there is no benefit to flush (from the perspective of * passed regionsSequenceNums map), because the region has already flushed the entries present * in the WAL file for which this method is called for (typically, the oldest wal file). @@ -699,9 +705,9 @@ class FSHLog implements HLog, Syncable { private byte[][] findEligibleMemstoresToFlush(Map regionsSequenceNums) { List regionsToFlush = null; // Keeping the old behavior of iterating unflushedSeqNums under oldestSeqNumsLock. - synchronized (oldestSeqNumsLock) { + synchronized (regionSequenceIdLock) { for (Map.Entry e : regionsSequenceNums.entrySet()) { - Long unFlushedVal = this.oldestUnflushedSeqNums.get(e.getKey()); + Long unFlushedVal = this.oldestUnflushedRegionSequenceIds.get(e.getKey()); if (unFlushedVal != null && unFlushedVal <= e.getValue()) { if (regionsToFlush == null) regionsToFlush = new ArrayList(); regionsToFlush.add(e.getKey()); @@ -724,7 +730,7 @@ class FSHLog implements HLog, Syncable { int logCount = getNumRolledLogFiles(); if (logCount > this.maxLogs && logCount > 0) { Map.Entry> firstWALEntry = - this.hlogSequenceNums.firstEntry(); + this.byWalRegionSequenceIds.firstEntry(); regions = findEligibleMemstoresToFlush(firstWALEntry.getValue()); } if (regions != null) { @@ -740,9 +746,9 @@ class FSHLog implements HLog, Syncable { return regions; } - /* + /** * Cleans up current writer closing. - * Presumes we're operating inside an updateLock scope. + * NOTE: Presumes we're operating inside an updateLock scope. * @return Path to current writer or null if none. * @throws IOException */ @@ -751,32 +757,27 @@ class FSHLog implements HLog, Syncable { if (this.writer != null) { // Close the current writer, get a new one. try { - // Wait till all current transactions are written to the hlog. + // Wait till all current transactions are synced to the hlog. // No new transactions can occur because we have the updatelock. - if (this.unflushedEntries.get() != this.syncedTillHere.get()) { - LOG.debug("cleanupCurrentWriter " + - " waiting for transactions to get synced " + - " total " + this.unflushedEntries.get() + - " synced till here " + this.syncedTillHere.get()); + if (isUnflushedEntries()) { + LOG.debug("Waiting on unsynced edits; highestUnsyncedSequence=" + this.highestUnsyncedSequence + + ", current sync sequence=" + this.highestSyncedSequence.get()); sync(); } this.writer.close(); this.writer = null; closeErrorCount.set(0); } catch (IOException e) { - LOG.error("Failed close of HLog writer", e); + boolean unflushedEdits = isUnflushedEntries(); + LOG.error("Failed close of HLog writer; isUnflushedEdits=" + unflushedEdits, e); int errors = closeErrorCount.incrementAndGet(); - if (errors <= closeErrorsTolerated && !hasDeferredEntries()) { - LOG.warn("Riding over HLog close failure! error count="+errors); + if (errors <= this.closeErrorsTolerated && !unflushedEdits) { + LOG.warn("Riding over HLog close failure; error count=" + errors); } else { - if (hasDeferredEntries()) { - LOG.error("Aborting due to unflushed edits in HLog"); - } - // Failed close of log file. Means we're losing edits. For now, - // shut ourselves down to minimize loss. Alternative is to try and - // keep going. See HBASE-930. + // Failed close of log file. It means we are dropping edits. For now, shut ourselves down to minimize loss. + // Alternative is to try and keep going. See HBASE-930. FailedLogCloseException flce = - new FailedLogCloseException("#" + currentfilenum); + new FailedLogCloseException("#" + currentfilenum + ", unsynced edits count=" + getUnflushedEntriesCount()); flce.initCause(e); throw flce; } @@ -788,8 +789,16 @@ class FSHLog implements HLog, Syncable { return oldFile; } + long getUnflushedEntriesCount() { + return this.highestUnsyncedSequence - this.highestSyncedSequence.get(); + } + + boolean isUnflushedEntries() { + return getUnflushedEntriesCount() > 0; + } + private void archiveLogFile(final Path p) throws IOException { - Path newPath = getHLogArchivePath(this.oldLogDir, p); + Path newPath = getHLogArchivePath(this.fullPathOldLogDir, p); // Tell our listeners that a log is going to be archived. if (!this.listeners.isEmpty()) { for (WALActionsListener i : this.listeners) { @@ -826,11 +835,11 @@ class FSHLog implements HLog, Syncable { if (filenum < 0) { throw new RuntimeException("hlog file number can't be < 0"); } - String child = prefix + "." + filenum; + String child = logFilePrefix + "." + filenum; if (forMeta) { child += HLog.META_HLOG_FILE_EXTN; } - return new Path(dir, child); + return new Path(fullPathLogDir, child); } /** @@ -844,7 +853,7 @@ class FSHLog implements HLog, Syncable { protected long getFileNumFromFileName(Path fileName) { if (fileName == null) throw new IllegalArgumentException("file name can't be null"); // The path should start with dir/. - String prefixPathStr = new Path(dir, prefix + ".").toString(); + String prefixPathStr = new Path(fullPathLogDir, logFilePrefix + ".").toString(); if (!fileName.toString().startsWith(prefixPathStr)) { throw new IllegalArgumentException("The log file " + fileName + " doesn't belong to" + " this regionserver " + prefixPathStr); @@ -857,12 +866,12 @@ class FSHLog implements HLog, Syncable { @Override public void closeAndDelete() throws IOException { close(); - if (!fs.exists(this.dir)) return; - FileStatus[] files = fs.listStatus(this.dir); + if (!fs.exists(this.fullPathLogDir)) return; + FileStatus[] files = fs.listStatus(this.fullPathLogDir); if (files != null) { for(FileStatus file : files) { - Path p = getHLogArchivePath(this.oldLogDir, file.getPath()); + Path p = getHLogArchivePath(this.fullPathOldLogDir, file.getPath()); // Tell our listeners that a log is going to be archived. if (!this.listeners.isEmpty()) { for (WALActionsListener i : this.listeners) { @@ -880,45 +889,16 @@ class FSHLog implements HLog, Syncable { } } } - LOG.debug("Moved " + files.length + " WAL file(s) to " + FSUtils.getPath(this.oldLogDir)); + LOG.debug("Moved " + files.length + " WAL file(s) to " + FSUtils.getPath(this.fullPathOldLogDir)); } - if (!fs.delete(dir, true)) { - LOG.info("Unable to delete " + dir); + if (!fs.delete(fullPathLogDir, true)) { + LOG.info("Unable to delete " + fullPathLogDir); } } @Override public void close() throws IOException { - if (this.closed) { - return; - } - - try { - asyncNotifier.interrupt(); - asyncNotifier.join(); - } catch (InterruptedException e) { - LOG.error("Exception while waiting for " + asyncNotifier.getName() + - " threads to die", e); - } - - for (int i = 0; i < asyncSyncers.length; ++i) { - try { - asyncSyncers[i].interrupt(); - asyncSyncers[i].join(); - } catch (InterruptedException e) { - LOG.error("Exception while waiting for " + asyncSyncers[i].getName() + - " threads to die", e); - } - } - - try { - asyncWriter.interrupt(); - asyncWriter.join(); - } catch (InterruptedException e) { - LOG.error("Exception while waiting for " + asyncWriter.getName() + - " thread to die", e); - } - + if (this.closed) return; try { // Prevent all further flushing and rolling. closeBarrier.stopAndDrainOps(); @@ -927,6 +907,11 @@ class FSHLog implements HLog, Syncable { Thread.currentThread().interrupt(); } + // TODO: For sure disruptor plays out all queued before exiting? Check. + // TODO: Do I have to interrupt? + if (this.disruptor != null) this.disruptor.shutdown(); + if (this.appendExecutor != null) this.appendExecutor.shutdown(); + // Tell our listeners that the log is closing if (!this.listeners.isEmpty()) { for (WALActionsListener i : this.listeners) { @@ -936,7 +921,7 @@ class FSHLog implements HLog, Syncable { synchronized (updateLock) { this.closed = true; if (LOG.isDebugEnabled()) { - LOG.debug("Closing WAL writer in " + this.dir.toString()); + LOG.debug("Closing WAL writer in " + this.fullPathLogDir.toString()); } if (this.writer != null) { this.writer.close(); @@ -961,385 +946,195 @@ class FSHLog implements HLog, Syncable { @Override @VisibleForTesting public void append(HRegionInfo info, TableName tableName, WALEdit edits, - final long now, HTableDescriptor htd, AtomicLong sequenceId) throws IOException { + final long now, HTableDescriptor htd, AtomicLong sequenceId) + throws IOException { append(info, tableName, edits, new ArrayList(), now, htd, true, true, sequenceId, HConstants.NO_NONCE, HConstants.NO_NONCE); } + @Override + public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits, + List clusterIds, final long now, HTableDescriptor htd, AtomicLong sequenceId, + boolean isInMemstore, long nonceGroup, long nonce) throws IOException { + return append(info, tableName, edits, clusterIds, now, htd, false, isInMemstore, sequenceId, nonceGroup, nonce); + } + /** - * Append a set of edits to the log. Log edits are keyed by (encoded) - * regionName, rowname, and log-sequence-id. + * Append a set of edits to the log. Log edits are keyed by (encoded) regionName, rowname, and log-sequence-id. * - * Later, if we sort by these keys, we obtain all the relevant edits for a - * given key-range of the HRegion (TODO). Any edits that do not have a - * matching COMPLETE_CACHEFLUSH message can be discarded. + * Later, if we sort by these keys, we obtain all the relevant edits for a given key-range of the HRegion (TODO). + * Any edits that do not have a matching COMPLETE_CACHEFLUSH message can be discarded. * - *

- * Logs cannot be restarted once closed, or once the HLog process dies. Each - * time the HLog starts, it must create a new log. This means that other - * systems should process the log appropriately upon each startup (and prior + *

Logs cannot be restarted once closed, or once the HLog process dies. Each time the HLog starts, it must create + * a new log. This means that other systems should process the log appropriately upon each startup (and prior * to initializing HLog). * - * synchronized prevents appends during the completion of a cache flush or for - * the duration of a log roll. + * Synchronized prevents appends during the completion of a cache flush or for the duration of a log roll. * * @param info * @param tableName * @param edits * @param clusterIds that have consumed the change (for replication) * @param now + * @param htd * @param doSync shall we sync? + * @param inMemstore * @param sequenceId of the region. - * @return txid of this transaction + * @param nonceGroup + * @param nonce + * @return txid of this transaction or if nothing to do, the last txid * @throws IOException */ - @SuppressWarnings("deprecation") private long append(HRegionInfo info, TableName tableName, WALEdit edits, List clusterIds, - final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore, - AtomicLong sequenceId, long nonceGroup, long nonce) throws IOException { - if (edits.isEmpty()) return this.unflushedEntries.get(); - if (this.closed) { - throw new IOException("Cannot append; log is closed"); - } - TraceScope traceScope = Trace.startSpan("FSHlog.append"); - try { - long txid = 0; - synchronized (this.updateLock) { - // get the sequence number from the passed Long. In normal flow, it is coming from the - // region. - long seqNum = sequenceId.incrementAndGet(); - // The 'lastSeqWritten' map holds the sequence number of the oldest - // write for each region (i.e. the first edit added to the particular - // memstore). . When the cache is flushed, the entry for the - // region being flushed is removed if the sequence number of the flush - // is greater than or equal to the value in lastSeqWritten. - // Use encoded name. Its shorter, guaranteed unique and a subset of - // actual name. - byte [] encodedRegionName = info.getEncodedNameAsBytes(); - if (isInMemstore) this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum); - HLogKey logKey = makeKey( - encodedRegionName, tableName, seqNum, now, clusterIds, nonceGroup, nonce); - - synchronized (pendingWritesLock) { - doWrite(info, logKey, edits, htd); - txid = this.unflushedEntries.incrementAndGet(); - } - this.numEntries.incrementAndGet(); - this.asyncWriter.setPendingTxid(txid); - - if (htd.isDeferredLogFlush()) { - lastDeferredTxid = txid; - } - this.latestSequenceNums.put(encodedRegionName, seqNum); - } - // TODO: note that only tests currently call append w/sync. - // Therefore, this code here is not actually used by anything. - // Sync if catalog region, and if not then check if that table supports - // deferred log flushing - if (doSync && - (info.isMetaRegion() || - !htd.isDeferredLogFlush())) { - // sync txn to file system - this.sync(txid); - } - return txid; - } finally { - traceScope.close(); - } + final long now, HTableDescriptor htd, boolean doSync, boolean inMemstore, + AtomicLong sequenceId, long nonceGroup, long nonce) + throws IOException { + if (!this.enabled || edits.isEmpty()) return this.highestUnsyncedSequence; + if (this.closed) throw new IOException("Cannot append; log is closed"); + // TODO: trace model here does not work any more. It does not match how we append. + TraceScope traceScope = Trace.startSpan("FSHlog.append"); + // Make a key but do not set the WALEdit by region sequence id now -- set it to -1 for now -- and then later + // just before we write it out to the DFS stream, then set the sequence id; late-binding. + HLogKey logKey = makeKey(info.getEncodedNameAsBytes(), tableName, -1, now, clusterIds, nonceGroup, nonce); + // This is crazy how much it takes to make an edit. Do we need all this stuff!!!!???? We need all the stuff to + // make a key and then below to append the edit, we need to carry htd, info, etc. all over the ring buffer. + long sequence = this.disruptor.getRingBuffer().next(); + try { + RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence); + FSWALEntry entry = new FSWALEntry(sequence, logKey, edits, sequenceId, inMemstore, htd, info); + truck.loadPayload(entry, traceScope.detach()); + } finally { + this.disruptor.getRingBuffer().publish(sequence); } - - @Override - public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits, - List clusterIds, final long now, HTableDescriptor htd, AtomicLong sequenceId, - boolean isInMemstore, long nonceGroup, long nonce) throws IOException { - return append(info, tableName, edits, clusterIds, - now, htd, false, isInMemstore, sequenceId, nonceGroup, nonce); + // Sync if we have been asked to -- only tests do this -- or if it is a meta table edit (these are precious). + if (doSync || info.isMetaRegion()) syncer(sequence); + this.highestUnsyncedSequence = sequence; + return sequence; } - /* The work of current write process of HLog goes as below: - * 1). All write handler threads append edits to HLog's local pending buffer; - * (it notifies AsyncWriter thread that there is new edits in local buffer) - * 2). All write handler threads wait in HLog.syncer() function for underlying threads to - * finish the sync that contains its txid; - * 3). An AsyncWriter thread is responsible for retrieving all edits in HLog's - * local pending buffer and writing to the hdfs (hlog.writer.append); - * (it notifies AsyncSyncer threads that there is new writes to hdfs which needs a sync) - * 4). AsyncSyncer threads are responsible for issuing sync request to hdfs to persist the - * writes by AsyncWriter; (they notify the AsyncNotifier thread that sync is done) - * 5). An AsyncNotifier thread is responsible for notifying all pending write handler - * threads which are waiting in the HLog.syncer() function - * 6). No LogSyncer thread any more (since there is always AsyncWriter/AsyncFlusher threads - * do the same job it does) - * note: more than one AsyncSyncer threads are needed here to guarantee good enough performance - * when less concurrent write handler threads. since sync is the most time-consuming - * operation in the whole write process, multiple AsyncSyncer threads can provide better - * parallelism of sync to get better overall throughput + /** + * Thread to runs the hdfs sync call. This call takes a while to complete. This is the longest pole adding edits to + * the WAL and this must complete to be sure all edits persisted. We run multiple threads sync'ng rather than one + * that just syncs in series so we have better latencies; otherwise, an edit that arrived just after a sync started, + * might have to wait almost the length of two sync invocations before it is marked done. + *

When the sync completes, it marks all the passed in futures done. On the other end of the sync future is a + * blocked thread, usually a regionserver Handler. There may be more than one future passed in the + * case where a few threads arrive at about the same time and all invoke 'sync'. In this case we'll batch up the + * invocations and run one filesystem sync only for a batch of Handler sync invocations. Do not confuse these + * Hanlder SyncFutures with the futures an ExecutorService returns when you call submit. We have no use for these in + * this model. These SyncFutures are 'artificial', something to hold the Handler until the filesystem sync completes. */ - // thread to write locally buffered writes to HDFS - private class AsyncWriter extends HasThread { - private long pendingTxid = 0; - private long txidToWrite = 0; - private long lastWrittenTxid = 0; - private Object writeLock = new Object(); - - public AsyncWriter(String name) { + private class SyncRunner extends HasThread { + /* + // LinkedList looks better than ArrayList if we are adding and removing entries -- less System.arraycopying -- + // and the Deque Interface seems to suit best what is going on here. + private final Deque syncFutures = new LinkedList(); + private final Object syncFuturesLock = new Object();*/ + private long sequence = -1; + private final Object lock = new Object(); + private final List syncFutures; + + /** + * UPDATE! + * @param syncs the batch of calls to sync that arrived as this thread was starting; when done, we will put the + * result of the actual hdfs sync call as the result. + * @param sequence The sequence number on the ring buffer when this thread was set running. If this actual + * writer sync completes then all appends up this point have been flushed/synced/pushed to datanodes. If we fail, + * then the passed in syncs futures will return the exception to their clients; some of the edits may + * have made it out to data nodes but we will report all that were part of this session as failed. + */ + SyncRunner(final String name, final int maxHandlersCount) { super(name); + this.syncFutures = new ArrayList(maxHandlersCount); } - // wake up (called by (write) handler thread) AsyncWriter thread - // to write buffered writes to HDFS - public void setPendingTxid(long txid) { - synchronized (this.writeLock) { - if (txid <= this.pendingTxid) - return; - - this.pendingTxid = txid; - this.writeLock.notify(); + void offer(final long sequence, final SyncFuture [] syncFutures, final int syncFutureCount) { + synchronized(this.lock) { + this.sequence = sequence; + for (int i = 0; i < syncFutureCount; i++) this.syncFutures.add(syncFutures[i]); + this.lock.notify(); } } public void run() { - try { - while (!this.isInterrupted()) { - // 1. wait until there is new writes in local buffer - synchronized (this.writeLock) { - while (this.pendingTxid <= this.lastWrittenTxid) { - this.writeLock.wait(); - } - } - - // 2. get all buffered writes and update 'real' pendingTxid - // since maybe newer writes enter buffer as AsyncWriter wakes - // up and holds the lock - // NOTE! can't hold 'updateLock' here since rollWriter will pend - // on 'sync()' with 'updateLock', but 'sync()' will wait for - // AsyncWriter/AsyncSyncer/AsyncNotifier series. without updateLock - // can leads to pendWrites more than pendingTxid, but not problem - List pendWrites = null; - synchronized (pendingWritesLock) { - this.txidToWrite = unflushedEntries.get(); - pendWrites = pendingWrites; - pendingWrites = new LinkedList(); - } - - // 3. write all buffered writes to HDFS(append, without sync) - try { - for (Entry e : pendWrites) { - writer.append(e); + long currentSequence; + while (!isInterrupted()) { + int syncCount = 0; + try { + long currentHighestSyncedSequence; + synchronized(this.lock) { + if (this.syncFutures.isEmpty()) { + this.lock.wait(1000); + if (this.syncFutures.isEmpty()) continue; } - postAppend(pendWrites); - } catch(IOException e) { - LOG.error("Error while AsyncWriter write, request close of hlog ", e); - requestLogRoll(); - - asyncIOE = e; - failedTxid.set(this.txidToWrite); - } - - // 4. update 'lastWrittenTxid' and notify AsyncSyncer to do 'sync' - this.lastWrittenTxid = this.txidToWrite; - boolean hasIdleSyncer = false; - for (int i = 0; i < asyncSyncers.length; ++i) { - if (!asyncSyncers[i].isSyncing()) { - hasIdleSyncer = true; - asyncSyncers[i].setWrittenTxid(this.lastWrittenTxid); - break; + // The syncFutures list is ordered. See below if some have already been synced (rare?). + long first = this.syncFutures.get(0).getRingBufferSequence(); + currentHighestSyncedSequence = highestSyncedSequence.get(); + if (currentHighestSyncedSequence >= first) { + // Then I can let at least one handler go this side of the sync call... go for it. + syncCount += syncsDone(this.syncFutures, currentHighestSyncedSequence, null); + // If I got rid of all waiting syncs, go back to sleep till more work comes in. + if (this.syncFutures.isEmpty()) continue; } + // Set the sync point for the call to filesystem sync. + currentSequence = this.sequence; } - if (!hasIdleSyncer) { - int idx = (int)this.lastWrittenTxid % asyncSyncers.length; - asyncSyncers[idx].setWrittenTxid(this.lastWrittenTxid); - } - } - } catch (InterruptedException e) { - LOG.debug(getName() + " interrupted while waiting for " + - "newer writes added to local buffer"); - } catch (Exception e) { - LOG.error("UNEXPECTED", e); - } finally { - LOG.info(getName() + " exiting"); - } - } - } - - // thread to request HDFS to sync the WALEdits written by AsyncWriter - // to make those WALEdits durable on HDFS side - private class AsyncSyncer extends HasThread { - private long writtenTxid = 0; - private long txidToSync = 0; - private long lastSyncedTxid = 0; - private volatile boolean isSyncing = false; - private Object syncLock = new Object(); - - public AsyncSyncer(String name) { - super(name); - } - - public boolean isSyncing() { - return this.isSyncing; - } - - // wake up (called by AsyncWriter thread) AsyncSyncer thread - // to sync(flush) writes written by AsyncWriter in HDFS - public void setWrittenTxid(long txid) { - synchronized (this.syncLock) { - if (txid <= this.writtenTxid) - return; - - this.writtenTxid = txid; - this.syncLock.notify(); - } - } - - public void run() { - try { - while (!this.isInterrupted()) { - // 1. wait until AsyncWriter has written data to HDFS and - // called setWrittenTxid to wake up us - synchronized (this.syncLock) { - while (this.writtenTxid <= this.lastSyncedTxid) { - this.syncLock.wait(); - } - this.txidToSync = this.writtenTxid; - } - - // 2. do 'sync' to HDFS to provide durability - long now = EnvironmentEdgeManager.currentTimeMillis(); + long start = System.nanoTime(); + Throwable t = null; try { - this.isSyncing = true; if (writer != null) { writer.sync(); + do { + currentHighestSyncedSequence = highestSyncedSequence.get(); + if (currentHighestSyncedSequence >= currentSequence) break; + } while (!highestSyncedSequence.compareAndSet(currentHighestSyncedSequence, currentSequence)); } - this.isSyncing = false; - postSync(); } catch (IOException e) { - LOG.fatal("Error while AsyncSyncer sync, request close of hlog ", e); - requestLogRoll(); - - asyncIOE = e; - failedTxid.set(this.txidToSync); - } - metrics.finishSync(EnvironmentEdgeManager.currentTimeMillis() - now); - - // 3. wake up AsyncNotifier to notify(wake-up) all pending 'put' - // handler threads on 'sync()' - this.lastSyncedTxid = this.txidToSync; - asyncNotifier.setFlushedTxid(this.lastSyncedTxid); - - // 4. check and do logRoll if needed - boolean logRollNeeded = false; - if (rollWriterLock.tryLock()) { - try { - logRollNeeded = checkLowReplication(); - } finally { - rollWriterLock.unlock(); - } - try { - if (logRollNeeded || writer != null && writer.getLength() > logrollsize) { - requestLogRoll(); - } - } catch (IOException e) { - LOG.warn("writer.getLength() failed,this failure won't block here"); + LOG.error("Error syncing, request close of hlog ", e); + t = e; + } catch (Exception e) { + LOG.warn("UNEXPECTED", e); + t = e; + } finally { + // Have to call done on all of these outstanding syncs irrespective because a handler waiting on other end. + synchronized(this.lock) { + syncCount += syncsDone(this.syncFutures, currentSequence, t); } + if (t != null) requestLogRoll(); + else checkLogRoll(); } + postSync(System.nanoTime() - start, syncCount); + } catch (InterruptedException e) { + // Presume legit interrupt. + Thread.currentThread().interrupt(); + } catch (Throwable t) { + LOG.warn("UNEXPECTED, continuing", t); } - } catch (InterruptedException e) { - LOG.debug(getName() + " interrupted while waiting for " + - "notification from AsyncWriter thread"); - } catch (Exception e) { - LOG.error("UNEXPECTED", e); - } finally { - LOG.info(getName() + " exiting"); } } } - // thread to notify all write handler threads which are pending on - // their written WALEdits' durability(sync) - // why an extra 'notifier' thread is needed rather than letting - // AsyncSyncer thread itself notifies when sync is done is to let - // AsyncSyncer thread do next sync as soon as possible since 'notify' - // has heavy synchronization with all pending write handler threads - private class AsyncNotifier extends HasThread { - private long flushedTxid = 0; - private long lastNotifiedTxid = 0; - private Object notifyLock = new Object(); - - public AsyncNotifier(String name) { - super(name); - } - - public void setFlushedTxid(long txid) { - synchronized (this.notifyLock) { - if (txid <= this.flushedTxid) { - return; - } - - this.flushedTxid = txid; - this.notifyLock.notify(); - } - } - - public void run() { - try { - while (!this.isInterrupted()) { - synchronized (this.notifyLock) { - while (this.flushedTxid <= this.lastNotifiedTxid) { - this.notifyLock.wait(); - } - this.lastNotifiedTxid = this.flushedTxid; - } - - // notify(wake-up) all pending (write) handler thread - // (or logroller thread which also may pend on sync()) - synchronized (syncedTillHere) { - syncedTillHere.set(this.lastNotifiedTxid); - syncedTillHere.notifyAll(); - } - } - } catch (InterruptedException e) { - LOG.debug(getName() + " interrupted while waiting for " + - " notification from AsyncSyncer thread"); - } catch (Exception e) { - LOG.error("UNEXPECTED", e); - } finally { - LOG.info(getName() + " exiting"); - } + /** + * Schedule a log roll if needed. + */ + void checkLogRoll() { + // Will return immediately if we are in the middle of a WAL log roll currently. + if (!rollWriterLock.tryLock()) return; + boolean lowReplication; + try { + lowReplication = checkLowReplication(); + } finally { + rollWriterLock.unlock(); } - } - - // sync all known transactions - private void syncer() throws IOException { - syncer(this.unflushedEntries.get()); // sync all pending items - } - - // sync all transactions upto the specified txid - private void syncer(long txid) throws IOException { - synchronized (this.syncedTillHere) { - while (this.syncedTillHere.get() < txid) { - try { - this.syncedTillHere.wait(); - - if (txid <= this.failedTxid.get()) { - assert asyncIOE != null : - "current txid is among(under) failed txids, but asyncIOE is null!"; - throw asyncIOE; - } - } catch (InterruptedException e) { - LOG.debug("interrupted while waiting for notification from AsyncNotifier"); - } - } + try { + if (lowReplication || writer != null && writer.getLength() > logrollsize) requestLogRoll(); + } catch (IOException e) { + LOG.warn("Writer.getLength() failed; continuing", e); } } - @Override - public void postSync() {} - - @Override - public void postAppend(List entries) {} - /* - * @return whether log roll should be requested + * @return true if number of replicas for the WAL is lower than threshold */ private boolean checkLowReplication() { boolean logRollNeeded = false; @@ -1347,8 +1142,7 @@ class FSHLog implements HLog, Syncable { // value, then roll logs. try { int numCurrentReplicas = getLogReplication(); - if (numCurrentReplicas != 0 - && numCurrentReplicas < this.minTolerableReplication) { + if (numCurrentReplicas != 0 && numCurrentReplicas < this.minTolerableReplication) { if (this.lowReplicationRollEnabled) { if (this.consecutiveLogRolls.get() < this.lowReplicationRollLimit) { LOG.warn("HDFS pipeline error detected. " + "Found " @@ -1368,7 +1162,6 @@ class FSHLog implements HLog, Syncable { } } } else if (numCurrentReplicas >= this.minTolerableReplication) { - if (!this.lowReplicationRollEnabled) { // The new writer's log replicas is always the default value. // So we should not enable LowReplication-Roller. If numEntries @@ -1383,12 +1176,64 @@ class FSHLog implements HLog, Syncable { } } } catch (Exception e) { - LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e + - " still proceeding ahead..."); + LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e + " still proceeding ahead..."); } return logRollNeeded; } + // Sync all known transactions + private void syncer() throws IOException { + syncer(this.highestUnsyncedSequence); // sync all pending items + } + + // sync all transactions upto the specified txid + private void syncer(final long unused/*Sync does all transactions before it; can't sync explicit version*/) + throws IOException { + long sequence = this.disruptor.getRingBuffer().next(); + SyncFuture syncFuture = getSyncFuture(sequence); + try { + RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence); + truck.loadPayload(syncFuture); + } finally { + this.disruptor.getRingBuffer().publish(sequence); + } + // Now we have published to the ringbuffer, halt the current thread until we get an answer back. + try { + syncFuture.get(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + IOException ioe = new InterruptedIOException(); + ioe.initCause(ie); + throw ioe; + } catch (ExecutionException e) { + throw ensureIOException(e.getCause()); + } + } + + private SyncFuture getSyncFuture(final long sequence) { + SyncFuture syncFuture = this.syncFuturesByHandler.get(Thread.currentThread()); + if (syncFuture == null) { + syncFuture = new SyncFuture(); + this.syncFuturesByHandler.put(Thread.currentThread(), syncFuture); + } + return syncFuture.reset(sequence); + } + + @Override + public void postSync(final long timeInNanos, final int handlerSyncs) { + // TODO: Add metric for handler syncs done at a time. + if (this.metrics != null) metrics.finishSync(timeInNanos/1000000); + } + + @Override + public long postAppend(final Entry e, final long elapsedTime) { + long len = 0; + if (this.metrics == null) return len; + for (KeyValue kv : e.getEdit().getKeyValues()) len += kv.getLength(); + metrics.finishAppend(elapsedTime, len); + return len; + } + /** * This method gets the datanode replication count for the current HLog. * @@ -1444,44 +1289,6 @@ class FSHLog implements HLog, Syncable { } } - // TODO: Remove info. Unused. - protected void doWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit, - HTableDescriptor htd) - throws IOException { - if (!this.enabled) { - return; - } - if (!this.listeners.isEmpty()) { - for (WALActionsListener i: this.listeners) { - i.visitLogEntryBeforeWrite(htd, logKey, logEdit); - } - } - try { - long now = EnvironmentEdgeManager.currentTimeMillis(); - // coprocessor hook: - if (!coprocessorHost.preWALWrite(info, logKey, logEdit)) { - if (logEdit.isReplay()) { - // set replication scope null so that this won't be replicated - logKey.setScopes(null); - } - // write to our buffer for the Hlog file. - this.pendingWrites.add(new HLog.Entry(logKey, logEdit)); - } - long took = EnvironmentEdgeManager.currentTimeMillis() - now; - coprocessorHost.postWALWrite(info, logKey, logEdit); - long len = 0; - for (KeyValue kv : logEdit.getKeyValues()) { - len += kv.getLength(); - } - this.metrics.finishAppend(took, len); - } catch (IOException e) { - LOG.fatal("Could not append. Requesting close of hlog", e); - requestLogRoll(); - throw e; - } - } - - /** @return How many items have been added to the log */ int getNumEntries() { return numEntries.get(); @@ -1489,7 +1296,7 @@ class FSHLog implements HLog, Syncable { /** @return the number of rolled log files */ public int getNumRolledLogFiles() { - return hlogSequenceNums.size(); + return byWalRegionSequenceIds.size(); } /** @return the number of log files in use */ @@ -1502,7 +1309,7 @@ class FSHLog implements HLog, Syncable { /** @return the size of log files in use */ @Override public long getLogFileSize() { - return totalLogSize.get() + curLogSize; + return this.totalLogSize.get(); } @Override @@ -1513,12 +1320,11 @@ class FSHLog implements HLog, Syncable { " - because the server is closing."); return false; } - synchronized (oldestSeqNumsLock) { - oldRegionSeqNum = this.oldestUnflushedSeqNums.remove(encodedRegionName); + synchronized (regionSequenceIdLock) { + oldRegionSeqNum = this.oldestUnflushedRegionSequenceIds.remove(encodedRegionName); if (oldRegionSeqNum != null) { - Long oldValue = this.oldestFlushingSeqNums.put(encodedRegionName, oldRegionSeqNum); - assert oldValue == null : "Flushing map not cleaned up for " - + Bytes.toString(encodedRegionName); + Long oldValue = this.lowestFlushingRegionSequenceIds.put(encodedRegionName, oldRegionSeqNum); + assert oldValue == null : "Flushing map not cleaned up for " + Bytes.toString(encodedRegionName); } } if (oldRegionSeqNum == null) { @@ -1534,10 +1340,9 @@ class FSHLog implements HLog, Syncable { } @Override - public void completeCacheFlush(final byte [] encodedRegionName) - { - synchronized (oldestSeqNumsLock) { - this.oldestFlushingSeqNums.remove(encodedRegionName); + public void completeCacheFlush(final byte [] encodedRegionName) { + synchronized (regionSequenceIdLock) { + this.lowestFlushingRegionSequenceIds.remove(encodedRegionName); } closeBarrier.endOp(); } @@ -1545,11 +1350,11 @@ class FSHLog implements HLog, Syncable { @Override public void abortCacheFlush(byte[] encodedRegionName) { Long currentSeqNum = null, seqNumBeforeFlushStarts = null; - synchronized (oldestSeqNumsLock) { - seqNumBeforeFlushStarts = this.oldestFlushingSeqNums.remove(encodedRegionName); + synchronized (regionSequenceIdLock) { + seqNumBeforeFlushStarts = this.lowestFlushingRegionSequenceIds.remove(encodedRegionName); if (seqNumBeforeFlushStarts != null) { currentSeqNum = - this.oldestUnflushedSeqNums.put(encodedRegionName, seqNumBeforeFlushStarts); + this.oldestUnflushedRegionSequenceIds.put(encodedRegionName, seqNumBeforeFlushStarts); } } closeBarrier.endOp(); @@ -1575,7 +1380,7 @@ class FSHLog implements HLog, Syncable { * @return dir */ protected Path getDir() { - return dir; + return fullPathLogDir; } static Path getHLogArchivePath(Path oldLogDir, Path p) { @@ -1594,9 +1399,10 @@ class FSHLog implements HLog, Syncable { System.err.println("Usage: HLog "); System.err.println("Arguments:"); System.err.println(" --dump Dump textual representation of passed one or more files"); - System.err.println(" For example: HLog --dump hdfs://example.com:9000/hbase/.logs/MACHINE/LOGFILE"); + System.err.println(" For example: FSHLog --dump hdfs://example.com:9000/hbase/.logs/MACHINE/LOGFILE"); System.err.println(" --split Split the passed directory of WAL logs"); - System.err.println(" For example: HLog --split hdfs://example.com:9000/hbase/.logs/DIR"); + System.err.println(" For example: FSHLog --split hdfs://example.com:9000/hbase/.logs/DIR"); + System.err.println(" --perf Write the same key times to a WAL: e.g. FSHLog --perf 10"); } private static void split(final Configuration conf, final Path p) @@ -1619,18 +1425,169 @@ class FSHLog implements HLog, Syncable { return coprocessorHost; } - /** Provide access to currently deferred sequence num for tests */ - boolean hasDeferredEntries() { - return this.lastDeferredTxid > this.syncedTillHere.get(); - } - @Override public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) { - Long result = oldestUnflushedSeqNums.get(encodedRegionName); + Long result = oldestUnflushedRegionSequenceIds.get(encodedRegionName); return result == null ? HConstants.NO_SEQNUM : result.longValue(); } /** + * Handler that is run by the disruptor ringbuffer consumer. Consumer is a SINGLE 'writer/appender' thread. Appends + * edits and starts up sync runs. Tries its best to batch up syncs. There is no discernible benefit batching appends + * so we just append as they come in because it simplifies the below implementation. See metrics for batching + * effectiveness (In measurement, at 100 concurrent handlers writing 1k, we are batching > 10 appends and 10 + * handler sync invocations for every actual dfsclient sync call; at 10 concurrent handlers, YMMV). + *

Herein, we have an array into which we store the sync futures as they come in. When we have a 'batch', we'll + * then pass what we have collected to a SyncRunner thread to do the filesystem sync. When it completes, it will + * then call {@link SyncFuture#done(long)} on each of SyncFutures in the batch to release blocked Handler threads. + *

I've tried various effects to try and make latencies low while keeping throughput high. I've tried keeping + * a single Queue of SyncFutures in this class appending to its tail as the syncs coming and having sync runner + * threads poll off the head to 'finish' completed SyncFutures. I've tried linkedlist, and various from + * concurrent utils whether LinkedBlockingQueue or ArrayBlockingQueue, etc. The more points of synchronization, the + * more 'work' (according to 'perf stats') that has to be done; small increases in stall percentages seem to have + * a big impact on throughput/latencies. The below model where we have an array into which we stash the syncs and + * then hand them off to the sync thread seemed like a decent compromise. See HBASE-8755 for more detail. + */ + class AppendEventHandler implements EventHandler, LifecycleAware { + private final SyncRunner [] syncRunners; + private final SyncFuture [] syncFutures; + private int syncFuturesCount = 0; + + /** + * Which syncrunner to use next. + */ + private int syncRunnerIndex; + + AppendEventHandler(final int syncRunnerCount, final int maxHandlersCount) { + this.syncFutures = new SyncFuture[maxHandlersCount]; + this.syncRunners = new SyncRunner[syncRunnerCount]; + for (int i = 0; i < syncRunnerCount; i++) { + this.syncRunners[i] = new SyncRunner("sync." + i, maxHandlersCount); + } + } + + @Override + public void onEvent(final RingBufferTruck truck, final long sequence, final boolean endOfBatch) throws Exception { + // Appends and syncs are coming in order off the ringbuffer. We depend on this fact. We'll add appends to + // dfsclient as they come in. Batching appends doesn't give any significant benefit on measurement. Handler + // sync calls we will batch up. + + // TODO: Trace only working for appends, not for syncs. + TraceScope scope = truck.getSpanPayload() != null? Trace.continueSpan(truck.getSpanPayload()): null; + try { + if (truck.getSyncFuturePayload() != null) { + this.syncFutures[this.syncFuturesCount++] = truck.getSyncFuturePayload(); + } else if (truck.getFSWALEntryPayload() != null) { + try { + append(truck.getFSWALEntryPayload()); + } catch (Exception e) { + // If append fails, presume any pending syncs will fail too; let all waiting handlers know of the exception + for (int i = 0; i < this.syncFuturesCount; i++) this.syncFutures[i].done(e); + this.syncFuturesCount = 0; + throw e; + } + } else { + // They can't both be null + throw new IllegalStateException(); + } + + // TODO: Check size and if big go ahead and call a sync if we have enough data. + + // If not a batch, return to consume more events from the ring buffer before proceeding; we want to get up a + // batch of syncs and appends before we go do a filesystem sync. + if (!endOfBatch || this.syncFuturesCount <= 0) return; + + // Now we have a batch. + // if (LOG.isTraceEnabled()) LOG.debug("Sequence=" + sequence + ", syncCount=" + this.syncFuturesCount); + // Below expects that the offer 'transfers' responsibility for the outstanding syncs to the syncRunner. + int index = this.syncRunnerIndex++ % this.syncRunners.length; + this.syncRunners[index].offer(sequence, this.syncFutures, this.syncFuturesCount); + this.syncFuturesCount = 0; + } finally { + // This scope only makes sense for the append. Syncs will be pulled-up short so tracing will not give a + // good representation. TODO: Fix. + if (scope != null) scope.close(); + } + } + + /** + * Append to the WAL. Does all CP and WAL listener calls. + * @param entry + * @throws Exception + */ + void append(final FSWALEntry entry) throws Exception { + // TODO: WORK ON MAKING THIS APPEND FASTER -- IT IS DOING WAY TOO MUCH WORK WITH CPs, AND PBing. + // Hold the update lock while appending to the WAL so it is not rolled out from under us. + long start = EnvironmentEdgeManager.currentTimeMillis(); + byte [] encodedRegionName = entry.getKey().getEncodedRegionName(); + synchronized (updateLock) { + try { + // We are about to append this edit; update the region-scoped sequence number. Do it here inside + // this single appending/writing thread under the updateLock -- means we are adding the edits in sequence. + long regionSequenceId = entry.getRegionSequenceIdReference().incrementAndGet(); + // Set the region-scoped sequence number back up into the key ("late-binding" -- setting before append). + entry.getKey().setLogSeqNum(regionSequenceId); + // Coprocessor hook. + if (!coprocessorHost.preWALWrite(entry.getHRegionInfo(), entry.getKey(), entry.getEdit())) { + if (entry.getEdit().isReplay()) { + // Set replication scope null so that this won't be replicated + entry.getKey().setScopes(null); + } + } + if (!listeners.isEmpty()) { + for (WALActionsListener i: listeners) { + // TODO: Why does listener take a table description and CPs take a regioninfo? Fix. + i.visitLogEntryBeforeWrite(entry.getHTableDescriptor(), entry.getKey(), entry.getEdit()); + } + } + writer.append(entry); + coprocessorHost.postWALWrite(entry.getHRegionInfo(), entry.getKey(), entry.getEdit()); + Long lRegionSequenceId = Long.valueOf(regionSequenceId); + if (entry.isInMemstore()) oldestUnflushedRegionSequenceIds.putIfAbsent(encodedRegionName, lRegionSequenceId); + highestRegionSequenceIds.put(encodedRegionName, lRegionSequenceId); + highestUnsyncedSequence = entry.getSequence(); + // Update metrics. + postAppend(entry, EnvironmentEdgeManager.currentTimeMillis() - start); + } catch (Exception e) { + LOG.fatal("Could not append. Requesting close of hlog", e); + requestLogRoll(); + throw e; + } + numEntries.incrementAndGet(); + } + } + + @Override + public void onStart() { + for (SyncRunner syncRunner: this.syncRunners) syncRunner.start(); + } + + @Override + public void onShutdown() { + for (SyncRunner syncRunner: this.syncRunners) syncRunner.interrupt(); + } + } + + private static IOException ensureIOException(final Throwable t) { + return (t instanceof IOException)? (IOException)t: new IOException(t); + } + + private static int syncsDone(final Collection syncFutures, final long sequence, final Throwable t) { + int count = 0; + if (syncFutures == null || syncFutures.isEmpty()) return count; + for (Iterator i = syncFutures.iterator(); i.hasNext();) { + SyncFuture syncFuture = i.next(); + long current = syncFuture.getRingBufferSequence(); + if (current > sequence) break; + i.remove(); + if (t != null) syncFuture.done(t); + else syncFuture.done(sequence); + count++; + } + return count; + } + + /** * Pass one or more log file names and it will either dump out a text version * on stdout or split the specified log files. * @@ -1645,6 +1602,24 @@ class FSHLog implements HLog, Syncable { // either dump using the HLogPrettyPrinter or split, depending on args if (args[0].compareTo("--dump") == 0) { HLogPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length)); + } else if (args[0].compareTo("--perf") == 0) { + final int count = Integer.parseInt(args[1]); + // Put up a WAL and just keep adding same edit to it. Simple perf test. + Configuration conf = HBaseConfiguration.create(); + Path rootDir = FSUtils.getRootDir(conf); + FileSystem fs = rootDir.getFileSystem(conf); + FSHLog wal = new FSHLog(fs, rootDir, "perflog", "oldPerflog", conf, null, false, "perf", false); + long start = System.nanoTime(); + WALEdit walEdit = new WALEdit(); + walEdit.add(new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"), + -1, new byte [1000])); + for (AtomicLong i = new AtomicLong(0); i.get() < count; i.incrementAndGet()) { + wal.append(HRegionInfo.FIRST_META_REGIONINFO, TableName.META_TABLE_NAME, walEdit, start, + HTableDescriptor.META_TABLEDESC, i); + wal.sync(); + } + wal.close(); + LOG.info("Write " + count + " 1k edits in " + (System.nanoTime() - start) + "nanos"); } else if (args[0].compareTo("--split") == 0) { Configuration conf = HBaseConfiguration.create(); for (int i = 1; i < args.length; i++) { @@ -1662,4 +1637,4 @@ class FSHLog implements HLog, Syncable { System.exit(-1); } } -} +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java new file mode 100644 index 0000000..90a2062 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; + +/** + * A WAL Entry for FSHLog implementation. + * It is a subclass of {@link HLog.Entry} that can carry extra info across the ring buffer such as + * the region sequence id (we want to use this later, just before we write the WAL to ensure region edits + * maintain order). The extra info added here is not 'serialized' as part of the WALEdit hence marked 'transient'. + */ +class FSWALEntry extends HLog.Entry { + // The below data members are denote 'transient' just to highlight these are not persisted; they are only in memory + // and held here while passing over the ring buffer. + private final transient long sequence; + private final transient AtomicLong regionSequenceIdReference; + private final transient boolean inMemstore; + private final transient HTableDescriptor htd; + private final transient HRegionInfo hri; + + FSWALEntry(final long sequence, final HLogKey key, final WALEdit edit, final AtomicLong referenceToRegionSequenceId, + final boolean inMemstore, final HTableDescriptor htd, final HRegionInfo hri) { + super(key, edit); + this.regionSequenceIdReference = referenceToRegionSequenceId; + this.inMemstore = inMemstore; + this.htd = htd; + this.hri = hri; + this.sequence = sequence; + } + + AtomicLong getRegionSequenceIdReference() { + return this.regionSequenceIdReference; + } + + boolean isInMemstore() { + return this.inMemstore; + } + + HTableDescriptor getHTableDescriptor() { + return this.htd; + } + + HRegionInfo getHRegionInfo() { + return this.hri; + } + + /** + * @return The sequence on the ring buffer when this edit was added. + */ + long getSequence() { + return this.sequence; + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 4aa5be4..2bee8f8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -42,7 +42,9 @@ import org.apache.hadoop.io.Writable; import com.google.common.annotations.VisibleForTesting; - +/** + * HLog records all the edits to HStore. It is the hbase write-ahead-log (WAL). + */ @InterfaceAudience.Private // TODO: Rename interface to WAL public interface HLog { @@ -52,7 +54,8 @@ public interface HLog { // TODO: this seems like an implementation detail that does not belong here. String SPLITTING_EXT = "-splitting"; boolean SPLIT_SKIP_ERRORS_DEFAULT = false; - /** The hbase:meta region's HLog filename extension */ + /** The hbase:meta region's HLog filename extension.*/ + // TODO: Implementation detail. Does not belong in here. String META_HLOG_FILE_EXTN = ".meta"; /** @@ -63,12 +66,14 @@ public interface HLog { String WAL_TRAILER_WARN_SIZE = "hbase.regionserver.waltrailer.warn.size"; int DEFAULT_WAL_TRAILER_WARN_SIZE = 1024 * 1024; // 1MB - // TODO: Implemenation detail. Why in here? + // TODO: Implementation detail. Why in here? Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+"); String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp"; + /** + * WAL Reader Interface + */ interface Reader { - /** * @param fs File system. * @param path Path. @@ -89,13 +94,15 @@ public interface HLog { void reset() throws IOException; /** - * @return the WALTrailer of the current HLog. It may be null in case of legacy or corrupt WAL - * files. + * @return the WALTrailer of the current HLog. It may be null in case of legacy or corrupt WAL files. */ - // TODO: What we need a trailer on WAL for? + // TODO: What we need a trailer on WAL for? It won't be present on the last WAL most of the time. What then? WALTrailer getWALTrailer(); } + /** + * WAL Writer Intrface. + */ interface Writer { void init(FileSystem fs, Path path, Configuration c, boolean overwritable) throws IOException; @@ -108,17 +115,18 @@ public interface HLog { long getLength() throws IOException; /** - * Sets HLog's WALTrailer. This trailer is appended at the end of WAL on closing. + * Sets HLog/WAL's WALTrailer. This trailer is appended at the end of WAL on closing. * @param walTrailer trailer to append to WAL. */ + // TODO: Why a trailer on the log? void setWALTrailer(WALTrailer walTrailer); } /** - * Utility class that lets us keep track of the edit with it's key. - * Only used when splitting logs. + * Utility class that lets us keep track of the edit and it's associated key. Only used when splitting logs. */ // TODO: Remove this Writable. + // TODO: Why is this in here? Implementation detail? class Entry implements Writable { private WALEdit edit; private HLogKey key; @@ -135,7 +143,6 @@ public interface HLog { * @param key log's key */ public Entry(HLogKey key, WALEdit edit) { - super(); this.key = key; this.edit = edit; } @@ -161,8 +168,7 @@ public interface HLog { /** * Set compression context for this entry. * - * @param compressionContext - * Compression context + * @param compressionContext Compression context */ public void setCompressionContext(CompressionContext compressionContext) { edit.setCompressionContext(compressionContext); @@ -189,14 +195,14 @@ public interface HLog { } /** - * registers WALActionsListener + * Registers WALActionsListener * * @param listener */ void registerWALActionsListener(final WALActionsListener listener); /** - * unregisters WALActionsListener + * Unregisters WALActionsListener * * @param listener */ @@ -217,7 +223,7 @@ public interface HLog { * @return the size of HLog files */ long getLogFileSize(); - + // TODO: Log rolling should not be in this interface. /** * Roll the log writer. That is, start writing log messages to a new file. @@ -250,8 +256,7 @@ public interface HLog { * @throws org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException * @throws IOException */ - byte[][] rollWriter(boolean force) throws FailedLogCloseException, - IOException; + byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException; /** * Shut down the log. @@ -261,43 +266,60 @@ public interface HLog { void close() throws IOException; /** - * Shut down the log and delete the log directory + * Shut down the log and delete the log directory. + * Used by tests only and in rare cases where we need a log just temporarily while bootstrapping a region or + * running migrations. * * @throws IOException */ void closeAndDelete() throws IOException; /** - * Same as appendNoSync(HRegionInfo, TableName, WALEdit, List, long, HTableDescriptor), + * Same as {@link #appendNoSync(HRegionInfo, TableName, WALEdit, List, long, HTableDescriptor, AtomicLong, boolean, long, long)} * except it causes a sync on the log - * @param sequenceId of the region. + * @param info + * @param tableName + * @param edits + * @param now + * @param htd + * @param sequenceId + * @throws IOException */ @VisibleForTesting public void append(HRegionInfo info, TableName tableName, WALEdit edits, final long now, HTableDescriptor htd, AtomicLong sequenceId) throws IOException; /** - * For notification post append to the writer. - * @param entries + * For notification post append to the writer. Used by metrics system at least. + * @param entry + * @param elapsedTime + * @return Size of this append. */ - void postAppend(final List entries); + long postAppend(final Entry entry, final long elapsedTime); /** - * For notification post writer sync. + * For notification post writer sync. Used by metrics system at least. + * @param timeInMillis How long the filesystem sync took in milliseconds. + * @param How many sync handler calls were released by this call to filesystem sync. */ - void postSync(); + void postSync(final long timeInMillis, final int handlerSyncs); /** - * Append a set of edits to the log. Log edits are keyed by (encoded) regionName, rowname, and - * log-sequence-id. The HLog is not flushed after this transaction is written to the log. + * Append a set of edits to the WAL. WAL edits are keyed by (encoded) regionName, rowname, and + * log-sequence-id. The WAL is not flushed/sync'd after this transaction completes. Call {@link #sync()} to + * flush/sync all outstanding edits/appends. * @param info * @param tableName * @param edits - * @param clusterIds The clusters that have consumed the change (for replication) + * @param clusterIds * @param now * @param htd - * @param sequenceId of the region - * @return txid of this transaction + * @param sequenceId + * @param isInMemstore Always true except for case where we are writing a compaction completion record into the WAL; + * in this case the entry is just so we can finish an unfinished compaction -- it is not an edit for memstore. + * @param nonceGroup + * @param nonce + * @return * @throws IOException */ long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits, @@ -311,6 +333,7 @@ public interface HLog { void sync() throws IOException; + // TODO: Why is this exposed? txid is an internal detail. void sync(long txid) throws IOException; /** @@ -318,7 +341,7 @@ public interface HLog { * in order to be able to do cleanup. This method tells WAL that some region is about * to flush memstore. * - * We stash the oldest seqNum for the region, and let the the next edit inserted in this + *

We stash the oldest seqNum for the region, and let the the next edit inserted in this * region be recorded in {@link #append(HRegionInfo, TableName, WALEdit, long, HTableDescriptor, * AtomicLong)} as new oldest seqnum. * In case of flush being aborted, we put the stashed value back; in case of flush succeeding, @@ -362,4 +385,4 @@ public interface HLog { * @return The number if present, HConstants.NO_SEQNUM if absent. */ long getEarliestMemstoreSeqNum(byte[] encodedRegionName); -} +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java index 2b9130c..8fbf697 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java @@ -59,8 +59,7 @@ public class HLogFactory { public static HLog createMetaHLog(final FileSystem fs, final Path root, final String logName, final Configuration conf, final List listeners, final String prefix) throws IOException { - return new FSHLog(fs, root, logName, HConstants.HREGION_OLDLOGDIR_NAME, - conf, listeners, false, prefix, true); + return new FSHLog(fs, root, logName, HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners, false, prefix, true); } /* diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java index 42a125f..624fa37 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java @@ -190,6 +190,14 @@ public class HLogKey implements WritableComparable { } /** + * Allow that the log sequence id to be set post-construction. + * @param sequence + */ + void setLogSeqNum(final long sequence) { + this.logSeqNum = sequence; + } + + /** * @return the write time */ public long getWriteTime() { @@ -439,17 +447,15 @@ public class HLogKey implements WritableComparable { // Do not need to read the clusters information as we are using protobufs from 0.95 } - public WALKey.Builder getBuilder( - WALCellCodec.ByteStringCompressor compressor) throws IOException { + public WALKey.Builder getBuilder(WALCellCodec.ByteStringCompressor compressor) + throws IOException { WALKey.Builder builder = WALKey.newBuilder(); if (compressionContext == null) { builder.setEncodedRegionName(ZeroCopyLiteralByteString.wrap(this.encodedRegionName)); builder.setTableName(ZeroCopyLiteralByteString.wrap(this.tablename.getName())); } else { - builder.setEncodedRegionName( - compressor.compress(this.encodedRegionName, compressionContext.regionDict)); - builder.setTableName(compressor.compress(this.tablename.getName(), - compressionContext.tableDict)); + builder.setEncodedRegionName(compressor.compress(this.encodedRegionName, compressionContext.regionDict)); + builder.setTableName(compressor.compress(this.tablename.getName(), compressionContext.tableDict)); } builder.setLogSequenceNumber(this.logSeqNum); builder.setWriteTime(writeTime); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java index 684f78c..86ac3ab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java @@ -189,7 +189,7 @@ public class HLogUtil { serverName = ServerName.parseServerName(logDirName); } catch (IllegalArgumentException ex) { serverName = null; - LOG.warn("Invalid log file path=" + logFile, ex); + LOG.warn("Cannot parse a server name from path=" + logFile + "; " + ex.getMessage()); } if (serverName != null && serverName.getStartcode() < 0) { LOG.warn("Invalid log file path=" + logFile); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index 6d10b26..b67b553 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -99,8 +99,7 @@ public class ProtobufLogWriter extends WriterBase { @Override public void append(HLog.Entry entry) throws IOException { entry.setCompressionContext(compressionContext); - entry.getKey().getBuilder(compressor).setFollowingKvCount(entry.getEdit().size()) - .build().writeDelimitedTo(output); + entry.getKey().getBuilder(compressor).setFollowingKvCount(entry.getEdit().size()).build().writeDelimitedTo(output); for (KeyValue kv : entry.getEdit().getKeyValues()) { // cellEncoder must assume little about the stream, since we write PB and cells in turn. cellEncoder.write(kv); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/RingBufferTruck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/RingBufferTruck.java new file mode 100644 index 0000000..3cdb471 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/RingBufferTruck.java @@ -0,0 +1,72 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import org.cloudera.htrace.Span; + +import com.lmax.disruptor.EventFactory; + +/** + * A 'truck' to carry a payload over the ring buffer. + * Has EITHER a {@link FSWALEntry} for making an append OR it has a {@link SyncFuture} on a sync call. + * On startup, the disruptor makes a bunch of these and they are reused carrying stuff over the ring buffer. + */ +class RingBufferTruck { + /** + * Either this syncFuture is set or entry is set, but not both. + */ + private SyncFuture syncFuture; + private FSWALEntry entry; + + /** + * The tracing span for this entry. Can be null. + * TODO: Fix up tracing. + */ + private Span span; + + void loadPayload(final FSWALEntry entry, final Span span) { + this.entry = entry; + this.span = span; + this.syncFuture = null; + } + + void loadPayload(final SyncFuture syncFuture) { + this.syncFuture = syncFuture; + this.entry = null; + this.span = null; + } + + FSWALEntry getFSWALEntryPayload() { + return this.entry; + } + + SyncFuture getSyncFuturePayload() { + return this.syncFuture; + } + + Span getSpanPayload() { + return this.span; + } + + final static EventFactory EVENT_FACTORY = new EventFactory() { + public RingBufferTruck newInstance() { + return new RingBufferTruck(); + } + }; +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java new file mode 100644 index 0000000..09b0281 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * A Future held until a sync completes. Handlers coming in call flush/sync of + * the edits they have appended to the WAL. Since sync takes a while to + * complete, we give the Handlers back this sync future to wait on until the + * actual HDFS sync completes. Meantime this sync future goes across the ring + * buffer and into a sync runner thread; when it completes, it finishes up the + * future, the handler get or failed check completes and the Handler can then + * progress. + *

+ * This is just a partial implementation of Future; we just implement get and + * failure. It also implements the EventTranslator interface so it can traverse + * the ringbuffer. + *

+ * There is not a one-to-one correlation between dfs sync invocations and + * instances of this class. A single dfs sync call may complete and mark many + * SyncFutures as done; i.e. we batch up sync calls rather than do a dfs sync + * call every time a Handler asks for it. + *

+ * SyncFutures are for reuse. Call {@link #reset(long)} before use even if it + * the first time, start the sync, then park the 'hitched' thread on a call to + * {@link #get()} + */ +class SyncFuture { + /** + * The sequence at which we were added to the ring buffer. + */ + private long ringBufferSequence; + + /** + * The sequence that was set in here when we were marked done. Should be equal + * or > ringBufferSequence + */ + private long doneSequence; + + /** + * If error, the associated throwable. Set when the future is 'done'. + */ + private Throwable throwable = null; + + //TODO: THIS NECESSARY? JUST USE -1 SEQUENCE AS FLAG? + private boolean done = true; + + private Thread t; + + /** + * Call this method to clear old usage and get it ready for new deploy. Call + * this method even if it is being used for the first time. + * + * @param sequence + * @return this + */ + synchronized SyncFuture reset(final long sequence) { + if (t != null && t != Thread.currentThread()) throw new IllegalStateException(); + t = Thread.currentThread(); + if (!this.done) throw new IllegalStateException("" + sequence + " " + Thread.currentThread()); + this.done = false; + this.doneSequence = 0; + this.ringBufferSequence = sequence; + return this; + } + + @Override + public synchronized String toString() { + return "done=" + this.done + ", ringBufferSequence=" + this.ringBufferSequence; + } + + synchronized long getRingBufferSequence() { + return this.ringBufferSequence; + } + + synchronized boolean done(final long sequence) { + if (isDone()) return false; + this.done = true; + if (sequence < this.ringBufferSequence) { + return done(new IllegalStateException("sequence=" + sequence + ", ringBufferSequence=" + this.ringBufferSequence)); + } + this.doneSequence = sequence; + notify(); + return true; + } + + synchronized boolean done(final Throwable t) { + if (isDone()) return false; + this.done = true; + this.throwable = t; + notify(); + return true; + } + + public boolean cancel(boolean mayInterruptIfRunning) { + throw new UnsupportedOperationException(); + } + + public synchronized long get() throws InterruptedException, ExecutionException { + while (!this.done) wait(); + if (this.throwable != null) throw new ExecutionException(this.throwable); + return this.doneSequence; + } + + public Long get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + throw new UnsupportedOperationException(); + } + + public boolean isCancelled() { + throw new UnsupportedOperationException(); + } + + synchronized boolean isDone() { + return this.done; + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java index 700fbbf..da5a9b4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java @@ -94,8 +94,9 @@ public class WALCoprocessorHost * @throws IOException */ public boolean preWALWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit) - throws IOException { + throws IOException { boolean bypass = false; + if (this.coprocessors == null || this.coprocessors.isEmpty()) return bypass; ObserverContext ctx = null; for (WALEnvironment env: coprocessors) { if (env.getInstance() instanceof @@ -119,7 +120,8 @@ public class WALCoprocessorHost * @throws IOException */ public void postWALWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit) - throws IOException { + throws IOException { + if (this.coprocessors == null || this.coprocessors.isEmpty()) return; ObserverContext ctx = null; for (WALEnvironment env: coprocessors) { if (env.getInstance() instanceof diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java index d4b36a6..7ae7a98 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java @@ -22,17 +22,16 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; -import java.util.List; import java.util.NavigableMap; import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hbase.codec.Codec; -import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.codec.Codec; +import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; @@ -271,5 +270,4 @@ public class WALEdit implements Writable, HeapSize { } return null; } -} - +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java index 4eec8d1..b2a7701 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java @@ -54,8 +54,8 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import com.yammer.metrics.core.Meter; import com.yammer.metrics.core.Histogram; +import com.yammer.metrics.core.Meter; import com.yammer.metrics.core.MetricsRegistry; import com.yammer.metrics.reporting.ConsoleReporter; @@ -72,8 +72,12 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool metrics.newMeter(HLogPerformanceEvaluation.class, "syncMeter", "syncs", TimeUnit.MILLISECONDS); private final Histogram syncHistogram = metrics.newHistogram(HLogPerformanceEvaluation.class, "syncHistogram", "nanos-between-syncs", true); + private final Histogram syncCountHistogram = + metrics.newHistogram(HLogPerformanceEvaluation.class, "syncCountHistogram", "countPerSync", true); private final Meter appendMeter = metrics.newMeter(HLogPerformanceEvaluation.class, "appendMeter", "bytes", TimeUnit.MILLISECONDS); + private final Histogram latencyHistogram = + metrics.newHistogram(HLogPerformanceEvaluation.class, "latencyHistogram", "nanos", true); private HBaseTestingUtility TEST_UTIL; @@ -127,8 +131,8 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool long startTime = System.currentTimeMillis(); int lastSync = 0; for (int i = 0; i < numIterations; ++i) { + long now = System.nanoTime(); Put put = setupPut(rand, key, value, numFamilies); - long now = System.currentTimeMillis(); WALEdit walEdit = new WALEdit(); addFamilyMapToWALEdit(put.getFamilyCellMap(), walEdit); HRegionInfo hri = region.getRegionInfo(); @@ -140,6 +144,7 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool lastSync = 0; } } + latencyHistogram.update(System.nanoTime() - now); } long totalTime = (System.currentTimeMillis() - startTime); logBenchmarkResult(Thread.currentThread().getName(), numIterations, totalTime); @@ -231,6 +236,10 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool conf.set(HConstants.CRYPTO_WAL_ALGORITHM_CONF_KEY, cipher); } + // Internal config. goes off number of threads; if more threads than handlers, stuff breaks. In regionserver, + // number of handlers == number of threads. + getConf().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, numThreads); + // Run HLog Performance Evaluation // First set the fs from configs. In case we are on hadoop1 FSUtils.setFsDefault(getConf(), FSUtils.getRootDir(getConf())); @@ -245,47 +254,74 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool // Initialize Table Descriptor HTableDescriptor htd = createHTableDescriptor(numFamilies); final long whenToRoll = roll; - HLog hlog = new FSHLog(fs, rootRegionDir, "wals", getConf()) { - int appends = 0; - long lastSync = 0; + final HLog hlog = new FSHLog(fs, rootRegionDir, "wals", getConf()) { + + @Override + public void postSync(final long timeInNanos, final int handlerSyncs) { + super.postSync(timeInNanos, handlerSyncs); + syncMeter.mark(); + syncHistogram.update(timeInNanos); + syncCountHistogram.update(handlerSyncs); + } + + @Override + public long postAppend(final HLog.Entry entry, final long elapsedTime) { + long size = super.postAppend(entry, elapsedTime); + appendMeter.mark(size); + return size; + } + }; + hlog.registerWALActionsListener(new WALActionsListener() { + private int appends = 0; @Override - protected void doWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit, - HTableDescriptor htd) - throws IOException { + public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey, WALEdit logEdit) { this.appends++; if (this.appends % whenToRoll == 0) { LOG.info("Rolling after " + appends + " edits"); - rollWriter(); + try { + hlog.rollWriter(); + } catch (FailedLogCloseException e) { + LOG.info("Failed roll; continuing", e); + } catch (IOException e) { + LOG.info("Failed roll; continuing", e); + } } - super.doWrite(info, logKey, logEdit, htd); - }; + } @Override - public void postSync() { - super.postSync(); - syncMeter.mark(); - long now = System.nanoTime(); - if (lastSync > 0) { - long diff = now - lastSync; - syncHistogram.update(diff); - } - this.lastSync = now; + public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit) { } @Override - public void postAppend(List entries) { - super.postAppend(entries); - int size = 0; - for (Entry e: entries) size += e.getEdit().heapSize(); - appendMeter.mark(size); + public void preLogRoll(Path oldPath, Path newPath) throws IOException { } - }; + + @Override + public void preLogArchive(Path oldPath, Path newPath) throws IOException { + } + + @Override + public void postLogRoll(Path oldPath, Path newPath) throws IOException { + } + + @Override + public void postLogArchive(Path oldPath, Path newPath) throws IOException { + } + + @Override + public void logRollRequested() { + } + + @Override + public void logCloseRequested() { + } + }); hlog.rollWriter(); HRegion region = null; try { region = openRegion(fs, rootRegionDir, htd, hlog); - ConsoleReporter.enable(this.metrics, 60, TimeUnit.SECONDS); + ConsoleReporter.enable(this.metrics, 30, TimeUnit.SECONDS); long putTime = runBenchmark(new HLogPutBenchmark(region, htd, numIterations, noSync, syncInterval), numThreads); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java index 94cfe69..069de19 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java @@ -65,7 +65,7 @@ import org.junit.experimental.categories.Category; @Category(LargeTests.class) @SuppressWarnings("deprecation") public class TestHLog { - private static final Log LOG = LogFactory.getLog(TestHLog.class); + private static final Log LOG = LogFactory.getLog(TestHLog.class);/* { ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL); ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL); @@ -73,7 +73,7 @@ public class TestHLog { .getLogger().setLevel(Level.ALL); ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL); ((Log4JLogger)HLog.LOG).getLogger().setLevel(Level.ALL); - } + }*/ private static Configuration conf; private static FileSystem fs; @@ -85,12 +85,10 @@ public class TestHLog { @Before public void setUp() throws Exception { - FileStatus[] entries = fs.listStatus(new Path("/")); for (FileStatus dir : entries) { fs.delete(dir.getPath(), true); } - } @After @@ -127,6 +125,7 @@ public class TestHLog { oldLogDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME); dir = new Path(hbaseDir, getName()); } + @AfterClass public static void tearDownAfterClass() throws Exception { TEST_UTIL.shutdownMiniCluster(); @@ -158,13 +157,11 @@ public class TestHLog { */ @Test public void testSplit() throws IOException { - final TableName tableName = TableName.valueOf(getName()); final byte [] rowName = tableName.getName(); Path logdir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME); - HLog log = HLogFactory.createHLog(fs, hbaseDir, - HConstants.HREGION_LOGDIR_NAME, conf); + HLog log = HLogFactory.createHLog(fs, hbaseDir, HConstants.HREGION_LOGDIR_NAME, conf); final int howmany = 3; HRegionInfo[] infos = new HRegionInfo[3]; Path tabledir = FSUtils.getTableDir(hbaseDir, tableName); @@ -199,8 +196,7 @@ public class TestHLog { log.rollWriter(); } log.close(); - List splits = HLogSplitter.split( - hbaseDir, logdir, oldLogDir, fs, conf); + List splits = HLogSplitter.split(hbaseDir, logdir, oldLogDir, fs, conf); verifySplits(splits, howmany); log = null; } finally { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java index 9320aed..0cb01cc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java @@ -18,12 +18,17 @@ package org.apache.hadoop.hbase.regionserver.wal; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; @@ -123,41 +128,41 @@ public class TestLogRollAbort { // Create the test table and open it String tableName = this.getClass().getSimpleName(); - HTableDescriptor desc = new HTableDescriptor(tableName); + HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName)); desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)); - desc.setDeferredLogFlush(true); admin.createTable(desc); HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName); + try { - HRegionServer server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName)); - HLog log = server.getWAL(); + HRegionServer server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName)); + HLog log = server.getWAL(); - assertTrue("Need HDFS-826 for this test", ((FSHLog) log).canGetCurReplicas()); - // don't run this test without append support (HDFS-200 & HDFS-142) - assertTrue("Need append support for this test", + assertTrue("Need HDFS-826 for this test", ((FSHLog) log).canGetCurReplicas()); + // don't run this test without append support (HDFS-200 & HDFS-142) + assertTrue("Need append support for this test", FSUtils.isAppendSupported(TEST_UTIL.getConfiguration())); - Put p = new Put(Bytes.toBytes("row2001")); - p.add(HConstants.CATALOG_FAMILY, Bytes.toBytes("col"), Bytes.toBytes(2001)); - table.put(p); + Put p = new Put(Bytes.toBytes("row2001")); + p.add(HConstants.CATALOG_FAMILY, Bytes.toBytes("col"), Bytes.toBytes(2001)); + table.put(p); - log.sync(); + log.sync(); - p = new Put(Bytes.toBytes("row2002")); - p.add(HConstants.CATALOG_FAMILY, Bytes.toBytes("col"), Bytes.toBytes(2002)); - table.put(p); + p = new Put(Bytes.toBytes("row2002")); + p.add(HConstants.CATALOG_FAMILY, Bytes.toBytes("col"), Bytes.toBytes(2002)); + table.put(p); - dfsCluster.restartDataNodes(); - LOG.info("Restarted datanodes"); + dfsCluster.restartDataNodes(); + LOG.info("Restarted datanodes"); - try { - log.rollWriter(true); - } catch (FailedLogCloseException flce) { - assertTrue("Should have deferred flush log edits outstanding", - ((FSHLog) log).hasDeferredEntries()); + try { + log.rollWriter(true); + } catch (FailedLogCloseException flce) { + assertTrue("Should have deferred flush log edits outstanding", ((FSHLog) log).isUnflushedEntries()); + } + } finally { + table.close(); } } - -} - +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 4173a19..f7b9883 100644 --- a/pom.xml +++ b/pom.xml @@ -898,6 +898,7 @@ 2.6 1.1.1 2.2 + 3.2.0 3.2.1 3.1 2.1.2 @@ -1361,6 +1362,11 @@ htrace-core ${htrace.version} + + com.lmax + disruptor + ${disruptor.version} +