Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1148622) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -960,7 +960,9 @@ } finally { this.updatesLock.writeLock().unlock(); } - + if (null != wal) { + wal.setFlushInProgress(true); + } LOG.debug("Finished snapshotting, commencing flushing stores"); // Any failure from here on out will be catastrophic requiring server @@ -995,7 +997,10 @@ // We used to only catch IOEs but its possible that we'd get other // exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch // all and sundry. - if (wal != null) wal.abortCacheFlush(); + if (wal != null) { + wal.abortCacheFlush(); + wal.setFlushInProgress(false); + } DroppedSnapshotException dse = new DroppedSnapshotException("region: " + Bytes.toStringBinary(getRegionName())); dse.initCause(t); @@ -1013,8 +1018,8 @@ wal.completeCacheFlush(this.regionInfo.getEncodedNameAsBytes(), regionInfo.getTableDesc().getName(), completeSequenceId, this.getRegionInfo().isMetaRegion()); + wal.setFlushInProgress(false); } - // C. Finally notify anyone waiting on memstore to clear: // e.g. checkResources(). synchronized (this) { Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (revision 1148622) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (working copy) @@ -40,6 +40,7 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; @@ -131,6 +132,7 @@ private final String prefix; private final Path oldLogDir; private boolean logRollRequested; + private AtomicBoolean isFlushInProgress = new AtomicBoolean(false); private static Class logWriterClass; @@ -182,6 +184,9 @@ */ private final ConcurrentSkipListMap lastSeqWritten = new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); + + private final ConcurrentSkipListMap seqWrittenWhileFlush = + new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); private volatile boolean closed = false; @@ -894,8 +899,13 @@ // memstore). When the cache is flushed, the entry for the // region being flushed is removed if the sequence number of the flush // is greater than or equal to the value in lastSeqWritten. - this.lastSeqWritten.putIfAbsent(regionInfo.getEncodedNameAsBytes(), - Long.valueOf(seqNum)); + if (isFlushInProgress.get()) { + this.seqWrittenWhileFlush.putIfAbsent(regionInfo + .getEncodedNameAsBytes(), seqNum); + } else { + this.lastSeqWritten.putIfAbsent(regionInfo.getEncodedNameAsBytes(), + Long.valueOf(seqNum)); + } doWrite(regionInfo, logKey, logEdit); this.numEntries.incrementAndGet(); } @@ -948,8 +958,12 @@ // is greater than or equal to the value in lastSeqWritten. // Use encoded name. Its shorter, guaranteed unique and a subset of // actual name. - byte [] hriKey = info.getEncodedNameAsBytes(); - this.lastSeqWritten.putIfAbsent(hriKey, seqNum); + byte[] hriKey = info.getEncodedNameAsBytes(); + if (isFlushInProgress.get()) { + this.seqWrittenWhileFlush.putIfAbsent(hriKey, seqNum); + } else { + this.lastSeqWritten.putIfAbsent(hriKey, seqNum); + } HLogKey logKey = makeKey(hriKey, tableName, seqNum, now); doWrite(info, logKey, edits); this.numEntries.incrementAndGet(); @@ -1180,6 +1194,11 @@ Long seq = this.lastSeqWritten.get(encodedRegionName); if (seq != null && logSeqId >= seq.longValue()) { this.lastSeqWritten.remove(encodedRegionName); + Long seqWhileFlush = this.seqWrittenWhileFlush.get(encodedRegionName); + if (null != seqWhileFlush) { + this.lastSeqWritten.putIfAbsent(encodedRegionName, seqWhileFlush); + this.seqWrittenWhileFlush.remove(encodedRegionName); + } } } // sync txn to file system @@ -1498,4 +1517,9 @@ System.exit(-1); } } + + + public void setFlushInProgress(boolean isFlushInProgress) { + this.isFlushInProgress.set(isFlushInProgress); + } }