diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 59dcf3c..a70f123 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1030,22 +1030,25 @@ public class HRegion implements HeapSize { // , Writable{ } status.setStatus("Disabling compacts and flushes for region"); - boolean wasFlushing; synchronized (writestate) { // Disable compacting and flushing by background threads for this // region. writestate.writesEnabled = false; - wasFlushing = writestate.flushing; LOG.debug("Closing " + this + ": disabling compactions & flushes"); waitForFlushesAndCompactions(); } // If we were not just flushing, is it worth doing a preflush...one // that will clear out of the bulk of the memstore before we put up // the close flag? - if (!abort && !wasFlushing && worthPreFlushing()) { + if (!abort && worthPreFlushing()) { status.setStatus("Pre-flushing region before close"); LOG.info("Running close preflush of " + this.getRegionNameAsString()); - internalFlushcache(status); + try { + internalFlushcache(status); + } catch (IOException ioe) { + // Failed to flush the region. Keep going. + status.setStatus("Failed pre-flush " + this + "; " + ioe.getMessage()); + } } this.closing.set(true); @@ -1061,7 +1064,22 @@ public class HRegion implements HeapSize { // , Writable{ LOG.debug("Updates disabled for region " + this); // Don't flush the cache if we are aborting if (!abort) { - internalFlushcache(status); + int flushCount = 0; + while (this.getMemstoreSize().get() > 0) { + try { + if (flushCount++ > 0) { + LOG.info("Running extra flush (carrying snapshot?) " + this); + } + internalFlushcache(status); + } catch (IOException ioe) { + status.setStatus("Failed flush " + this + ", putting online again"); + synchronized (writestate) { + writestate.writesEnabled = true; + } + // Have to throw to upper layers. I can't abort server from here. + throw ioe; + } + } } Map> result = @@ -1075,6 +1093,7 @@ public class HRegion implements HeapSize { // , Writable{ // close each store in parallel for (final Store store : stores.values()) { + assert store.getFlushableSize() == 0; completionService .submit(new Callable>>() { @Override @@ -1104,7 +1123,7 @@ public class HRegion implements HeapSize { // , Writable{ } } this.closed.set(true); - + if (memstoreSize.get() != 0) LOG.error("Memstore size is " + memstoreSize.get()); if (coprocessorHost != null) { status.setStatus("Running coprocessor post-close hooks"); this.coprocessorHost.postClose(abort); @@ -1600,7 +1619,7 @@ public class HRegion implements HeapSize { // , Writable{ status.setStatus("Obtaining lock to block concurrent updates"); // block waiting for the lock for internal flush this.updatesLock.writeLock().lock(); - long flushsize = this.memstoreSize.get(); + long totalFlushableSize = 0; status.setStatus("Preparing to flush by snapshotting stores"); List storeFlushCtxs = new ArrayList(stores.size()); long flushSeqId = -1L; @@ -1622,6 +1641,7 @@ public class HRegion implements HeapSize { // , Writable{ } for (Store s : stores.values()) { + totalFlushableSize += s.getFlushableSize(); storeFlushCtxs.add(s.createFlushContext(flushSeqId)); } @@ -1633,7 +1653,7 @@ public class HRegion implements HeapSize { // , Writable{ this.updatesLock.writeLock().unlock(); } String s = "Finished memstore snapshotting " + this + - ", syncing WAL and waiting on mvcc, flushsize=" + flushsize; + ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize; status.setStatus(s); if (LOG.isTraceEnabled()) LOG.trace(s); @@ -1680,7 +1700,7 @@ public class HRegion implements HeapSize { // , Writable{ storeFlushCtxs.clear(); // Set down the memstore size by amount of flush. - this.addAndGetGlobalMemstoreSize(-flushsize); + this.addAndGetGlobalMemstoreSize(-totalFlushableSize); } catch (Throwable t) { // An exception here means that the snapshot was not persisted. // The hlog needs to be replayed so its content is restored to memstore. @@ -1718,7 +1738,7 @@ public class HRegion implements HeapSize { // , Writable{ long time = EnvironmentEdgeManager.currentTimeMillis() - startTime; long memstoresize = this.memstoreSize.get(); String msg = "Finished memstore flush of ~" + - StringUtils.humanReadableInt(flushsize) + "/" + flushsize + + StringUtils.humanReadableInt(totalFlushableSize) + "/" + totalFlushableSize + ", currentsize=" + StringUtils.humanReadableInt(memstoresize) + "/" + memstoresize + " for region " + this + " in " + time + "ms, sequenceid=" + flushSeqId + @@ -1726,7 +1746,7 @@ public class HRegion implements HeapSize { // , Writable{ ((wal == null)? "; wal=null": ""); LOG.info(msg); status.setStatus(msg); - this.recentFlushes.add(new Pair(time/1000, flushsize)); + this.recentFlushes.add(new Pair(time/1000, totalFlushableSize)); return compactionRequested; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index f7dfb17..fec63ca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -368,10 +368,16 @@ public class HStore implements Store { @Override public long getMemstoreFlushSize() { + // TODO: Why is this in here? The flushsize of the region rather than the store? St.Ack return this.region.memstoreFlushSize; } @Override + public long getFlushableSize() { + return this.memstore.getFlushableSize(); + } + + @Override public long getCompactionCheckMultiplier() { return this.compactionCheckMultiplier; } @@ -801,7 +807,7 @@ public class HStore implements Store { } } } catch (IOException e) { - LOG.warn("Failed flushing store file, retring num=" + i, e); + LOG.warn("Failed flushing store file, retrying num=" + i, e); lastException = e; } if (lastException != null && i < (flushRetriesNumber - 1)) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 8da61fd..b3005a5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -86,6 +86,7 @@ public class MemStore implements HeapSize { // Used to track own heapSize final AtomicLong size; + private volatile long snapshotSize; // Used to track when to flush volatile long timeOfOldestEdit = Long.MAX_VALUE; @@ -117,6 +118,7 @@ public class MemStore implements HeapSize { timeRangeTracker = new TimeRangeTracker(); snapshotTimeRangeTracker = new TimeRangeTracker(); this.size = new AtomicLong(DEEP_OVERHEAD); + this.snapshotSize = 0; if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) { this.chunkPool = MemStoreChunkPool.getPool(conf); this.allocator = new MemStoreLAB(conf, chunkPool); @@ -148,6 +150,7 @@ public class MemStore implements HeapSize { "Doing nothing. Another ongoing flush or did we fail last attempt?"); } else { if (!this.kvset.isEmpty()) { + this.snapshotSize = keySize(); this.snapshot = this.kvset; this.kvset = new KeyValueSkipListSet(this.comparator); this.snapshotTimeRangeTracker = this.timeRangeTracker; @@ -177,6 +180,18 @@ public class MemStore implements HeapSize { } /** + * On flush, how much memory we will clear. + * Flush will first clear out the data in snapshot if any (It will take a second flush + * invocation to clear the current Cell set). If snapshot is empty, current + * Cell set will be flushed. + * + * @return size of data that is going to be flushed + */ + long getFlushableSize() { + return this.snapshotSize > 0 ? this.snapshotSize : keySize(); + } + + /** * The passed snapshot was successfully persisted; it can be let go. * @param ss The snapshot to clean out. * @throws UnexpectedException @@ -195,6 +210,7 @@ public class MemStore implements HeapSize { this.snapshot = new KeyValueSkipListSet(this.comparator); this.snapshotTimeRangeTracker = new TimeRangeTracker(); } + this.snapshotSize = 0; if (this.snapshotAllocator != null) { tmpAllocator = this.snapshotAllocator; this.snapshotAllocator = null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 798979b..8923769 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -254,6 +254,13 @@ public interface Store extends HeapSize, StoreConfigInformation { */ long getMemStoreSize(); + /** + * @return The amount of memory we could flush from this memstore; usually this is equal to + * {@link #getMemStoreSize()} unless we are carrying snapshots and then it will be the size of + * outstanding snapshots. + */ + long getFlushableSize(); + HColumnDescriptor getFamily(); /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreConfigInformation.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreConfigInformation.java index 768b6cf..5de00d8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreConfigInformation.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreConfigInformation.java @@ -33,6 +33,8 @@ public interface StoreConfigInformation { /** * @return Gets the Memstore flush size for the region that this store works with. */ + // TODO: Why is this in here? It should be in Store and it should return the Store flush size, + // not the Regions. St.Ack long getMemstoreFlushSize(); /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index e518074..a5adf34 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -41,6 +41,7 @@ import static org.mockito.Mockito.verify; import java.io.IOException; import java.io.InterruptedIOException; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -65,6 +66,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CompatibilitySingletonFactory; +import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestCase; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -110,12 +112,14 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; import org.apache.hadoop.hbase.regionserver.HRegion.RowLock; +import org.apache.hadoop.hbase.regionserver.TestStore.FaultyFileSystem; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogFactory; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.test.MetricsAssertHelper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; @@ -135,7 +139,7 @@ import org.mockito.Mockito; import com.google.common.collect.Lists; /** - * Basic stand-alone testing of HRegion. + * Basic stand-alone testing of HRegion. No clusters! * * A lot of the meta information for an HRegion now lives inside other HRegions * or in the HBaseMaster, so only basic testing is possible. @@ -149,12 +153,13 @@ public class TestHRegion { @Rule public TestName name = new TestName(); private static final String COLUMN_FAMILY = "MyCF"; + private static final byte [] COLUMN_FAMILY_BYTES = Bytes.toBytes(COLUMN_FAMILY); HRegion region = null; private static HBaseTestingUtility TEST_UTIL; // do not run unit tests in parallel - public static Configuration conf ; + public Configuration conf ; private String DIR; - private static FileSystem fs; + private FileSystem fs; private final int MAX_VERSIONS = 2; // Test names @@ -173,7 +178,7 @@ public class TestHRegion { @Before public void setup() throws IOException { - this.TEST_UTIL = HBaseTestingUtility.createLocalHTU(); + TEST_UTIL = HBaseTestingUtility.createLocalHTU(); this.fs = TEST_UTIL.getTestFileSystem(); this.conf = TEST_UTIL.getConfiguration(); this.DIR = TEST_UTIL.getDataTestDir("TestHRegion").toString(); @@ -191,12 +196,165 @@ public class TestHRegion { String getName() { return name.getMethodName(); } - - // //////////////////////////////////////////////////////////////////////////// - // New tests that doesn't spin up a mini cluster but rather just test the - // individual code pieces in the HRegion. Putting files locally in - // /tmp/testtable - // //////////////////////////////////////////////////////////////////////////// + + /** + * Test for Bug 2 of HBASE-10466. + * "Bug 2: Conditions for the first flush of region close (so-called pre-flush) If memstoreSize + * is smaller than a certain value, or when region close starts a flush is ongoing, the first + * flush is skipped and only the second flush takes place. However, two flushes are required in + * case previous flush fails and leaves some data in snapshot. The bug could cause loss of data + * in current memstore. The fix is removing all conditions except abort check so we ensure 2 + * flushes for region close." + * @throws IOException + */ + @Test + public void testCloseCarryingSnapshot() throws IOException { + HRegion region = initHRegion(tableName, name.getMethodName(), conf, COLUMN_FAMILY_BYTES); + Store store = region.getStore(COLUMN_FAMILY_BYTES); + // Get some random bytes. + byte [] value = Bytes.toBytes(name.getMethodName()); + // Make a random put against our cf. + Put put = new Put(value); + put.add(COLUMN_FAMILY_BYTES, null, value); + // First put something in current memstore, which will be in snapshot after flusher.prepare() + region.put(put); + StoreFlushContext storeFlushCtx = store.createFlushContext(12345); + storeFlushCtx.prepare(); + // Second put something in current memstore + put.add(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value); + region.put(put); + // Close with something in memstore and something in the snapshot. Make sure all is cleared. + region.close(); + assertEquals(0, region.getMemstoreSize().get()); + } + + /** + * Test we do not lose data if we fail a flush and then close. + * Part of HBase-10466. Tests the following from the issue description: + * "Bug 1: Wrong calculation of HRegion.memstoreSize: When a flush fails, data to be flushed is + * kept in each MemStore's snapshot and wait for next flush attempt to continue on it. But when + * the next flush succeeds, the counter of total memstore size in HRegion is always deduced by + * the sum of current memstore sizes instead of snapshots left from previous failed flush. This + * calculation is problematic that almost every time there is failed flush, HRegion.memstoreSize + * gets reduced by a wrong value. If region flush could not proceed for a couple cycles, the size + * in current memstore could be much larger than the snapshot. It's likely to drift memstoreSize + * much smaller than expected. In extreme case, if the error accumulates to even bigger than + * HRegion's memstore size limit, any further flush is skipped because flush does not do anything + * if memstoreSize is not larger than 0." + * @throws Exception + */ + @Test + public void testFlushSizeAccounting() throws Exception { + final Configuration conf = TEST_UTIL.getConfiguration(); + // Only retry once. + conf.setInt("hbase.hstore.flush.retries.number", 1); + final User user = User.createUserForTesting(conf, this.name.getMethodName(), + new String[]{"foo"}); + // Inject our faulty LocalFileSystem + conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); + user.runAs(new PrivilegedExceptionAction() { + public Object run() throws Exception { + // Make sure it worked (above is sensitive to caching details in hadoop core) + FileSystem fs = FileSystem.get(conf); + Assert.assertEquals(FaultyFileSystem.class, fs.getClass()); + FaultyFileSystem ffs = (FaultyFileSystem)fs; + HRegion region = null; + try { + // Initialize region + region = initHRegion(tableName, name.getMethodName(), conf, COLUMN_FAMILY_BYTES); + long size = region.getMemstoreSize().get(); + Assert.assertEquals(0, size); + // Put one item into memstore. Measure the size of one item in memstore. + Put p1 = new Put(row); + p1.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual1, 1, (byte[])null)); + region.put(p1); + final long sizeOfOnePut = region.getMemstoreSize().get(); + // Fail a flush which means the current memstore will hang out as memstore 'snapshot'. + try { + LOG.info("Flushing"); + region.flushcache(); + Assert.fail("Didn't bubble up IOE!"); + } catch (DroppedSnapshotException dse) { + // What we are expecting + } + // Make it so all writes succeed from here on out + ffs.fault.set(false); + // Check sizes. Should still be the one entry. + Assert.assertEquals(sizeOfOnePut, region.getMemstoreSize().get()); + // Now add two entries so that on this next flush that fails, we can see if we + // subtract the right amount, the snapshot size only. + Put p2 = new Put(row); + p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual2, 2, (byte[])null)); + p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual3, 3, (byte[])null)); + region.put(p2); + Assert.assertEquals(sizeOfOnePut * 3, region.getMemstoreSize().get()); + // Do a successful flush. It will clear the snapshot only. Thats how flushes work. + // If already a snapshot, we clear it else we move the memstore to be snapshot and flush + // it + region.flushcache(); + // Make sure our memory accounting is right. + Assert.assertEquals(sizeOfOnePut * 2, region.getMemstoreSize().get()); + } finally { + if (region != null) { + region.close(); + Assert.assertEquals(0, region.getMemstoreSize().get()); + } + FileSystem.closeAllForUGI(user.getUGI()); + } + return null; + } + }); + } + + @Test + public void testCloseWithFailingFlush() throws Exception { + final Configuration conf = TEST_UTIL.getConfiguration(); + // Only retry once. + conf.setInt("hbase.hstore.flush.retries.number", 1); + final User user = User.createUserForTesting(conf, this.name.getMethodName(), + new String[]{"foo"}); + // Inject our faulty LocalFileSystem + conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); + user.runAs(new PrivilegedExceptionAction() { + public Object run() throws Exception { + // Make sure it worked (above is sensitive to caching details in hadoop core) + FileSystem fs = FileSystem.get(conf); + Assert.assertEquals(FaultyFileSystem.class, fs.getClass()); + FaultyFileSystem ffs = (FaultyFileSystem)fs; + HRegion region = null; + try { + // Initialize region + region = initHRegion(tableName, name.getMethodName(), conf, COLUMN_FAMILY_BYTES); + long size = region.getMemstoreSize().get(); + Assert.assertEquals(0, size); + // Put one item into memstore. Measure the size of one item in memstore. + Put p1 = new Put(row); + p1.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual1, 1, (byte[])null)); + region.put(p1); + // Manufacture an outstanding snapshot -- fake a failed flush by doing prepare step only. + Store store = region.getStore(COLUMN_FAMILY_BYTES); + StoreFlushContext storeFlushCtx = store.createFlushContext(12345); + storeFlushCtx.prepare(); + // Now add two entries to the foreground memstore. + Put p2 = new Put(row); + p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual2, 2, (byte[])null)); + p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual3, 3, (byte[])null)); + region.put(p2); + // Now try close on top of a failing flush. + region.close(); + fail(); + } catch (DroppedSnapshotException dse) { + // Expected + } finally { + if (region != null) { + Assert.assertEquals(0, region.getMemstoreSize().get()); + } + FileSystem.closeAllForUGI(user.getUGI()); + } + return null; + } + }); + } @Test public void testCompactionAffectedByScanners() throws Exception { @@ -479,8 +637,8 @@ public class TestHRegion { writer.close(); // close the region now, and reopen again - HTableDescriptor htd = region.getTableDesc(); - HRegionInfo info = region.getRegionInfo(); + region.getTableDesc(); + region.getRegionInfo(); region.close(); region = HRegion.openHRegion(region, null); @@ -1186,7 +1344,7 @@ public class TestHRegion { Put put = new Put(row2); put.add(fam1, qual1, value1); try { - boolean res = region.checkAndMutate(row, fam1, qual1, CompareOp.EQUAL, + region.checkAndMutate(row, fam1, qual1, CompareOp.EQUAL, new BinaryComparator(value2), put, false); fail(); } catch (org.apache.hadoop.hbase.DoNotRetryIOException expected) { @@ -2446,19 +2604,6 @@ public class TestHRegion { } } - private void assertICV(byte[] row, byte[] familiy, byte[] qualifier, int amount) - throws IOException { - // run a get and see? - Get get = new Get(row); - get.addColumn(familiy, qualifier); - Result result = region.get(get); - assertEquals(1, result.size()); - - Cell kv = result.rawCells()[0]; - int r = Bytes.toInt(CellUtil.cloneValue(kv)); - assertEquals(amount, r); - } - @Test public void testScanner_Wildcard_FromMemStoreAndFiles_EnforceVersions() throws IOException { byte[] row1 = Bytes.toBytes("row1"); @@ -3297,7 +3442,6 @@ public class TestHRegion { // //////////////////////////////////////////////////////////////////////////// @Test public void testBloomFilterSize() throws IOException { - byte[] row1 = Bytes.toBytes("row1"); byte[] fam1 = Bytes.toBytes("fam1"); byte[] qf1 = Bytes.toBytes("col"); byte[] val1 = Bytes.toBytes("value1"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index 793b839..f80813e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -29,8 +29,7 @@ import java.util.Iterator; import java.util.List; import java.util.NavigableSet; import java.util.concurrent.ConcurrentSkipListSet; - -import junit.framework.TestCase; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -75,15 +74,22 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; import org.apache.hadoop.util.Progressable; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; import org.mockito.Mockito; /** * Test class for the Store */ @Category(MediumTests.class) -public class TestStore extends TestCase { +public class TestStore { public static final Log LOG = LogFactory.getLog(TestStore.class); + @Rule public TestName name = new TestName(); HStore store; byte [] table = Bytes.toBytes("table"); @@ -115,7 +121,7 @@ public class TestStore extends TestCase { * Setup * @throws IOException */ - @Override + @Before public void setUp() throws IOException { qualifiers.add(qf1); qualifiers.add(qf3); @@ -149,7 +155,7 @@ public class TestStore extends TestCase { } @SuppressWarnings("deprecation") - private void init(String methodName, Configuration conf, HTableDescriptor htd, + private Store init(String methodName, Configuration conf, HTableDescriptor htd, HColumnDescriptor hcd) throws IOException { //Setting up a Store Path basedir = new Path(DIR+methodName); @@ -167,12 +173,73 @@ public class TestStore extends TestCase { HRegion region = new HRegion(tableDir, hlog, fs, conf, info, htd, null); store = new HStore(region, hcd, conf); + return store; + } + + /** + * Test we do not lose data if we fail a flush and then close. + * Part of HBase-10466 + * @throws Exception + */ + @Test + public void testFlushSizeAccounting() throws Exception { + LOG.info("Setting up a faulty file system that cannot write in " + + this.name.getMethodName()); + final Configuration conf = HBaseConfiguration.create(); + // Only retry once. + conf.setInt("hbase.hstore.flush.retries.number", 1); + User user = User.createUserForTesting(conf, this.name.getMethodName(), + new String[]{"foo"}); + // Inject our faulty LocalFileSystem + conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); + user.runAs(new PrivilegedExceptionAction() { + public Object run() throws Exception { + // Make sure it worked (above is sensitive to caching details in hadoop core) + FileSystem fs = FileSystem.get(conf); + Assert.assertEquals(FaultyFileSystem.class, fs.getClass()); + FaultyFileSystem ffs = (FaultyFileSystem)fs; + + // Initialize region + init(name.getMethodName(), conf); + + long size = store.memstore.getFlushableSize(); + Assert.assertEquals(0, size); + LOG.info("Adding some data"); + long kvSize = store.add(new KeyValue(row, family, qf1, 1, (byte[])null)); + size = store.memstore.getFlushableSize(); + Assert.assertEquals(kvSize, size); + // Flush. Bug #1 from HBASE-10466. Make sure size calculation on failed flush is right. + try { + LOG.info("Flushing"); + flushStore(store, id++); + Assert.fail("Didn't bubble up IOE!"); + } catch (IOException ioe) { + Assert.assertTrue(ioe.getMessage().contains("Fault injected")); + } + size = store.memstore.getFlushableSize(); + Assert.assertEquals(kvSize, size); + store.add(new KeyValue(row, family, qf2, 2, (byte[])null)); + // Even though we add a new kv, we expect the flushable size to be 'same' since we have + // not yet cleared the snapshot -- the above flush failed. + Assert.assertEquals(kvSize, size); + ffs.fault.set(false); + flushStore(store, id++); + size = store.memstore.getFlushableSize(); + // Size should be the foreground kv size. + Assert.assertEquals(kvSize, size); + flushStore(store, id++); + size = store.memstore.getFlushableSize(); + Assert.assertEquals(0, size); + return null; + } + }); } /** * Verify that compression and data block encoding are respected by the * Store.createWriterInTmp() method, used on store flush. */ + @Test public void testCreateWriter() throws Exception { Configuration conf = HBaseConfiguration.create(); FileSystem fs = FileSystem.get(conf); @@ -180,7 +247,7 @@ public class TestStore extends TestCase { HColumnDescriptor hcd = new HColumnDescriptor(family); hcd.setCompressionType(Compression.Algorithm.GZ); hcd.setDataBlockEncoding(DataBlockEncoding.DIFF); - init(getName(), conf, hcd); + init(name.getMethodName(), conf, hcd); // Test createWriterInTmp() StoreFile.Writer writer = store.createWriterInTmp(4, hcd.getCompression(), false, true, false); @@ -193,11 +260,12 @@ public class TestStore extends TestCase { // Verify that compression and encoding settings are respected HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), conf); - assertEquals(hcd.getCompressionType(), reader.getCompressionAlgorithm()); - assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding()); + Assert.assertEquals(hcd.getCompressionType(), reader.getCompressionAlgorithm()); + Assert.assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding()); reader.close(); } + @Test public void testDeleteExpiredStoreFiles() throws Exception { int storeFileNum = 4; int ttl = 4; @@ -209,7 +277,7 @@ public class TestStore extends TestCase { conf.setBoolean("hbase.store.delete.expired.storefile", true); HColumnDescriptor hcd = new HColumnDescriptor(family); hcd.setTimeToLive(ttl); - init(getName(), conf, hcd); + init(name.getMethodName(), conf, hcd); long sleepTime = this.store.getScanInfo().getTtl() / storeFileNum; long timeStamp; @@ -226,7 +294,7 @@ public class TestStore extends TestCase { } // Verify the total number of store files - assertEquals(storeFileNum, this.store.getStorefiles().size()); + Assert.assertEquals(storeFileNum, this.store.getStorefiles().size()); // Each compaction request will find one expired store file and delete it // by the compaction. @@ -237,27 +305,28 @@ public class TestStore extends TestCase { // the first is expired normally. // If not the first compaction, there is another empty store file, List files = new ArrayList(cr.getFiles()); - assertEquals(Math.min(i, 2), cr.getFiles().size()); + Assert.assertEquals(Math.min(i, 2), cr.getFiles().size()); for (int j = 0; j < files.size(); j++) { - assertTrue(files.get(j).getReader().getMaxTimestamp() < (edge + Assert.assertTrue(files.get(j).getReader().getMaxTimestamp() < (edge .currentTimeMillis() - this.store.getScanInfo().getTtl())); } // Verify that the expired store file is compacted to an empty store file. // Default compaction policy creates just one and only one compacted file. StoreFile compactedFile = this.store.compact(compaction).get(0); // It is an empty store file. - assertEquals(0, compactedFile.getReader().getEntries()); + Assert.assertEquals(0, compactedFile.getReader().getEntries()); // Let the next store file expired. edge.incrementTime(sleepTime); } } + @Test public void testLowestModificationTime() throws Exception { Configuration conf = HBaseConfiguration.create(); FileSystem fs = FileSystem.get(conf); // Initialize region - init(getName(), conf); + init(name.getMethodName(), conf); int storeFileNum = 4; for (int i = 1; i <= storeFileNum; i++) { @@ -270,13 +339,13 @@ public class TestStore extends TestCase { // after flush; check the lowest time stamp long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); - assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS); + Assert.assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS); // after compact; check the lowest time stamp store.compact(store.requestCompaction()); lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles()); lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles()); - assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS); + Assert.assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS); } private static long getLowestTimeStampFromFS(FileSystem fs, @@ -311,8 +380,9 @@ public class TestStore extends TestCase { * Test for hbase-1686. * @throws IOException */ + @Test public void testEmptyStoreFile() throws IOException { - init(this.getName()); + init(this.name.getMethodName()); // Write a store file. this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null)); this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null)); @@ -335,20 +405,21 @@ public class TestStore extends TestCase { this.store.close(); // Reopen it... should pick up two files this.store = new HStore(this.store.getHRegion(), this.store.getFamily(), c); - assertEquals(2, this.store.getStorefilesCount()); + Assert.assertEquals(2, this.store.getStorefilesCount()); result = HBaseTestingUtility.getFromStoreFile(store, get.getRow(), qualifiers); - assertEquals(1, result.size()); + Assert.assertEquals(1, result.size()); } /** * Getting data from memstore only * @throws IOException */ + @Test public void testGet_FromMemStoreOnly() throws IOException { - init(this.getName()); + init(this.name.getMethodName()); //Put data in memstore this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null)); @@ -370,8 +441,9 @@ public class TestStore extends TestCase { * Getting data from files only * @throws IOException */ + @Test public void testGet_FromFilesOnly() throws IOException { - init(this.getName()); + init(this.name.getMethodName()); //Put data in memstore this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null)); @@ -408,8 +480,9 @@ public class TestStore extends TestCase { * Getting data from memstore and files * @throws IOException */ + @Test public void testGet_FromMemStoreAndFiles() throws IOException { - init(this.getName()); + init(this.name.getMethodName()); //Put data in memstore this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null)); @@ -441,14 +514,14 @@ public class TestStore extends TestCase { private void flush(int storeFilessize) throws IOException{ this.store.snapshot(); flushStore(store, id++); - assertEquals(storeFilessize, this.store.getStorefiles().size()); - assertEquals(0, this.store.memstore.kvset.size()); + Assert.assertEquals(storeFilessize, this.store.getStorefiles().size()); + Assert.assertEquals(0, this.store.memstore.kvset.size()); } private void assertCheck() { - assertEquals(expected.size(), result.size()); + Assert.assertEquals(expected.size(), result.size()); for(int i=0; i 0); + Assert.assertTrue(ret > 0); // then flush. flushStore(store, id++); - assertEquals(1, this.store.getStorefiles().size()); + Assert.assertEquals(1, this.store.getStorefiles().size()); // from the one we inserted up there, and a new one - assertEquals(2, this.store.memstore.kvset.size()); + Assert.assertEquals(2, this.store.memstore.kvset.size()); // how many key/values for this row are there? Get get = new Get(row); @@ -495,25 +569,25 @@ public class TestStore extends TestCase { List results = new ArrayList(); results = HBaseTestingUtility.getFromStoreFile(store, get); - assertEquals(2, results.size()); + Assert.assertEquals(2, results.size()); long ts1 = results.get(0).getTimestamp(); long ts2 = results.get(1).getTimestamp(); - assertTrue(ts1 > ts2); + Assert.assertTrue(ts1 > ts2); - assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0)))); - assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1)))); + Assert.assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0)))); + Assert.assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1)))); } - @Override - protected void tearDown() throws Exception { - super.tearDown(); + @After + public void tearDown() throws Exception { EnvironmentEdgeManagerTestHelper.reset(); } + @Test public void testICV_negMemstoreSize() throws IOException { - init(this.getName()); + init(this.name.getMethodName()); long time = 100; ManualEnvironmentEdge ee = new ManualEnvironmentEdge(); @@ -549,9 +623,9 @@ public class TestStore extends TestCase { if (ret != 0) System.out.println("ret: " + ret); if (ret2 != 0) System.out.println("ret2: " + ret2); - assertTrue("ret: " + ret, ret >= 0); + Assert.assertTrue("ret: " + ret, ret >= 0); size += ret; - assertTrue("ret2: " + ret2, ret2 >= 0); + Assert.assertTrue("ret2: " + ret2, ret2 >= 0); size += ret2; @@ -565,13 +639,14 @@ public class TestStore extends TestCase { //System.out.println(kv + " size= " + kvsize + " kvsize= " + kv.heapSize()); computedSize += kvsize; } - assertEquals(computedSize, size); + Assert.assertEquals(computedSize, size); } + @Test public void testIncrementColumnValue_SnapshotFlushCombo() throws Exception { ManualEnvironmentEdge mee = new ManualEnvironmentEdge(); EnvironmentEdgeManagerTestHelper.injectEdge(mee); - init(this.getName()); + init(this.name.getMethodName()); long oldValue = 1L; long newValue = 3L; @@ -586,12 +661,12 @@ public class TestStore extends TestCase { long ret = this.store.updateColumnValue(row, family, qf1, newValue); // memstore should have grown by some amount. - assertTrue(ret > 0); + Assert.assertTrue(ret > 0); // then flush. flushStore(store, id++); - assertEquals(1, this.store.getStorefiles().size()); - assertEquals(1, this.store.memstore.kvset.size()); + Assert.assertEquals(1, this.store.getStorefiles().size()); + Assert.assertEquals(1, this.store.memstore.kvset.size()); // now increment again: newValue += 1; @@ -611,30 +686,31 @@ public class TestStore extends TestCase { List results = new ArrayList(); results = HBaseTestingUtility.getFromStoreFile(store, get); - assertEquals(2, results.size()); + Assert.assertEquals(2, results.size()); long ts1 = results.get(0).getTimestamp(); long ts2 = results.get(1).getTimestamp(); - assertTrue(ts1 > ts2); - assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0)))); - assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1)))); + Assert.assertTrue(ts1 > ts2); + Assert.assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0)))); + Assert.assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1)))); mee.setValue(2); // time goes up slightly newValue += 1; this.store.updateColumnValue(row, family, qf1, newValue); results = HBaseTestingUtility.getFromStoreFile(store, get); - assertEquals(2, results.size()); + Assert.assertEquals(2, results.size()); ts1 = results.get(0).getTimestamp(); ts2 = results.get(1).getTimestamp(); - assertTrue(ts1 > ts2); - assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0)))); - assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1)))); + Assert.assertTrue(ts1 > ts2); + Assert.assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0)))); + Assert.assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1)))); } + @Test public void testHandleErrorsInFlush() throws Exception { LOG.info("Setting up a faulty file system that cannot write"); @@ -648,10 +724,10 @@ public class TestStore extends TestCase { public Object run() throws Exception { // Make sure it worked (above is sensitive to caching details in hadoop core) FileSystem fs = FileSystem.get(conf); - assertEquals(FaultyFileSystem.class, fs.getClass()); + Assert.assertEquals(FaultyFileSystem.class, fs.getClass()); // Initialize region - init(getName(), conf); + init(name.getMethodName(), conf); LOG.info("Adding some data"); store.add(new KeyValue(row, family, qf1, 1, (byte[])null)); @@ -662,30 +738,35 @@ public class TestStore extends TestCase { Collection files = store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName()); - assertEquals(0, files != null ? files.size() : 0); + Assert.assertEquals(0, files != null ? files.size() : 0); //flush try { LOG.info("Flushing"); flush(1); - fail("Didn't bubble up IOE!"); + Assert.fail("Didn't bubble up IOE!"); } catch (IOException ioe) { - assertTrue(ioe.getMessage().contains("Fault injected")); + Assert.assertTrue(ioe.getMessage().contains("Fault injected")); } LOG.info("After failed flush, we should still have no files!"); files = store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName()); - assertEquals(0, files != null ? files.size() : 0); + Assert.assertEquals(0, files != null ? files.size() : 0); return null; } }); + FileSystem.closeAllForUGI(user.getUGI()); } - + /** + * Faulty file system that will fail if you write past its fault position the FIRST TIME + * only; thereafter it will succeed. Used by {@link TestHRegion} too. + */ static class FaultyFileSystem extends FilterFileSystem { List> outStreams = new ArrayList>(); private long faultPos = 200; + AtomicBoolean fault = new AtomicBoolean(true); public FaultyFileSystem() { super(new LocalFileSystem()); @@ -694,7 +775,7 @@ public class TestStore extends TestCase { @Override public FSDataOutputStream create(Path p) throws IOException { - return new FaultyOutputStream(super.create(p), faultPos); + return new FaultyOutputStream(super.create(p), faultPos, fault); } @Override @@ -702,7 +783,7 @@ public class TestStore extends TestCase { boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { return new FaultyOutputStream(super.create(f, permission, - overwrite, bufferSize, replication, blockSize, progress), faultPos); + overwrite, bufferSize, replication, blockSize, progress), faultPos, fault); } public FSDataOutputStream createNonRecursive(Path f, boolean overwrite, @@ -716,11 +797,13 @@ public class TestStore extends TestCase { static class FaultyOutputStream extends FSDataOutputStream { volatile long faultPos = Long.MAX_VALUE; + private final AtomicBoolean fault; - public FaultyOutputStream(FSDataOutputStream out, - long faultPos) throws IOException { + public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault) + throws IOException { super(out, null); this.faultPos = faultPos; + this.fault = fault; } @Override @@ -731,14 +814,12 @@ public class TestStore extends TestCase { } private void injectFault() throws IOException { - if (getPos() >= faultPos) { + if (this.fault.get() && getPos() >= faultPos) { throw new IOException("Fault injected"); } } } - - private static void flushStore(HStore store, long id) throws IOException { StoreFlushContext storeFlushCtx = store.createFlushContext(id); storeFlushCtx.prepare(); @@ -746,8 +827,6 @@ public class TestStore extends TestCase { storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); } - - /** * Generate a list of KeyValues for testing based on given parameters * @param timestamps @@ -772,12 +851,13 @@ public class TestStore extends TestCase { * Test to ensure correctness when using Stores with multiple timestamps * @throws IOException */ + @Test public void testMultipleTimestamps() throws IOException { int numRows = 1; long[] timestamps1 = new long[] {1,5,10,20}; long[] timestamps2 = new long[] {30,80}; - init(this.getName()); + init(this.name.getMethodName()); List kvList1 = getKeyValueSet(timestamps1,numRows, qf1, family); for (Cell kv : kvList1) { @@ -798,27 +878,27 @@ public class TestStore extends TestCase { get.setTimeRange(0,15); result = HBaseTestingUtility.getFromStoreFile(store, get); - assertTrue(result.size()>0); + Assert.assertTrue(result.size()>0); get.setTimeRange(40,90); result = HBaseTestingUtility.getFromStoreFile(store, get); - assertTrue(result.size()>0); + Assert.assertTrue(result.size()>0); get.setTimeRange(10,45); result = HBaseTestingUtility.getFromStoreFile(store, get); - assertTrue(result.size()>0); + Assert.assertTrue(result.size()>0); get.setTimeRange(80,145); result = HBaseTestingUtility.getFromStoreFile(store, get); - assertTrue(result.size()>0); + Assert.assertTrue(result.size()>0); get.setTimeRange(1,2); result = HBaseTestingUtility.getFromStoreFile(store, get); - assertTrue(result.size()>0); + Assert.assertTrue(result.size()>0); get.setTimeRange(90,200); result = HBaseTestingUtility.getFromStoreFile(store, get); - assertTrue(result.size()==0); + Assert.assertTrue(result.size()==0); } /** @@ -826,14 +906,16 @@ public class TestStore extends TestCase { * * @throws IOException When the IO operations fail. */ + @Test public void testSplitWithEmptyColFam() throws IOException { - init(this.getName()); - assertNull(store.getSplitPoint()); + init(this.name.getMethodName()); + Assert.assertNull(store.getSplitPoint()); store.getHRegion().forceSplit(null); - assertNull(store.getSplitPoint()); + Assert.assertNull(store.getSplitPoint()); store.getHRegion().clearSplit_TESTS_ONLY(); } + @Test public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception { final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle"; long anyValue = 10; @@ -843,25 +925,25 @@ public class TestStore extends TestCase { // a number we pass in is higher than some config value, inside compactionPolicy. Configuration conf = HBaseConfiguration.create(); conf.setLong(CONFIG_KEY, anyValue); - init(getName() + "-xml", conf); - assertTrue(store.throttleCompaction(anyValue + 1)); - assertFalse(store.throttleCompaction(anyValue)); + init(name.getMethodName() + "-xml", conf); + Assert.assertTrue(store.throttleCompaction(anyValue + 1)); + Assert.assertFalse(store.throttleCompaction(anyValue)); // HTD overrides XML. --anyValue; HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table)); HColumnDescriptor hcd = new HColumnDescriptor(family); htd.setConfiguration(CONFIG_KEY, Long.toString(anyValue)); - init(getName() + "-htd", conf, htd, hcd); - assertTrue(store.throttleCompaction(anyValue + 1)); - assertFalse(store.throttleCompaction(anyValue)); + init(name.getMethodName() + "-htd", conf, htd, hcd); + Assert.assertTrue(store.throttleCompaction(anyValue + 1)); + Assert.assertFalse(store.throttleCompaction(anyValue)); // HCD overrides them both. --anyValue; hcd.setConfiguration(CONFIG_KEY, Long.toString(anyValue)); - init(getName() + "-hcd", conf, htd, hcd); - assertTrue(store.throttleCompaction(anyValue + 1)); - assertFalse(store.throttleCompaction(anyValue)); + init(name.getMethodName() + "-hcd", conf, htd, hcd); + Assert.assertTrue(store.throttleCompaction(anyValue + 1)); + Assert.assertFalse(store.throttleCompaction(anyValue)); } public static class DummyStoreEngine extends DefaultStoreEngine { @@ -874,11 +956,12 @@ public class TestStore extends TestCase { } } + @Test public void testStoreUsesSearchEngineOverride() throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName()); - init(this.getName(), conf); - assertEquals(DummyStoreEngine.lastCreatedCompactor, this.store.storeEngine.getCompactor()); + init(this.name.getMethodName(), conf); + Assert.assertEquals(DummyStoreEngine.lastCreatedCompactor, + this.store.storeEngine.getCompactor()); } -} - +} \ No newline at end of file