diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 90976e2..576ff84 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -208,12 +208,23 @@ public class HRegion implements HeapSize { // , Writable{
*/
final AtomicBoolean closing = new AtomicBoolean(false);
- protected volatile long completeSequenceId = -1L;
+ /**
+ * The sequence id of the last flush on this region. Used doing some rough calculations on
+ * whether time to flush or not.
+ */
+ protected volatile long lastFlushSeqId = -1L;
/**
- * Region level sequence Id. It is used for appending WALEdits in HLog. Its default value is -1,
- * as a marker that the region hasn't opened yet. Once it is opened, it is set to
- * {@link #openSeqNum}.
+ * Region scoped edit sequence Id. Edits to this region are GUARANTEED to appear in the WAL/HLog
+ * file in this sequence id's order; i.e. edit #2 will be in the WAL after edit #1.
+ * Its default value is {@link HLog.NO_SEQUENCE_ID}. This default is used as a marker to indicate
+ * that the region hasn't opened yet. Once it is opened, it is set to the derived
+ * {@link #openSeqNum}, the largest sequence id of all hfiles opened under this Region.
+ *
+ *
Control of this sequence is handed off to the WAL/HLog implementation. It is responsible
+ * for tagging edits with the correct sequence id since it is responsible for getting the
+ * edits into the WAL files. It controls updating the sequence id value. DO NOT UPDATE IT
+ * OUTSIDE OF THE WAL. The value you get will not be what you think it is.
*/
private final AtomicLong sequenceId = new AtomicLong(-1L);
@@ -390,7 +401,7 @@ public class HRegion implements HeapSize { // , Writable{
/**
* Objects from this class are created when flushing to describe all the different states that
* that method ends up in. The Result enum describes those states. The sequence id should only
- * be specified if the flush was successful, and the failure message should only be speficied
+ * be specified if the flush was successful, and the failure message should only be specified
* if it didn't flush.
*/
public static class FlushResult {
@@ -741,7 +752,7 @@ public class HRegion implements HeapSize { // , Writable{
this.closing.set(false);
this.closed.set(false);
- this.completeSequenceId = nextSeqid;
+ this.lastFlushSeqId = nextSeqid;
if (coprocessorHost != null) {
status.setStatus("Running coprocessor post-open hooks");
coprocessorHost.postOpen();
@@ -1602,7 +1613,8 @@ public class HRegion implements HeapSize { // , Writable{
* Should the memstore be flushed now
*/
boolean shouldFlush() {
- if(this.completeSequenceId + this.flushPerChanges < this.sequenceId.get()) {
+ // This is a rough measure.
+ if (this.lastFlushSeqId + this.flushPerChanges < this.sequenceId.get()) {
return true;
}
if (flushCheckInterval <= 0) { //disabled
@@ -1625,34 +1637,16 @@ public class HRegion implements HeapSize { // , Writable{
}
/**
- * Flush the memstore.
- *
- * Flushing the memstore is a little tricky. We have a lot of updates in the
- * memstore, all of which have also been written to the log. We need to
- * write those updates in the memstore out to disk, while being able to
- * process reads/writes as much as possible during the flush operation. Also,
- * the log has to state clearly the point in time at which the memstore was
- * flushed. (That way, during recovery, we know when we can rely on the
- * on-disk flushed structures and when we have to recover the memstore from
- * the log.)
- *
- *
So, we have a three-step process:
- *
- *
- A. Flush the memstore to the on-disk stores, noting the current
- * sequence ID for the log.
-
- *
- *
- B. Write a FLUSHCACHE-COMPLETE message to the log, using the sequence
- * ID that was current at the time of memstore-flush.
- *
- * - C. Get rid of the memstore structures that are now redundant, as
- * they've been flushed to the on-disk HStores.
- *
- * This method is protected, but can be accessed via several public
- * routes.
- *
- *
This method may block for some time.
+ * Flush the memstore. Flushing the memstore is a little tricky. We have a lot of updates in the
+ * memstore, all of which have also been written to the log. We need to write those updates in the
+ * memstore out to disk, while being able to process reads/writes as much as possible during the
+ * flush operation.
+ *
This method may block for some time. Every time you call it, we up the regions
+ * sequence id even if we don't flush; i.e. the returned region id will be at least one larger
+ * than the last edit applied to this region. The returned id does not refer to an actual edit.
+ * The returned id can be used for say installing a bulk loaded file just ahead of the last hfile
+ * that was the result of this flush, etc.
* @param status
- *
* @return object describing the flush's state
*
* @throws IOException general io exceptions
@@ -1666,10 +1660,9 @@ public class HRegion implements HeapSize { // , Writable{
/**
* @param wal Null if we're NOT to go via hlog/wal.
- * @param myseqid The seqid to use if wal is null writing out
- * flush file.
+ * @param myseqid The seqid to use if wal is null writing out flush file.
* @param status
- * @return true if the region needs compacting
+ * @return object describing the flush's state
* @throws IOException
* @see #internalFlushcache(MonitoredTask)
*/
@@ -1681,50 +1674,63 @@ public class HRegion implements HeapSize { // , Writable{
throw new IOException("Aborting flush because server is abortted...");
}
final long startTime = EnvironmentEdgeManager.currentTimeMillis();
- // Clear flush flag.
- // If nothing to flush, return and avoid logging start/stop flush.
+ // If nothing to flush, return, but we need to safely update the region sequence id
if (this.memstoreSize.get() <= 0) {
- return new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush");
+ // Take an update lock because am about to change the sequence id.
+ this.updatesLock.writeLock().lock();
+ try {
+ if (this.memstoreSize.get() <= 0) {
+ // Presume that if there are still no edits in the memstore, then there are no edits for
+ // this region out in the WAL/HLog subsystem so no need to do any trickery clearing out
+ // edits in the WAL system. Up the sequence number so the resulting flush id is for
+ // sure just beyond the last appended region edit (useful as a marker when bulk loading,
+ // etc.)
+ return new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
+ getNextSequenceId(wal, startTime), "Nothing to flush");
+ }
+ } finally {
+ this.updatesLock.writeLock().unlock();
+ }
}
if (LOG.isDebugEnabled()) {
LOG.debug("Started memstore flush for " + this +
", current region memstore size " +
- StringUtils.humanReadableInt(this.memstoreSize.get()) +
+ StringUtils.byteDesc(this.memstoreSize.get()) +
((wal != null)? "": "; wal is null, using passed sequenceid=" + myseqid));
}
- // Stop updates while we snapshot the memstore of all stores. We only have
- // to do this for a moment. Its quick. The subsequent sequence id that
- // goes into the HLog after we've flushed all these snapshots also goes
- // into the info file that sits beside the flushed files.
- // We also set the memstore size to zero here before we allow updates
- // again so its value will represent the size of the updates received
- // during the flush
+ // Stop updates while we snapshot the memstore of all of these regions' stores. We only have
+ // to do this for a moment. It is quick. We also set the memstore size to zero here before we
+ // allow updates again so its value will represent the size of the updates received
+ // during flush
MultiVersionConsistencyControl.WriteEntry w = null;
- // We have to take a write lock during snapshot, or else a write could
- // end up in both snapshot and memstore (makes it difficult to do atomic
- // rows then)
+ // We have to take an update lock during snapshot, or else a write could end up in both snapshot
+ // and memstore (makes it difficult to do atomic rows then)
status.setStatus("Obtaining lock to block concurrent updates");
// block waiting for the lock for internal flush
this.updatesLock.writeLock().lock();
long totalFlushableSize = 0;
- status.setStatus("Preparing to flush by snapshotting stores");
+ status.setStatus("Preparing to flush by snapshotting stores in " +
+ getRegionInfo().getEncodedName());
List storeFlushCtxs = new ArrayList(stores.size());
long flushSeqId = -1L;
try {
// Record the mvcc for all transactions in progress.
w = mvcc.beginMemstoreInsert();
mvcc.advanceMemstore(w);
- // check if it is not closing.
if (wal != null) {
if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) {
+ // This should never happen.
String msg = "Flush will not be started for ["
+ this.getRegionInfo().getEncodedName() + "] - because the WAL is closing.";
status.setStatus(msg);
return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
}
- flushSeqId = this.sequenceId.incrementAndGet();
+ // Get a sequence id that we can use to denote the flush. It will be one beyond the last
+ // edit that made it into the hfile (the below does not add an edit, it just asks the
+ // WAL system to return next sequence edit).
+ flushSeqId = getNextSequenceId(wal, startTime);
} else {
// use the provided sequence Id as WAL is not being used for this flush.
flushSeqId = myseqid;
@@ -1735,7 +1741,7 @@ public class HRegion implements HeapSize { // , Writable{
storeFlushCtxs.add(s.createFlushContext(flushSeqId));
}
- // prepare flush (take a snapshot)
+ // Prepare flush (take a snapshot)
for (StoreFlushContext flush : storeFlushCtxs) {
flush.prepare();
}
@@ -1747,12 +1753,6 @@ public class HRegion implements HeapSize { // , Writable{
status.setStatus(s);
if (LOG.isTraceEnabled()) LOG.trace(s);
- // sync unflushed WAL changes when deferred log sync is enabled
- // see HBASE-8208 for details
- if (wal != null && !shouldSyncLog()) {
- wal.sync();
- }
-
// wait for all in-progress transactions to commit to HLog before
// we can start the flush. This prevents
// uncommitted transactions from being written into HFiles.
@@ -1816,8 +1816,8 @@ public class HRegion implements HeapSize { // , Writable{
// Record latest flush time
this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
- // Update the last flushed sequence id for region
- completeSequenceId = flushSeqId;
+ // Update the last flushed sequence id for region. TODO: This is dup'd inside the WAL/FSHlog.
+ this.lastFlushSeqId = flushSeqId;
// C. Finally notify anyone waiting on memstore to clear:
// e.g. checkResources().
@@ -1828,9 +1828,9 @@ public class HRegion implements HeapSize { // , Writable{
long time = EnvironmentEdgeManager.currentTimeMillis() - startTime;
long memstoresize = this.memstoreSize.get();
String msg = "Finished memstore flush of ~" +
- StringUtils.humanReadableInt(totalFlushableSize) + "/" + totalFlushableSize +
+ StringUtils.byteDesc(totalFlushableSize) + "/" + totalFlushableSize +
", currentsize=" +
- StringUtils.humanReadableInt(memstoresize) + "/" + memstoresize +
+ StringUtils.byteDesc(memstoresize) + "/" + memstoresize +
" for region " + this + " in " + time + "ms, sequenceid=" + flushSeqId +
", compaction requested=" + compactionRequested +
((wal == null)? "; wal=null": "");
@@ -1842,6 +1842,20 @@ public class HRegion implements HeapSize { // , Writable{
FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushSeqId);
}
+ /**
+ * Method to safely get the next sequence number.
+ * @param wal
+ * @param now
+ * @return Next sequence number unassociated with any actual edit.
+ * @throws IOException
+ */
+ private long getNextSequenceId(final HLog wal, final long now) throws IOException {
+ HLogKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable());
+ WALEdit edits = new WALEdit();
+ wal.appendNoSync(getRegionInfo(), key, edits, getTableDesc(), this.sequenceId, false);
+ return key.getLogSeqNum();
+ }
+
//////////////////////////////////////////////////////////////////////////////
// get() methods for client use.
//////////////////////////////////////////////////////////////////////////////
@@ -2516,7 +2530,7 @@ public class HRegion implements HeapSize { // , Writable{
}
// txid should always increase, so having the one from the last call is ok.
txid = this.log.appendNoSync(this.getRegionInfo(), htableDescriptor.getTableName(),
- walEdit, m.getClusterIds(), now, htableDescriptor, this.sequenceId, true,
+ walEdit, m.getClusterIds(), now, htableDescriptor, getSequenceId(), true,
currentNonceGroup, currentNonce);
hasWalAppends = true;
walEdit = new WALEdit(isInReplay);
@@ -2541,7 +2555,7 @@ public class HRegion implements HeapSize { // , Writable{
Mutation mutation = batchOp.getMutation(firstIndex);
if (walEdit.size() > 0) {
txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(),
- walEdit, mutation.getClusterIds(), now, this.htableDescriptor, this.sequenceId,
+ walEdit, mutation.getClusterIds(), now, this.htableDescriptor, getSequenceId(),
true, currentNonceGroup, currentNonce);
hasWalAppends = true;
}
@@ -3643,13 +3657,15 @@ public class HRegion implements HeapSize { // , Writable{
long seqId = -1;
// We need to assign a sequential ID that's in between two memstores in order to preserve
// the guarantee that all the edits lower than the highest sequential ID from all the
- // HFiles are flushed on disk. See HBASE-10958.
+ // HFiles are flushed on disk. See HBASE-10958. The sequence id returned when we flush is
+ // guaranteed to be one beyond the file made when we flushed (or if nothing to flush, it is
+ // a sequence id that we can be sure is beyond the last hfile written).
if (assignSeqId) {
FlushResult fs = this.flushcache();
if (fs.isFlushSucceeded()) {
seqId = fs.flushSequenceId;
} else if (fs.result == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
- seqId = this.sequenceId.incrementAndGet();
+ seqId = fs.flushSequenceId;
} else {
throw new IOException("Could not bulk load with an assigned sequential ID because the " +
"flush didn't run. Reason for not flushing: " + fs.failureReason);
@@ -4938,7 +4954,7 @@ public class HRegion implements HeapSize { // , Writable{
if (!walEdit.isEmpty()) {
txid = this.log.appendNoSync(this.getRegionInfo(),
this.htableDescriptor.getTableName(), walEdit, processor.getClusterIds(), now,
- this.htableDescriptor, this.sequenceId, true, nonceGroup, nonce);
+ this.htableDescriptor, getSequenceId(), true, nonceGroup, nonce);
}
// 8. Release region lock
if (locked) {
@@ -5179,10 +5195,10 @@ public class HRegion implements HeapSize { // , Writable{
// Using default cluster id, as this can only happen in the orginating
// cluster. A slave cluster receives the final value (not the delta)
// as a Put.
- txid = this.log.appendNoSync(this.getRegionInfo(),
- this.htableDescriptor.getTableName(), walEdits, new ArrayList(),
- EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor, this.sequenceId,
- true, nonceGroup, nonce);
+ HLogKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(),
+ this.htableDescriptor.getTableName(), nonceGroup, nonce);
+ txid = this.log.appendNoSync(getRegionInfo(), key, walEdits, this.htableDescriptor,
+ this.sequenceId, true);
} else {
recordMutationWithoutWal(append.getFamilyCellMap());
}
@@ -5374,8 +5390,8 @@ public class HRegion implements HeapSize { // , Writable{
// as a Put.
txid = this.log.appendNoSync(this.getRegionInfo(),
this.htableDescriptor.getTableName(), walEdits, new ArrayList(),
- EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor, this.sequenceId,
- true, nonceGroup, nonce);
+ EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor,
+ getSequenceId(), true, nonceGroup, nonce);
} else {
recordMutationWithoutWal(increment.getFamilyCellMap());
}
@@ -6048,8 +6064,10 @@ public class HRegion implements HeapSize { // , Writable{
}
/**
- * @return sequenceId.
+ * Do not change this sequence id. See {@link #sequenceId} comment.
+ * @return sequenceId
*/
+ @VisibleForTesting
public AtomicLong getSequenceId() {
return this.sequenceId;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 4e379b6..e98f47d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1241,7 +1241,7 @@ public class HRegionServer extends HasThread implements
.setWriteRequestsCount((int) r.writeRequestsCount.get())
.setTotalCompactingKVs(totalCompactingKVs)
.setCurrentCompactedKVs(currentCompactedKVs)
- .setCompleteSequenceId(r.completeSequenceId);
+ .setCompleteSequenceId(r.lastFlushSeqId);
return regionLoad.build();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index f52937e..4909d8e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -138,9 +138,7 @@ class FSHLog implements HLog, Syncable {
static final Log LOG = LogFactory.getLog(FSHLog.class);
/**
- * Disruptor is a fancy ring buffer. This disruptor/ring buffer is used to take edits and sync
- * calls from the Handlers and passes them to the append and sync executors with minimal
- * contention.
+ * The nexus at which all incoming handlers meet. Does appends and sync with an ordering.
*/
private final Disruptor disruptor;
@@ -909,7 +907,7 @@ class FSHLog implements HLog, Syncable {
long oldFileLen = this.fs.getFileStatus(oldPath).getLen();
this.totalLogSize.addAndGet(oldFileLen);
LOG.info("Rolled WAL " + FSUtils.getPath(oldPath) + " with entries=" + oldNumEntries +
- ", filesize=" + StringUtils.humanReadableInt(oldFileLen) + "; new WAL " +
+ ", filesize=" + StringUtils.byteDesc(oldFileLen) + "; new WAL " +
FSUtils.getPath(newPath));
} else {
LOG.info("New WAL " + FSUtils.getPath(newPath));
@@ -1088,99 +1086,88 @@ class FSHLog implements HLog, Syncable {
}
}
- /**
- * @param now
- * @param encodedRegionName Encoded name of the region as returned by
- * HRegionInfo#getEncodedNameAsBytes().
- * @param tableName
- * @param clusterIds that have consumed the change
- * @return New log key.
- */
- protected HLogKey makeKey(byte[] encodedRegionName, TableName tableName, long seqnum,
- long now, List clusterIds, long nonceGroup, long nonce) {
- return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterIds, nonceGroup, nonce);
- }
-
@Override
@VisibleForTesting
public void append(HRegionInfo info, TableName tableName, WALEdit edits,
final long now, HTableDescriptor htd, AtomicLong sequenceId)
throws IOException {
- append(info, tableName, edits, new ArrayList(), now, htd, true, true, sequenceId,
- HConstants.NO_NONCE, HConstants.NO_NONCE);
+ HLogKey logKey = new HLogKey(info.getEncodedNameAsBytes(), tableName);
+ append(logKey, edits, htd, info, true, true, sequenceId);
}
@Override
- public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits,
+ public long appendNoSync(final HRegionInfo info, TableName tableName, WALEdit edits,
List clusterIds, final long now, HTableDescriptor htd, AtomicLong sequenceId,
- boolean isInMemstore, long nonceGroup, long nonce) throws IOException {
- return append(info, tableName, edits, clusterIds, now, htd, false, isInMemstore, sequenceId,
- nonceGroup, nonce);
+ boolean inMemstore, long nonceGroup, long nonce) throws IOException {
+ HLogKey logKey =
+ new HLogKey(info.getEncodedNameAsBytes(), tableName, now, clusterIds, nonceGroup, nonce);
+ return append(logKey, edits, htd, info, false, inMemstore, sequenceId);
+ }
+
+ @Override
+ public long appendNoSync(final HRegionInfo info, final HLogKey key, final WALEdit edits,
+ final HTableDescriptor htd, final AtomicLong sequenceId, final boolean inMemstore)
+ throws IOException {
+ return append(key, edits, htd, info, false, inMemstore, sequenceId);
}
/**
* Append a set of edits to the log. Log edits are keyed by (encoded) regionName, rowname, and
* log-sequence-id.
- *
- * Later, if we sort by these keys, we obtain all the relevant edits for a given key-range of the
- * HRegion (TODO). Any edits that do not have a matching COMPLETE_CACHEFLUSH message can be
- * discarded.
- *
- * Logs cannot be restarted once closed, or once the HLog process dies. Each time the HLog
- * starts, it must create a new log. This means that other systems should process the log
- * appropriately upon each startup (and prior to initializing HLog).
- *
- * Synchronized prevents appends during the completion of a cache flush or for the duration of a
- * log roll.
- *
- * @param info
- * @param tableName
+ * @param key
* @param edits
- * @param clusterIds that have consumed the change (for replication)
- * @param now
- * @param htd
- * @param doSync shall we sync after we call the append?
+ * @param htd This comes in here just so it is available on a pre append for replications. Get
+ * rid of it. It is kinda crazy this comes in here when we have tablename and regioninfo.
+ * Replication gets its scope from the HTD.
+ * @param hri region info
+ * @param sync shall we sync after we call the append?
* @param inMemstore
- * @param sequenceId of the region.
- * @param nonceGroup
- * @param nonce
+ * @param sequenceId The region sequence id reference.
* @return txid of this transaction or if nothing to do, the last txid
* @throws IOException
*/
- private long append(HRegionInfo info, TableName tableName, WALEdit edits, List clusterIds,
- final long now, HTableDescriptor htd, boolean doSync, boolean inMemstore,
- AtomicLong sequenceId, long nonceGroup, long nonce)
+ private long append(final HLogKey key, WALEdit edits, HTableDescriptor htd,
+ final HRegionInfo hri, boolean sync,
+ boolean inMemstore, AtomicLong sequenceId)
throws IOException {
if (!this.enabled || edits.isEmpty()) return this.highestUnsyncedSequence;
if (this.closed) throw new IOException("Cannot append; log is closed");
+ // Make a trace scope for the append. It is closed on other side of the ring buffer by the
+ // single consuming thread. Don't have to worry about it.
TraceScope scope = Trace.startSpan("FSHLog.append");
- // Make a key but do not set the WALEdit by region sequence id now -- set it to -1 for now --
- // and then later just before we write it out to the DFS stream, then set the sequence id;
- // late-binding.
- HLogKey logKey =
- makeKey(info.getEncodedNameAsBytes(), tableName, -1, now, clusterIds, nonceGroup, nonce);
// This is crazy how much it takes to make an edit. Do we need all this stuff!!!!???? We need
- // all the stuff to make a key and then below to append the edit, we need to carry htd, info,
+ // all this to make a key and then below to append the edit, we need to carry htd, info,
// etc. all over the ring buffer.
+ FSWALEntry entry = null;
long sequence = this.disruptor.getRingBuffer().next();
try {
RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
- FSWALEntry entry =
- new FSWALEntry(sequence, logKey, edits, sequenceId, inMemstore, htd, info);
+ entry = new FSWALEntry(sequence, key, edits, sequenceId, inMemstore, htd, hri);
truck.loadPayload(entry, scope.detach());
} finally {
this.disruptor.getRingBuffer().publish(sequence);
+ // Wait until the entry gets its region edit/sequence id assigned. Makes keeping mvcc and
+ // sequence id aligned easier to reason about.
+ waitUntilRegionSequenceIdAssigned(entry);
}
// doSync is set in tests. Usually we arrive in here via appendNoSync w/ the sync called after
// all edits on a handler have been added.
- //
- // When we sync, we will sync to the current point, the txid of the last edit added.
- // Since we are single writer, the next txid should be the just next one in sequence;
- // do not explicitly specify it. Sequence id/txid is an implementation internal detail.
- if (doSync) sync();
+ if (sync) sync(sequence);
return sequence;
}
+ private void waitUntilRegionSequenceIdAssigned(final FSWALEntry entry) throws IOException {
+ // Now we have published the ringbuffer, halt the current thread until we get an answer back.
+ try {
+ entry.getRegionSequenceId();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ IOException ioe = new InterruptedIOException();
+ ioe.initCause(ie);
+ throw ioe;
+ }
+ }
+
/**
* Thread to runs the hdfs sync call. This call takes a while to complete. This is the longest
* pole adding edits to the WAL and this must complete to be sure all edits persisted. We run
@@ -1915,18 +1902,17 @@ class FSHLog implements HLog, Syncable {
* @throws Exception
*/
void append(final FSWALEntry entry) throws Exception {
- // TODO: WORK ON MAKING THIS APPEND FASTER. OING WAY TOO MUCH WORK WITH CPs, PBing, etc.
+ // TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc.
+ atHeadOfRingBufferEventHandlerAppend();
long start = EnvironmentEdgeManager.currentTimeMillis();
byte [] encodedRegionName = entry.getKey().getEncodedRegionName();
+ long regionSequenceId = HLog.NO_SEQUENCE_ID;
try {
// We are about to append this edit; update the region-scoped sequence number. Do it
// here inside this single appending/writing thread. Events are ordered on the ringbuffer
// so region sequenceids will also be in order.
- long regionSequenceId = entry.getRegionSequenceIdReference().incrementAndGet();
- // Set the region-scoped sequence number back up into the key ("late-binding" --
- // setting before append).
- entry.getKey().setLogSeqNum(regionSequenceId);
+ entry.stampRegionSequenceId();
// Coprocessor hook.
if (!coprocessorHost.preWALWrite(entry.getHRegionInfo(), entry.getKey(),
entry.getEdit())) {
@@ -1938,8 +1924,7 @@ class FSHLog implements HLog, Syncable {
if (!listeners.isEmpty()) {
for (WALActionsListener i: listeners) {
// TODO: Why does listener take a table description and CPs take a regioninfo? Fix.
- i.visitLogEntryBeforeWrite(entry.getHTableDescriptor(), entry.getKey(),
- entry.getEdit());
+ i.visitLogEntryBeforeWrite(entry.getHTableDescriptor(), entry.getKey(), entry.getEdit());
}
}
writer.append(entry);
@@ -1972,6 +1957,14 @@ class FSHLog implements HLog, Syncable {
}
}
+ /**
+ * Exposed for testing only. Use to tricks like halt the ring buffer appending.
+ */
+ @VisibleForTesting
+ void atHeadOfRingBufferEventHandlerAppend() {
+ // Noop
+ }
+
private static IOException ensureIOException(final Throwable t) {
return (t instanceof IOException)? (IOException)t: new IOException(t);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
index 0d65a54..20edb49 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
@@ -25,10 +25,12 @@ import org.apache.hadoop.hbase.HTableDescriptor;
/**
* A WAL Entry for {@link FSHLog} implementation. Immutable.
- * It is a subclass of {@link HLog.Entry} that carries extra info across the ring buffer such as
+ * A subclass of {@link HLog.Entry} that carries extra info across the ring buffer such as
* region sequence id (we want to use this later, just before we write the WAL to ensure region
* edits maintain order). The extra info added here is not 'serialized' as part of the WALEdit
- * hence marked 'transient' to underline this fact.
+ * hence marked 'transient' to underline this fact. It also adds mechanism so we can wait on
+ * the assign of the region sequence id. See {@link #setRegionSequenceId(long)} and
+ * {@link #getRegionSequenceId()}.
*/
@InterfaceAudience.Private
class FSWALEntry extends HLog.Entry {
@@ -55,10 +57,6 @@ class FSWALEntry extends HLog.Entry {
return "sequence=" + this.sequence + ", " + super.toString();
};
- AtomicLong getRegionSequenceIdReference() {
- return this.regionSequenceIdReference;
- }
-
boolean isInMemstore() {
return this.inMemstore;
}
@@ -77,4 +75,28 @@ class FSWALEntry extends HLog.Entry {
long getSequence() {
return this.sequence;
}
+
+ /**
+ * Stamp this edit with a region edit/sequence id.
+ * Call when safe to do so: i.e. the context is such that the increment on the passed in
+ * {@link #regionSequenceIdReference} is guaranteed aligned w/ how appends are going into the
+ * WAL. This method works with {@link #getRegionSequenceId()}. It will block waiting on this
+ * method if on initialization our edit/sequence id is {@link HLogKey#NO_SEQ_NO}.
+ * @return The region edit/sequence id we set for this edit.
+ */
+ synchronized long stampRegionSequenceId() {
+ long regionSequenceId = this.regionSequenceIdReference.incrementAndGet();
+ getKey().setLogSeqNum(regionSequenceId);
+ notify();
+ return regionSequenceId;
+ }
+
+ synchronized long getRegionSequenceId() throws InterruptedException {
+ long regionSequenceId = HLog.NO_SEQUENCE_ID;
+ while (true) {
+ regionSequenceId = getKey().getLogSeqNum();
+ if (regionSequenceId != HLog.NO_SEQUENCE_ID) return regionSequenceId;
+ wait(1000);
+ }
+ }
}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
index 0917c8b..0f2e210 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
@@ -49,6 +49,7 @@ import com.google.common.annotations.VisibleForTesting;
// TODO: Rename interface to WAL
public interface HLog {
Log LOG = LogFactory.getLog(HLog.class);
+ public static final long NO_SEQUENCE_ID = -1;
/** File Extension used while splitting an HLog into regions (HBASE-2312) */
// TODO: this seems like an implementation detail that does not belong here.
@@ -311,8 +312,9 @@ public interface HLog {
/**
* Append a set of edits to the WAL. WAL edits are keyed by (encoded) regionName, rowname, and
- * log-sequence-id. The WAL is not flushed/sync'd after this transaction completes.
- * Call {@link #sync()} to flush/sync all outstanding edits/appends.
+ * log-sequence-id. The WAL is not flushed/sync'd after this transaction completes BUT on return
+ * this edit must have its region edit/sequence id assigned else it messes up our unification
+ * of mvcc and sequenceid.
* @param info
* @param tableName
* @param edits
@@ -332,11 +334,43 @@ public interface HLog {
* able to sync an explicit edit only (the current default implementation syncs up to the time
* of the sync call syncing whatever is behind the sync).
* @throws IOException
+ * @deprecated Use {@link #appendNoSync(HRegionInfo, HLogKey, WALEdit, HTableDescriptor, AtomicLong, boolean)}
+ * instead because you can get back the region edit/sequenceid; it is set into the passed in
+ * key.
*/
long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits,
List clusterIds, final long now, HTableDescriptor htd, AtomicLong sequenceId,
boolean isInMemstore, long nonceGroup, long nonce) throws IOException;
+ /**
+ * Append a set of edits to the WAL. The WAL is not flushed/sync'd after this transaction
+ * completes BUT on return this edit must have its region edit/sequence id assigned
+ * else it messes up our unification of mvcc and sequenceid. On return key will
+ * have the region edit/sequence id filled in.
+ * @param info
+ * @param tableName
+ * @param edits
+ * @param clusterIds
+ * @param now
+ * @param htd
+ * @param sequenceId A reference to the atomic long the info region is using as
+ * source of its incrementing edits sequence id. Inside in this call we will increment it and
+ * attach the sequence to the edit we apply the WAL.
+ * @param isInMemstore Always true except for case where we are writing a compaction completion
+ * record into the WAL; in this case the entry is just so we can finish an unfinished compaction
+ * -- it is not an edit for memstore.
+ * @param nonceGroup
+ * @param nonce
+ * @return Returns a 'transaction id'. Do not use. This is an internal implementation detail and
+ * cannot be respected in all implementations; i.e. the append/sync machine may or may not be
+ * able to sync an explicit edit only (the current default implementation syncs up to the time
+ * of the sync call syncing whatever is behind the sync).
+ * @throws IOException
+ */
+ long appendNoSync(HRegionInfo info, HLogKey key, WALEdit edits,
+ HTableDescriptor htd, AtomicLong sequenceId, boolean inMemstore)
+ throws IOException;
+
// TODO: Do we need all these versions of sync?
void hsync() throws IOException;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
index 4563cf8..a74469e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
@@ -23,6 +23,7 @@ import java.io.DataOutput;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -31,7 +32,6 @@ import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.UUID;
-import com.google.protobuf.HBaseZeroCopyByteString;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -43,11 +43,13 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FamilyScope;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.ScopeType;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
+import com.google.protobuf.HBaseZeroCopyByteString;
/**
* A Key for an entry in the change log.
@@ -122,6 +124,7 @@ public class HLogKey implements WritableComparable {
private long nonceGroup = HConstants.NO_NONCE;
private long nonce = HConstants.NO_NONCE;
+ static final List EMPTY_UUIDS = Collections.unmodifiableList(new ArrayList());
private CompressionContext compressionContext;
@@ -139,6 +142,11 @@ public class HLogKey implements WritableComparable {
HConstants.NO_NONCE, HConstants.NO_NONCE);
}
+ public HLogKey(final byte[] encodedRegionName, final TableName tablename) {
+ init(encodedRegionName, tablename, HLog.NO_SEQUENCE_ID, System.currentTimeMillis(),
+ EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE);
+ }
+
/**
* Create the log key for writing to somewhere.
* We maintain the tablename mainly for debugging purposes.
@@ -156,6 +164,41 @@ public class HLogKey implements WritableComparable {
init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce);
}
+ /**
+ * Create the log key for writing to somewhere.
+ * We maintain the tablename mainly for debugging purposes.
+ * A regionName is always a sub-table object.
+ *
+ * @param encodedRegionName Encoded name of the region as returned by
+ * HRegionInfo#getEncodedNameAsBytes().
+ * @param tablename - name of table
+ * @param logSeqNum - log sequence number
+ * @param now Time at which this edit was written.
+ * @param clusterIds the clusters that have consumed the change(used in Replication)
+ */
+ public HLogKey(final byte [] encodedRegionName, final TableName tablename,
+ final long now, List clusterIds, long nonceGroup, long nonce) {
+ init(encodedRegionName, tablename, HLog.NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce);
+ }
+
+ /**
+ * Create the log key for writing to somewhere.
+ * We maintain the tablename mainly for debugging purposes.
+ * A regionName is always a sub-table object.
+ *
+ * @param encodedRegionName Encoded name of the region as returned by
+ * HRegionInfo#getEncodedNameAsBytes().
+ * @param tablename - name of table
+ * @param logSeqNum - log sequence number
+ * @param now Time at which this edit was written.
+ * @param clusterIds the clusters that have consumed the change(used in Replication)
+ */
+ public HLogKey(final byte [] encodedRegionName, final TableName tablename, long nonceGroup,
+ long nonce) {
+ init(encodedRegionName, tablename, HLog.NO_SEQUENCE_ID,
+ EnvironmentEdgeManager.currentTimeMillis(), EMPTY_UUIDS, nonceGroup, nonce);
+ }
+
protected void init(final byte [] encodedRegionName, final TableName tablename,
long logSeqNum, final long now, List clusterIds, long nonceGroup, long nonce) {
this.logSeqNum = logSeqNum;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/RingBufferTruck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/RingBufferTruck.java
index cc9ec22..de3f4f0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/RingBufferTruck.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/RingBufferTruck.java
@@ -79,14 +79,6 @@ class RingBufferTruck {
}
/**
- * return {@code true} when this truck is carrying a {@link Span},
- * {@code false} otherwise.
- */
- boolean hasSpanPayload() {
- return this.span != null;
- }
-
- /**
* Unload the truck of its {@link FSWALEntry} payload. The internal refernce is released.
*/
FSWALEntry unloadFSWALEntryPayload() {
@@ -105,7 +97,7 @@ class RingBufferTruck {
}
/**
- * Unload the truck of its {@link Span} payload. The internal refernce is released.
+ * Unload the truck of its {@link Span} payload. The internal reference is released.
*/
Span unloadSpanPayload() {
Span ret = this.span;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index ae912fa..c26f767 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -58,6 +58,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -67,6 +68,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.DroppedSnapshotException;
@@ -75,6 +77,7 @@ import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo;
@@ -113,6 +116,7 @@ import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
+import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
import org.apache.hadoop.hbase.regionserver.HRegion.RowLock;
import org.apache.hadoop.hbase.regionserver.TestStore.FaultyFileSystem;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
index 7467fe0..8737f93 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
@@ -18,17 +18,26 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.BindException;
+import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
@@ -38,14 +47,31 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
+import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdge;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.Coprocessor;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -137,6 +163,86 @@ public class TestHLog {
}
/**
+ * Test flush for sure has a sequence id that is beyond the last edit appended. We do this
+ * by slowing appends in the background ring buffer thread while in foreground we call
+ * flush. The addition of the sync over HRegion in flush should fix an issue where flush was
+ * returning before all of its appends had made it out to the WAL (HBASE-11109).
+ * @throws IOException
+ * @see HBASE-11109
+ */
+ @Test
+ public void testFlushSequenceIdIsGreaterThanAllEditsInHFile() throws IOException {
+ String testName = "testFlushSequenceIdIsGreaterThanAllEditsInHFile";
+ final TableName tableName = TableName.valueOf(testName);
+ final HRegionInfo hri = new HRegionInfo(tableName);
+ final byte[] rowName = tableName.getName();
+ final HTableDescriptor htd = new HTableDescriptor(tableName);
+ htd.addFamily(new HColumnDescriptor("f"));
+ HRegion r = HRegion.createHRegion(hri, TEST_UTIL.getDefaultRootDirPath(),
+ TEST_UTIL.getConfiguration(), htd);
+ HRegion.closeHRegion(r);
+ final int countPerFamily = 10;
+ final MutableBoolean goslow = new MutableBoolean(false);
+ // Bypass factory so I can subclass and doctor a method.
+ FSHLog wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDefaultRootDirPath(),
+ testName, conf) {
+ @Override
+ void atHeadOfRingBufferEventHandlerAppend() {
+ if (goslow.isTrue()) {
+ Threads.sleep(100);
+ LOG.debug("Sleeping before appending 100ms");
+ }
+ super.atHeadOfRingBufferEventHandlerAppend();
+ }
+ };
+ HRegion region = HRegion.openHRegion(TEST_UTIL.getConfiguration(),
+ TEST_UTIL.getTestFileSystem(), TEST_UTIL.getDefaultRootDirPath(), hri, htd, wal);
+ EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
+ try {
+ List puts = null;
+ for (HColumnDescriptor hcd: htd.getFamilies()) {
+ puts =
+ TestWALReplay.addRegionEdits(rowName, hcd.getName(), countPerFamily, ee, region, "x");
+ }
+
+ // Now assert edits made it in.
+ final Get g = new Get(rowName);
+ Result result = region.get(g);
+ assertEquals(countPerFamily * htd.getFamilies().size(), result.size());
+
+ // Construct a WALEdit and add it a few times to the WAL.
+ WALEdit edits = new WALEdit();
+ for (Put p: puts) {
+ CellScanner cs = p.cellScanner();
+ while (cs.advance()) {
+ edits.add(KeyValueUtil.ensureKeyValue(cs.current()));
+ }
+ }
+ // Add any old cluster id.
+ List clusterIds = new ArrayList();
+ clusterIds.add(UUID.randomUUID());
+ // Now make appends run slow.
+ goslow.setValue(true);
+ for (int i = 0; i < countPerFamily; i++) {
+ wal.appendNoSync(region.getRegionInfo(), tableName, edits,
+ clusterIds, System.currentTimeMillis(), htd, region.getSequenceId(), true, -1, -1);
+ }
+ region.flushcache();
+ // FlushResult.flushSequenceId is not visible here so go get the current sequence id.
+ long currentSequenceId = region.getSequenceId().get();
+ // Now release the appends
+ goslow.setValue(false);
+ synchronized (goslow) {
+ goslow.notifyAll();
+ }
+ assertTrue(currentSequenceId >= region.getSequenceId().get());
+ } finally {
+ region.close(true);
+ wal.close();
+ }
+ }
+
+ /**
* Write to a log file with three concurrent threads and verifying all data is written.
* @throws Exception
*/
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
index 487ac63..eccf0b1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
@@ -433,153 +433,155 @@ public class TestLogRolling {
LOG.info("Replication=" +
fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()));
// When the hbase:meta table can be opened, the region servers are running
- new HTable(TEST_UTIL.getConfiguration(), TableName.META_TABLE_NAME);
-
- this.server = cluster.getRegionServer(0);
- this.log = server.getWAL();
-
- // Create the test table and open it
- String tableName = getName();
- HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
- desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
-
- admin.createTable(desc);
- HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
-
- server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName));
- this.log = server.getWAL();
- final List paths = new ArrayList();
- final List preLogRolledCalled = new ArrayList();
- paths.add(((FSHLog) log).computeFilename());
- log.registerWALActionsListener(new WALActionsListener() {
- @Override
- public void preLogRoll(Path oldFile, Path newFile) {
- LOG.debug("preLogRoll: oldFile="+oldFile+" newFile="+newFile);
- preLogRolledCalled.add(new Integer(1));
- }
- @Override
- public void postLogRoll(Path oldFile, Path newFile) {
- paths.add(newFile);
+ HTable t = new HTable(TEST_UTIL.getConfiguration(), TableName.META_TABLE_NAME);
+ try {
+ this.server = cluster.getRegionServer(0);
+ this.log = server.getWAL();
+
+ // Create the test table and open it
+ String tableName = getName();
+ HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
+ desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
+
+ admin.createTable(desc);
+ HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
+
+ server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName));
+ this.log = server.getWAL();
+ final List paths = new ArrayList();
+ final List preLogRolledCalled = new ArrayList();
+ paths.add(((FSHLog) log).computeFilename());
+ log.registerWALActionsListener(new WALActionsListener() {
+ @Override
+ public void preLogRoll(Path oldFile, Path newFile) {
+ LOG.debug("preLogRoll: oldFile="+oldFile+" newFile="+newFile);
+ preLogRolledCalled.add(new Integer(1));
+ }
+ @Override
+ public void postLogRoll(Path oldFile, Path newFile) {
+ paths.add(newFile);
+ }
+ @Override
+ public void preLogArchive(Path oldFile, Path newFile) {}
+ @Override
+ public void postLogArchive(Path oldFile, Path newFile) {}
+ @Override
+ public void logRollRequested() {}
+ @Override
+ public void logCloseRequested() {}
+ @Override
+ public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
+ WALEdit logEdit) {}
+ @Override
+ public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey,
+ WALEdit logEdit) {}
+ });
+
+ assertTrue("Need HDFS-826 for this test", ((FSHLog) log).canGetCurReplicas());
+ // don't run this test without append support (HDFS-200 & HDFS-142)
+ assertTrue("Need append support for this test", FSUtils
+ .isAppendSupported(TEST_UTIL.getConfiguration()));
+
+ writeData(table, 1002);
+
+ table.setAutoFlush(true, true);
+
+ long curTime = System.currentTimeMillis();
+ long oldFilenum = log.getFilenum();
+ assertTrue("Log should have a timestamp older than now",
+ curTime > oldFilenum && oldFilenum != -1);
+
+ assertTrue("The log shouldn't have rolled yet", oldFilenum == log.getFilenum());
+
+ // roll all datanodes in the pipeline
+ dfsCluster.restartDataNodes();
+ Thread.sleep(1000);
+ dfsCluster.waitActive();
+ LOG.info("Data Nodes restarted");
+ validateData(table, 1002);
+
+ // this write should succeed, but trigger a log roll
+ writeData(table, 1003);
+ long newFilenum = log.getFilenum();
+
+ assertTrue("Missing datanode should've triggered a log roll",
+ newFilenum > oldFilenum && newFilenum > curTime);
+ validateData(table, 1003);
+
+ writeData(table, 1004);
+
+ // roll all datanode again
+ dfsCluster.restartDataNodes();
+ Thread.sleep(1000);
+ dfsCluster.waitActive();
+ LOG.info("Data Nodes restarted");
+ validateData(table, 1004);
+
+ // this write should succeed, but trigger a log roll
+ writeData(table, 1005);
+
+ // force a log roll to read back and verify previously written logs
+ log.rollWriter(true);
+ assertTrue("preLogRolledCalled has size of " + preLogRolledCalled.size(),
+ preLogRolledCalled.size() >= 1);
+
+ // read back the data written
+ Set loggedRows = new HashSet();
+ FSUtils fsUtils = FSUtils.getInstance(fs, TEST_UTIL.getConfiguration());
+ for (Path p : paths) {
+ LOG.debug("recovering lease for " + p);
+ fsUtils.recoverFileLease(((HFileSystem)fs).getBackingFs(), p, TEST_UTIL.getConfiguration(), null);
+
+ LOG.debug("Reading HLog "+FSUtils.getPath(p));
+ HLog.Reader reader = null;
+ try {
+ reader = HLogFactory.createReader(fs, p,
+ TEST_UTIL.getConfiguration());
+ HLog.Entry entry;
+ while ((entry = reader.next()) != null) {
+ LOG.debug("#"+entry.getKey().getLogSeqNum()+": "+entry.getEdit().getKeyValues());
+ for (KeyValue kv : entry.getEdit().getKeyValues()) {
+ loggedRows.add(Bytes.toStringBinary(kv.getRow()));
+ }
+ }
+ } catch (EOFException e) {
+ LOG.debug("EOF reading file "+FSUtils.getPath(p));
+ } finally {
+ if (reader != null) reader.close();
+ }
}
- @Override
- public void preLogArchive(Path oldFile, Path newFile) {}
- @Override
- public void postLogArchive(Path oldFile, Path newFile) {}
- @Override
- public void logRollRequested() {}
- @Override
- public void logCloseRequested() {}
- @Override
- public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
- WALEdit logEdit) {}
- @Override
- public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey,
- WALEdit logEdit) {}
- });
-
- assertTrue("Need HDFS-826 for this test", ((FSHLog) log).canGetCurReplicas());
- // don't run this test without append support (HDFS-200 & HDFS-142)
- assertTrue("Need append support for this test", FSUtils
- .isAppendSupported(TEST_UTIL.getConfiguration()));
-
- writeData(table, 1002);
-
- table.setAutoFlush(true, true);
- long curTime = System.currentTimeMillis();
- long oldFilenum = log.getFilenum();
- assertTrue("Log should have a timestamp older than now",
- curTime > oldFilenum && oldFilenum != -1);
-
- assertTrue("The log shouldn't have rolled yet", oldFilenum == log.getFilenum());
+ // verify the written rows are there
+ assertTrue(loggedRows.contains("row1002"));
+ assertTrue(loggedRows.contains("row1003"));
+ assertTrue(loggedRows.contains("row1004"));
+ assertTrue(loggedRows.contains("row1005"));
- // roll all datanodes in the pipeline
- dfsCluster.restartDataNodes();
- Thread.sleep(1000);
- dfsCluster.waitActive();
- LOG.info("Data Nodes restarted");
- validateData(table, 1002);
-
- // this write should succeed, but trigger a log roll
- writeData(table, 1003);
- long newFilenum = log.getFilenum();
-
- assertTrue("Missing datanode should've triggered a log roll",
- newFilenum > oldFilenum && newFilenum > curTime);
- validateData(table, 1003);
-
- writeData(table, 1004);
-
- // roll all datanode again
- dfsCluster.restartDataNodes();
- Thread.sleep(1000);
- dfsCluster.waitActive();
- LOG.info("Data Nodes restarted");
- validateData(table, 1004);
-
- // this write should succeed, but trigger a log roll
- writeData(table, 1005);
+ // flush all regions
+ List regions = new ArrayList(server.getOnlineRegionsLocalContext());
+ for (HRegion r: regions) {
+ r.flushcache();
+ }
- // force a log roll to read back and verify previously written logs
- log.rollWriter(true);
- assertTrue("preLogRolledCalled has size of " + preLogRolledCalled.size(),
- preLogRolledCalled.size() >= 1);
-
- // read back the data written
- Set loggedRows = new HashSet();
- FSUtils fsUtils = FSUtils.getInstance(fs, TEST_UTIL.getConfiguration());
- for (Path p : paths) {
- LOG.debug("recovering lease for " + p);
- fsUtils.recoverFileLease(((HFileSystem)fs).getBackingFs(), p, TEST_UTIL.getConfiguration(), null);
-
- LOG.debug("Reading HLog "+FSUtils.getPath(p));
- HLog.Reader reader = null;
+ ResultScanner scanner = table.getScanner(new Scan());
try {
- reader = HLogFactory.createReader(fs, p,
- TEST_UTIL.getConfiguration());
- HLog.Entry entry;
- while ((entry = reader.next()) != null) {
- LOG.debug("#"+entry.getKey().getLogSeqNum()+": "+entry.getEdit().getKeyValues());
- for (KeyValue kv : entry.getEdit().getKeyValues()) {
- loggedRows.add(Bytes.toStringBinary(kv.getRow()));
- }
+ for (int i=2; i<=5; i++) {
+ Result r = scanner.next();
+ assertNotNull(r);
+ assertFalse(r.isEmpty());
+ assertEquals("row100"+i, Bytes.toString(r.getRow()));
}
- } catch (EOFException e) {
- LOG.debug("EOF reading file "+FSUtils.getPath(p));
} finally {
- if (reader != null) reader.close();
+ scanner.close();
}
- }
-
- // verify the written rows are there
- assertTrue(loggedRows.contains("row1002"));
- assertTrue(loggedRows.contains("row1003"));
- assertTrue(loggedRows.contains("row1004"));
- assertTrue(loggedRows.contains("row1005"));
-
- // flush all regions
- List regions =
- new ArrayList(server.getOnlineRegionsLocalContext());
- for (HRegion r: regions) {
- r.flushcache();
- }
- ResultScanner scanner = table.getScanner(new Scan());
- try {
- for (int i=2; i<=5; i++) {
- Result r = scanner.next();
- assertNotNull(r);
- assertFalse(r.isEmpty());
- assertEquals("row100"+i, Bytes.toString(r.getRow()));
+ // verify that no region servers aborted
+ for (JVMClusterUtil.RegionServerThread rsThread:
+ TEST_UTIL.getHBaseCluster().getRegionServerThreads()) {
+ assertFalse(rsThread.getRegionServer().isAborted());
}
} finally {
- scanner.close();
- }
-
- // verify that no region servers aborted
- for (JVMClusterUtil.RegionServerThread rsThread:
- TEST_UTIL.getHBaseCluster().getRegionServerThreads()) {
- assertFalse(rsThread.getRegionServer().isAborted());
+ if (t != null) t.close();
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
index 515aef0..6e51245 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
@@ -969,16 +969,19 @@ public class TestWALReplay {
}
}
- private void addRegionEdits (final byte [] rowName, final byte [] family,
+ static List addRegionEdits (final byte [] rowName, final byte [] family,
final int count, EnvironmentEdge ee, final HRegion r,
final String qualifierPrefix)
throws IOException {
+ List puts = new ArrayList();
for (int j = 0; j < count; j++) {
byte[] qualifier = Bytes.toBytes(qualifierPrefix + Integer.toString(j));
Put p = new Put(rowName);
p.add(family, qualifier, ee.currentTimeMillis(), rowName);
r.put(p);
+ puts.add(p);
}
+ return puts;
}
/*
@@ -1031,6 +1034,4 @@ public class TestWALReplay {
htd.addFamily(c);
return htd;
}
-
-}
-
+}
\ No newline at end of file