diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 90976e2..a6aff82 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -208,12 +208,30 @@ public class HRegion implements HeapSize { // , Writable{ */ final AtomicBoolean closing = new AtomicBoolean(false); - protected volatile long completeSequenceId = -1L; + /** + * The sequence id of the last flush on this region. Used doing some rough calculations on + * whether time to flush or not. + */ + protected volatile long lastFlushSeqId = -1L; /** - * Region level sequence Id. It is used for appending WALEdits in HLog. Its default value is -1, - * as a marker that the region hasn't opened yet. Once it is opened, it is set to - * {@link #openSeqNum}. + * Region scoped edit sequence Id. Edits to this region are GUARANTEED to appear in the WAL/HLog + * file in this sequence id's order; i.e. edit #2 will be in the WAL after edit #1. + * Its default value is -1. This default is used as a marker to indicate that the region hasn't + * opened yet. Once it is opened, it is set to the derived {@link #openSeqNum}, the largest + * sequence id of all hfiles opened under this Region. + * + *

Control of this sequence is handed off to the WAL/HLog implementation. It is responsible + * for tagging edits with the correct sequence id since it is responsible for getting the + * edits into the WAL files. It controls updating the sequence id value. That is well and good + * except around flush where the flushing thread wants to get the sequence id that is just at the + * boundary of what is in an hfile and what is not yet flushed. To attain the sequence id at this + * boundary, we have to do some fancy dancing in the below flush method blocking updates to this + * region, flushing any region edits that might be in the underlying WAL/HLog system and then + * taking a sequence id reading. In other words, special care must be taken updating this + * sequence id (and reading the sequence id, it may not be what you think it is). Ideally the + * WAL/HLog system would 'own' this sequence. TODO. See the + * FSHLog#RingBufferEventHandler#append method for more. */ private final AtomicLong sequenceId = new AtomicLong(-1L); @@ -390,7 +408,7 @@ public class HRegion implements HeapSize { // , Writable{ /** * Objects from this class are created when flushing to describe all the different states that * that method ends up in. The Result enum describes those states. The sequence id should only - * be specified if the flush was successful, and the failure message should only be speficied + * be specified if the flush was successful, and the failure message should only be specified * if it didn't flush. */ public static class FlushResult { @@ -741,7 +759,7 @@ public class HRegion implements HeapSize { // , Writable{ this.closing.set(false); this.closed.set(false); - this.completeSequenceId = nextSeqid; + this.lastFlushSeqId = nextSeqid; if (coprocessorHost != null) { status.setStatus("Running coprocessor post-open hooks"); coprocessorHost.postOpen(); @@ -1602,7 +1620,8 @@ public class HRegion implements HeapSize { // , Writable{ * Should the memstore be flushed now */ boolean shouldFlush() { - if(this.completeSequenceId + this.flushPerChanges < this.sequenceId.get()) { + // This is a rough measure. + if (this.lastFlushSeqId + this.flushPerChanges < this.sequenceId.get()) { return true; } if (flushCheckInterval <= 0) { //disabled @@ -1625,34 +1644,16 @@ public class HRegion implements HeapSize { // , Writable{ } /** - * Flush the memstore. - * - * Flushing the memstore is a little tricky. We have a lot of updates in the - * memstore, all of which have also been written to the log. We need to - * write those updates in the memstore out to disk, while being able to - * process reads/writes as much as possible during the flush operation. Also, - * the log has to state clearly the point in time at which the memstore was - * flushed. (That way, during recovery, we know when we can rely on the - * on-disk flushed structures and when we have to recover the memstore from - * the log.) - * - *

So, we have a three-step process: - * - *

- *

This method is protected, but can be accessed via several public - * routes. - * - *

This method may block for some time. + * Flush the memstore. Flushing the memstore is a little tricky. We have a lot of updates in the + * memstore, all of which have also been written to the log. We need to write those updates in the + * memstore out to disk, while being able to process reads/writes as much as possible during the + * flush operation. + *

This method may block for some time. Every time you call it, we up the regions + * sequence id even if we don't flush; i.e. the returned region id will be at least one larger + * than the last edit applied to this region. The returned id does not refer to an actual edit. + * The returned id can be used for say installing a bulk loaded file just ahead of the last hfile + * that was the result of this flush, etc. * @param status - * * @return object describing the flush's state * * @throws IOException general io exceptions @@ -1666,10 +1667,9 @@ public class HRegion implements HeapSize { // , Writable{ /** * @param wal Null if we're NOT to go via hlog/wal. - * @param myseqid The seqid to use if wal is null writing out - * flush file. + * @param myseqid The seqid to use if wal is null writing out flush file. * @param status - * @return true if the region needs compacting + * @return object describing the flush's state * @throws IOException * @see #internalFlushcache(MonitoredTask) */ @@ -1681,49 +1681,69 @@ public class HRegion implements HeapSize { // , Writable{ throw new IOException("Aborting flush because server is abortted..."); } final long startTime = EnvironmentEdgeManager.currentTimeMillis(); - // Clear flush flag. - // If nothing to flush, return and avoid logging start/stop flush. + // If nothing to flush, return, but we need to safely update the region sequence id if (this.memstoreSize.get() <= 0) { - return new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush"); + // Take an update lock because am about to change the sequence id. + this.updatesLock.writeLock().lock(); + try { + if (this.memstoreSize.get() <= 0) { + // Presume that if there are still no edits in the memstore, then there are no edits for + // this region out in the WAL/HLog subsystem so no need to do any trickery clearing out + // edits in the WAL system. Up the sequence number so the resulting flush id is for + // sure just beyond the last flushed region edit (useful as a marker when bulk loading, + // etc.) + return new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, + this.sequenceId.incrementAndGet(), "Nothing to flush"); + } + } finally { + this.updatesLock.writeLock().unlock(); + } } if (LOG.isDebugEnabled()) { LOG.debug("Started memstore flush for " + this + ", current region memstore size " + - StringUtils.humanReadableInt(this.memstoreSize.get()) + + StringUtils.byteDesc(this.memstoreSize.get()) + ((wal != null)? "": "; wal is null, using passed sequenceid=" + myseqid)); } - // Stop updates while we snapshot the memstore of all stores. We only have - // to do this for a moment. Its quick. The subsequent sequence id that - // goes into the HLog after we've flushed all these snapshots also goes - // into the info file that sits beside the flushed files. - // We also set the memstore size to zero here before we allow updates - // again so its value will represent the size of the updates received - // during the flush + // Stop updates while we snapshot the memstore of all of these regions' stores. We only have + // to do this for a moment. It is quick. We also set the memstore size to zero here before we + // allow updates again so its value will represent the size of the updates received + // during flush MultiVersionConsistencyControl.WriteEntry w = null; - // We have to take a write lock during snapshot, or else a write could - // end up in both snapshot and memstore (makes it difficult to do atomic - // rows then) + // We have to take an update lock during snapshot, or else a write could end up in both snapshot + // and memstore (makes it difficult to do atomic rows then) status.setStatus("Obtaining lock to block concurrent updates"); // block waiting for the lock for internal flush this.updatesLock.writeLock().lock(); long totalFlushableSize = 0; - status.setStatus("Preparing to flush by snapshotting stores"); + status.setStatus("Preparing to flush by snapshotting stores in " + + getRegionInfo().getEncodedName()); List storeFlushCtxs = new ArrayList(stores.size()); long flushSeqId = -1L; try { // Record the mvcc for all transactions in progress. w = mvcc.beginMemstoreInsert(); mvcc.advanceMemstore(w); - // check if it is not closing. + // Be sure there are no region edits still outstanding for this region in the WAL system. Do + // this by adding in a sync and waiting on its completion. This will chase out all of this + // regions' outstanding edits (because we have the update lock at this moment). It is safe to + // then do the below, jigger maps of outstanding edits and take the current sequence id as + // the 'highest' sequence id at this current point. This sync under the update lock is going + // to put a road bump in our write throughtput for this region (other regions can progress + // with their writing while this is going on but there will be slight stall for this region). if (wal != null) { + wal.sync(); if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) { + // This should never happen. String msg = "Flush will not be started for [" + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing."; status.setStatus(msg); return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg); } + // Get a sequence id that we can use to denote the flush. It will be one beyond the last + // edit that made it into the hfile. flushSeqId = this.sequenceId.incrementAndGet(); } else { // use the provided sequence Id as WAL is not being used for this flush. @@ -1735,7 +1755,7 @@ public class HRegion implements HeapSize { // , Writable{ storeFlushCtxs.add(s.createFlushContext(flushSeqId)); } - // prepare flush (take a snapshot) + // Prepare flush (take a snapshot) for (StoreFlushContext flush : storeFlushCtxs) { flush.prepare(); } @@ -1747,12 +1767,6 @@ public class HRegion implements HeapSize { // , Writable{ status.setStatus(s); if (LOG.isTraceEnabled()) LOG.trace(s); - // sync unflushed WAL changes when deferred log sync is enabled - // see HBASE-8208 for details - if (wal != null && !shouldSyncLog()) { - wal.sync(); - } - // wait for all in-progress transactions to commit to HLog before // we can start the flush. This prevents // uncommitted transactions from being written into HFiles. @@ -1816,8 +1830,8 @@ public class HRegion implements HeapSize { // , Writable{ // Record latest flush time this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis(); - // Update the last flushed sequence id for region - completeSequenceId = flushSeqId; + // Update the last flushed sequence id for region. TODO: This is dup'd inside the WAL/FSHlog. + this.lastFlushSeqId = flushSeqId; // C. Finally notify anyone waiting on memstore to clear: // e.g. checkResources(). @@ -1828,9 +1842,9 @@ public class HRegion implements HeapSize { // , Writable{ long time = EnvironmentEdgeManager.currentTimeMillis() - startTime; long memstoresize = this.memstoreSize.get(); String msg = "Finished memstore flush of ~" + - StringUtils.humanReadableInt(totalFlushableSize) + "/" + totalFlushableSize + + StringUtils.byteDesc(totalFlushableSize) + "/" + totalFlushableSize + ", currentsize=" + - StringUtils.humanReadableInt(memstoresize) + "/" + memstoresize + + StringUtils.byteDesc(memstoresize) + "/" + memstoresize + " for region " + this + " in " + time + "ms, sequenceid=" + flushSeqId + ", compaction requested=" + compactionRequested + ((wal == null)? "; wal=null": ""); @@ -2516,7 +2530,7 @@ public class HRegion implements HeapSize { // , Writable{ } // txid should always increase, so having the one from the last call is ok. txid = this.log.appendNoSync(this.getRegionInfo(), htableDescriptor.getTableName(), - walEdit, m.getClusterIds(), now, htableDescriptor, this.sequenceId, true, + walEdit, m.getClusterIds(), now, htableDescriptor, getSequenceId(), true, currentNonceGroup, currentNonce); hasWalAppends = true; walEdit = new WALEdit(isInReplay); @@ -2541,7 +2555,7 @@ public class HRegion implements HeapSize { // , Writable{ Mutation mutation = batchOp.getMutation(firstIndex); if (walEdit.size() > 0) { txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(), - walEdit, mutation.getClusterIds(), now, this.htableDescriptor, this.sequenceId, + walEdit, mutation.getClusterIds(), now, this.htableDescriptor, getSequenceId(), true, currentNonceGroup, currentNonce); hasWalAppends = true; } @@ -3643,13 +3657,15 @@ public class HRegion implements HeapSize { // , Writable{ long seqId = -1; // We need to assign a sequential ID that's in between two memstores in order to preserve // the guarantee that all the edits lower than the highest sequential ID from all the - // HFiles are flushed on disk. See HBASE-10958. + // HFiles are flushed on disk. See HBASE-10958. The sequence id returned when we flush is + // guaranteed to be one beyond the file made when we flushed (or if nothing to flush, it is + // a sequence id that we can be sure is beyond the last hfile written). if (assignSeqId) { FlushResult fs = this.flushcache(); if (fs.isFlushSucceeded()) { seqId = fs.flushSequenceId; } else if (fs.result == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) { - seqId = this.sequenceId.incrementAndGet(); + seqId = fs.flushSequenceId; } else { throw new IOException("Could not bulk load with an assigned sequential ID because the " + "flush didn't run. Reason for not flushing: " + fs.failureReason); @@ -4938,7 +4954,7 @@ public class HRegion implements HeapSize { // , Writable{ if (!walEdit.isEmpty()) { txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(), walEdit, processor.getClusterIds(), now, - this.htableDescriptor, this.sequenceId, true, nonceGroup, nonce); + this.htableDescriptor, getSequenceId(), true, nonceGroup, nonce); } // 8. Release region lock if (locked) { @@ -5181,7 +5197,7 @@ public class HRegion implements HeapSize { // , Writable{ // as a Put. txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(), walEdits, new ArrayList(), - EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor, this.sequenceId, + EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor, getSequenceId(), true, nonceGroup, nonce); } else { recordMutationWithoutWal(append.getFamilyCellMap()); @@ -5374,8 +5390,8 @@ public class HRegion implements HeapSize { // , Writable{ // as a Put. txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(), walEdits, new ArrayList(), - EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor, this.sequenceId, - true, nonceGroup, nonce); + EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor, + getSequenceId(), true, nonceGroup, nonce); } else { recordMutationWithoutWal(increment.getFamilyCellMap()); } @@ -6048,8 +6064,10 @@ public class HRegion implements HeapSize { // , Writable{ } /** - * @return sequenceId. + * Do not change this sequence id. See {@link #sequenceId} comment. + * @return sequenceId */ + @VisibleForTesting public AtomicLong getSequenceId() { return this.sequenceId; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 4e379b6..e98f47d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1241,7 +1241,7 @@ public class HRegionServer extends HasThread implements .setWriteRequestsCount((int) r.writeRequestsCount.get()) .setTotalCompactingKVs(totalCompactingKVs) .setCurrentCompactedKVs(currentCompactedKVs) - .setCompleteSequenceId(r.completeSequenceId); + .setCompleteSequenceId(r.lastFlushSeqId); return regionLoad.build(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index f52937e..7f34d5c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -1915,7 +1915,8 @@ class FSHLog implements HLog, Syncable { * @throws Exception */ void append(final FSWALEntry entry) throws Exception { - // TODO: WORK ON MAKING THIS APPEND FASTER. OING WAY TOO MUCH WORK WITH CPs, PBing, etc. + // TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc. + atHeadOfRingBufferEventHandlerAppend(); long start = EnvironmentEdgeManager.currentTimeMillis(); byte [] encodedRegionName = entry.getKey().getEncodedRegionName(); @@ -1972,6 +1973,14 @@ class FSHLog implements HLog, Syncable { } } + /** + * Exposed for testing only. Use to tricks like halt the ring buffer appending. + */ + @VisibleForTesting + void atHeadOfRingBufferEventHandlerAppend() { + // Noop + } + private static IOException ensureIOException(final Throwable t) { return (t instanceof IOException)? (IOException)t: new IOException(t); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index ae912fa..c26f767 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -58,6 +58,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -67,6 +68,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CompatibilitySingletonFactory; import org.apache.hadoop.hbase.DroppedSnapshotException; @@ -75,6 +77,7 @@ import org.apache.hadoop.hbase.HBaseTestCase; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HRegionInfo; @@ -113,6 +116,7 @@ import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; +import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; import org.apache.hadoop.hbase.regionserver.HRegion.RowLock; import org.apache.hadoop.hbase.regionserver.TestStore.FaultyFileSystem; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java index 7467fe0..3731ab0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java @@ -23,12 +23,15 @@ import static org.junit.Assert.*; import java.io.IOException; import java.lang.reflect.Method; import java.net.BindException; +import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; @@ -39,11 +42,18 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult; import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdge; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver; import org.apache.hadoop.hdfs.DFSClient; @@ -137,6 +147,86 @@ public class TestHLog { } /** + * Test flush for sure has a sequence id that is beyond the last edit appended. We do this + * by slowing appends in the background ring buffer thread while in foreground we call + * flush. The addition of the sync over HRegion in flush should fix an issue where flush was + * returning before all of its appends had made it out to the WAL (HBASE-11109). + * @throws IOException + * @see HBASE-11109 + */ + @Test + public void testFlushSequenceIdIsGreaterThanAllEditsInHFile() throws IOException { + String testName = "testFlushSequenceIdIsGreaterThanAllEditsInHFile"; + final TableName tableName = TableName.valueOf(testName); + final HRegionInfo hri = new HRegionInfo(tableName); + final byte[] rowName = tableName.getName(); + final HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor("f")); + HRegion r = HRegion.createHRegion(hri, TEST_UTIL.getDefaultRootDirPath(), + TEST_UTIL.getConfiguration(), htd); + HRegion.closeHRegion(r); + final int countPerFamily = 10; + final MutableBoolean goslow = new MutableBoolean(false); + // Bypass factory so I can subclass and doctor a method. + FSHLog wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDefaultRootDirPath(), + testName, conf) { + @Override + void atHeadOfRingBufferEventHandlerAppend() { + if (goslow.isTrue()) { + Threads.sleep(100); + LOG.debug("Sleeping before appending 100ms"); + } + super.atHeadOfRingBufferEventHandlerAppend(); + } + }; + HRegion region = HRegion.openHRegion(TEST_UTIL.getConfiguration(), + TEST_UTIL.getTestFileSystem(), TEST_UTIL.getDefaultRootDirPath(), hri, htd, wal); + EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate(); + try { + List puts = null; + for (HColumnDescriptor hcd: htd.getFamilies()) { + puts = + TestWALReplay.addRegionEdits(rowName, hcd.getName(), countPerFamily, ee, region, "x"); + } + + // Now assert edits made it in. + final Get g = new Get(rowName); + Result result = region.get(g); + assertEquals(countPerFamily * htd.getFamilies().size(), result.size()); + + // Construct a WALEdit and add it a few times to the WAL. + WALEdit edits = new WALEdit(); + for (Put p: puts) { + CellScanner cs = p.cellScanner(); + while (cs.advance()) { + edits.add(KeyValueUtil.ensureKeyValue(cs.current())); + } + } + // Add any old cluster id. + List clusterIds = new ArrayList(); + clusterIds.add(UUID.randomUUID()); + // Now make appends run slow. + goslow.setValue(true); + for (int i = 0; i < countPerFamily; i++) { + wal.appendNoSync(region.getRegionInfo(), tableName, edits, + clusterIds, System.currentTimeMillis(), htd, region.getSequenceId(), true, -1, -1); + } + region.flushcache(); + // FlushResult.flushSequenceId is not visible here so go get the current sequence id. + long currentSequenceId = region.getSequenceId().get(); + // Now release the appends + goslow.setValue(false); + synchronized (goslow) { + goslow.notifyAll(); + } + assertTrue(currentSequenceId >= region.getSequenceId().get()); + } finally { + region.close(true); + wal.close(); + } + } + + /** * Write to a log file with three concurrent threads and verifying all data is written. * @throws Exception */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index 515aef0..6e51245 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -969,16 +969,19 @@ public class TestWALReplay { } } - private void addRegionEdits (final byte [] rowName, final byte [] family, + static List addRegionEdits (final byte [] rowName, final byte [] family, final int count, EnvironmentEdge ee, final HRegion r, final String qualifierPrefix) throws IOException { + List puts = new ArrayList(); for (int j = 0; j < count; j++) { byte[] qualifier = Bytes.toBytes(qualifierPrefix + Integer.toString(j)); Put p = new Put(rowName); p.add(family, qualifier, ee.currentTimeMillis(), rowName); r.put(p); + puts.add(p); } + return puts; } /* @@ -1031,6 +1034,4 @@ public class TestWALReplay { htd.addFamily(c); return htd; } - -} - +} \ No newline at end of file