Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (revision 1150283) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (working copy) @@ -39,6 +39,7 @@ import java.util.TreeSet; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; @@ -130,8 +131,8 @@ private final String prefix; private final Path oldLogDir; private boolean logRollRequested; + private AtomicBoolean isFlushInProgress = new AtomicBoolean(false); - private static Class logWriterClass; private static Class logReaderClass; @@ -181,6 +182,9 @@ */ private final ConcurrentSkipListMap lastSeqWritten = new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); + + private final ConcurrentSkipListMap seqWrittenWhileFlush = + new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); private volatile boolean closed = false; @@ -866,8 +870,7 @@ // memstore). When the cache is flushed, the entry for the // region being flushed is removed if the sequence number of the flush // is greater than or equal to the value in lastSeqWritten. - this.lastSeqWritten.putIfAbsent(regionInfo.getEncodedNameAsBytes(), - Long.valueOf(seqNum)); + addToLastSequenceWritten(regionInfo.getEncodedNameAsBytes(), seqNum); doWrite(regionInfo, logKey, logEdit); this.numEntries.incrementAndGet(); } @@ -921,7 +924,7 @@ // Use encoded name. Its shorter, guaranteed unique and a subset of // actual name. byte [] hriKey = info.getEncodedNameAsBytes(); - this.lastSeqWritten.putIfAbsent(hriKey, seqNum); + addToLastSequenceWritten(hriKey, seqNum); HLogKey logKey = makeKey(hriKey, tableName, seqNum, now); doWrite(info, logKey, edits); this.numEntries.incrementAndGet(); @@ -935,6 +938,14 @@ } } + private void addToLastSequenceWritten(byte[] hriKey, long seqNum) { + if (isFlushInProgress.get()) { + this.seqWrittenWhileFlush.putIfAbsent(hriKey, Long.valueOf(seqNum)); + } else { + this.lastSeqWritten.putIfAbsent(hriKey, Long.valueOf(seqNum)); + } + } + /** * This thread is responsible to call syncFs and buffer up the writers while * it happens. @@ -1121,6 +1132,7 @@ */ public long startCacheFlush() { this.cacheFlushLock.lock(); + this.isFlushInProgress.set(true); return obtainSeqNum(); } @@ -1152,13 +1164,20 @@ this.numEntries.incrementAndGet(); Long seq = this.lastSeqWritten.get(encodedRegionName); if (seq != null && logSeqId >= seq.longValue()) { - this.lastSeqWritten.remove(encodedRegionName); + Long seqWhileFlush = this.seqWrittenWhileFlush + .remove(encodedRegionName); + if (null != seqWhileFlush) { + this.lastSeqWritten.putIfAbsent(encodedRegionName, seqWhileFlush); + } else { + this.lastSeqWritten.remove(encodedRegionName); + } } } // sync txn to file system this.sync(); } finally { + this.isFlushInProgress.set(false); this.cacheFlushLock.unlock(); } } @@ -1178,6 +1197,8 @@ * by the failure gets restored to the memstore. */ public void abortCacheFlush() { + this.seqWrittenWhileFlush.clear(); + this.isFlushInProgress.set(false); this.cacheFlushLock.unlock(); } @@ -1471,4 +1492,5 @@ System.exit(-1); } } + }