### Eclipse Workspace Patch 1.0 #P apache-trunk Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (revision 1440984) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (working copy) @@ -737,7 +737,7 @@ * @return Path The path name of the tmp file to which the store was flushed * @throws IOException */ - private Path flushCache(final long logCacheFlushId, + protected Path flushCache(final long logCacheFlushId, SortedSet snapshot, TimeRangeTracker snapshotTimeRangeTracker, AtomicLong flushedSize, Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (revision 1440984) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (working copy) @@ -20,12 +20,18 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.doReturn; import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,7 +39,18 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; @@ -47,8 +64,11 @@ import org.apache.hadoop.hbase.regionserver.FlushRequester; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.Store; -import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdge; @@ -529,6 +549,129 @@ } /** + * Test that we could recover the data correctly after aborting flush. In the + * test, first we abort flush after writing some data, then writing more data + * and flush again, at last verify the data. + * @throws IOException + */ + @Test + public void testReplayEditsAfterAbortingFlush() throws IOException { + final String tableNameStr = "testReplayEditsAfterAbortingFlush"; + final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableNameStr); + final Path basedir = new Path(this.hbaseRootDir, tableNameStr); + deleteDir(basedir); + final HTableDescriptor htd = createBasic3FamilyHTD(tableNameStr); + HRegion region3 = HRegion.createHRegion(hri, hbaseRootDir, this.conf, htd); + region3.close(); + region3.getLog().closeAndDelete(); + // Write countPerFamily edits into the three families. Do a flush on one + // of the families during the load of edits so its seqid is not same as + // others to test we do right thing when different seqids. + HLog wal = createWAL(this.conf); + final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false); + RegionServerServices rsServices = Mockito.mock(RegionServerServices.class); + Mockito.doReturn(false).when(rsServices).isStopped(); + HRegion region = new HRegion(basedir, wal, this.fs, this.conf, hri, htd, + rsServices) { + @Override + protected HStore instantiateHStore(Path tableDir, HColumnDescriptor c) + throws IOException { + return new HStore(tableDir, this, c, fs, conf) { + @Override + protected Path flushCache(final long logCacheFlushId, + SortedSet snapshot, + TimeRangeTracker snapshotTimeRangeTracker, + AtomicLong flushedSize, MonitoredTask status) throws IOException { + if (throwExceptionWhenFlushing.get()) { + throw new IOException("Simulated exception by tests"); + } + return super.flushCache(logCacheFlushId, snapshot, + snapshotTimeRangeTracker, flushedSize, status); + } + }; + } + }; + long seqid = region.initialize(); + // HRegionServer usually does this. It knows the largest seqid across all + // regions. + wal.setSequenceNumber(seqid); + + int writtenRowCount = 10; + List families = new ArrayList( + htd.getFamilies()); + for (int i = 0; i < writtenRowCount; i++) { + Put put = new Put(Bytes.toBytes(tableNameStr + Integer.toString(i))); + put.add(families.get(i % families.size()).getName(), Bytes.toBytes("q"), + Bytes.toBytes("val")); + region.put(put); + } + + // Now assert edits made it in. + RegionScanner scanner = region.getScanner(new Scan()); + assertEquals(writtenRowCount, getScannedCount(scanner)); + + // Let us flush the region + throwExceptionWhenFlushing.set(true); + try { + region.flushcache(); + fail("Injected exception hasn't been thrown"); + } catch (Throwable t) { + LOG.info("Expected simulated exception when flushing region," + + t.getMessage()); + // simulated to abort server + Mockito.doReturn(true).when(rsServices).isStopped(); + } + // writing more data + int moreRow = 10; + for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) { + Put put = new Put(Bytes.toBytes(tableNameStr + Integer.toString(i))); + put.add(families.get(i % families.size()).getName(), Bytes.toBytes("q"), + Bytes.toBytes("val")); + region.put(put); + } + writtenRowCount += moreRow; + // call flush again + throwExceptionWhenFlushing.set(false); + try { + region.flushcache(); + } catch (IOException t) { + LOG.info("Expected exception when flushing region because server is stopped," + + t.getMessage()); + } + + region.close(true); + wal.close(); + + // Let us try to split and recover + runWALSplit(this.conf); + HLog wal2 = createWAL(this.conf); + Mockito.doReturn(false).when(rsServices).isStopped(); + HRegion region2 = new HRegion(basedir, wal2, this.fs, this.conf, hri, htd, + rsServices); + long seqid2 = region2.initialize(); + // HRegionServer usually does this. It knows the largest seqid across all + // regions. + wal2.setSequenceNumber(seqid2); + + scanner = region2.getScanner(new Scan()); + assertEquals(writtenRowCount, getScannedCount(scanner)); + } + + private int getScannedCount(RegionScanner scanner) throws IOException { + int scannedCount = 0; + List results = new ArrayList(); + while (true) { + boolean existMore = scanner.next(results); + if (!results.isEmpty()) + scannedCount++; + if (!existMore) + break; + results.clear(); + } + return scannedCount; + } + + /** * Create an HRegion with the result of a HLog split and test we only see the * good edits * @throws Exception Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1440984) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -1520,6 +1520,10 @@ protected boolean internalFlushcache( final HLog wal, final long myseqid, MonitoredTask status) throws IOException { + if (this.rsServices != null && this.rsServices.isStopped()) { + // Don't flush when server aborting, it's unsafe + throw new IOException("Aborting flush because server is stopped..."); + } final long startTime = EnvironmentEdgeManager.currentTimeMillis(); // Clear flush flag. // Record latest flush time