Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1161363) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -946,7 +946,8 @@ final long currentMemStoreSize = this.memstoreSize.get(); List storeFlushers = new ArrayList(stores.size()); try { - sequenceId = (wal == null)? myseqid: wal.startCacheFlush(); + sequenceId = (wal == null)? myseqid : + wal.startCacheFlush(this.regionInfo.getEncodedNameAsBytes()); completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId); for (Store s : stores.values()) { @@ -995,7 +996,9 @@ // We used to only catch IOEs but its possible that we'd get other // exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch // all and sundry. - if (wal != null) wal.abortCacheFlush(); + if (wal != null){ + wal.abortCacheFlush(this.regionInfo.getEncodedNameAsBytes()); + } DroppedSnapshotException dse = new DroppedSnapshotException("region: " + Bytes.toStringBinary(getRegionName())); dse.initCause(t); Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (revision 1161363) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (working copy) @@ -1147,6 +1147,18 @@ return outputfiles.size(); } + private byte[] getSnapshotName(byte[] encodedRegionName) { + byte snp[] = new byte[encodedRegionName.length + 3]; + // an encoded region name has only hex digits. s, n or p are not hex + // and therefore snapshot-names will never collide with + // encoded-region-names + snp[0] = 's'; snp[1] = 'n'; snp[2] = 'p'; + for (int i = 0; i < encodedRegionName.length; i++) { + snp[i+3] = encodedRegionName[i]; + } + return snp; + } + /** * By acquiring a log sequence ID, we can allow log messages to continue while * we flush the cache. @@ -1155,13 +1167,45 @@ * completion of a cache-flush. Otherwise the log-seq-id for the flush will * not appear in the correct logfile. * - * @return sequence ID to pass {@link #completeCacheFlush(byte[], byte[], long, boolean)} - * (byte[], byte[], long)} + * Ensuring that flushes and log-rolls don't happen concurrently also allows + * us to temporarily put a log-seq-number in lastSeqWritten against the region + * being flushed that might not be the earliest in-memory log-seq-number for + * that region. By the time the flush is completed or aborted and before the + * cacheFlushLock is released it is ensured that lastSeqWritten again has the + * oldest in-memory edit's lsn for the region that was being flushed. + * + * In this method, by removing the entry in lastSeqWritten for the region + * being flushed we ensure that the next edit inserted in this region will be + * correctly recorded in {@link #append(HRegionInfo, HLogKey, WALEdit)}. The + * lsn of the earliest in-memory lsn - which is now in the memstore snapshot - + * is saved temporarily in the lastSeqWritten map while the flush is active. + * + * @return sequence ID to pass + * {@link #completeCacheFlush(byte[], byte[], long, boolean)} (byte[], + * byte[], long)} * @see #completeCacheFlush(byte[], byte[], long, boolean) * @see #abortCacheFlush() */ - public long startCacheFlush() { + public long startCacheFlush(final byte[] encodedRegionName) { this.cacheFlushLock.lock(); + Long seq = this.lastSeqWritten.remove(encodedRegionName); + // seq is the lsn of the oldest edit associated with this region. If a + // snapshot already exists - because the last flush failed - then seq will + // be the lsn of the oldest edit in the snapshot + if (seq != null) { + // keeping the earliest sequence number of the snapshot in + // lastSeqWritten maintains the correctness of + // getOldestOutstandingSeqNum(). But it doesn't matter really because + // everything is being done inside of cacheFlush lock. + Long oldseq = + lastSeqWritten.put(getSnapshotName(encodedRegionName), seq); + if (oldseq != null) { + LOG.error("Logic Error Snapshot seq id from earlier flush still" + + " present! for region " + Bytes.toString(encodedRegionName) + + " overwritten oldseq=" + oldseq + "with new seq=" + seq); + Runtime.getRuntime().halt(1); + } + } return obtainSeqNum(); } @@ -1191,15 +1235,15 @@ writeTime += System.currentTimeMillis() - now; writeOps++; this.numEntries.incrementAndGet(); - Long seq = this.lastSeqWritten.get(encodedRegionName); - if (seq != null && logSeqId >= seq.longValue()) { - this.lastSeqWritten.remove(encodedRegionName); - } } // sync txn to file system this.sync(); } finally { + // updateLock not needed for removing snapshot's entry + // Cleaning up of lastSeqWritten is in the finally clause because we + // don't want to confuse getOldestOutstandingSeqNum() + this.lastSeqWritten.remove(getSnapshotName(encodedRegionName)); this.cacheFlushLock.unlock(); } } @@ -1218,7 +1262,25 @@ * currently is a restart of the regionserver so the snapshot content dropped * by the failure gets restored to the memstore. */ - public void abortCacheFlush() { + public void abortCacheFlush(byte[] encodedRegionName) { + Long snapshot_seq = + this.lastSeqWritten.remove(getSnapshotName(encodedRegionName)); + if (snapshot_seq != null) { + // updateLock not necessary because we are racing against + // lastSeqWritten.putIfAbsent() in append() and we will always win + // before releasing cacheFlushLock make sure that the region's entry in + // lastSeqWritten points to the earliest edit in the region + Long current_memstore_earliest_seq = + this.lastSeqWritten.put(encodedRegionName, snapshot_seq); + if (current_memstore_earliest_seq != null && + (current_memstore_earliest_seq.longValue() <= + snapshot_seq.longValue())) { + LOG.error("Logic Error region " + Bytes.toString(encodedRegionName) + + "acquired edits out of order current memstore seq=" + + current_memstore_earliest_seq + " snapshot seq=" + snapshot_seq); + Runtime.getRuntime().halt(1); + } + } this.cacheFlushLock.unlock(); } Index: src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (revision 1161363) +++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (working copy) @@ -457,7 +457,7 @@ HRegionInfo info = new HRegionInfo(new HTableDescriptor(tableName), row,Bytes.toBytes(Bytes.toString(row) + "1"), false); log.append(info, tableName, cols, System.currentTimeMillis()); - long logSeqId = log.startCacheFlush(); + long logSeqId = log.startCacheFlush(info.getEncodedNameAsBytes()); log.completeCacheFlush(info.getEncodedNameAsBytes(), tableName, logSeqId, info.isMetaRegion()); log.close(); Path filename = log.computeFilename(); @@ -525,7 +525,7 @@ HRegionInfo hri = new HRegionInfo(new HTableDescriptor(tableName), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); log.append(hri, tableName, cols, System.currentTimeMillis()); - long logSeqId = log.startCacheFlush(); + long logSeqId = log.startCacheFlush(hri.getEncodedNameAsBytes()); log.completeCacheFlush(hri.getEncodedNameAsBytes(), tableName, logSeqId, false); log.close(); Path filename = log.computeFilename(); @@ -633,7 +633,7 @@ // Flush the first region, we expect to see the first two files getting // archived - long seqId = log.startCacheFlush(); + long seqId = log.startCacheFlush(hri.getEncodedNameAsBytes()); log.completeCacheFlush(hri.getEncodedNameAsBytes(), tableName, seqId, false); log.rollWriter(); assertEquals(2, log.getNumLogFiles()); @@ -641,7 +641,7 @@ // Flush the second region, which removes all the remaining output files // since the oldest was completely flushed and the two others only contain // flush information - seqId = log.startCacheFlush(); + seqId = log.startCacheFlush(hri2.getEncodedNameAsBytes()); log.completeCacheFlush(hri2.getEncodedNameAsBytes(), tableName2, seqId, false); log.rollWriter(); assertEquals(0, log.getNumLogFiles()); Index: src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (revision 1161363) +++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (working copy) @@ -352,7 +352,7 @@ } // Add a cache flush, shouldn't have any effect - long logSeqId = wal.startCacheFlush(); + long logSeqId = wal.startCacheFlush(regionName); wal.completeCacheFlush(regionName, tableName, logSeqId, hri.isMetaRegion()); // Add an edit to another family, should be skipped.