Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1149128) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -1108,61 +1108,73 @@ } finally { this.updatesLock.writeLock().unlock(); } - status.setStatus("Flushing stores"); - - LOG.debug("Finished snapshotting, commencing flushing stores"); - - // Any failure from here on out will be catastrophic requiring server - // restart so hlog content can be replayed and put back into the memstore. - // Otherwise, the snapshot content while backed up in the hlog, it will not - // be part of the current running servers state. boolean compactionRequested = false; try { - // A. Flush memstore to all the HStores. - // Keep running vector of all store files that includes both old and the - // just-made new flush store file. + if (null != wal) { + wal.setFlushInProgress(true); + } + status.setStatus("Flushing stores"); - for (StoreFlusher flusher : storeFlushers) { - flusher.flushCache(status); - } - // Switch snapshot (in memstore) -> new hfile (thus causing - // all the store scanners to reset/reseek). - for (StoreFlusher flusher : storeFlushers) { - boolean needsCompaction = flusher.commit(); - if (needsCompaction) { - compactionRequested = true; + LOG.debug("Finished snapshotting, commencing flushing stores"); + + // Any failure from here on out will be catastrophic requiring server + // restart so hlog content can be replayed and put back into the memstore. + // Otherwise, the snapshot content while backed up in the hlog, it will + // not + // be part of the current running servers state. + try { + // A. Flush memstore to all the HStores. + // Keep running vector of all store files that includes both old and the + // just-made new flush store file. + + for (StoreFlusher flusher : storeFlushers) { + flusher.flushCache(status); } + // Switch snapshot (in memstore) -> new hfile (thus causing + // all the store scanners to reset/reseek). + for (StoreFlusher flusher : storeFlushers) { + boolean needsCompaction = flusher.commit(); + if (needsCompaction) { + compactionRequested = true; + } + } + storeFlushers.clear(); + + // Set down the memstore size by amount of flush. + this.addAndGetGlobalMemstoreSize(-currentMemStoreSize); + } catch (Throwable t) { + // An exception here means that the snapshot was not persisted. + // The hlog needs to be replayed so its content is restored to memstore. + // Currently, only a server restart will do this. + // We used to only catch IOEs but its possible that we'd get other + // exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch + // all and sundry. + if (wal != null) { + wal.abortCacheFlush(); + } + DroppedSnapshotException dse = new DroppedSnapshotException("region: " + + Bytes.toStringBinary(getRegionName())); + dse.initCause(t); + status.abort("Flush failed: " + StringUtils.stringifyException(t)); + throw dse; } - storeFlushers.clear(); - // Set down the memstore size by amount of flush. - this.addAndGetGlobalMemstoreSize(-currentMemStoreSize); - } catch (Throwable t) { - // An exception here means that the snapshot was not persisted. - // The hlog needs to be replayed so its content is restored to memstore. - // Currently, only a server restart will do this. - // We used to only catch IOEs but its possible that we'd get other - // exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch - // all and sundry. - if (wal != null) wal.abortCacheFlush(); - DroppedSnapshotException dse = new DroppedSnapshotException("region: " + - Bytes.toStringBinary(getRegionName())); - dse.initCause(t); - status.abort("Flush failed: " + StringUtils.stringifyException(t)); - throw dse; - } + // If we get to here, the HStores have been written. If we get an + // error in completeCacheFlush it will release the lock it is holding - // If we get to here, the HStores have been written. If we get an - // error in completeCacheFlush it will release the lock it is holding - - // B. Write a FLUSHCACHE-COMPLETE message to the log. - // This tells future readers that the HStores were emitted correctly, - // and that all updates to the log for this regionName that have lower - // log-sequence-ids can be safely ignored. - if (wal != null) { - wal.completeCacheFlush(this.regionInfo.getEncodedNameAsBytes(), - regionInfo.getTableName(), completeSequenceId, - this.getRegionInfo().isMetaRegion()); + // B. Write a FLUSHCACHE-COMPLETE message to the log. + // This tells future readers that the HStores were emitted correctly, + // and that all updates to the log for this regionName that have lower + // log-sequence-ids can be safely ignored. + if (wal != null) { + wal.completeCacheFlush(this.regionInfo.getEncodedNameAsBytes(), + regionInfo.getTableName(), completeSequenceId, this.getRegionInfo() + .isMetaRegion()); + } + } finally { + if (wal != null) { + wal.setFlushInProgress(false); + } } // C. Finally notify anyone waiting on memstore to clear: Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (revision 1149128) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (working copy) @@ -39,6 +39,7 @@ import java.util.TreeSet; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; @@ -126,6 +127,7 @@ private final String prefix; private final Path oldLogDir; private boolean logRollRequested; + private AtomicBoolean isFlushInProgress = new AtomicBoolean(false); private static Class logWriterClass; @@ -176,6 +178,9 @@ */ private final ConcurrentSkipListMap lastSeqWritten = new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); + + private final ConcurrentSkipListMap seqWrittenWhileFlush = + new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); private volatile boolean closed = false; @@ -862,8 +867,7 @@ // memstore). When the cache is flushed, the entry for the // region being flushed is removed if the sequence number of the flush // is greater than or equal to the value in lastSeqWritten. - this.lastSeqWritten.putIfAbsent(regionInfo.getEncodedNameAsBytes(), - Long.valueOf(seqNum)); + addToLastSequenceWritten(regionInfo.getEncodedNameAsBytes(), seqNum); doWrite(regionInfo, logKey, logEdit, htd); this.numEntries.incrementAndGet(); } @@ -917,7 +921,7 @@ // Use encoded name. Its shorter, guaranteed unique and a subset of // actual name. byte [] hriKey = info.getEncodedNameAsBytes(); - this.lastSeqWritten.putIfAbsent(hriKey, seqNum); + addToLastSequenceWritten(hriKey, seqNum); HLogKey logKey = makeKey(hriKey, tableName, seqNum, now); doWrite(info, logKey, edits, htd); this.numEntries.incrementAndGet(); @@ -931,6 +935,14 @@ } } + private void addToLastSequenceWritten(byte[] hriKey, long seqNum) { + if (isFlushInProgress.get()) { + this.seqWrittenWhileFlush.putIfAbsent(hriKey, Long.valueOf(seqNum)); + } else { + this.lastSeqWritten.putIfAbsent(hriKey, Long.valueOf(seqNum)); + } + } + /** * This thread is responsible to call syncFs and buffer up the writers while * it happens. @@ -1163,6 +1175,11 @@ Long seq = this.lastSeqWritten.get(encodedRegionName); if (seq != null && logSeqId >= seq.longValue()) { this.lastSeqWritten.remove(encodedRegionName); + Long seqWhileFlush = this.seqWrittenWhileFlush.get(encodedRegionName); + if (null != seqWhileFlush) { + this.lastSeqWritten.putIfAbsent(encodedRegionName, seqWhileFlush); + this.seqWrittenWhileFlush.remove(encodedRegionName); + } } } // sync txn to file system @@ -1463,4 +1480,8 @@ System.exit(-1); } } + + public void setFlushInProgress(boolean isFlushInProgress) { + this.isFlushInProgress.set(isFlushInProgress); + } }