diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 90976e2..ecdd042 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -214,6 +214,13 @@ public class HRegion implements HeapSize { // , Writable{ * Region level sequence Id. It is used for appending WALEdits in HLog. Its default value is -1, * as a marker that the region hasn't opened yet. Once it is opened, it is set to * {@link #openSeqNum}. + * + * You cannot just increment this value and think you have the 'highest' sequence id EVEN IF YOU + * HOLD THE UPDATE lock. The increment of this sequenceId is done in a background thread that is + * servicing a ring buffer of queued, appended edits. It may still be processing (and updating + * this sequenceid behind your back) when you do your increment thinking you have the highest + * sequence id; even if you've blocked updates (the ring buffer may still have outstanding edits + * it is working on). See FSHLog#RingBufferEventHandler#append method. */ private final AtomicLong sequenceId = new AtomicLong(-1L); @@ -1684,12 +1691,23 @@ public class HRegion implements HeapSize { // , Writable{ // Clear flush flag. // If nothing to flush, return and avoid logging start/stop flush. if (this.memstoreSize.get() <= 0) { - return new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush"); + // Check under an update lock to be sure it empty. + this.updatesLock.writeLock().lock(); + try { + if (this.memstoreSize.get() <= 0) { + // Up the sequence number in case we are being called by a bulk load looking + // for a sequence id to safely use. See bulk load code calling in here. + return new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, + this.sequenceId.incrementAndGet(), "Nothing to flush"); + } + } finally { + this.updatesLock.writeLock().unlock(); + } } if (LOG.isDebugEnabled()) { LOG.debug("Started memstore flush for " + this + ", current region memstore size " + - StringUtils.humanReadableInt(this.memstoreSize.get()) + + StringUtils.byteDesc(this.memstoreSize.get()) + ((wal != null)? "": "; wal is null, using passed sequenceid=" + myseqid)); } @@ -1716,8 +1734,16 @@ public class HRegion implements HeapSize { // , Writable{ // Record the mvcc for all transactions in progress. w = mvcc.beginMemstoreInsert(); mvcc.advanceMemstore(w); - // check if it is not closing. + // Chase all of this regions edits out of the region buffer by putting in a sync and + // waiting on it to complete. When it completes, we know there are no more edits for this + // region hanging out in the ring buffer (because we have the update lock at this moment). + // It is safe to then do the below, jigger maps of outstanding edits and take the current + // sequence id as the 'highest' sequence id at this current point. This sync under the update + // lock is going to put a road bump in our write throughtput for this region (other regions + // can progress with their writing while this is going on but there will be slight stall for + // this region). if (wal != null) { + wal.sync(); if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) { String msg = "Flush will not be started for [" + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing."; @@ -1735,7 +1761,7 @@ public class HRegion implements HeapSize { // , Writable{ storeFlushCtxs.add(s.createFlushContext(flushSeqId)); } - // prepare flush (take a snapshot) + // Prepare flush (take a snapshot) for (StoreFlushContext flush : storeFlushCtxs) { flush.prepare(); } @@ -1747,12 +1773,6 @@ public class HRegion implements HeapSize { // , Writable{ status.setStatus(s); if (LOG.isTraceEnabled()) LOG.trace(s); - // sync unflushed WAL changes when deferred log sync is enabled - // see HBASE-8208 for details - if (wal != null && !shouldSyncLog()) { - wal.sync(); - } - // wait for all in-progress transactions to commit to HLog before // we can start the flush. This prevents // uncommitted transactions from being written into HFiles. @@ -1828,9 +1848,9 @@ public class HRegion implements HeapSize { // , Writable{ long time = EnvironmentEdgeManager.currentTimeMillis() - startTime; long memstoresize = this.memstoreSize.get(); String msg = "Finished memstore flush of ~" + - StringUtils.humanReadableInt(totalFlushableSize) + "/" + totalFlushableSize + + StringUtils.byteDesc(totalFlushableSize) + "/" + totalFlushableSize + ", currentsize=" + - StringUtils.humanReadableInt(memstoresize) + "/" + memstoresize + + StringUtils.byteDesc(memstoresize) + "/" + memstoresize + " for region " + this + " in " + time + "ms, sequenceid=" + flushSeqId + ", compaction requested=" + compactionRequested + ((wal == null)? "; wal=null": ""); @@ -3649,7 +3669,7 @@ public class HRegion implements HeapSize { // , Writable{ if (fs.isFlushSucceeded()) { seqId = fs.flushSequenceId; } else if (fs.result == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) { - seqId = this.sequenceId.incrementAndGet(); + seqId = fs.flushSequenceId; } else { throw new IOException("Could not bulk load with an assigned sequential ID because the " + "flush didn't run. Reason for not flushing: " + fs.failureReason); @@ -6048,9 +6068,11 @@ public class HRegion implements HeapSize { // , Writable{ } /** - * @return sequenceId. + * Do not change this sequence id. The id you get will not mean what you think it means. + * See {@link #sequenceId} comment. + * @return sequenceId */ - public AtomicLong getSequenceId() { + AtomicLong getSequenceId() { return this.sequenceId; }