diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 90976e2..d8864ce 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -36,7 +36,6 @@ import java.util.NavigableMap; import java.util.NavigableSet; import java.util.Set; import java.util.TreeMap; -import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentHashMap; @@ -208,12 +207,23 @@ public class HRegion implements HeapSize { // , Writable{ */ final AtomicBoolean closing = new AtomicBoolean(false); - protected volatile long completeSequenceId = -1L; + /** + * The sequence id of the last flush on this region. Used doing some rough calculations on + * whether time to flush or not. + */ + protected volatile long lastFlushSeqId = -1L; /** - * Region level sequence Id. It is used for appending WALEdits in HLog. Its default value is -1, - * as a marker that the region hasn't opened yet. Once it is opened, it is set to - * {@link #openSeqNum}. + * Region scoped edit sequence Id. Edits to this region are GUARANTEED to appear in the WAL/HLog + * file in this sequence id's order; i.e. edit #2 will be in the WAL after edit #1. + * Its default value is {@link HLog.NO_SEQUENCE_ID}. This default is used as a marker to indicate + * that the region hasn't opened yet. Once it is opened, it is set to the derived + * {@link #openSeqNum}, the largest sequence id of all hfiles opened under this Region. + * + *

Control of this sequence is handed off to the WAL/HLog implementation. It is responsible + * for tagging edits with the correct sequence id since it is responsible for getting the + * edits into the WAL files. It controls updating the sequence id value. DO NOT UPDATE IT + * OUTSIDE OF THE WAL. The value you get will not be what you think it is. */ private final AtomicLong sequenceId = new AtomicLong(-1L); @@ -390,7 +400,7 @@ public class HRegion implements HeapSize { // , Writable{ /** * Objects from this class are created when flushing to describe all the different states that * that method ends up in. The Result enum describes those states. The sequence id should only - * be specified if the flush was successful, and the failure message should only be speficied + * be specified if the flush was successful, and the failure message should only be specified * if it didn't flush. */ public static class FlushResult { @@ -741,7 +751,7 @@ public class HRegion implements HeapSize { // , Writable{ this.closing.set(false); this.closed.set(false); - this.completeSequenceId = nextSeqid; + this.lastFlushSeqId = nextSeqid; if (coprocessorHost != null) { status.setStatus("Running coprocessor post-open hooks"); coprocessorHost.postOpen(); @@ -1602,7 +1612,8 @@ public class HRegion implements HeapSize { // , Writable{ * Should the memstore be flushed now */ boolean shouldFlush() { - if(this.completeSequenceId + this.flushPerChanges < this.sequenceId.get()) { + // This is a rough measure. + if (this.lastFlushSeqId + this.flushPerChanges < this.sequenceId.get()) { return true; } if (flushCheckInterval <= 0) { //disabled @@ -1625,34 +1636,16 @@ public class HRegion implements HeapSize { // , Writable{ } /** - * Flush the memstore. - * - * Flushing the memstore is a little tricky. We have a lot of updates in the - * memstore, all of which have also been written to the log. We need to - * write those updates in the memstore out to disk, while being able to - * process reads/writes as much as possible during the flush operation. Also, - * the log has to state clearly the point in time at which the memstore was - * flushed. (That way, during recovery, we know when we can rely on the - * on-disk flushed structures and when we have to recover the memstore from - * the log.) - * - *

So, we have a three-step process: - * - *

- *

This method is protected, but can be accessed via several public - * routes. - * - *

This method may block for some time. + * Flush the memstore. Flushing the memstore is a little tricky. We have a lot of updates in the + * memstore, all of which have also been written to the log. We need to write those updates in the + * memstore out to disk, while being able to process reads/writes as much as possible during the + * flush operation. + *

This method may block for some time. Every time you call it, we up the regions + * sequence id even if we don't flush; i.e. the returned region id will be at least one larger + * than the last edit applied to this region. The returned id does not refer to an actual edit. + * The returned id can be used for say installing a bulk loaded file just ahead of the last hfile + * that was the result of this flush, etc. * @param status - * * @return object describing the flush's state * * @throws IOException general io exceptions @@ -1666,10 +1659,9 @@ public class HRegion implements HeapSize { // , Writable{ /** * @param wal Null if we're NOT to go via hlog/wal. - * @param myseqid The seqid to use if wal is null writing out - * flush file. + * @param myseqid The seqid to use if wal is null writing out flush file. * @param status - * @return true if the region needs compacting + * @return object describing the flush's state * @throws IOException * @see #internalFlushcache(MonitoredTask) */ @@ -1681,50 +1673,67 @@ public class HRegion implements HeapSize { // , Writable{ throw new IOException("Aborting flush because server is abortted..."); } final long startTime = EnvironmentEdgeManager.currentTimeMillis(); - // Clear flush flag. - // If nothing to flush, return and avoid logging start/stop flush. + // If nothing to flush, return, but we need to safely update the region sequence id if (this.memstoreSize.get() <= 0) { - return new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush"); + // Take an update lock because am about to change the sequence id and we want the sequence id + // to be at the border of the empty memstore. + this.updatesLock.writeLock().lock(); + try { + if (this.memstoreSize.get() <= 0) { + // Presume that if there are still no edits in the memstore, then there are no edits for + // this region out in the WAL/HLog subsystem so no need to do any trickery clearing out + // edits in the WAL system. Up the sequence number so the resulting flush id is for + // sure just beyond the last appended region edit (useful as a marker when bulk loading, + // etc.) + // wal can be null replaying edits. + return wal != null? + new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, + getNextSequenceId(wal, startTime), "Nothing to flush"): + new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush"); + } + } finally { + this.updatesLock.writeLock().unlock(); + } } if (LOG.isDebugEnabled()) { LOG.debug("Started memstore flush for " + this + ", current region memstore size " + - StringUtils.humanReadableInt(this.memstoreSize.get()) + + StringUtils.byteDesc(this.memstoreSize.get()) + ((wal != null)? "": "; wal is null, using passed sequenceid=" + myseqid)); } - // Stop updates while we snapshot the memstore of all stores. We only have - // to do this for a moment. Its quick. The subsequent sequence id that - // goes into the HLog after we've flushed all these snapshots also goes - // into the info file that sits beside the flushed files. - // We also set the memstore size to zero here before we allow updates - // again so its value will represent the size of the updates received - // during the flush + // Stop updates while we snapshot the memstore of all of these regions' stores. We only have + // to do this for a moment. It is quick. We also set the memstore size to zero here before we + // allow updates again so its value will represent the size of the updates received + // during flush MultiVersionConsistencyControl.WriteEntry w = null; - // We have to take a write lock during snapshot, or else a write could - // end up in both snapshot and memstore (makes it difficult to do atomic - // rows then) + // We have to take an update lock during snapshot, or else a write could end up in both snapshot + // and memstore (makes it difficult to do atomic rows then) status.setStatus("Obtaining lock to block concurrent updates"); // block waiting for the lock for internal flush this.updatesLock.writeLock().lock(); long totalFlushableSize = 0; - status.setStatus("Preparing to flush by snapshotting stores"); + status.setStatus("Preparing to flush by snapshotting stores in " + + getRegionInfo().getEncodedName()); List storeFlushCtxs = new ArrayList(stores.size()); long flushSeqId = -1L; try { // Record the mvcc for all transactions in progress. w = mvcc.beginMemstoreInsert(); mvcc.advanceMemstore(w); - // check if it is not closing. if (wal != null) { if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) { + // This should never happen. String msg = "Flush will not be started for [" + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing."; status.setStatus(msg); return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg); } - flushSeqId = this.sequenceId.incrementAndGet(); + // Get a sequence id that we can use to denote the flush. It will be one beyond the last + // edit that made it into the hfile (the below does not add an edit, it just asks the + // WAL system to return next sequence edit). + flushSeqId = getNextSequenceId(wal, startTime); } else { // use the provided sequence Id as WAL is not being used for this flush. flushSeqId = myseqid; @@ -1735,7 +1744,7 @@ public class HRegion implements HeapSize { // , Writable{ storeFlushCtxs.add(s.createFlushContext(flushSeqId)); } - // prepare flush (take a snapshot) + // Prepare flush (take a snapshot) for (StoreFlushContext flush : storeFlushCtxs) { flush.prepare(); } @@ -1749,9 +1758,7 @@ public class HRegion implements HeapSize { // , Writable{ // sync unflushed WAL changes when deferred log sync is enabled // see HBASE-8208 for details - if (wal != null && !shouldSyncLog()) { - wal.sync(); - } + if (wal != null && !shouldSyncLog()) wal.sync(); // wait for all in-progress transactions to commit to HLog before // we can start the flush. This prevents @@ -1816,8 +1823,8 @@ public class HRegion implements HeapSize { // , Writable{ // Record latest flush time this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis(); - // Update the last flushed sequence id for region - completeSequenceId = flushSeqId; + // Update the last flushed sequence id for region. TODO: This is dup'd inside the WAL/FSHlog. + this.lastFlushSeqId = flushSeqId; // C. Finally notify anyone waiting on memstore to clear: // e.g. checkResources(). @@ -1828,9 +1835,9 @@ public class HRegion implements HeapSize { // , Writable{ long time = EnvironmentEdgeManager.currentTimeMillis() - startTime; long memstoresize = this.memstoreSize.get(); String msg = "Finished memstore flush of ~" + - StringUtils.humanReadableInt(totalFlushableSize) + "/" + totalFlushableSize + + StringUtils.byteDesc(totalFlushableSize) + "/" + totalFlushableSize + ", currentsize=" + - StringUtils.humanReadableInt(memstoresize) + "/" + memstoresize + + StringUtils.byteDesc(memstoresize) + "/" + memstoresize + " for region " + this + " in " + time + "ms, sequenceid=" + flushSeqId + ", compaction requested=" + compactionRequested + ((wal == null)? "; wal=null": ""); @@ -1842,6 +1849,22 @@ public class HRegion implements HeapSize { // , Writable{ FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushSeqId); } + /** + * Method to safely get the next sequence number. + * @param wal + * @param now + * @return Next sequence number unassociated with any actual edit. + * @throws IOException + */ + private long getNextSequenceId(final HLog wal, final long now) throws IOException { + HLogKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable()); + // Call append but with an empty WALEdit. The returned seqeunce id will not be associated + // with any edit and we can be sure it went in after all outstanding appends. + wal.appendNoSync(getTableDesc(), getRegionInfo(), key, + WALEdit.EMPTY_WALEDIT, this.sequenceId, false); + return key.getLogSeqNum(); + } + ////////////////////////////////////////////////////////////////////////////// // get() methods for client use. ////////////////////////////////////////////////////////////////////////////// @@ -2515,9 +2538,11 @@ public class HRegion implements HeapSize { // , Writable{ throw new IOException("Multiple nonces per batch and not in replay"); } // txid should always increase, so having the one from the last call is ok. - txid = this.log.appendNoSync(this.getRegionInfo(), htableDescriptor.getTableName(), - walEdit, m.getClusterIds(), now, htableDescriptor, this.sequenceId, true, - currentNonceGroup, currentNonce); + HLogKey key = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), now, m.getClusterIds(), currentNonceGroup, + currentNonce); + txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), key, + walEdit, getSequenceId(), true); hasWalAppends = true; walEdit = new WALEdit(isInReplay); } @@ -2540,9 +2565,11 @@ public class HRegion implements HeapSize { // , Writable{ // ------------------------- Mutation mutation = batchOp.getMutation(firstIndex); if (walEdit.size() > 0) { - txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(), - walEdit, mutation.getClusterIds(), now, this.htableDescriptor, this.sequenceId, - true, currentNonceGroup, currentNonce); + HLogKey key = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), now, mutation.getClusterIds(), + currentNonceGroup, currentNonce); + txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), key, walEdit, + getSequenceId(), true); hasWalAppends = true; } @@ -3643,13 +3670,15 @@ public class HRegion implements HeapSize { // , Writable{ long seqId = -1; // We need to assign a sequential ID that's in between two memstores in order to preserve // the guarantee that all the edits lower than the highest sequential ID from all the - // HFiles are flushed on disk. See HBASE-10958. + // HFiles are flushed on disk. See HBASE-10958. The sequence id returned when we flush is + // guaranteed to be one beyond the file made when we flushed (or if nothing to flush, it is + // a sequence id that we can be sure is beyond the last hfile written). if (assignSeqId) { FlushResult fs = this.flushcache(); if (fs.isFlushSucceeded()) { seqId = fs.flushSequenceId; } else if (fs.result == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) { - seqId = this.sequenceId.incrementAndGet(); + seqId = fs.flushSequenceId; } else { throw new IOException("Could not bulk load with an assigned sequential ID because the " + "flush didn't run. Reason for not flushing: " + fs.failureReason); @@ -4936,9 +4965,11 @@ public class HRegion implements HeapSize { // , Writable{ long txid = 0; // 7. Append no sync if (!walEdit.isEmpty()) { - txid = this.log.appendNoSync(this.getRegionInfo(), - this.htableDescriptor.getTableName(), walEdit, processor.getClusterIds(), now, - this.htableDescriptor, this.sequenceId, true, nonceGroup, nonce); + HLogKey key = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), now, processor.getClusterIds(), nonceGroup, + nonce); + txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), + key, walEdit, getSequenceId(), true); } // 8. Release region lock if (locked) { @@ -5179,10 +5210,10 @@ public class HRegion implements HeapSize { // , Writable{ // Using default cluster id, as this can only happen in the orginating // cluster. A slave cluster receives the final value (not the delta) // as a Put. - txid = this.log.appendNoSync(this.getRegionInfo(), - this.htableDescriptor.getTableName(), walEdits, new ArrayList(), - EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor, this.sequenceId, - true, nonceGroup, nonce); + HLogKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), nonceGroup, nonce); + txid = this.log.appendNoSync(this.htableDescriptor, getRegionInfo(), key, walEdits, + this.sequenceId, true); } else { recordMutationWithoutWal(append.getFamilyCellMap()); } @@ -5372,10 +5403,10 @@ public class HRegion implements HeapSize { // , Writable{ // Using default cluster id, as this can only happen in the orginating // cluster. A slave cluster receives the final value (not the delta) // as a Put. - txid = this.log.appendNoSync(this.getRegionInfo(), - this.htableDescriptor.getTableName(), walEdits, new ArrayList(), - EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor, this.sequenceId, - true, nonceGroup, nonce); + HLogKey key = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), nonceGroup, nonce); + txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), + key, walEdits, getSequenceId(), true); } else { recordMutationWithoutWal(increment.getFamilyCellMap()); } @@ -6048,8 +6079,10 @@ public class HRegion implements HeapSize { // , Writable{ } /** - * @return sequenceId. + * Do not change this sequence id. See {@link #sequenceId} comment. + * @return sequenceId */ + @VisibleForTesting public AtomicLong getSequenceId() { return this.sequenceId; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 4e379b6..e98f47d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1241,7 +1241,7 @@ public class HRegionServer extends HasThread implements .setWriteRequestsCount((int) r.writeRequestsCount.get()) .setTotalCompactingKVs(totalCompactingKVs) .setCurrentCompactedKVs(currentCompactedKVs) - .setCompleteSequenceId(r.completeSequenceId); + .setCompleteSequenceId(r.lastFlushSeqId); return regionLoad.build(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index f52937e..278aa27 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -87,19 +87,16 @@ import com.lmax.disruptor.dsl.ProducerType; /** * Implementation of {@link HLog} to go against {@link FileSystem}; i.e. keep WALs in HDFS. * Only one HLog/WAL is ever being written at a time. When a WAL hits a configured maximum size, - * it is rolled. This is done internal to the implementation, so external - * callers do not have to be concerned with log rolling. + * it is rolled. This is done internal to the implementation. * - *

As data is flushed from the MemStore to other (better) on-disk structures (files sorted by + *

As data is flushed from the MemStore to other on-disk structures (files sorted by * key, hfiles), a WAL becomes obsolete. We can let go of all the log edits/entries for a given - * HRegion-id up to the most-recent CACHEFLUSH message from that HRegion. A bunch of work in the - * below is done keeping account of these region sequence ids -- what is flushed out to hfiles, - * and what is yet in WAL and in memory only. + * HRegion-sequence id. A bunch of work in the below is done keeping account of these region + * sequence ids -- what is flushed out to hfiles, and what is yet in WAL and in memory only. * *

It is only practical to delete entire files. Thus, we delete an entire on-disk file * F when all of the edits in F have a log-sequence-id that's older - * (smaller) than the most-recent CACHEFLUSH message for every HRegion that has an edit in - * F. + * (smaller) than the most-recent flush. * *

To read an HLog, call {@link HLogFactory#createReader(org.apache.hadoop.fs.FileSystem, * org.apache.hadoop.fs.Path, org.apache.hadoop.conf.Configuration)}. @@ -113,24 +110,29 @@ class FSHLog implements HLog, Syncable { // here appending and syncing on a single WAL. The Disruptor is configured to handle multiple // producers but it has one consumer only (the producers in HBase are IPC Handlers calling append // and then sync). The single consumer/writer pulls the appends and syncs off the ring buffer. - // The appends are added to the WAL immediately without pause or batching (there may be a slight - // benefit batching appends but it complicates the implementation -- the gain is not worth - // the added complication). When a producer calls sync, it is given back a future. The producer - // 'blocks' on the future so it does not return until the sync completes. The future is passed - // over the ring buffer from the producer to the consumer thread where it does its best to batch - // up the producer syncs so one WAL sync actually spans multiple producer sync invocations. How - // well the batching works depends on the write rate; i.e. we tend to batch more in times of + // When a handler calls sync, it is given back a future. The producer 'blocks' on the future so + // it does not return until the sync completes. The future is passed over the ring buffer from + // the producer/handler to the consumer thread where it does its best to batch up the producer + // syncs so one WAL sync actually spans multiple producer sync invocations. How well the + // batching works depends on the write rate; i.e. we tend to batch more in times of // high writes/syncs. // - //

The consumer thread pass the syncs off to muliple syncing threads in a round robin fashion + // Calls to append now also wait until the append has been done on the consumer side of the + // disruptor. We used to not wait but it makes the implemenation easier to grok if we have + // the region edit/sequence id after the append returns. + // + // TODO: Handlers need to coordinate appending AND syncing. Can we have the threads contend + // once only? Probably hard given syncs take way longer than an append. + // + // The consumer threads pass the syncs off to multiple syncing threads in a round robin fashion // to ensure we keep up back-to-back FS sync calls (FS sync calls are the long poll writing the // WAL). The consumer thread passes the futures to the sync threads for it to complete // the futures when done. // - //

The 'sequence' in the below is the sequence of the append/sync on the ringbuffer. It + // The 'sequence' in the below is the sequence of the append/sync on the ringbuffer. It // acts as a sort-of transaction id. It is always incrementing. // - //

The RingBufferEventHandler class hosts the ring buffer consuming code. The threads that + // The RingBufferEventHandler class hosts the ring buffer consuming code. The threads that // do the actual FS sync are implementations of SyncRunner. SafePointZigZagLatch is a // synchronization class used to halt the consumer at a safe point -- just after all outstanding // syncs and appends have completed -- so the log roller can swap the WAL out under it. @@ -138,14 +140,17 @@ class FSHLog implements HLog, Syncable { static final Log LOG = LogFactory.getLog(FSHLog.class); /** - * Disruptor is a fancy ring buffer. This disruptor/ring buffer is used to take edits and sync - * calls from the Handlers and passes them to the append and sync executors with minimal - * contention. + * The nexus at which all incoming handlers meet. Does appends and sync with an ordering. + * Appends and syncs are each put on the ring which means handlers need to + * smash up against the ring twice (can we make it once only? ... maybe not since time to append + * is so different from time to sync and sometimes we don't want to sync or we want to async + * the sync). The ring is where we make sure of our ordering and it is also where we do + * batching up of handler sync calls. */ private final Disruptor disruptor; /** - * An executorservice that runs the AppendEventHandler append executor. + * An executorservice that runs the disrutpor AppendEventHandler append executor. */ private final ExecutorService appendExecutor; @@ -159,6 +164,9 @@ class FSHLog implements HLog, Syncable { /** * Map of {@link SyncFuture}s keyed by Handler objects. Used so we reuse SyncFutures. + * TODO: Reus FSWALEntry's rather than create them anew each time as we do SyncFutures here. + * TODO: Add a FSWalEntry and SyncFuture as thread locals on handlers rather than have them + * get them from this Map? */ private final Map syncFuturesByHandler; @@ -170,14 +178,14 @@ class FSHLog implements HLog, Syncable { /** * The highest known outstanding unsync'd WALEdit sequence number where sequence number is the - * ring buffer sequence. + * ring buffer sequence. Maintained by the ring buffer consumer. */ private volatile long highestUnsyncedSequence = -1; /** * Updated to the ring buffer sequence of the last successful sync call. This can be less than * {@link #highestUnsyncedSequence} for case where we have an append where a sync has not yet - * come in for it. + * come in for it. Maintained by the syncing threads. */ private final AtomicLong highestSyncedSequence = new AtomicLong(0); @@ -192,15 +200,20 @@ class FSHLog implements HLog, Syncable { // Minimum tolerable replicas, if the actual value is lower than it, rollWriter will be triggered private final int minTolerableReplication; + // DFSOutputStream.getNumCurrentReplicas method instance gotten via reflection. private final Method getNumCurrentReplicas; + private final static Object [] NO_ARGS = new Object []{}; + // If live datanode count is lower than the default replicas value, // RollWriter will be triggered in each sync(So the RollWriter will be // triggered one by one in a short time). Using it as a workaround to slow // down the roll frequency triggered by checkLowReplication(). private final AtomicInteger consecutiveLogRolls = new AtomicInteger(0); + private final int lowReplicationRollLimit; + // If consecutiveLogRolls is larger than lowReplicationRollLimit, // then disable the rolling in checkLowReplication(). // Enable it if the replications recover. @@ -273,18 +286,17 @@ class FSHLog implements HLog, Syncable { /** * This lock ties all operations on oldestFlushingRegionSequenceIds and * oldestFlushedRegionSequenceIds Maps with the exception of append's putIfAbsent call into - * oldestUnflushedSeqNums. We use these Maps to find out the low bound seqNum, or to find regions - * with old seqNums to force flush; we are interested in old stuff not the new additions - * (TODO: IS THIS SAFE? CHECK!). + * oldestUnflushedSeqNums. We use these Maps to find out the low bound regions sequence id, or + * to find regions with old sequence ids to force flush; we are interested in old stuff not the + * new additions (TODO: IS THIS SAFE? CHECK!). */ private final Object regionSequenceIdLock = new Object(); /** * Map of encoded region names to their OLDEST -- i.e. their first, the longest-lived -- - * sequence id in memstore. Note that this sequenceid is the region sequence id. This is not + * sequence id in memstore. Note that this sequence id is the region sequence id. This is not * related to the id we use above for {@link #highestSyncedSequence} and - * {@link #highestUnsyncedSequence} which is the sequence from the disruptor ring buffer, an - * internal detail. + * {@link #highestUnsyncedSequence} which is the sequence from the disruptor ring buffer. */ private final ConcurrentSkipListMap oldestUnflushedRegionSequenceIds = new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); @@ -327,14 +339,14 @@ class FSHLog implements HLog, Syncable { }; /** - * Map of wal log file to the latest sequence nums of all regions it has entries of. + * Map of wal log file to the latest sequence ids of all regions it has entries of. * The map is sorted by the log file creation timestamp (contained in the log file name). */ private NavigableMap> byWalRegionSequenceIds = new ConcurrentSkipListMap>(LOG_NAME_COMPARATOR); /** - * Exception handler to pass the disruptor ringbuffer. Same as native implemenation only it + * Exception handler to pass the disruptor ringbuffer. Same as native implementation only it * logs using our logger instead of java native logger. */ static class RingBufferExceptionHandler implements ExceptionHandler { @@ -373,48 +385,6 @@ class FSHLog implements HLog, Syncable { } /** - * Constructor. - * - * @param fs filesystem handle - * @param root path for stored and archived hlogs - * @param logDir dir where hlogs are stored - * @param oldLogDir dir where hlogs are archived - * @param conf configuration to use - * @throws IOException - */ - public FSHLog(final FileSystem fs, final Path root, final String logDir, final String oldLogDir, - final Configuration conf) - throws IOException { - this(fs, root, logDir, oldLogDir, conf, null, true, null, false); - } - - /** - * Create an edit log at the given dir location. - * - * You should never have to load an existing log. If there is a log at - * startup, it should have already been processed and deleted by the time the - * HLog object is started up. - * - * @param fs filesystem handle - * @param root path for stored and archived hlogs - * @param logDir dir where hlogs are stored - * @param conf configuration to use - * @param listeners Listeners on WAL events. Listeners passed here will - * be registered before we do anything else; e.g. the - * Constructor {@link #rollWriter()}. - * @param prefix should always be hostname and port in distributed env and - * it will be URL encoded before being used. - * If prefix is null, "hlog" will be used - * @throws IOException - */ - public FSHLog(final FileSystem fs, final Path root, final String logDir, - final Configuration conf, final List listeners, - final String prefix) throws IOException { - this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners, true, prefix, - false); - } - - /** * Create an edit log at the given dir location. * * You should never have to load an existing log. If there is a log at @@ -448,7 +418,7 @@ class FSHLog implements HLog, Syncable { this.forMeta = forMeta; this.conf = conf; - // Register listeners. + // Register listeners. TODO: Should this exist anymore? We have CPs? if (listeners != null) { for (WALActionsListener i: listeners) { registerWALActionsListener(i); @@ -456,7 +426,7 @@ class FSHLog implements HLog, Syncable { } // Get size to roll log at. Roll at 95% of HDFS block size so we avoid crossing HDFS blocks - // (it costs x'ing bocks) + // (it costs a little x'ing bocks) long blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize", FSUtils.getDefaultBlockSize(this.fs, this.fullPathLogDir)); this.logrollsize = @@ -496,7 +466,8 @@ class FSHLog implements HLog, Syncable { // rollWriter sets this.hdfs_out if it can. rollWriter(); - // handle the reflection necessary to call getNumCurrentReplicas() + // handle the reflection necessary to call getNumCurrentReplicas(). TODO: Replace with + // HdfsDataOutputStream#getCurrentBlockReplication() and go without reflection. this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out); this.coprocessorHost = new WALCoprocessorHost(this, conf); @@ -537,6 +508,8 @@ class FSHLog implements HLog, Syncable { * @return Method or null. */ private static Method getGetNumCurrentReplicas(final FSDataOutputStream os) { + // TODO: Remove all this and use the now publically available + // HdfsDataOutputStream#getCurrentBlockReplication() Method m = null; if (os != null) { Class wrappedStreamClass = os.getWrappedStream().getClass(); @@ -909,7 +882,7 @@ class FSHLog implements HLog, Syncable { long oldFileLen = this.fs.getFileStatus(oldPath).getLen(); this.totalLogSize.addAndGet(oldFileLen); LOG.info("Rolled WAL " + FSUtils.getPath(oldPath) + " with entries=" + oldNumEntries + - ", filesize=" + StringUtils.humanReadableInt(oldFileLen) + "; new WAL " + + ", filesize=" + StringUtils.byteDesc(oldFileLen) + "; new WAL " + FSUtils.getPath(newPath)); } else { LOG.info("New WAL " + FSUtils.getPath(newPath)); @@ -1088,96 +1061,81 @@ class FSHLog implements HLog, Syncable { } } - /** - * @param now - * @param encodedRegionName Encoded name of the region as returned by - * HRegionInfo#getEncodedNameAsBytes(). - * @param tableName - * @param clusterIds that have consumed the change - * @return New log key. - */ - protected HLogKey makeKey(byte[] encodedRegionName, TableName tableName, long seqnum, - long now, List clusterIds, long nonceGroup, long nonce) { - return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterIds, nonceGroup, nonce); - } - @Override @VisibleForTesting public void append(HRegionInfo info, TableName tableName, WALEdit edits, final long now, HTableDescriptor htd, AtomicLong sequenceId) throws IOException { - append(info, tableName, edits, new ArrayList(), now, htd, true, true, sequenceId, - HConstants.NO_NONCE, HConstants.NO_NONCE); + HLogKey logKey = new HLogKey(info.getEncodedNameAsBytes(), tableName, now); + append(htd, info, logKey, edits, sequenceId, true, true); } @Override - public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits, + public long appendNoSync(final HRegionInfo info, TableName tableName, WALEdit edits, List clusterIds, final long now, HTableDescriptor htd, AtomicLong sequenceId, - boolean isInMemstore, long nonceGroup, long nonce) throws IOException { - return append(info, tableName, edits, clusterIds, now, htd, false, isInMemstore, sequenceId, - nonceGroup, nonce); + boolean inMemstore, long nonceGroup, long nonce) throws IOException { + HLogKey logKey = + new HLogKey(info.getEncodedNameAsBytes(), tableName, now, clusterIds, nonceGroup, nonce); + return append(htd, info, logKey, edits, sequenceId, false, inMemstore); + } + + @Override + public long appendNoSync(final HTableDescriptor htd, final HRegionInfo info, final HLogKey key, + final WALEdit edits, final AtomicLong sequenceId, final boolean inMemstore) + throws IOException { + return append(htd, info, key, edits, sequenceId, false, inMemstore); } /** * Append a set of edits to the log. Log edits are keyed by (encoded) regionName, rowname, and * log-sequence-id. - * - * Later, if we sort by these keys, we obtain all the relevant edits for a given key-range of the - * HRegion (TODO). Any edits that do not have a matching COMPLETE_CACHEFLUSH message can be - * discarded. - * - *

Logs cannot be restarted once closed, or once the HLog process dies. Each time the HLog - * starts, it must create a new log. This means that other systems should process the log - * appropriately upon each startup (and prior to initializing HLog). - * - * Synchronized prevents appends during the completion of a cache flush or for the duration of a - * log roll. - * - * @param info - * @param tableName + * @param key * @param edits - * @param clusterIds that have consumed the change (for replication) - * @param now - * @param htd - * @param doSync shall we sync after we call the append? + * @param htd This comes in here just so it is available on a pre append for replications. Get + * rid of it. It is kinda crazy this comes in here when we have tablename and regioninfo. + * Replication gets its scope from the HTD. + * @param hri region info + * @param sync shall we sync after we call the append? * @param inMemstore - * @param sequenceId of the region. - * @param nonceGroup - * @param nonce + * @param sequenceId The region sequence id reference. * @return txid of this transaction or if nothing to do, the last txid * @throws IOException */ - private long append(HRegionInfo info, TableName tableName, WALEdit edits, List clusterIds, - final long now, HTableDescriptor htd, boolean doSync, boolean inMemstore, - AtomicLong sequenceId, long nonceGroup, long nonce) + private long append(HTableDescriptor htd, final HRegionInfo hri, final HLogKey key, + WALEdit edits, AtomicLong sequenceId, boolean sync, boolean inMemstore) throws IOException { - if (!this.enabled || edits.isEmpty()) return this.highestUnsyncedSequence; + if (!this.enabled) return this.highestUnsyncedSequence; if (this.closed) throw new IOException("Cannot append; log is closed"); + // Make a trace scope for the append. It is closed on other side of the ring buffer by the + // single consuming thread. Don't have to worry about it. TraceScope scope = Trace.startSpan("FSHLog.append"); - // Make a key but do not set the WALEdit by region sequence id now -- set it to -1 for now -- - // and then later just before we write it out to the DFS stream, then set the sequence id; - // late-binding. - HLogKey logKey = - makeKey(info.getEncodedNameAsBytes(), tableName, -1, now, clusterIds, nonceGroup, nonce); // This is crazy how much it takes to make an edit. Do we need all this stuff!!!!???? We need - // all the stuff to make a key and then below to append the edit, we need to carry htd, info, + // all this to make a key and then below to append the edit, we need to carry htd, info, // etc. all over the ring buffer. + FSWALEntry entry = null; long sequence = this.disruptor.getRingBuffer().next(); try { RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence); - FSWALEntry entry = - new FSWALEntry(sequence, logKey, edits, sequenceId, inMemstore, htd, info); + // Construction of FSWALEntry sets a latch. The latch is thrown just after we stamp the + // edit with its edit/sequence id. The below entry.getRegionSequenceId will wait on the + // latch to be thrown. TODO: reuse FSWALEntry as we do SyncFuture rather create per append. + entry = new FSWALEntry(sequence, key, edits, sequenceId, inMemstore, htd, hri); truck.loadPayload(entry, scope.detach()); } finally { this.disruptor.getRingBuffer().publish(sequence); + // Now wait until the region edit/sequence id is available. The 'entry' has an internal + // latch that is thrown when the region edit/sequence id is set. Calling + // entry.getRegionSequenceId will cause us block until the latch is thrown. The return is + // the region edit/sequence id, not the ring buffer txid. + try { + entry.getRegionSequenceId(); + } catch (InterruptedException e) { + throw convertInterruptedExceptionToIOException(e); + } } // doSync is set in tests. Usually we arrive in here via appendNoSync w/ the sync called after // all edits on a handler have been added. - // - // When we sync, we will sync to the current point, the txid of the last edit added. - // Since we are single writer, the next txid should be the just next one in sequence; - // do not explicitly specify it. Sequence id/txid is an implementation internal detail. - if (doSync) sync(); + if (sync) sync(sequence); return sequence; } @@ -1433,15 +1391,20 @@ class FSHLog implements HLog, Syncable { syncFuture.get(); return syncFuture.getSpan(); } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - IOException ioe = new InterruptedIOException(); - ioe.initCause(ie); - throw ioe; + LOG.warn("Interrupted", ie); + throw convertInterruptedExceptionToIOException(ie); } catch (ExecutionException e) { throw ensureIOException(e.getCause()); } } + private IOException convertInterruptedExceptionToIOException(final InterruptedException ie) { + Thread.currentThread().interrupt(); + IOException ioe = new InterruptedIOException(); + ioe.initCause(ie); + return ioe; + } + private SyncFuture getSyncFuture(final long sequence, Span span) { SyncFuture syncFuture = this.syncFuturesByHandler.get(Thread.currentThread()); if (syncFuture == null) { @@ -1876,7 +1839,7 @@ class FSHLog implements HLog, Syncable { attainSafePoint(sequence); this.syncFuturesCount = 0; } catch (Throwable t) { - LOG.error("UNEXPECTED!!!", t); + LOG.error("UNEXPECTED!!! syncFutures.length=" + this.syncFutures.length, t); } } @@ -1915,18 +1878,17 @@ class FSHLog implements HLog, Syncable { * @throws Exception */ void append(final FSWALEntry entry) throws Exception { - // TODO: WORK ON MAKING THIS APPEND FASTER. OING WAY TOO MUCH WORK WITH CPs, PBing, etc. + // TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc. + atHeadOfRingBufferEventHandlerAppend(); long start = EnvironmentEdgeManager.currentTimeMillis(); byte [] encodedRegionName = entry.getKey().getEncodedRegionName(); + long regionSequenceId = HLog.NO_SEQUENCE_ID; try { // We are about to append this edit; update the region-scoped sequence number. Do it // here inside this single appending/writing thread. Events are ordered on the ringbuffer // so region sequenceids will also be in order. - long regionSequenceId = entry.getRegionSequenceIdReference().incrementAndGet(); - // Set the region-scoped sequence number back up into the key ("late-binding" -- - // setting before append). - entry.getKey().setLogSeqNum(regionSequenceId); + regionSequenceId = entry.stampRegionSequenceId(); // Coprocessor hook. if (!coprocessorHost.preWALWrite(entry.getHRegionInfo(), entry.getKey(), entry.getEdit())) { @@ -1942,13 +1904,18 @@ class FSHLog implements HLog, Syncable { entry.getEdit()); } } - writer.append(entry); - assert highestUnsyncedSequence < entry.getSequence(); - highestUnsyncedSequence = entry.getSequence(); - Long lRegionSequenceId = Long.valueOf(regionSequenceId); - highestRegionSequenceIds.put(encodedRegionName, lRegionSequenceId); - if (entry.isInMemstore()) { - oldestUnflushedRegionSequenceIds.putIfAbsent(encodedRegionName, lRegionSequenceId); + // If empty, there is nothing to append. Maybe empty when we are looking for a region + // sequence id only, a region edit/sequence id that is not associated with an actual edit. + // It has to go through all the rigmarole to be sure we have the right ordering. + if (!entry.getEdit().isEmpty()) { + writer.append(entry); + assert highestUnsyncedSequence < entry.getSequence(); + highestUnsyncedSequence = entry.getSequence(); + Long lRegionSequenceId = Long.valueOf(regionSequenceId); + highestRegionSequenceIds.put(encodedRegionName, lRegionSequenceId); + if (entry.isInMemstore()) { + oldestUnflushedRegionSequenceIds.putIfAbsent(encodedRegionName, lRegionSequenceId); + } } coprocessorHost.postWALWrite(entry.getHRegionInfo(), entry.getKey(), entry.getEdit()); // Update metrics. @@ -1972,6 +1939,14 @@ class FSHLog implements HLog, Syncable { } } + /** + * Exposed for testing only. Use to tricks like halt the ring buffer appending. + */ + @VisibleForTesting + void atHeadOfRingBufferEventHandlerAppend() { + // Noop + } + private static IOException ensureIOException(final Throwable t) { return (t instanceof IOException)? (IOException)t: new IOException(t); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java index 0d65a54..75ee25a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver.wal; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.classification.InterfaceAudience; @@ -25,10 +26,12 @@ import org.apache.hadoop.hbase.HTableDescriptor; /** * A WAL Entry for {@link FSHLog} implementation. Immutable. - * It is a subclass of {@link HLog.Entry} that carries extra info across the ring buffer such as + * A subclass of {@link HLog.Entry} that carries extra info across the ring buffer such as * region sequence id (we want to use this later, just before we write the WAL to ensure region * edits maintain order). The extra info added here is not 'serialized' as part of the WALEdit - * hence marked 'transient' to underline this fact. + * hence marked 'transient' to underline this fact. It also adds mechanism so we can wait on + * the assign of the region sequence id. See {@link #setRegionSequenceId(long)} and + * {@link #getRegionSequenceId()}. */ @InterfaceAudience.Private class FSWALEntry extends HLog.Entry { @@ -39,6 +42,9 @@ class FSWALEntry extends HLog.Entry { private final transient boolean inMemstore; private final transient HTableDescriptor htd; private final transient HRegionInfo hri; + // Latch that is set on creation and then is undone on the other side of the ring buffer by the + // consumer thread just after it sets the region edit/sequence id in here. + private final transient CountDownLatch latch = new CountDownLatch(1); FSWALEntry(final long sequence, final HLogKey key, final WALEdit edit, final AtomicLong referenceToRegionSequenceId, final boolean inMemstore, @@ -55,10 +61,6 @@ class FSWALEntry extends HLog.Entry { return "sequence=" + this.sequence + ", " + super.toString(); }; - AtomicLong getRegionSequenceIdReference() { - return this.regionSequenceIdReference; - } - boolean isInMemstore() { return this.inMemstore; } @@ -77,4 +79,27 @@ class FSWALEntry extends HLog.Entry { long getSequence() { return this.sequence; } + + /** + * Stamp this edit with a region edit/sequence id. + * Call when safe to do so: i.e. the context is such that the increment on the passed in + * {@link #regionSequenceIdReference} is guaranteed aligned w/ how appends are going into the + * WAL. This method works with {@link #getRegionSequenceId()}. It will block waiting on this + * method if on initialization our edit/sequence id is {@link HLogKey#NO_SEQ_NO}. + * @return The region edit/sequence id we set for this edit. + * @see #getRegionSequenceId() + */ + long stampRegionSequenceId() { + long regionSequenceId = this.regionSequenceIdReference.incrementAndGet(); + getKey().setLogSeqNum(regionSequenceId); + // On creation, a latch was set. Count it down when sequence id is set. This will free + // up anyone blocked on {@link #getRegionSequenceId()} + this.latch.countDown(); + return regionSequenceId; + } + + long getRegionSequenceId() throws InterruptedException { + this.latch.await(); + return getKey().getLogSeqNum(); + } } \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 0917c8b..03a642d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -49,6 +49,7 @@ import com.google.common.annotations.VisibleForTesting; // TODO: Rename interface to WAL public interface HLog { Log LOG = LogFactory.getLog(HLog.class); + public static final long NO_SEQUENCE_ID = -1; /** File Extension used while splitting an HLog into regions (HBASE-2312) */ // TODO: this seems like an implementation detail that does not belong here. @@ -288,6 +289,9 @@ public interface HLog { * @param htd * @param sequenceId * @throws IOException + * @deprecated For tests only and even then, should use + * {@link #appendNoSync(HTableDescriptor, HRegionInfo, HLogKey, WALEdit, AtomicLong, boolean)} + * and {@link #sync()} instead. */ @VisibleForTesting public void append(HRegionInfo info, TableName tableName, WALEdit edits, @@ -311,8 +315,9 @@ public interface HLog { /** * Append a set of edits to the WAL. WAL edits are keyed by (encoded) regionName, rowname, and - * log-sequence-id. The WAL is not flushed/sync'd after this transaction completes. - * Call {@link #sync()} to flush/sync all outstanding edits/appends. + * log-sequence-id. The WAL is not flushed/sync'd after this transaction completes BUT on return + * this edit must have its region edit/sequence id assigned else it messes up our unification + * of mvcc and sequenceid. * @param info * @param tableName * @param edits @@ -332,11 +337,39 @@ public interface HLog { * able to sync an explicit edit only (the current default implementation syncs up to the time * of the sync call syncing whatever is behind the sync). * @throws IOException + * @deprecated Use {@link #appendNoSync(HRegionInfo, HLogKey, WALEdit, HTableDescriptor, + * AtomicLong, boolean)} + * instead because you can get back the region edit/sequenceid; it is set into the passed in + * key. */ long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits, List clusterIds, final long now, HTableDescriptor htd, AtomicLong sequenceId, boolean isInMemstore, long nonceGroup, long nonce) throws IOException; + /** + * Append a set of edits to the WAL. The WAL is not flushed/sync'd after this transaction + * completes BUT on return this edit must have its region edit/sequence id assigned + * else it messes up our unification of mvcc and sequenceid. On return key will + * have the region edit/sequence id filled in. + * @param info + * @param key Modified by this call; we add to it this edits region edit/sequence id. + * @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit + * sequence id that is after all currently appended edits. + * @param htd + * @param sequenceId A reference to the atomic long the info region is using as + * source of its incrementing edits sequence id. Inside in this call we will increment it and + * attach the sequence to the edit we apply the WAL. + * @param inMemstore Always true except for case where we are writing a compaction completion + * record into the WAL; in this case the entry is just so we can finish an unfinished compaction + * -- it is not an edit for memstore. + * @return Returns a 'transaction id' and key will have the region edit/sequence id + * in it. + * @throws IOException + */ + long appendNoSync(HTableDescriptor htd, HRegionInfo info, HLogKey key, WALEdit edits, + AtomicLong sequenceId, boolean inMemstore) + throws IOException; + // TODO: Do we need all these versions of sync? void hsync() throws IOException; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java index 85aaef8..435a3e3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java @@ -47,13 +47,14 @@ public class HLogFactory { public static HLog createHLog(final FileSystem fs, final Path root, final String logName, final String oldLogName, final Configuration conf) throws IOException { - return new FSHLog(fs, root, logName, oldLogName, conf); + return new FSHLog(fs, root, logName, oldLogName, conf, null, true, null, false); } public static HLog createHLog(final FileSystem fs, final Path root, final String logName, final Configuration conf, final List listeners, final String prefix) throws IOException { - return new FSHLog(fs, root, logName, conf, listeners, prefix); + return new FSHLog(fs, root, logName, HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners, + true, prefix, false); } public static HLog createMetaHLog(final FileSystem fs, final Path root, final String logName, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java index 4563cf8..4f7b375 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java @@ -23,6 +23,7 @@ import java.io.DataOutput; import java.io.EOFException; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -31,7 +32,6 @@ import java.util.NavigableMap; import java.util.TreeMap; import java.util.UUID; -import com.google.protobuf.HBaseZeroCopyByteString; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -43,11 +43,13 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FamilyScope; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.ScopeType; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableUtils; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; +import com.google.protobuf.HBaseZeroCopyByteString; /** * A Key for an entry in the change log. @@ -122,6 +124,7 @@ public class HLogKey implements WritableComparable { private long nonceGroup = HConstants.NO_NONCE; private long nonce = HConstants.NO_NONCE; + static final List EMPTY_UUIDS = Collections.unmodifiableList(new ArrayList()); private CompressionContext compressionContext; @@ -139,10 +142,20 @@ public class HLogKey implements WritableComparable { HConstants.NO_NONCE, HConstants.NO_NONCE); } + public HLogKey(final byte[] encodedRegionName, final TableName tablename) { + this(encodedRegionName, tablename, System.currentTimeMillis()); + } + + public HLogKey(final byte[] encodedRegionName, final TableName tablename, final long now) { + init(encodedRegionName, tablename, HLog.NO_SEQUENCE_ID, now, + EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE); + } + /** * Create the log key for writing to somewhere. * We maintain the tablename mainly for debugging purposes. * A regionName is always a sub-table object. + *

Used by log splitting and snapshots. * * @param encodedRegionName Encoded name of the region as returned by * HRegionInfo#getEncodedNameAsBytes(). @@ -156,6 +169,41 @@ public class HLogKey implements WritableComparable { init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce); } + /** + * Create the log key for writing to somewhere. + * We maintain the tablename mainly for debugging purposes. + * A regionName is always a sub-table object. + * + * @param encodedRegionName Encoded name of the region as returned by + * HRegionInfo#getEncodedNameAsBytes(). + * @param tablename - name of table + * @param logSeqNum - log sequence number + * @param now Time at which this edit was written. + * @param clusterIds the clusters that have consumed the change(used in Replication) + */ + public HLogKey(final byte [] encodedRegionName, final TableName tablename, + final long now, List clusterIds, long nonceGroup, long nonce) { + init(encodedRegionName, tablename, HLog.NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce); + } + + /** + * Create the log key for writing to somewhere. + * We maintain the tablename mainly for debugging purposes. + * A regionName is always a sub-table object. + * + * @param encodedRegionName Encoded name of the region as returned by + * HRegionInfo#getEncodedNameAsBytes(). + * @param tablename - name of table + * @param logSeqNum - log sequence number + * @param now Time at which this edit was written. + * @param clusterIds the clusters that have consumed the change(used in Replication) + */ + public HLogKey(final byte [] encodedRegionName, final TableName tablename, long nonceGroup, + long nonce) { + init(encodedRegionName, tablename, HLog.NO_SEQUENCE_ID, + EnvironmentEdgeManager.currentTimeMillis(), EMPTY_UUIDS, nonceGroup, nonce); + } + protected void init(final byte [] encodedRegionName, final TableName tablename, long logSeqNum, final long now, List clusterIds, long nonceGroup, long nonce) { this.logSeqNum = logSeqNum; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java index 9825fee..fcb5610 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java @@ -20,10 +20,8 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.IOException; -import java.util.ArrayList; import java.util.NavigableSet; import java.util.TreeSet; -import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -35,13 +33,12 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import com.google.protobuf.TextFormat; @@ -263,13 +260,10 @@ public class HLogUtil { */ public static void writeCompactionMarker(HLog log, HTableDescriptor htd, HRegionInfo info, final CompactionDescriptor c, AtomicLong sequenceId) throws IOException { - WALEdit e = WALEdit.createCompaction(c); - long now = EnvironmentEdgeManager.currentTimeMillis(); TableName tn = TableName.valueOf(c.getTableName().toByteArray()); - log.appendNoSync(info, tn, e, new ArrayList(), now, htd, sequenceId, false, - HConstants.NO_NONCE, HConstants.NO_NONCE); + HLogKey key = new HLogKey(info.getEncodedNameAsBytes(), tn); + log.appendNoSync(htd, info, key, WALEdit.createCompaction(c), sequenceId, false); log.sync(); - if (LOG.isTraceEnabled()) { LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c)); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/RingBufferTruck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/RingBufferTruck.java index cc9ec22..de3f4f0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/RingBufferTruck.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/RingBufferTruck.java @@ -79,14 +79,6 @@ class RingBufferTruck { } /** - * return {@code true} when this truck is carrying a {@link Span}, - * {@code false} otherwise. - */ - boolean hasSpanPayload() { - return this.span != null; - } - - /** * Unload the truck of its {@link FSWALEntry} payload. The internal refernce is released. */ FSWALEntry unloadFSWALEntryPayload() { @@ -105,7 +97,7 @@ class RingBufferTruck { } /** - * Unload the truck of its {@link Span} payload. The internal refernce is released. + * Unload the truck of its {@link Span} payload. The internal reference is released. */ Span unloadSpanPayload() { Span ret = this.span; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java index 695d18a..ba45720 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java @@ -48,6 +48,9 @@ import org.htrace.Span; */ @InterfaceAudience.Private class SyncFuture { + // Implementation notes: I tried using a cyclicbarrier in here for handler and sync threads + // to coordinate on but it did not give any obvious advantage and some issues with order in which + // events happen. private static final long NOT_DONE = 0; /** @@ -187,4 +190,4 @@ class SyncFuture { synchronized Throwable getThrowable() { return this.throwable; } -} +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java index 25c815b..920340c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java @@ -87,9 +87,11 @@ public interface WALActionsListener { * @param htd * @param logKey * @param logEdit + * TODO: Retire this in favor of {@link #visitLogEntryBeforeWrite(HRegionInfo, HLogKey, WALEdit)} + * It only exists to get scope when replicating. Scope should be in the HLogKey and not need + * us passing in a htd. */ void visitLogEntryBeforeWrite( HTableDescriptor htd, HLogKey logKey, WALEdit logEdit ); - -} +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java index 6e5d250..b10c4a9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java @@ -80,13 +80,14 @@ public class WALEdit implements Writable, HeapSize { // TODO: Get rid of this; see HBASE-8457 public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY"); static final byte [] METAROW = Bytes.toBytes("METAROW"); - static final byte[] COMPLETE_CACHE_FLUSH = Bytes.toBytes("HBASE::CACHEFLUSH"); static final byte[] COMPACTION = Bytes.toBytes("HBASE::COMPACTION"); private final int VERSION_2 = -1; private final boolean isReplay; private final ArrayList kvs = new ArrayList(1); + public static final WALEdit EMPTY_WALEDIT = new WALEdit(); + // Only here for legacy writable deserialization @Deprecated private NavigableMap scopes; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index ae912fa..cad6a9e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -34,9 +34,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.eq; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -4184,8 +4182,8 @@ public class TestHRegion { //verify append called or not verify(log, expectAppend ? times(1) : never()) - .appendNoSync((HRegionInfo)any(), eq(tableName), (WALEdit)any(), (List)any(), - anyLong(), (HTableDescriptor)any(), (AtomicLong)any(), anyBoolean(), anyLong(), anyLong()); + .appendNoSync((HTableDescriptor)any(), (HRegionInfo)any(), (HLogKey)any(), + (WALEdit)any(), (AtomicLong)any(), Mockito.anyBoolean()); // verify sync called or not if (expectSync || expectSyncFromLogSyncer) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java index 7467fe0..8737f93 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java @@ -18,17 +18,26 @@ */ package org.apache.hadoop.hbase.regionserver.wal; -import static org.junit.Assert.*; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; import java.lang.reflect.Method; import java.net.BindException; +import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; @@ -38,14 +47,31 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdge; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.hbase.Coprocessor; -import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; -import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -137,6 +163,86 @@ public class TestHLog { } /** + * Test flush for sure has a sequence id that is beyond the last edit appended. We do this + * by slowing appends in the background ring buffer thread while in foreground we call + * flush. The addition of the sync over HRegion in flush should fix an issue where flush was + * returning before all of its appends had made it out to the WAL (HBASE-11109). + * @throws IOException + * @see HBASE-11109 + */ + @Test + public void testFlushSequenceIdIsGreaterThanAllEditsInHFile() throws IOException { + String testName = "testFlushSequenceIdIsGreaterThanAllEditsInHFile"; + final TableName tableName = TableName.valueOf(testName); + final HRegionInfo hri = new HRegionInfo(tableName); + final byte[] rowName = tableName.getName(); + final HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor("f")); + HRegion r = HRegion.createHRegion(hri, TEST_UTIL.getDefaultRootDirPath(), + TEST_UTIL.getConfiguration(), htd); + HRegion.closeHRegion(r); + final int countPerFamily = 10; + final MutableBoolean goslow = new MutableBoolean(false); + // Bypass factory so I can subclass and doctor a method. + FSHLog wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDefaultRootDirPath(), + testName, conf) { + @Override + void atHeadOfRingBufferEventHandlerAppend() { + if (goslow.isTrue()) { + Threads.sleep(100); + LOG.debug("Sleeping before appending 100ms"); + } + super.atHeadOfRingBufferEventHandlerAppend(); + } + }; + HRegion region = HRegion.openHRegion(TEST_UTIL.getConfiguration(), + TEST_UTIL.getTestFileSystem(), TEST_UTIL.getDefaultRootDirPath(), hri, htd, wal); + EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate(); + try { + List puts = null; + for (HColumnDescriptor hcd: htd.getFamilies()) { + puts = + TestWALReplay.addRegionEdits(rowName, hcd.getName(), countPerFamily, ee, region, "x"); + } + + // Now assert edits made it in. + final Get g = new Get(rowName); + Result result = region.get(g); + assertEquals(countPerFamily * htd.getFamilies().size(), result.size()); + + // Construct a WALEdit and add it a few times to the WAL. + WALEdit edits = new WALEdit(); + for (Put p: puts) { + CellScanner cs = p.cellScanner(); + while (cs.advance()) { + edits.add(KeyValueUtil.ensureKeyValue(cs.current())); + } + } + // Add any old cluster id. + List clusterIds = new ArrayList(); + clusterIds.add(UUID.randomUUID()); + // Now make appends run slow. + goslow.setValue(true); + for (int i = 0; i < countPerFamily; i++) { + wal.appendNoSync(region.getRegionInfo(), tableName, edits, + clusterIds, System.currentTimeMillis(), htd, region.getSequenceId(), true, -1, -1); + } + region.flushcache(); + // FlushResult.flushSequenceId is not visible here so go get the current sequence id. + long currentSequenceId = region.getSequenceId().get(); + // Now release the appends + goslow.setValue(false); + synchronized (goslow) { + goslow.notifyAll(); + } + assertTrue(currentSequenceId >= region.getSequenceId().get()); + } finally { + region.close(true); + wal.close(); + } + } + + /** * Write to a log file with three concurrent threads and verifying all data is written. * @throws Exception */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java index 487ac63..936a2e0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java @@ -433,153 +433,156 @@ public class TestLogRolling { LOG.info("Replication=" + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); // When the hbase:meta table can be opened, the region servers are running - new HTable(TEST_UTIL.getConfiguration(), TableName.META_TABLE_NAME); - - this.server = cluster.getRegionServer(0); - this.log = server.getWAL(); - - // Create the test table and open it - String tableName = getName(); - HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName)); - desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)); - - admin.createTable(desc); - HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName); - - server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName)); - this.log = server.getWAL(); - final List paths = new ArrayList(); - final List preLogRolledCalled = new ArrayList(); - paths.add(((FSHLog) log).computeFilename()); - log.registerWALActionsListener(new WALActionsListener() { - @Override - public void preLogRoll(Path oldFile, Path newFile) { - LOG.debug("preLogRoll: oldFile="+oldFile+" newFile="+newFile); - preLogRolledCalled.add(new Integer(1)); - } - @Override - public void postLogRoll(Path oldFile, Path newFile) { - paths.add(newFile); + HTable t = new HTable(TEST_UTIL.getConfiguration(), TableName.META_TABLE_NAME); + try { + this.server = cluster.getRegionServer(0); + this.log = server.getWAL(); + + // Create the test table and open it + String tableName = getName(); + HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName)); + desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)); + + admin.createTable(desc); + HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName); + + server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName)); + this.log = server.getWAL(); + final List paths = new ArrayList(); + final List preLogRolledCalled = new ArrayList(); + paths.add(((FSHLog) log).computeFilename()); + log.registerWALActionsListener(new WALActionsListener() { + @Override + public void preLogRoll(Path oldFile, Path newFile) { + LOG.debug("preLogRoll: oldFile="+oldFile+" newFile="+newFile); + preLogRolledCalled.add(new Integer(1)); + } + @Override + public void postLogRoll(Path oldFile, Path newFile) { + paths.add(newFile); + } + @Override + public void preLogArchive(Path oldFile, Path newFile) {} + @Override + public void postLogArchive(Path oldFile, Path newFile) {} + @Override + public void logRollRequested() {} + @Override + public void logCloseRequested() {} + @Override + public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey, + WALEdit logEdit) {} + @Override + public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey, + WALEdit logEdit) {} + }); + + assertTrue("Need HDFS-826 for this test", ((FSHLog) log).canGetCurReplicas()); + // don't run this test without append support (HDFS-200 & HDFS-142) + assertTrue("Need append support for this test", FSUtils + .isAppendSupported(TEST_UTIL.getConfiguration())); + + writeData(table, 1002); + + table.setAutoFlush(true, true); + + long curTime = System.currentTimeMillis(); + long oldFilenum = log.getFilenum(); + assertTrue("Log should have a timestamp older than now", + curTime > oldFilenum && oldFilenum != -1); + + assertTrue("The log shouldn't have rolled yet", oldFilenum == log.getFilenum()); + + // roll all datanodes in the pipeline + dfsCluster.restartDataNodes(); + Thread.sleep(1000); + dfsCluster.waitActive(); + LOG.info("Data Nodes restarted"); + validateData(table, 1002); + + // this write should succeed, but trigger a log roll + writeData(table, 1003); + long newFilenum = log.getFilenum(); + + assertTrue("Missing datanode should've triggered a log roll", + newFilenum > oldFilenum && newFilenum > curTime); + validateData(table, 1003); + + writeData(table, 1004); + + // roll all datanode again + dfsCluster.restartDataNodes(); + Thread.sleep(1000); + dfsCluster.waitActive(); + LOG.info("Data Nodes restarted"); + validateData(table, 1004); + + // this write should succeed, but trigger a log roll + writeData(table, 1005); + + // force a log roll to read back and verify previously written logs + log.rollWriter(true); + assertTrue("preLogRolledCalled has size of " + preLogRolledCalled.size(), + preLogRolledCalled.size() >= 1); + + // read back the data written + Set loggedRows = new HashSet(); + FSUtils fsUtils = FSUtils.getInstance(fs, TEST_UTIL.getConfiguration()); + for (Path p : paths) { + LOG.debug("recovering lease for " + p); + fsUtils.recoverFileLease(((HFileSystem)fs).getBackingFs(), p, + TEST_UTIL.getConfiguration(), null); + + LOG.debug("Reading HLog "+FSUtils.getPath(p)); + HLog.Reader reader = null; + try { + reader = HLogFactory.createReader(fs, p, + TEST_UTIL.getConfiguration()); + HLog.Entry entry; + while ((entry = reader.next()) != null) { + LOG.debug("#"+entry.getKey().getLogSeqNum()+": "+entry.getEdit().getKeyValues()); + for (KeyValue kv : entry.getEdit().getKeyValues()) { + loggedRows.add(Bytes.toStringBinary(kv.getRow())); + } + } + } catch (EOFException e) { + LOG.debug("EOF reading file "+FSUtils.getPath(p)); + } finally { + if (reader != null) reader.close(); + } } - @Override - public void preLogArchive(Path oldFile, Path newFile) {} - @Override - public void postLogArchive(Path oldFile, Path newFile) {} - @Override - public void logRollRequested() {} - @Override - public void logCloseRequested() {} - @Override - public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey, - WALEdit logEdit) {} - @Override - public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey, - WALEdit logEdit) {} - }); - - assertTrue("Need HDFS-826 for this test", ((FSHLog) log).canGetCurReplicas()); - // don't run this test without append support (HDFS-200 & HDFS-142) - assertTrue("Need append support for this test", FSUtils - .isAppendSupported(TEST_UTIL.getConfiguration())); - - writeData(table, 1002); - - table.setAutoFlush(true, true); - - long curTime = System.currentTimeMillis(); - long oldFilenum = log.getFilenum(); - assertTrue("Log should have a timestamp older than now", - curTime > oldFilenum && oldFilenum != -1); - - assertTrue("The log shouldn't have rolled yet", oldFilenum == log.getFilenum()); - - // roll all datanodes in the pipeline - dfsCluster.restartDataNodes(); - Thread.sleep(1000); - dfsCluster.waitActive(); - LOG.info("Data Nodes restarted"); - validateData(table, 1002); - - // this write should succeed, but trigger a log roll - writeData(table, 1003); - long newFilenum = log.getFilenum(); - assertTrue("Missing datanode should've triggered a log roll", - newFilenum > oldFilenum && newFilenum > curTime); - validateData(table, 1003); - - writeData(table, 1004); + // verify the written rows are there + assertTrue(loggedRows.contains("row1002")); + assertTrue(loggedRows.contains("row1003")); + assertTrue(loggedRows.contains("row1004")); + assertTrue(loggedRows.contains("row1005")); - // roll all datanode again - dfsCluster.restartDataNodes(); - Thread.sleep(1000); - dfsCluster.waitActive(); - LOG.info("Data Nodes restarted"); - validateData(table, 1004); - - // this write should succeed, but trigger a log roll - writeData(table, 1005); + // flush all regions + List regions = new ArrayList(server.getOnlineRegionsLocalContext()); + for (HRegion r: regions) { + r.flushcache(); + } - // force a log roll to read back and verify previously written logs - log.rollWriter(true); - assertTrue("preLogRolledCalled has size of " + preLogRolledCalled.size(), - preLogRolledCalled.size() >= 1); - - // read back the data written - Set loggedRows = new HashSet(); - FSUtils fsUtils = FSUtils.getInstance(fs, TEST_UTIL.getConfiguration()); - for (Path p : paths) { - LOG.debug("recovering lease for " + p); - fsUtils.recoverFileLease(((HFileSystem)fs).getBackingFs(), p, TEST_UTIL.getConfiguration(), null); - - LOG.debug("Reading HLog "+FSUtils.getPath(p)); - HLog.Reader reader = null; + ResultScanner scanner = table.getScanner(new Scan()); try { - reader = HLogFactory.createReader(fs, p, - TEST_UTIL.getConfiguration()); - HLog.Entry entry; - while ((entry = reader.next()) != null) { - LOG.debug("#"+entry.getKey().getLogSeqNum()+": "+entry.getEdit().getKeyValues()); - for (KeyValue kv : entry.getEdit().getKeyValues()) { - loggedRows.add(Bytes.toStringBinary(kv.getRow())); - } + for (int i=2; i<=5; i++) { + Result r = scanner.next(); + assertNotNull(r); + assertFalse(r.isEmpty()); + assertEquals("row100"+i, Bytes.toString(r.getRow())); } - } catch (EOFException e) { - LOG.debug("EOF reading file "+FSUtils.getPath(p)); } finally { - if (reader != null) reader.close(); + scanner.close(); } - } - - // verify the written rows are there - assertTrue(loggedRows.contains("row1002")); - assertTrue(loggedRows.contains("row1003")); - assertTrue(loggedRows.contains("row1004")); - assertTrue(loggedRows.contains("row1005")); - // flush all regions - List regions = - new ArrayList(server.getOnlineRegionsLocalContext()); - for (HRegion r: regions) { - r.flushcache(); - } - - ResultScanner scanner = table.getScanner(new Scan()); - try { - for (int i=2; i<=5; i++) { - Result r = scanner.next(); - assertNotNull(r); - assertFalse(r.isEmpty()); - assertEquals("row100"+i, Bytes.toString(r.getRow())); + // verify that no region servers aborted + for (JVMClusterUtil.RegionServerThread rsThread: + TEST_UTIL.getHBaseCluster().getRegionServerThreads()) { + assertFalse(rsThread.getRegionServer().isAborted()); } } finally { - scanner.close(); - } - - // verify that no region servers aborted - for (JVMClusterUtil.RegionServerThread rsThread: - TEST_UTIL.getHBaseCluster().getRegionServerThreads()) { - assertFalse(rsThread.getRegionServer().isAborted()); + if (t != null) t.close(); } } @@ -589,57 +592,62 @@ public class TestLogRolling { */ @Test public void testCompactionRecordDoesntBlockRolling() throws Exception { - // When the hbase:meta table can be opened, the region servers are running - new HTable(TEST_UTIL.getConfiguration(), TableName.META_TABLE_NAME); - - String tableName = getName(); - HTable table = createTestTable(tableName); - String tableName2 = tableName + "1"; - HTable table2 = createTestTable(tableName2); - - server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName)); - this.log = server.getWAL(); - FSHLog fshLog = (FSHLog)log; - HRegion region = server.getOnlineRegions(table2.getName()).get(0); - Store s = region.getStore(HConstants.CATALOG_FAMILY); + HTable table = null; + HTable table2 = null; - //have to flush namespace to ensure it doesn't affect wall tests - admin.flush(TableName.NAMESPACE_TABLE_NAME.getName()); + // When the hbase:meta table can be opened, the region servers are running + HTable t = new HTable(TEST_UTIL.getConfiguration(), TableName.META_TABLE_NAME); + try { + String tableName = getName(); + table = createTestTable(tableName); + String tableName2 = tableName + "1"; + table2 = createTestTable(tableName2); + + server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName)); + this.log = server.getWAL(); + FSHLog fshLog = (FSHLog)log; + HRegion region = server.getOnlineRegions(table2.getName()).get(0); + Store s = region.getStore(HConstants.CATALOG_FAMILY); + + //have to flush namespace to ensure it doesn't affect wall tests + admin.flush(TableName.NAMESPACE_TABLE_NAME.getName()); + + // Put some stuff into table2, to make sure we have some files to compact. + for (int i = 1; i <= 2; ++i) { + doPut(table2, i); + admin.flush(table2.getTableName()); + } + doPut(table2, 3); // don't flush yet, or compaction might trigger before we roll WAL + assertEquals("Should have no WAL after initial writes", 0, fshLog.getNumRolledLogFiles()); + assertEquals(2, s.getStorefilesCount()); - // Put some stuff into table2, to make sure we have some files to compact. - for (int i = 1; i <= 2; ++i) { - doPut(table2, i); + // Roll the log and compact table2, to have compaction record in the 2nd WAL. + fshLog.rollWriter(); + assertEquals("Should have WAL; one table is not flushed", 1, fshLog.getNumRolledLogFiles()); admin.flush(table2.getTableName()); + region.compactStores(); + // Wait for compaction in case if flush triggered it before us. + Assert.assertNotNull(s); + for (int waitTime = 3000; s.getStorefilesCount() > 1 && waitTime > 0; waitTime -= 200) { + Threads.sleepWithoutInterrupt(200); + } + assertEquals("Compaction didn't happen", 1, s.getStorefilesCount()); + + // Write some value to the table so the WAL cannot be deleted until table is flushed. + doPut(table, 0); // Now 2nd WAL will have compaction record for table2 and put for table. + fshLog.rollWriter(); // 1st WAL deleted, 2nd not deleted yet. + assertEquals("Should have WAL; one table is not flushed", 1, fshLog.getNumRolledLogFiles()); + + // Flush table to make latest WAL obsolete; write another record, and roll again. + admin.flush(table.getTableName()); + doPut(table, 1); + fshLog.rollWriter(); // Now 2nd WAL is deleted and 3rd is added. + assertEquals("Should have 1 WALs at the end", 1, fshLog.getNumRolledLogFiles()); + } finally { + if (t != null) t.close(); + if (table != null) table.close(); + if (table2 != null) table2.close(); } - doPut(table2, 3); // don't flush yet, or compaction might trigger before we roll WAL - assertEquals("Should have no WAL after initial writes", 0, fshLog.getNumRolledLogFiles()); - assertEquals(2, s.getStorefilesCount()); - - // Roll the log and compact table2, to have compaction record in the 2nd WAL. - fshLog.rollWriter(); - assertEquals("Should have WAL; one table is not flushed", 1, fshLog.getNumRolledLogFiles()); - admin.flush(table2.getTableName()); - region.compactStores(); - // Wait for compaction in case if flush triggered it before us. - Assert.assertNotNull(s); - for (int waitTime = 3000; s.getStorefilesCount() > 1 && waitTime > 0; waitTime -= 200) { - Threads.sleepWithoutInterrupt(200); - } - assertEquals("Compaction didn't happen", 1, s.getStorefilesCount()); - - // Write some value to the table so the WAL cannot be deleted until table is flushed. - doPut(table, 0); // Now 2nd WAL will have compaction record for table2 and put for table. - fshLog.rollWriter(); // 1st WAL deleted, 2nd not deleted yet. - assertEquals("Should have WAL; one table is not flushed", 1, fshLog.getNumRolledLogFiles()); - - // Flush table to make latest WAL obsolete; write another record, and roll again. - admin.flush(table.getTableName()); - doPut(table, 1); - fshLog.rollWriter(); // Now 2nd WAL is deleted and 3rd is added. - assertEquals("Should have 1 WALs at the end", 1, fshLog.getNumRolledLogFiles()); - - table.close(); - table2.close(); } private void doPut(HTable table, int i) throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index 515aef0..6e51245 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -969,16 +969,19 @@ public class TestWALReplay { } } - private void addRegionEdits (final byte [] rowName, final byte [] family, + static List addRegionEdits (final byte [] rowName, final byte [] family, final int count, EnvironmentEdge ee, final HRegion r, final String qualifierPrefix) throws IOException { + List puts = new ArrayList(); for (int j = 0; j < count; j++) { byte[] qualifier = Bytes.toBytes(qualifierPrefix + Integer.toString(j)); Put p = new Put(rowName); p.add(family, qualifier, ee.currentTimeMillis(), rowName); r.put(p); + puts.add(p); } + return puts; } /* @@ -1031,6 +1034,4 @@ public class TestWALReplay { htd.addFamily(c); return htd; } - -} - +} \ No newline at end of file