diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/example/LongTermArchivingHFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/example/LongTermArchivingHFileCleaner.java index e6ef052..d8a2d7d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/example/LongTermArchivingHFileCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/example/LongTermArchivingHFileCleaner.java @@ -44,7 +44,7 @@ public class LongTermArchivingHFileCleaner extends BaseHFileCleanerDelegate { private static final Log LOG = LogFactory.getLog(LongTermArchivingHFileCleaner.class); - private TableHFileArchiveTracker archiveTracker; + TableHFileArchiveTracker archiveTracker; private FileSystem fs; @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java index 35974b7..6a12f1d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java @@ -43,7 +43,7 @@ public abstract class CleanerChore extends Chore private final FileSystem fs; private final Path oldFileDir; private final Configuration conf; - private List cleanersChain; + protected List cleanersChain; /** * @param name name of the chore being run diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java index 06a612d..ca9c7fa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.master.cleaner; +import java.util.List; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -48,4 +50,11 @@ public class HFileCleaner extends CleanerChore { protected boolean validate(Path file) { return StoreFile.validateStoreFileName(file.getName()); } + + /** + * Exposed for TESTING! + */ + public List getDelegates() { + return this.cleanersChain; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index 671362c..106cff1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -58,7 +58,7 @@ public class CompactSplitThread implements CompactionRequestor { private int regionSplitLimit; /** @param server */ - CompactSplitThread(HRegionServer server) { + public CompactSplitThread(HRegionServer server) { super(); this.server = server; this.conf = server.getConfiguration(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 31c68ef..8cbed93 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -4122,4 +4122,11 @@ public class HRegionServer implements ClientProtocol, private String getMyEphemeralNodePath() { return ZKUtil.joinZNode(this.zooKeeper.rsZNode, getServerName().toString()); } + + /** + * Exposed for testing! + */ + public CompactSplitThread getCompactSplitThread() { + return this.compactSplitThread; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index 3a6251b..9b45702 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -57,7 +57,7 @@ import com.google.common.base.Preconditions; * @see FlushRequester */ @InterfaceAudience.Private -class MemStoreFlusher extends HasThread implements FlushRequester { +public class MemStoreFlusher extends HasThread implements FlushRequester { static final Log LOG = LogFactory.getLog(MemStoreFlusher.class); // These two data members go together. Any entry in the one must have // a corresponding entry in the other. @@ -142,12 +142,13 @@ class MemStoreFlusher extends HasThread implements FlushRequester { } /** - * The memstore across all regions has exceeded the low water mark. Pick - * one region to flush and flush it synchronously (this is called from the - * flush thread) + * The memstore across all regions has exceeded the low water mark. Pick one region to flush and + * flush it synchronously (this is called from the flush thread) + *

+ * Exposed for testing! * @return true if successful */ - private boolean flushOneForGlobalPressure() { + public boolean flushOneForGlobalPressure() { SortedMap regionsBySize = server.getCopyOfOnlineRegionsSortedBySize(); @@ -356,9 +357,9 @@ class MemStoreFlusher extends HasThread implements FlushRequester { // Note: We don't impose blockingStoreFiles constraint on meta regions LOG.warn("Region " + region.getRegionNameAsString() + " has too many " + "store files; delaying flush up to " + this.blockingWaitTime + "ms"); - if (!this.server.compactSplitThread.requestSplit(region)) { + if (!this.server.getCompactSplitThread().requestSplit(region)) { try { - this.server.compactSplitThread.requestCompaction(region, getName()); + this.server.getCompactSplitThread().requestCompaction(region, getName()); } catch (IOException e) { LOG.error("Cache flush failed" + (region != null ? (" for region " + Bytes.toStringBinary(region.getRegionName())) : ""), @@ -404,9 +405,9 @@ class MemStoreFlusher extends HasThread implements FlushRequester { // We just want to check the size boolean shouldSplit = region.checkSplit() != null; if (shouldSplit) { - this.server.compactSplitThread.requestSplit(region); + this.server.getCompactSplitThread().requestSplit(region); } else if (shouldCompact) { - server.compactSplitThread.requestCompaction(region, getName()); + server.getCompactSplitThread().requestCompaction(region, getName()); } server.getMetrics().addFlush(region.getRecentFlushInfo()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java index bee9668..ccc528d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java @@ -256,11 +256,11 @@ public class CompactionRequest implements Comparable, server.getMetrics().addCompaction(now - start, this.totalSize); // degenerate case: blocked regions require recursive enqueues if (s.getCompactPriority() <= 0) { - server.compactSplitThread + server.getCompactSplitThread() .requestCompaction(r, s, "Recursive enqueue"); } else { // see if the compaction has caused us to exceed max region size - server.compactSplitThread.requestSplit(r); + server.getCompactSplitThread().requestSplit(r); } } } catch (IOException ex) { @@ -272,7 +272,7 @@ public class CompactionRequest implements Comparable, server.checkFileSystem(); } finally { s.finishRequest(this); - LOG.debug("CompactSplitThread status: " + server.compactSplitThread); + LOG.debug("CompactSplitThread status: " + server.getCompactSplitThread()); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java index 9f05d1f..234e520 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java @@ -33,7 +33,7 @@ import org.apache.hadoop.hbase.regionserver.HStore; * Helper class for all utilities related to archival/retrieval of HFiles */ public class HFileArchiveUtil { - static final String DEFAULT_HFILE_ARCHIVE_DIRECTORY = ".archive"; + public static final String DEFAULT_HFILE_ARCHIVE_DIRECTORY = ".archive"; private HFileArchiveUtil() { // non-external instantiation - util class diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 8c40666..cf6a3ef 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -2293,4 +2293,17 @@ public class HBaseTestingUtility { public void setFileSystemURI(String fsURI) { FS_URI = fsURI; } + + /** + * Get an {@link MockSupportedHRegionBuilder} to create an HRegion with fully mocked internals + * @param tableName name of the table for which to create the region + * @param hcd column descriptor + * @return an {@link MockSupportedHRegionBuilder} to build a mocked-out HRegion + * @throws IOException if the test filesystem can't be reached. + */ + public MockSupportedHRegionBuilder fullyMockHRegion(String tableName, HColumnDescriptor hcd) + throws IOException { + return new MockSupportedHRegionBuilder(this.getTestFileSystem(), this.getConfiguration(), tableName, hcd, + this.getDataTestDir()); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockSupportedHRegionBuilder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockSupportedHRegionBuilder.java new file mode 100644 index 0000000..9ad2802 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockSupportedHRegionBuilder.java @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.master.MockRegionServer; +import org.apache.hadoop.hbase.regionserver.CompactSplitThread; +import org.apache.hadoop.hbase.regionserver.FlushRequester; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.MemStoreFlusher; +import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import com.google.common.collect.Lists; + +/** + * Help build an HRegion with mocked internals + */ +public class MockSupportedHRegionBuilder { + private final Configuration conf; + private ServerName name; + private HColumnDescriptor hcd; + private String table; + private HLog log; + private FileSystem fs; + private Path tableDir; + private RegionServerMetrics metrics; + private RegionServerServices rss; + private boolean setRss = false; + private CompactSplitThread cst; + private CompactSplitThread cstSpy; + + public MockSupportedHRegionBuilder(FileSystem fs, Configuration conf, String tableName, HColumnDescriptor hcd, + Path rootDir) { + this.conf = new Configuration(conf); + this.table = tableName; + this.hcd = hcd; + this.fs = fs; + this.tableDir = new Path(rootDir, tableName); + } + + public void setServerName(ServerName name) { + this.name = name; + } + + /** + * Provide custom regionserver services. By default, uses the specified {@link ServerName} via + * {@link #setServerName(ServerName)} or a generic server name. If called, no custom + * {@link FlushRequester} will be returned. + *

+ * This method is not advised unless custom behavior is desired. To augment mock behavior, get the + * mock services from {@link #getMockRegionServerServices()} and then the mock flusher can get + * obtained via {@link RegionServerServices#getFlushRequester()}. + */ + public void setRegionServerServices(RegionServerServices rss) { + this.rss = rss; + this.setRss = true; + } + + /** + * @return the {@link RegionServerServices} for the reigon, mock set on calls to + * {@link #build(boolean)}. + */ + public RegionServerServices getMockRegionServerServices() { + return this.rss; + } + + public void setRegionServerMetrics(RegionServerMetrics metrics) { + this.metrics = metrics; + } + + /** + * Set a custom {@link HLog}. Mock {@link HLog} can be obtained via {@link #getMockHLog()} if this + * method isn't called, otherwise returns the hlog specified here. + * @param log log to use for the region + */ + public void setHLog(HLog log) { + this.log = log; + } + + /** + * Set on calls to {@link #build(boolean)}. + * @return the current hlog, a mock object by default + */ + public HLog getMockHLog() { + return this.log; + } + + /** + * Set the parent regionserver's {@link CompactSplitThread}. + *

+ * Unless you need custom compact/split logic, this method is not recommended. By default, the a + * mock CompactSplitThread is created that never splits the region. To get the mocked + * CompactSplitThread to add additional custom behavior, see {@link #getSpiedCompactSplitThread()}. + * @param cst CompactSplitThread to use + */ + public void setCompactSplitThread(CompactSplitThread cst) { + this.cst = cst; + } + + /** + * Get the {@link Mockito#spy(Object)} on the real {@link CompactSplitThread} used if + * {@link #setCompactSplitThread(CompactSplitThread)} was not called. + * @return mockable version of the regionserver's compaction/split mechansim.Not null + * only after calling {@link #build(boolean)}. + */ + public CompactSplitThread getSpiedCompactSplitThread() { + return this.cstSpy; + } + + /** + * Build the region using the set objects. Mock functionality can still be set after creating the + * region. + * @param initialize if {@link HRegion#initialize()} should be called on the region before after + * all mocks are established + * @return the built {@link HRegion} with the specified mocking to support it. + * @throws ZooKeeperConnectionException on object creation failure + * @throws IOException on object creationfailure + */ + public HRegion build(boolean initialize) throws ZooKeeperConnectionException, IOException { + // make sure we have a directory for this + fs.mkdirs(tableDir); + // setup the info + HRegionInfo info = new HRegionInfo(Bytes.toBytes(table)); + + // create the rss for the region + if (rss == null) { + MockRegionServer services = new MockRegionServer(conf, name == null ? new ServerName( + "192.168.1.199,58102,1319771740322") : name); + rss = Mockito.spy(services); + } + + // set the log + log = log == null ? Mockito.mock(HLog.class) : log; + + // set the table descriptor + HTableDescriptor desc = new HTableDescriptor(table); + desc.addFamily(hcd); + + // build the region + HRegion region = new HRegion(tableDir, log, fs, conf, info, desc, rss); + + // setup mocks/spys for all the services we need to deal with + HRegionServer mockServer = Mockito.mock(HRegionServer.class); + Mockito.when(mockServer.getOnlineRegion(info.getRegionName())).thenReturn(region); + Mockito.when(mockServer.getOnlineRegions(Bytes.toBytes(table))).thenReturn( + Lists.newArrayList(region)); + Mockito.when(mockServer.getConfiguration()).thenReturn(conf); + + // setup the metrics + metrics = metrics == null ? Mockito.mock(RegionServerMetrics.class) : metrics; + Mockito.when(mockServer.getMetrics()).thenReturn(metrics); + + // mock out accounting to just reference the single region + RegionServerAccounting mockAccounting = Mockito.mock(RegionServerAccounting.class); + Mockito.when(mockServer.getRegionServerAccounting()).thenReturn(mockAccounting); + Mockito.when(mockAccounting.getGlobalMemstoreSize()).thenReturn(region.getMemstoreSize().get()); + + // mock out the compact/split thread + if (cst == null) { + cst = new CompactSplitThread(mockServer); + cstSpy = Mockito.spy(cst); + // never splits, needs doAnswer to avoid calling real method + Mockito.doAnswer(new Answer() { + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable { + return false; + } + }).when(cstSpy).requestSplit(region); + } + Mockito.when(mockServer.getCompactSplitThread()).thenReturn(cst); + + // set up the memstore flusher, only if we have mock region server services + if (!setRss) { + MemStoreFlusher flusher = new MemStoreFlusher(conf, mockServer); + MemStoreFlusher flusherSpy = Mockito.spy(flusher); + // needs doAnswer to avoid calling real method + Mockito.doAnswer(new Answer() { + + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable { + return false; + } + }).when(flusherSpy).flushOneForGlobalPressure(); + // set the flush requester to be returned + Mockito.when(rss.getFlushRequester()).thenReturn(flusherSpy); + // start running the flusher + Threads.setDaemonThreadRunning(flusherSpy.getThread()); + } + + // initialize the region if requested + if (initialize) { + region.initialize(); + } + return region; + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java index db11fd0..eceff05 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java @@ -19,39 +19,45 @@ package org.apache.hadoop.hbase.backup.example; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CountDownLatch; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.MockSupportedHRegionBuilder; import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; -import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner; -import org.apache.hadoop.hbase.regionserver.CheckedArchivingHFileCleaner; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.Store; -import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HFileArchiveTestingUtil; import org.apache.hadoop.hbase.util.HFileArchiveUtil; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; /** * Spin up a small cluster and check that the hfiles of region are properly long-term archived as @@ -60,6 +66,24 @@ import org.junit.experimental.categories.Category; @Category(LargeTests.class) public class TestZooKeeperTableArchiveClient { + /** + * Simple Stoppable implementation that just keeps track of a boolea + */ + private class Stop implements Stoppable { + + private volatile boolean stopped = false; + + @Override + public void stop(String why) { + this.stopped = true; + } + + @Override + public boolean isStopped() { + return this.stopped; + } + }; + private static final Log LOG = LogFactory.getLog(TestZooKeeperTableArchiveClient.class); private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); private static final String STRING_TABLE_NAME = "test"; @@ -76,9 +100,13 @@ public class TestZooKeeperTableArchiveClient { @BeforeClass public static void setupCluster() throws Exception { setupConf(UTIL.getConfiguration()); - UTIL.startMiniCluster(numRS); + UTIL.startMiniZKCluster(); archivingClient = new ZKTableArchiveClient(UTIL.getConfiguration(), UTIL.getHBaseAdmin() .getConnection()); + // make hfile archiving node so we can archive files + ZooKeeperWatcher watcher = UTIL.getZooKeeperWatcher(); + String archivingZNode = ZKTableArchiveClient.getArchiveZNode(UTIL.getConfiguration(), watcher); + ZKUtil.createWithParents(watcher, archivingZNode); } private static void setupConf(Configuration conf) { @@ -94,21 +122,16 @@ public class TestZooKeeperTableArchiveClient { conf.setInt("hbase.hstore.blockingStoreFiles", 12); // drop the number of attempts for the hbase admin conf.setInt("hbase.client.retries.number", 1); - // set the ttl on the hfiles - conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, ttl); - conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, - CheckedArchivingHFileCleaner.class.getCanonicalName(), - LongTermArchivingHFileCleaner.class.getCanonicalName()); } @Before public void setup() throws Exception { - UTIL.createTable(TABLE_NAME, TEST_FAM); + // UTIL.createTable(TABLE_NAME, TEST_FAM); } @After public void tearDown() throws Exception { - UTIL.deleteTable(TABLE_NAME); + // UTIL.deleteTable(TABLE_NAME); // and cleanup the archive directory try { UTIL.getTestFileSystem().delete(new Path(UTIL.getDefaultRootDirPath(), ".archive"), true); @@ -122,7 +145,7 @@ public class TestZooKeeperTableArchiveClient { @AfterClass public static void cleanupTest() throws Exception { try { - UTIL.shutdownMiniCluster(); + UTIL.shutdownMiniZKCluster(); } catch (Exception e) { LOG.warn("problem shutting down cluster", e); } @@ -156,158 +179,228 @@ public class TestZooKeeperTableArchiveClient { @Test public void testArchivingOnSingleTable() throws Exception { - // turn on hfile retention - LOG.debug("----Starting archiving"); - archivingClient.enableHFileBackupAsync(TABLE_NAME); - assertTrue("Archving didn't get turned on", archivingClient - .getArchivingEnabled(TABLE_NAME)); + createArchiveDirectory(); + FileSystem fs = UTIL.getTestFileSystem(); + Path archiveDir = getArchiveDir(); + Path tableDir = getTableDir(); - // get the RS and region serving our table - List servingRegions = UTIL.getHBaseCluster().getRegions(TABLE_NAME); - // make sure we only have 1 region serving this table - assertEquals(1, servingRegions.size()); - HRegion region = servingRegions.get(0); + Configuration conf = UTIL.getConfiguration(); + // setup the long term archiver + Stoppable stop = new Stop(); + HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop); + List cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner); + final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); + + // create the region + HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM); + MockSupportedHRegionBuilder builder = UTIL.fullyMockHRegion(STRING_TABLE_NAME, hcd); + HRegion region = builder.build(true); + + loadRegionAndClose(region); + + // get the current hfiles in the archive directory + List files = getAllFiles(fs, archiveDir); + LOG.info("Jesse"); + if (files == null) { + FSUtils.logFileSystemState(fs, UTIL.getDataTestDir(), LOG); + throw new RuntimeException("Didn't load archive any files!"); + } + CountDownLatch finished = setupCleanerWatching(delegate, cleaners, files.size()); - // get the parent RS and monitor - HRegionServer hrs = UTIL.getRSForFirstRegionInTable(TABLE_NAME); - FileSystem fs = hrs.getFileSystem(); + // run the cleaner + cleaner.start(); + // wait for the cleaner to check all the files + finished.await(); + // stop the cleaner + stop.stop(""); + + // know the cleaner ran, so now check all the files again to make sure they are still there + List archivedFiles = getAllFiles(fs, archiveDir); + assertEquals("Archived files changed after running archive cleaner.", files, archivedFiles); + + // but we still have the archive directory + assertTrue(fs.exists(HFileArchiveUtil.getArchivePath(UTIL.getConfiguration()))); + + // remove the table and archive directories + FSUtils.delete(fs, tableDir, true); + FSUtils.delete(fs, archiveDir, true); + } + + private void createArchiveDirectory() throws IOException { + //create the archive and test directory + FileSystem fs = UTIL.getTestFileSystem(); + Path archiveDir = getArchiveDir(); + fs.mkdirs(archiveDir); + } + + private Path getArchiveDir() throws IOException { + return new Path(UTIL.getDataTestDir(), HFileArchiveUtil.DEFAULT_HFILE_ARCHIVE_DIRECTORY); + } + + private Path getTableDir() throws IOException { + Path testDataDir = UTIL.getDataTestDir(); + FSUtils.setRootDir(UTIL.getConfiguration(), testDataDir); + return new Path(testDataDir, STRING_TABLE_NAME); + } + private HFileCleaner setupAndCreateCleaner(Configuration conf, FileSystem fs, Path archiveDir, + Stoppable stop) { + conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, + LongTermArchivingHFileCleaner.class.getCanonicalName()); + return new HFileCleaner(1000, stop, conf, fs, archiveDir); + } + + /** + * Start archiving table for given hfile cleaner + * @param tableName table to archive + * @param cleaner cleaner to check to make sure change propagated + * @return underlying {@link LongTermArchivingHFileCleaner} that is managing archiving + * @throws IOException on failure + * @throws KeeperException on failure + */ + private List turnOnArchiving(String tableName, HFileCleaner cleaner) + throws IOException, KeeperException { + // turn on hfile retention + LOG.debug("----Starting archiving for table:" + tableName); + archivingClient.enableHFileBackupAsync(Bytes.toBytes(tableName)); + assertTrue("Archving didn't get turned on", archivingClient.getArchivingEnabled(tableName)); + + // wait for the archiver to get the notification + List cleaners = cleaner.getDelegates(); + LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); + while (!delegate.archiveTracker.keepHFiles(STRING_TABLE_NAME)) { + // spin until propagation - should be fast + } + return cleaners; + } + + private void loadRegionAndClose(HRegion region) throws Exception { // put some data on the region LOG.debug("-------Loading table"); UTIL.loadRegion(region, TEST_FAM); loadAndCompact(region); - // check that we actually have some store files that were archived - Store store = region.getStore(TEST_FAM); - Path storeArchiveDir = HFileArchiveTestingUtil.getStoreArchivePath(UTIL.getConfiguration(), - region, store); + // wait for any compactions to finish + region.waitForFlushesAndCompactions(); - // check to make sure we archived some files - assertTrue("Didn't create a store archive directory", fs.exists(storeArchiveDir)); - assertTrue("No files in the store archive", - FSUtils.listStatus(fs, storeArchiveDir, null).length > 0); + // close the region to prevent any more flushes or compactions + HRegion.closeHRegion(region); - // and then put some non-tables files in the archive - Configuration conf = UTIL.getConfiguration(); - Path archiveDir = HFileArchiveUtil.getArchivePath(conf); - // write a tmp file to the archive dir - Path tmpFile = new Path(archiveDir, "toDelete"); - FSDataOutputStream out = fs.create(tmpFile); - out.write(1); - out.close(); - - assertTrue(fs.exists(tmpFile)); - // make sure we wait long enough for the files to expire - Thread.sleep(ttl); - - // print currrent state for comparison - FSUtils.logFileSystemState(fs, archiveDir, LOG); - - // ensure there are no archived files after waiting for a timeout - ensureHFileCleanersRun(); - - // check to make sure the right things get deleted - assertTrue("Store archive got deleted", fs.exists(storeArchiveDir)); - assertTrue("Archived HFiles got deleted", - FSUtils.listStatus(fs, storeArchiveDir, null).length > 0); + // wait for any extra flushes and compactions + region.waitForFlushesAndCompactions(); + } - assertFalse( - "Tmp file (non-table archive file) didn't " + "get deleted, archive dir: " - + fs.listStatus(archiveDir), fs.exists(tmpFile)); - LOG.debug("Turning off hfile backup."); - // stop archiving the table - archivingClient.disableHFileBackup(); - LOG.debug("Deleting table from archive."); - // now remove the archived table - Path primaryTable = new Path(HFileArchiveUtil.getArchivePath(UTIL.getConfiguration()), - STRING_TABLE_NAME); - fs.delete(primaryTable, true); - LOG.debug("Deleted primary table, waiting for file cleaners to run"); - // and make sure the archive directory is retained after a cleanup - // have to do this manually since delegates aren't run if there isn't any files in the archive - // dir to cleanup - Thread.sleep(ttl); - UTIL.getHBaseCluster().getMaster().getHFileCleaner().triggerNow(); - Thread.sleep(ttl); - LOG.debug("File cleaners done, checking results."); - // but we still have the archive directory - assertTrue(fs.exists(HFileArchiveUtil.getArchivePath(UTIL.getConfiguration()))); + /** + * Spy on the {@link LongTermArchivingHFileCleaner} to ensure we can catch when the cleaner has + * seen all the files + * @return a {@link CountDownLatch} to wait on that releases when the cleaner has been called at + * least the expected number of times. + */ + private CountDownLatch setupCleanerWatching(LongTermArchivingHFileCleaner cleaner, + List cleaners, final int expected) { + // replace the cleaner with one that we can can check + BaseHFileCleanerDelegate delegateSpy = Mockito.spy(cleaner); + final int[] counter = new int[] { 0 }; + final CountDownLatch finished = new CountDownLatch(1); + Mockito.doAnswer(new Answer() { + + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable { + counter[0]++; + LOG.debug(counter[0] + "/ " + expected + ") Mocking call to isFileDeletable"); + if (counter[0] > expected) finished.countDown(); + return (Boolean) invocation.callRealMethod(); + + } + }).when(delegateSpy).isFileDeletable(Mockito.any(Path.class)); + cleaners.set(0, delegateSpy); + + return finished; } /** - * Make sure all the {@link HFileCleaner} run. - *

- * Blocking operation up to 3x ttl - * @throws InterruptedException + * Get all the files (non-directory entries) in the file system under the passed directory + * @param dir directory to investigate + * @return all files under the directory */ - private void ensureHFileCleanersRun() throws InterruptedException { - LOG.debug("Waiting on archive cleaners to run..."); - CheckedArchivingHFileCleaner.resetCheck(); - do { - UTIL.getHBaseCluster().getMaster().getHFileCleaner().triggerNow(); - LOG.debug("Triggered, sleeping an amount until we can pass the check."); - Thread.sleep(ttl); - } while (!CheckedArchivingHFileCleaner.getChecked()); + private List getAllFiles(FileSystem fs, Path dir) throws IOException { + FileStatus[] files = FSUtils.listStatus(fs, dir, null); + if (files == null) return null; + + List allFiles = new ArrayList(); + for (FileStatus file : files) { + if (file.isDir()) { + List subFiles = getAllFiles(fs, file.getPath()); + if (subFiles != null) allFiles.addAll(subFiles); + continue; + } + allFiles.add(file.getPath()); + } + return allFiles; } /** * Test archiving/cleaning across multiple tables, where some are retained, and others aren't - * @throws Exception + * @throws Exception on failure */ @Test public void testMultipleTables() throws Exception { - archivingClient.enableHFileBackupAsync(TABLE_NAME); - assertTrue("Archving didn't get turned on", archivingClient - .getArchivingEnabled(TABLE_NAME)); + createArchiveDirectory(); + FileSystem fs = UTIL.getTestFileSystem(); + Path archiveDir = getArchiveDir(); + Path tableDir = getTableDir(); + + Configuration conf = UTIL.getConfiguration(); + // setup the long term archiver + Stoppable stop = new Stop(); + HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop); + List cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner); + final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); + + // create the region + HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM); + MockSupportedHRegionBuilder builder = UTIL.fullyMockHRegion(STRING_TABLE_NAME, hcd); + HRegion region = builder.build(true); + loadRegionAndClose(region); // create the another table that we don't archive String otherTable = "otherTable"; - UTIL.createTable(Bytes.toBytes(otherTable), TEST_FAM); + hcd = new HColumnDescriptor(TEST_FAM); + builder = UTIL.fullyMockHRegion(otherTable, hcd); + HRegion otherRegion = builder.build(true); + loadRegionAndClose(otherRegion); + + // get the current hfiles in the archive directory + List files = getAllFiles(fs, archiveDir); + if (files == null) { + FSUtils.logFileSystemState(fs, archiveDir, LOG); + throw new RuntimeException("Didn't load archive any files!"); + } + + // run the cleaners + CountDownLatch finished = setupCleanerWatching(delegate, cleaners, files.size()); + // run the cleaner + cleaner.start(); + // wait for the cleaner to check all the files + finished.await(); + // stop the cleaner + stop.stop(""); + + // know the cleaner ran, so now check all the files again to make sure they are still there + List archivedFiles = getAllFiles(fs, archiveDir); + for(Path file: archivedFiles) { + //ensure we don't have files from the non-archived table + assertFalse("Have a file from the non-archived table: " + file, + otherTable.equals(file.getParent().getParent().getParent().getName())); + } - // get the parent RS and monitor - FileSystem fs = FileSystem.get(UTIL.getConfiguration()); - - // put data in the filesystem of the first table - LOG.debug("Loading data into:" + STRING_TABLE_NAME); - loadAndCompact(STRING_TABLE_NAME); - - // and some data in the other table - LOG.debug("Loading data into:" + otherTable); - loadAndCompact(otherTable); - - // make sure we wait long enough for the other table's files to expire - ensureHFileCleanersRun(); - - // check to make sure the right things get deleted - Path primaryStoreArchive = HFileArchiveTestingUtil.getStoreArchivePath(UTIL, STRING_TABLE_NAME, - TEST_FAM); - Path otherStoreArchive = HFileArchiveTestingUtil - .getStoreArchivePath(UTIL, otherTable, TEST_FAM); - // make sure the primary store doesn't have any files - assertTrue("Store archive got deleted", fs.exists(primaryStoreArchive)); - assertTrue("Archived HFiles got deleted", - FSUtils.listStatus(fs, primaryStoreArchive, null).length > 0); - FileStatus[] otherArchiveFiles = FSUtils.listStatus(fs, otherStoreArchive, null); - assertNull("Archived HFiles (" + otherStoreArchive - + ") should have gotten deleted, but didn't, remaining files:" - + getPaths(otherArchiveFiles), otherArchiveFiles); - // sleep again to make sure we the other table gets cleaned up - ensureHFileCleanersRun(); - // first pass removes the store archive - assertFalse(fs.exists(otherStoreArchive)); - // second pass removes the region - ensureHFileCleanersRun(); - Path parent = otherStoreArchive.getParent(); - assertFalse(fs.exists(parent)); - // third pass remove the table - ensureHFileCleanersRun(); - parent = otherStoreArchive.getParent(); - assertFalse(fs.exists(parent)); // but we still have the archive directory assertTrue(fs.exists(HFileArchiveUtil.getArchivePath(UTIL.getConfiguration()))); - FSUtils.logFileSystemState(fs, HFileArchiveUtil.getArchivePath(UTIL.getConfiguration()), LOG); - UTIL.deleteTable(Bytes.toBytes(otherTable)); + // remove the table and archive directories + FSUtils.delete(fs, tableDir, true); + FSUtils.delete(fs, new Path(tableDir.getParent(), otherTable), true); + FSUtils.delete(fs, archiveDir, true); } private List getPaths(FileStatus[] files) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index 7def888..7a65b22 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -100,7 +100,7 @@ import com.google.protobuf.ServiceException; * {@link #setGetResult(byte[], byte[], Result)} for how to fill the backing data * store that the get pulls from. */ -class MockRegionServer implements AdminProtocol, ClientProtocol, RegionServerServices { +public class MockRegionServer implements AdminProtocol, ClientProtocol, RegionServerServices { private final ServerName sn; private final ZooKeeperWatcher zkw; private final Configuration conf; @@ -154,7 +154,7 @@ class MockRegionServer implements AdminProtocol, ClientProtocol, RegionServerSer * @throws IOException * @throws ZooKeeperConnectionException */ - MockRegionServer(final Configuration conf, final ServerName sn) + public MockRegionServer(final Configuration conf, final ServerName sn) throws ZooKeeperConnectionException, IOException { this.sn = sn; this.conf = conf; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/CheckedArchivingHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/CheckedArchivingHFileCleaner.java deleted file mode 100644 index efc9cb6..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/CheckedArchivingHFileCleaner.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate; - -/** - * HFile archive cleaner that just tells you if it has been run already or not (and allows resets) - - * always attempts to delete the passed file. - *

- * Just a helper class for testing to make sure the cleaner has been run. - */ -public class CheckedArchivingHFileCleaner extends BaseHFileCleanerDelegate { - - private static boolean checked; - - @Override - public boolean isFileDeletable(Path file) { - checked = true; - return true; - } - - public static boolean getChecked() { - return checked; - } - - public static void resetCheck() { - checked = false; - } -}