Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (revision 1150558) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (working copy) @@ -39,6 +39,7 @@ import java.util.TreeSet; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; @@ -126,6 +127,7 @@ private final String prefix; private final Path oldLogDir; private boolean logRollRequested; + private AtomicBoolean isFlushInProgress = new AtomicBoolean(false); private static Class logWriterClass; @@ -176,6 +178,9 @@ */ private final ConcurrentSkipListMap lastSeqWritten = new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); + + private final ConcurrentSkipListMap seqWrittenWhileFlush = + new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); private volatile boolean closed = false; @@ -862,8 +867,7 @@ // memstore). When the cache is flushed, the entry for the // region being flushed is removed if the sequence number of the flush // is greater than or equal to the value in lastSeqWritten. - this.lastSeqWritten.putIfAbsent(regionInfo.getEncodedNameAsBytes(), - Long.valueOf(seqNum)); + addToLastSequenceWritten(regionInfo.getEncodedNameAsBytes(), seqNum); doWrite(regionInfo, logKey, logEdit, htd); this.numEntries.incrementAndGet(); } @@ -917,7 +921,7 @@ // Use encoded name. Its shorter, guaranteed unique and a subset of // actual name. byte [] hriKey = info.getEncodedNameAsBytes(); - this.lastSeqWritten.putIfAbsent(hriKey, seqNum); + addToLastSequenceWritten(hriKey, seqNum); HLogKey logKey = makeKey(hriKey, tableName, seqNum, now); doWrite(info, logKey, edits, htd); this.numEntries.incrementAndGet(); @@ -931,6 +935,14 @@ } } + private void addToLastSequenceWritten(byte[] hriKey, long seqNum) { + if (isFlushInProgress.get()) { + this.seqWrittenWhileFlush.putIfAbsent(hriKey, Long.valueOf(seqNum)); + } else { + this.lastSeqWritten.putIfAbsent(hriKey, Long.valueOf(seqNum)); + } + } + /** * This thread is responsible to call syncFs and buffer up the writers while * it happens. @@ -1131,6 +1143,7 @@ */ public long startCacheFlush() { this.cacheFlushLock.lock(); + this.isFlushInProgress.set(true); return obtainSeqNum(); } @@ -1163,12 +1176,18 @@ Long seq = this.lastSeqWritten.get(encodedRegionName); if (seq != null && logSeqId >= seq.longValue()) { this.lastSeqWritten.remove(encodedRegionName); + Long seqWhileFlush = this.seqWrittenWhileFlush.get(encodedRegionName); + if (null != seqWhileFlush) { + this.lastSeqWritten.putIfAbsent(encodedRegionName, seqWhileFlush); + this.seqWrittenWhileFlush.remove(encodedRegionName); + } } } // sync txn to file system this.sync(); } finally { + this.isFlushInProgress.set(false); this.cacheFlushLock.unlock(); } } @@ -1188,6 +1207,7 @@ * by the failure gets restored to the memstore. */ public void abortCacheFlush() { + this.isFlushInProgress.set(false); this.cacheFlushLock.unlock(); } @@ -1463,4 +1483,5 @@ System.exit(-1); } } + }