### Eclipse Workspace Patch 1.0 #P apache-trunk Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (revision 1438356) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (working copy) @@ -725,7 +725,7 @@ * @return Path The path name of the tmp file to which the store was flushed * @throws IOException */ - private Path flushCache(final long logCacheFlushId, + protected Path flushCache(final long logCacheFlushId, SortedSet snapshot, TimeRangeTracker snapshotTimeRangeTracker, AtomicLong flushedSize, Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (revision 1438356) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (working copy) @@ -20,12 +20,16 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.List; +import java.util.SortedSet; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,7 +37,18 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; @@ -47,8 +62,10 @@ import org.apache.hadoop.hbase.regionserver.FlushRequester; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.Store; -import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdge; @@ -529,6 +546,118 @@ } /** + * Test that we could recover the data correctly after aborting flush. In the + * test, first we abort flush after writing some data, then writing more data + * and flush again, at last verify the data. + * @throws IOException + */ + @Test + public void testReplayEditsAfterAbortingFlush() throws IOException { + final String tableNameStr = "testReplayEditsAfterAbortingFlush"; + final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableNameStr); + final Path basedir = new Path(this.hbaseRootDir, tableNameStr); + deleteDir(basedir); + final HTableDescriptor htd = createBasic3FamilyHTD(tableNameStr); + HRegion region3 = HRegion.createHRegion(hri, hbaseRootDir, this.conf, htd); + region3.close(); + region3.getLog().closeAndDelete(); + // Write countPerFamily edits into the three families. Do a flush on one + // of the families during the load of edits so its seqid is not same as + // others to test we do right thing when different seqids. + HLog wal = createWAL(this.conf); + final AtomicBoolean exceptionThrownWhenFlushing = new AtomicBoolean(false); + HRegion region = new HRegion(basedir, wal, this.fs, this.conf, hri, htd, + null) { + @Override + protected HStore instantiateHStore(Path tableDir, HColumnDescriptor c) + throws IOException { + return new HStore(tableDir, this, c, fs, conf) { + @Override + protected Path flushCache(final long logCacheFlushId, + SortedSet snapshot, + TimeRangeTracker snapshotTimeRangeTracker, + AtomicLong flushedSize, MonitoredTask status) throws IOException { + if (exceptionThrownWhenFlushing.get()) { + throw new IOException("Simulated exception by tests"); + } + return super.flushCache(logCacheFlushId, snapshot, + snapshotTimeRangeTracker, flushedSize, status); + } + }; + } + }; + long seqid = region.initialize(); + // HRegionServer usually does this. It knows the largest seqid across all + // regions. + wal.setSequenceNumber(seqid); + + int writtenRowCount = 10; + List familys = new ArrayList( + htd.getFamilies()); + for (int i = 0; i < writtenRowCount; i++) { + Put put = new Put(Bytes.toBytes(tableNameStr + Integer.toString(i))); + put.add(familys.get(i % familys.size()).getName(), Bytes.toBytes("q"), + Bytes.toBytes("val")); + region.put(put); + } + + // Now assert edits made it in. + RegionScanner scanner = region.getScanner(new Scan()); + assertEquals(writtenRowCount, getScannedCount(scanner)); + + // Let us flush the region + exceptionThrownWhenFlushing.set(true); + try { + region.flushcache(); + fail("Injected exception hasn't been thrown"); + } catch (Throwable t) { + LOG.info("Expected simulated exception when flushing region," + + t.getMessage()); + } + // writing more data + int moreRow = 10; + for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) { + Put put = new Put(Bytes.toBytes(tableNameStr + Integer.toString(i))); + put.add(familys.get(i % familys.size()).getName(), Bytes.toBytes("q"), + Bytes.toBytes("val")); + region.put(put); + } + writtenRowCount += moreRow; + // call flush again + exceptionThrownWhenFlushing.set(false); + region.flushcache(); + region.close(true); + wal.close(); + + // Let us try to split and recover + runWALSplit(this.conf); + HLog wal2 = createWAL(this.conf); + HRegion region2 = new HRegion(basedir, wal2, this.fs, this.conf, hri, htd, + null); + long seqid2 = region2.initialize(); + // HRegionServer usually does this. It knows the largest seqid across all + // regions. + wal2.setSequenceNumber(seqid2); + + scanner = region2.getScanner(new Scan()); + assertEquals(writtenRowCount, getScannedCount(scanner)); + } + + private int getScannedCount(RegionScanner scanner) throws IOException { + int scannedCount = 0; + List results = new ArrayList(); + while (true) { + boolean existMore = scanner.next(results); + if (!results.isEmpty()) + scannedCount++; + if (!existMore) + break; + results.clear(); + } + return scannedCount; + } + + /** * Create an HRegion with the result of a HLog split and test we only see the * good edits * @throws Exception Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (revision 1438356) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (working copy) @@ -142,25 +142,36 @@ void snapshot() { this.lock.writeLock().lock(); try { + boolean resetKVSet = false; // If snapshot currently has entries, then flusher failed or didn't call // cleanup. Log a warning. if (!this.snapshot.isEmpty()) { - LOG.warn("Snapshot called again without clearing previous. " + - "Doing nothing. Another ongoing flush or did we fail last attempt?"); + LOG.warn("Snapshot called again without clearing previous. " + + "Another ongoing flush or did we fail last attempt?" + + "Copying keyvalus from kvset to snapshot."); + if (!this.kvset.isEmpty()) { + for (KeyValue kv : this.kvset) { + this.snapshot.add(kv); + this.snapshotTimeRangeTracker.includeTimestamp(kv); + } + resetKVSet = true; + } } else { if (!this.kvset.isEmpty()) { this.snapshot = this.kvset; - this.kvset = new KeyValueSkipListSet(this.comparator); this.snapshotTimeRangeTracker = this.timeRangeTracker; - this.timeRangeTracker = new TimeRangeTracker(); - // Reset heap to not include any keys - this.size.set(DEEP_OVERHEAD); - // Reset allocator so we get a fresh buffer for the new memstore - if (allocator != null) { - this.allocator = new MemStoreLAB(conf); - } } } + if (resetKVSet) { + this.kvset = new KeyValueSkipListSet(this.comparator); + this.timeRangeTracker = new TimeRangeTracker(); + // Reset heap to not include any keys + this.size.set(DEEP_OVERHEAD); + // Reset allocator so we get a fresh buffer for the new memstore + if (allocator != null) { + this.allocator = new MemStoreLAB(conf); + } + } } finally { this.lock.writeLock().unlock(); } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (revision 1438356) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (working copy) @@ -450,7 +450,7 @@ Bytes.toBytes(oldValue))); // snapshot the store. - this.store.snapshot(); + StoreFlusher storeFlusher = prepareFlush(store, id++); // add other things: this.store.add(new KeyValue(row, family, qf2, @@ -464,7 +464,7 @@ assertTrue(ret > 0); // then flush. - flushStore(store, id++); + flushStoreAfterPrepare(storeFlusher); assertEquals(1, this.store.getStorefiles().size()); // from the one we inserted up there, and a new one assertEquals(2, this.store.memstore.kvset.size()); @@ -561,7 +561,7 @@ Bytes.toBytes(oldValue))); // snapshot the store. - this.store.snapshot(); + StoreFlusher storeFlusher = prepareFlush(store, id++); // update during the snapshot, the exact same TS as the Put (lololol) long ret = this.store.updateColumnValue(row, family, qf1, newValue); @@ -570,7 +570,7 @@ assertTrue(ret > 0); // then flush. - flushStore(store, id++); + flushStoreAfterPrepare(storeFlusher); assertEquals(1, this.store.getStorefiles().size()); assertEquals(1, this.store.memstore.kvset.size()); @@ -720,7 +720,18 @@ } } + private static StoreFlusher prepareFlush(HStore store, long id) + throws IOException { + StoreFlusher storeFlusher = store.getStoreFlusher(id); + storeFlusher.prepare(); + return storeFlusher; + } + private static void flushStoreAfterPrepare(StoreFlusher storeFlusher) + throws IOException { + storeFlusher.flushCache(Mockito.mock(MonitoredTask.class)); + storeFlusher.commit(Mockito.mock(MonitoredTask.class)); + } private static void flushStore(HStore store, long id) throws IOException { StoreFlusher storeFlusher = store.getStoreFlusher(id);