Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1149129) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -960,6 +960,9 @@ } finally { this.updatesLock.writeLock().unlock(); } + if (null != wal) { + wal.setFlushInProgress(true); + } LOG.debug("Finished snapshotting, commencing flushing stores"); @@ -995,7 +998,10 @@ // We used to only catch IOEs but its possible that we'd get other // exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch // all and sundry. - if (wal != null) wal.abortCacheFlush(); + if (wal != null) { + wal.abortCacheFlush(); + wal.setFlushInProgress(false); + } DroppedSnapshotException dse = new DroppedSnapshotException("region: " + Bytes.toStringBinary(getRegionName())); dse.initCause(t); @@ -1013,6 +1019,7 @@ wal.completeCacheFlush(this.regionInfo.getEncodedNameAsBytes(), regionInfo.getTableDesc().getName(), completeSequenceId, this.getRegionInfo().isMetaRegion()); + wal.setFlushInProgress(false); } // C. Finally notify anyone waiting on memstore to clear: Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (revision 1149449) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (working copy) @@ -39,6 +39,7 @@ import java.util.TreeSet; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; @@ -130,8 +131,8 @@ private final String prefix; private final Path oldLogDir; private boolean logRollRequested; + private AtomicBoolean isFlushInProgress = new AtomicBoolean(false); - private static Class logWriterClass; private static Class logReaderClass; @@ -181,6 +182,9 @@ */ private final ConcurrentSkipListMap lastSeqWritten = new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); + + private final ConcurrentSkipListMap seqWrittenWhileFlush = + new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); private volatile boolean closed = false; @@ -866,8 +870,13 @@ // memstore). When the cache is flushed, the entry for the // region being flushed is removed if the sequence number of the flush // is greater than or equal to the value in lastSeqWritten. - this.lastSeqWritten.putIfAbsent(regionInfo.getEncodedNameAsBytes(), - Long.valueOf(seqNum)); + if (isFlushInProgress.get()) { + this.seqWrittenWhileFlush.putIfAbsent(regionInfo + .getEncodedNameAsBytes(), seqNum); + } else { + this.lastSeqWritten.putIfAbsent(regionInfo.getEncodedNameAsBytes(), + Long.valueOf(seqNum)); + } doWrite(regionInfo, logKey, logEdit); this.numEntries.incrementAndGet(); } @@ -920,8 +929,12 @@ // is greater than or equal to the value in lastSeqWritten. // Use encoded name. Its shorter, guaranteed unique and a subset of // actual name. - byte [] hriKey = info.getEncodedNameAsBytes(); - this.lastSeqWritten.putIfAbsent(hriKey, seqNum); + byte[] hriKey = info.getEncodedNameAsBytes(); + if (isFlushInProgress.get()) { + this.seqWrittenWhileFlush.putIfAbsent(hriKey, seqNum); + } else { + this.lastSeqWritten.putIfAbsent(hriKey, seqNum); + } HLogKey logKey = makeKey(hriKey, tableName, seqNum, now); doWrite(info, logKey, edits); this.numEntries.incrementAndGet(); @@ -1153,6 +1166,11 @@ Long seq = this.lastSeqWritten.get(encodedRegionName); if (seq != null && logSeqId >= seq.longValue()) { this.lastSeqWritten.remove(encodedRegionName); + Long seqWhileFlush = this.seqWrittenWhileFlush.get(encodedRegionName); + if (null != seqWhileFlush) { + this.lastSeqWritten.putIfAbsent(encodedRegionName, seqWhileFlush); + this.seqWrittenWhileFlush.remove(encodedRegionName); + } } } // sync txn to file system @@ -1471,4 +1489,8 @@ System.exit(-1); } } + + public void setFlushInProgress(boolean isFlushInProgress) { + this.isFlushInProgress.set(isFlushInProgress); + } }