Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1149129) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -960,61 +960,72 @@ } finally { this.updatesLock.writeLock().unlock(); } - - LOG.debug("Finished snapshotting, commencing flushing stores"); - - // Any failure from here on out will be catastrophic requiring server - // restart so hlog content can be replayed and put back into the memstore. - // Otherwise, the snapshot content while backed up in the hlog, it will not - // be part of the current running servers state. boolean compactionRequested = false; try { - // A. Flush memstore to all the HStores. - // Keep running vector of all store files that includes both old and the - // just-made new flush store file. + if (null != wal) { + wal.setFlushInProgress(true); + } - for (StoreFlusher flusher : storeFlushers) { - flusher.flushCache(); - } - // Switch snapshot (in memstore) -> new hfile (thus causing - // all the store scanners to reset/reseek). - for (StoreFlusher flusher : storeFlushers) { - boolean needsCompaction = flusher.commit(); - if (needsCompaction) { - compactionRequested = true; + LOG.debug("Finished snapshotting, commencing flushing stores"); + + // Any failure from here on out will be catastrophic requiring server + // restart so hlog content can be replayed and put back into the memstore. + // Otherwise, the snapshot content while backed up in the hlog, it will + // not + // be part of the current running servers state. + try { + // A. Flush memstore to all the HStores. + // Keep running vector of all store files that includes both old and the + // just-made new flush store file. + + for (StoreFlusher flusher : storeFlushers) { + flusher.flushCache(); } + // Switch snapshot (in memstore) -> new hfile (thus causing + // all the store scanners to reset/reseek). + for (StoreFlusher flusher : storeFlushers) { + boolean needsCompaction = flusher.commit(); + if (needsCompaction) { + compactionRequested = true; + } + } + storeFlushers.clear(); + + // Set down the memstore size by amount of flush. + this.memstoreSize.addAndGet(-currentMemStoreSize); + } catch (Throwable t) { + // An exception here means that the snapshot was not persisted. + // The hlog needs to be replayed so its content is restored to memstore. + // Currently, only a server restart will do this. + // We used to only catch IOEs but its possible that we'd get other + // exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch + // all and sundry. + if (wal != null) { + wal.abortCacheFlush(); + } + DroppedSnapshotException dse = new DroppedSnapshotException("region: " + + Bytes.toStringBinary(getRegionName())); + dse.initCause(t); + throw dse; } - storeFlushers.clear(); - // Set down the memstore size by amount of flush. - this.memstoreSize.addAndGet(-currentMemStoreSize); - } catch (Throwable t) { - // An exception here means that the snapshot was not persisted. - // The hlog needs to be replayed so its content is restored to memstore. - // Currently, only a server restart will do this. - // We used to only catch IOEs but its possible that we'd get other - // exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch - // all and sundry. - if (wal != null) wal.abortCacheFlush(); - DroppedSnapshotException dse = new DroppedSnapshotException("region: " + - Bytes.toStringBinary(getRegionName())); - dse.initCause(t); - throw dse; - } + // If we get to here, the HStores have been written. If we get an + // error in completeCacheFlush it will release the lock it is holding - // If we get to here, the HStores have been written. If we get an - // error in completeCacheFlush it will release the lock it is holding - - // B. Write a FLUSHCACHE-COMPLETE message to the log. - // This tells future readers that the HStores were emitted correctly, - // and that all updates to the log for this regionName that have lower - // log-sequence-ids can be safely ignored. - if (wal != null) { - wal.completeCacheFlush(this.regionInfo.getEncodedNameAsBytes(), - regionInfo.getTableDesc().getName(), completeSequenceId, - this.getRegionInfo().isMetaRegion()); + // B. Write a FLUSHCACHE-COMPLETE message to the log. + // This tells future readers that the HStores were emitted correctly, + // and that all updates to the log for this regionName that have lower + // log-sequence-ids can be safely ignored. + if (wal != null) { + wal.completeCacheFlush(this.regionInfo.getEncodedNameAsBytes(), + regionInfo.getTableDesc().getName(), completeSequenceId, this + .getRegionInfo().isMetaRegion()); + } + } finally { + if (wal != null) { + wal.setFlushInProgress(false); + } } - // C. Finally notify anyone waiting on memstore to clear: // e.g. checkResources(). synchronized (this) { Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (revision 1149449) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (working copy) @@ -39,6 +39,7 @@ import java.util.TreeSet; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; @@ -130,8 +131,8 @@ private final String prefix; private final Path oldLogDir; private boolean logRollRequested; + private AtomicBoolean isFlushInProgress = new AtomicBoolean(false); - private static Class logWriterClass; private static Class logReaderClass; @@ -181,6 +182,9 @@ */ private final ConcurrentSkipListMap lastSeqWritten = new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); + + private final ConcurrentSkipListMap seqWrittenWhileFlush = + new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); private volatile boolean closed = false; @@ -866,8 +870,7 @@ // memstore). When the cache is flushed, the entry for the // region being flushed is removed if the sequence number of the flush // is greater than or equal to the value in lastSeqWritten. - this.lastSeqWritten.putIfAbsent(regionInfo.getEncodedNameAsBytes(), - Long.valueOf(seqNum)); + addToLastSequenceWritten(regionInfo.getEncodedNameAsBytes(), seqNum); doWrite(regionInfo, logKey, logEdit); this.numEntries.incrementAndGet(); } @@ -921,7 +924,7 @@ // Use encoded name. Its shorter, guaranteed unique and a subset of // actual name. byte [] hriKey = info.getEncodedNameAsBytes(); - this.lastSeqWritten.putIfAbsent(hriKey, seqNum); + addToLastSequenceWritten(hriKey, seqNum); HLogKey logKey = makeKey(hriKey, tableName, seqNum, now); doWrite(info, logKey, edits); this.numEntries.incrementAndGet(); @@ -935,6 +938,14 @@ } } + private void addToLastSequenceWritten(byte[] hriKey, long seqNum) { + if (isFlushInProgress.get()) { + this.seqWrittenWhileFlush.putIfAbsent(hriKey, Long.valueOf(seqNum)); + } else { + this.lastSeqWritten.putIfAbsent(hriKey, Long.valueOf(seqNum)); + } + } + /** * This thread is responsible to call syncFs and buffer up the writers while * it happens. @@ -1153,6 +1164,11 @@ Long seq = this.lastSeqWritten.get(encodedRegionName); if (seq != null && logSeqId >= seq.longValue()) { this.lastSeqWritten.remove(encodedRegionName); + Long seqWhileFlush = this.seqWrittenWhileFlush.get(encodedRegionName); + if (null != seqWhileFlush) { + this.lastSeqWritten.putIfAbsent(encodedRegionName, seqWhileFlush); + this.seqWrittenWhileFlush.remove(encodedRegionName); + } } } // sync txn to file system @@ -1471,4 +1487,8 @@ System.exit(-1); } } + + public void setFlushInProgress(boolean isFlushInProgress) { + this.isFlushInProgress.set(isFlushInProgress); + } }