diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 5f14680..c047f25 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -662,7 +662,7 @@ public final class HConstants {
public static final String ENABLE_WAL_COMPRESSION =
"hbase.regionserver.wal.enablecompression";
-/** Region in Transition metrics threshold time */
+ /** Region in Transition metrics threshold time */
public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD="hbase.metrics.rit.stuck.warning.threshold";
public static final String LOAD_BALANCER_SLOP_KEY = "hbase.regions.slop";
@@ -673,6 +673,10 @@ public final class HConstants {
*/
public static final byte [] NO_NEXT_INDEXED_KEY = Bytes.toBytes("NO_NEXT_INDEXED_KEY");
+ /** Configuration key for the directory to backup HFiles for a table */
+ public static final String HFILE_ARCHIVE_DIRECTORY = "hbase.table.archive.directory";
+ public static final String HFILE_ARCHIVE_ZNODE_PARENT = "hfilearchive";
+
private HConstants() {
// Can't be instantiated with this ctor.
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiveCleanup.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiveCleanup.java
new file mode 100644
index 0000000..4a88e2e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiveCleanup.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import java.io.IOException;
+import java.util.Stack;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.Parser;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
+
+import com.google.common.base.Function;
+
+/**
+ * Utility to help cleanup extra archive files. This is particularly useful if
+ * there is a partially completed archive, but one of the regionservers bailed
+ * before it could be completed. Ignores directories as existing archiving might
+ * need them.
+ *
+ * Run with -h to see help.
+ */
+public class HFileArchiveCleanup {
+
+ private static final Log LOG = LogFactory.getLog(HFileArchiveCleanup.class);
+
+ private static Configuration conf;
+
+ // configuration
+ private String table = null;
+ private static Function shouldDelete;
+
+ public static void main(String[] args) throws Exception {
+ // setup
+ HFileArchiveCleanup cleaner = new HFileArchiveCleanup();
+ try {
+ if (!cleaner.parse(args)) {
+ System.exit(-1);
+ }
+ } catch (ParseException e) {
+ LOG.error("Parse Exception: " + e.getMessage());
+ cleaner.printHelp();
+ System.exit(-1);
+ }
+
+ // run the cleanup
+ Configuration conf = getConf();
+ FileSystem fs = FSUtils.getCurrentFileSystem(conf);
+
+ Path archiveDir;
+ // if not cleaning a specific table, then just cleanup the archive directory
+ if (cleaner.table == null) {
+ archiveDir = new Path(FSUtils.getRootDir(conf),
+ HFileArchiveUtil.getConfiguredArchiveDir(conf));
+ } else {
+ Path tableDir = HTableDescriptor.getTableDir(FSUtils.getRootDir(conf),
+ Bytes.toBytes(cleaner.table));
+ archiveDir = HFileArchiveUtil.getTableArchivePath(conf, tableDir);
+ }
+
+ LOG.debug("Cleaning up: " + archiveDir);
+ // iterate through the archive directory and delete everything that falls
+ // outside the range
+ Stack directories = new Stack();
+ directories.add(archiveDir);
+ // while there are more directories to look at
+ while (!directories.isEmpty()) {
+ Path parent = directories.pop();
+ // if the parent exists
+ if (fs.exists(parent)) {
+ // get all the children
+ FileStatus[] children = fs.listStatus(parent);
+ for (FileStatus child : children) {
+ // push directory to be cleaned
+ if (child.isDir()) {
+ directories.push(child.getPath());
+ continue;
+ }
+ // otherwise its a file and we can check deletion
+ // we have to check mod times against what the command line says
+ long mod = child.getModificationTime();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Child file: " + child.getPath() + ", has mod time:" + mod);
+ }
+ // check to see if it meets the specified timestamps
+ if (shouldDelete.apply(mod)) {
+ try {
+ if (!fs.delete(child.getPath(), false)) {
+ LOG.warn("Could not delete:" + child);
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to delete child", e);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Set the configuration to use.
+ *
+ * Exposed for TESTING!
+ * @param conf
+ */
+ static void setConfiguration(Configuration conf) {
+ HFileArchiveCleanup.conf = conf;
+ }
+
+ private synchronized static Configuration getConf() {
+ if (HFileArchiveCleanup.conf == null) {
+ setConfiguration(HBaseConfiguration.create());
+ }
+ return HFileArchiveCleanup.conf;
+ }
+
+ // ---------------------------
+ // Setup and options parsing
+ // ---------------------------
+
+ private final Options opts;
+ private final Parser parser;
+
+ @SuppressWarnings("static-access")
+ public HFileArchiveCleanup() {
+ // create all the necessary options
+ opts = new Options();
+ // create the time option
+ OptionBuilder
+ .hasArg()
+ .withArgName("start time")
+ .withDescription(
+ "Start time from which to start removing"
+ + " backup files in seconds from the epoch (inclusive). "
+ + "If not set, all archive files are removed from the "
+ + "specified table up to the end time. Not setting start "
+ + "OR end time deletes all archived files.");
+ opts.addOption(OptionBuilder.create('s'));
+
+ // end time option
+ OptionBuilder
+ .hasArg()
+ .withArgName("end time")
+ .withDescription(
+ "End time from which to start removing backup files in seconds "
+ + "from the epoch (exclusive). If not set, all archive files forward "
+ + "from the start time are removed. Not setting start OR end time deletes "
+ + "all archived files.");
+ opts.addOption(OptionBuilder.create('e'));
+
+ // create the tablename option
+ OptionBuilder
+ .hasArg()
+ .withArgName("table")
+ .withDescription(
+ "Name of the table for which to remove backups. If not set, all archived "
+ + "files will be cleaned up.")
+ .withType(String.class);
+ opts.addOption(OptionBuilder.create('t'));
+
+ // add the help
+ opts.addOption(OptionBuilder.hasArg(false).withDescription("Show this help").create('h'));
+
+ // create the parser
+ parser = new GnuParser();
+ }
+
+ public boolean parse(String[] args) throws ParseException {
+ CommandLine cmd;
+ cmd = parser.parse(opts, args);
+ //hack around final and annonymous innner classes
+ final long[] startend = new long[] { -1, Long.MAX_VALUE };
+ if (cmd.hasOption('h')) {
+ printHelp();
+ return false;
+ }
+ // get start/end times
+ if (cmd.hasOption('s')) {
+ startend[0]= Long.parseLong(cmd.getOptionValue('s'));
+ LOG.debug("Setting start time to:" + startend[0]);
+ }
+ if (cmd.hasOption('e')) {
+ startend[1]= Long.parseLong(cmd.getOptionValue('e'));
+ LOG.debug("Setting end time to:" + startend[1]);
+ }
+
+ if (startend[0] > startend[1]) {
+ throw new ParseException("Start time cannot exceed end time.");
+ }
+
+ if (startend[0] == startend[1]) {
+ //set the same start and end time
+ shouldDelete = new Function() {
+ @Override
+ public Boolean apply(Long input) {
+ return input.longValue() ==startend[0];
+ }
+ };
+ } else {
+ //set a range to delete
+ shouldDelete = new Function() {
+ @Override
+ public Boolean apply(Long input) {
+ return startend[0] <= input && input < startend[1];
+ }
+ };
+ }
+
+ // get the table to cleanup
+ if (cmd.hasOption('t')) {
+ this.table = cmd.getOptionValue('t');
+ }
+ return true;
+ }
+
+ public void printHelp() {
+ HelpFormatter help = new HelpFormatter();
+ help.printHelp("java HFileArchiveCleanup", opts);
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiveMonitor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiveMonitor.java
new file mode 100644
index 0000000..3d77c5d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiveMonitor.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+
+/**
+ * Monitor for which tables the HFiles should be archived.
+ */
+public interface HFileArchiveMonitor {
+
+ /**
+ * Determine if the given table should or should not allow its hfiles to be
+ * deleted
+ *
+ * @param tableName name of the table to check
+ * @return true if its store files should be retained, false
+ * otherwise
+ */
+ public boolean keepHFiles(String tableName);
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiveTableMonitor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiveTableMonitor.java
new file mode 100644
index 0000000..51ae8ea
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiveTableMonitor.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Monitor the actual tables for which HFiles are archived.
+ *
+ * It is internally synchronized to ensure consistent view of the table state.
+ *
+ * By default, updates zookeeper that this server has received the update to
+ * archive the hfiles for the given table.
+ */
+public class HFileArchiveTableMonitor extends Configured implements HFileArchiveMonitor {
+ private static final Log LOG = LogFactory.getLog(HFileArchiveTableMonitor.class);
+ private final Set archivedTables = new TreeSet();
+
+ protected final ZooKeeperWatcher zkw;
+ protected final Server parent;
+
+ /**
+ * Exposed for testing only. Generally, the
+ * {@link TableHFileArchiveTracker#create(ZooKeeperWatcher, Server)} should be used
+ * when working with table archive monitors.
+ * @param parent server for which we should monitor archiving
+ * @param zkw watcher for the zookeeper cluster for archiving hfiles
+ */
+ protected HFileArchiveTableMonitor(Server parent, ZooKeeperWatcher zkw) {
+ this.parent = parent;
+ this.zkw = zkw;
+ }
+
+ /**
+ * Set the tables to be archived. Internally adds each table and attempts to
+ * register it.
+ *
+ * Note: All previous tables will be removed in favor of these tables.
+ * @param tables add each of the tables to be archived.
+ */
+ public synchronized void setArchiveTables(List tables) {
+ archivedTables.clear();
+ archivedTables.addAll(tables);
+ for (String table : tables) {
+ registerTable(table);
+ }
+ }
+
+ /**
+ * Add the named table to be those being archived. Attempts to register the
+ * table
+ * @param table name of the table to be registered
+ */
+ public synchronized void addTable(String table) {
+ if (this.keepHFiles(table)) {
+ LOG.debug("Already archiving table: " + table + ", ignoring it");
+ return;
+ }
+ archivedTables.add(table);
+ registerTable(table);
+ }
+
+ /**
+ * Subclass hook for registering a table with external sources.
+ *
+ * Currently updates ZK with the fact that the current server has started
+ * archiving hfiles for the specified table.
+ * @param table name of the table that is being archived
+ */
+ protected void registerTable(String table) {
+ LOG.debug("Registering table '" + table + "' as being archived.");
+
+ // notify that we are archiving the table for this server
+ try {
+ String tablenode = HFileArchiveUtil.getTableNode(zkw, table);
+ String serverNode = ZKUtil.joinZNode(tablenode, parent.getServerName().toString());
+ ZKUtil.createEphemeralNodeAndWatch(zkw, serverNode, new byte[0]);
+ } catch (KeeperException e) {
+ LOG.error("Failing to update that this server(" + parent.getServerName()
+ + " is joining archive of table:" + table, e);
+ }
+ }
+
+ public synchronized void removeTable(String table) {
+ archivedTables.remove(table);
+ }
+
+ public synchronized void clearArchive() {
+ archivedTables.clear();
+ }
+
+ @Override
+ public synchronized boolean keepHFiles(String tableName) {
+ return archivedTables.contains(tableName);
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileDisposer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileDisposer.java
new file mode 100644
index 0000000..4f56231
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileDisposer.java
@@ -0,0 +1,685 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
+
+/**
+ * Utility class to handle the removal of HFiles (or the respective
+ * {@link StoreFile StoreFiles} for a HRegion from the {@link FileSystem}. The
+ * hfiles will be archived or deleted, depending on the state of the system.
+ */
+public class HFileDisposer {
+ private static final Log LOG = LogFactory.getLog(HFileDisposer.class);
+ private static final String SEPARATOR = ".";
+
+ private HFileDisposer() {
+ // hidden ctor since this is just a util
+ }
+
+ /**
+ * Cleans up all the files for a HRegion, either via archiving (if the
+ * {@link HFileArchiveMonitor} indicates it should be) or by just removing all
+ * the files.
+ * @param fs the file system object
+ * @param monitor manager for if the region should be archived or deleted
+ * @param info HRegionInfo for region to be deleted
+ * @throws IOException
+ */
+ public static void disposeRegion(FileSystem fs, HFileArchiveMonitor monitor, HRegionInfo info)
+ throws IOException {
+ Path rootDir = FSUtils.getRootDir(fs.getConf());
+ disposeRegion(fs, monitor, rootDir, HTableDescriptor.getTableDir(rootDir, info.getTableName()),
+ HRegion.getRegionDir(rootDir, info));
+ }
+
+ /**
+ * Cleans up all the files for a HRegion, either via archiving (if the
+ * {@link HFileArchiveMonitor}, from the {@link RegionServerServices},
+ * indicates it should be) or by just removing all the files.
+ * @param fs FileSystem where the region files reside
+ * @param rss services to obtain the {@link HFileArchiveMonitor} via
+ * {@link RegionServerServices#getHFileArchiveMonitor()}. Testing
+ * notes: if the returned monitor is null, the region files are
+ * deleted
+ * @param rootdir root directory of the hbase installation on the
+ * {@link FileSystem}
+ * @param tabledir {@link Path} to where the table is being stored (for
+ * building the archive path)
+ * @param regiondir {@link Path} to where a region is being stored (for
+ * building the archive path)
+ * @throws IOException
+ */
+ public static void disposeRegion(FileSystem fs, RegionServerServices rss, Path rootdir,
+ Path tabledir, Path regiondir) throws IOException {
+ HFileArchiveMonitor manager = rss == null ? null : rss.getHFileArchiveMonitor();
+ disposeRegion(fs, manager, rootdir, tabledir, regiondir);
+ }
+
+ /**
+ * Remove an entire region from the table directory.
+ *
+ * Either archives the region or outright deletes it, depending on if
+ * archiving is enabled.
+ * @param fs {@link FileSystem} from which to remove the region
+ * @param monitor Monitor for which tables should be archived or deleted
+ * @param rootdir {@link Path} to the root directory where hbase files are
+ * stored (for building the archive path)
+ * @param tabledir {@link Path} to where the table is being stored (for
+ * building the archive path)
+ * @param regionDir {@link Path} to where a region is being stored (for
+ * building the archive path)
+ * @return true if the region was sucessfully deleted. false
+ * if the filesystem operations could not complete.
+ * @throws IOException if the request cannot be completed
+ */
+ public static boolean disposeRegion(FileSystem fs, HFileArchiveMonitor monitor, Path rootdir,
+ Path tabledir, Path regionDir) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("REMOVING region " + regionDir.toString());
+ }
+ // check to make sure we don't keep files, in which case just delete them
+ // NOTE: assumption here that tabledir = tablename
+ if (shouldDeleteFiles(monitor, tabledir)) {
+ LOG.debug("Deleting hregion directory (" + regionDir + ") - no backup");
+ return deleteRegionWithoutArchiving(fs, regionDir);
+ }
+
+ // otherwise, we archive the files
+ // make sure the regiondir lives under the tabledir
+ Preconditions.checkArgument(regionDir.toString().startsWith(tabledir.toString()));
+
+ // get the directory to archive region files
+ Path regionArchiveDir = HFileArchiveUtil.getRegionArchiveDir(fs.getConf(), tabledir, regionDir);
+ if (regionArchiveDir == null) {
+ LOG.warn("No archive directory could be found for the region:" + regionDir
+ + ", deleting instead");
+ deleteRegionWithoutArchiving(fs, regionDir);
+ // we should have archived, but failed to. Doesn't matter if we deleted
+ // the archived files correctly or not.
+ return false;
+ }
+
+ FileStatusConverter getAsFile = new FileStatusConverter(fs);
+ // otherwise, we attempt to archive the store files
+
+ // build collection of just the store directories to archive
+ Collection toArchive = Collections2.transform(
+ Collections2.filter(Arrays.asList(fs.listStatus(regionDir)), new Predicate() {
+ @Override
+ public boolean apply(FileStatus input) {
+ // filter out paths that are not store directories
+ if (!input.isDir() || input.getPath().getName().toString().startsWith(".")) return false;
+ return true;
+ }
+ }), getAsFile);
+
+ boolean success = false;
+ try {
+ success = resolveAndArchive(fs, regionArchiveDir, toArchive);
+ } catch (IOException e) {
+ success = false;
+ }
+
+ // if that was successful, then we delete the region
+ if (success) {
+ return deleteRegionWithoutArchiving(fs, regionDir);
+ }
+
+ throw new IOException("Received error when attempting to archive files (" + toArchive
+ + "), cannot delete region directory.");
+ }
+
+ /**
+ * Without regard for backup, delete a region. Should be used with caution.
+ * @param regionDir {@link Path} to the region to be deleted.
+ * @throws IOException on filesystem operation failure
+ */
+ private static boolean deleteRegionWithoutArchiving(FileSystem fs, Path regionDir)
+ throws IOException {
+ if (fs.delete(regionDir, true)) {
+ LOG.debug("Deleted all region files in: " + regionDir);
+ return true;
+ }
+ LOG.debug("Failed to delete region directory:" + regionDir);
+ return false;
+ }
+
+ /**
+ * Remove the store files, either by archiving them or outright deletion
+ * @param rss services to obtain the {@link HFileArchiveMonitor} via
+ * {@link RegionServerServices#getHFileArchiveMonitor()}. Testing
+ * notes: if the services or the returned monitor is null, the region
+ * files are deleted
+ * @param parent Parent region hosting the store files
+ * @param conf {@link Configuration} to examine to determine the archive
+ * directory
+ * @param family the family hosting the store files
+ * @param compactedFiles files to be disposed of. No further reading of these
+ * files should be attempted; otherwise likely to cause an
+ * {@link IOException}
+ * @throws IOException if the files could not be correctly disposed.
+ */
+ public static void disposeStoreFiles(RegionServerServices rss, HRegion parent,
+ Configuration conf, byte[] family, Collection compactedFiles) throws IOException {
+
+ // short circuit if we don't have any files to delete
+ if (compactedFiles.size() == 0) {
+ LOG.debug("No store files to dispose, done!");
+ return;
+ }
+
+ // if the monitor isn't available or indicates we shouldn't archive, then we
+ // delete the store files
+ if (shouldDeleteFiles(rss, parent.getTableDir())) {
+ deleteStoreFilesWithoutArchiving(compactedFiles);
+ return;
+ }
+
+ // build the archive path
+ FileSystem fs = rss.getFileSystem();
+ Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, parent, family);
+
+ // make sure we don't archive if we can't and that the archive dir exists
+ if (storeArchiveDir == null || !fs.mkdirs(storeArchiveDir)) {
+ LOG.warn("Could make archive directory (" + storeArchiveDir + ") for store:"
+ + Bytes.toString(family) + ", deleting compacted files instead.");
+ deleteStoreFilesWithoutArchiving(compactedFiles);
+ return;
+ }
+
+ // otherwise we attempt to archive the store files
+ LOG.debug("Archiving compacted store files.");
+
+ // wrap the storefile into a File
+ StoreToFile getStorePath = new StoreToFile(fs);
+ Collection storeFiles = Collections2.transform(compactedFiles, getStorePath);
+
+ // do the actual archive
+ if (!resolveAndArchive(fs, storeArchiveDir, storeFiles)) {
+ throw new IOException("Failed to archive/delete all the files."
+ + " Something is probably arwy on the filesystem.");
+ }
+ }
+
+ /**
+ * Check to see if we should archive the given files, based on the state of
+ * the regionserver
+ * @param rss services to provide the {@link HFileArchiveMonitor}, can be null
+ * (in which case returns true).
+ * @param tabledir table directory where the files are being archived. NOTE:
+ * implicit assumption here that the tabledir = name of the table
+ * @return true if the files should be deleted false if they
+ * should be archived
+ */
+ private static boolean shouldDeleteFiles(RegionServerServices rss, Path tabledir) {
+ if (rss == null) return true;
+ return shouldDeleteFiles(rss.getHFileArchiveMonitor(), tabledir);
+ }
+
+ /**
+ * Check to see if we should archive the given files, based on the state of
+ * the regionserver
+ * @param monitor services to indicate if the files should be archived and can
+ * be null (in which case returns true).
+ * @param tabledir table directory where the files are being archived. NOTE:
+ * implicit assumption here that the tabledir = name of the table
+ * @return true if the files should be deleted (or the monitor is
+ * null), false if they should be archived
+ */
+ private static boolean shouldDeleteFiles(HFileArchiveMonitor monitor, Path tabledir) {
+ if (monitor == null) return true;
+ return !monitor.keepHFiles(tabledir.getName());
+ }
+
+ /**
+ * Just do a simple delete of the given store files
+ *
+ * A best effort is made to delete each of the files, rather than bailing on
+ * the first failure.
+ *
+ * This method is preferable to
+ * {@link #deleteFilesWithoutArchiving(Collection)} since it consumes less
+ * resources, but is limited in terms of usefulness
+ * @param compactedFiles store files to delete from the file system.
+ * @throws IOException if a file cannot be deleted. All files will be
+ * attempted to deleted before throwing the exception, rather than
+ * failing at the first file.
+ */
+ private static void deleteStoreFilesWithoutArchiving(Collection compactedFiles)
+ throws IOException {
+ LOG.debug("Deleting store files without archiving.");
+ boolean failure = false;
+ for (StoreFile hsf : compactedFiles) {
+ try {
+ hsf.deleteReader();
+ } catch (IOException e) {
+ LOG.error("Failed to delete store file:" + hsf.getPath());
+ failure = true;
+ }
+ }
+ if (failure) {
+ throw new IOException("Failed to delete all store files. See log for failures.");
+ }
+ }
+
+ /**
+ * Archive the given files and resolve any conflicts with existing files via
+ * appending the time archiving started (so all conflicts in the same group
+ * have the same timestamp appended).
+ *
+ * Recursively copies over files to archive if any of the files to archive are
+ * directories. Archive directory structure for children is the base archive
+ * directory name + the parent directory.
+ * @param fs {@link FileSystem} on which to archive the files
+ * @param baseArchiveDir base archive directory to archive the given files
+ * @param toArchive files to be archived
+ * @return true on success, false otherwise
+ * @throws IOException on unexpected failure
+ */
+ private static boolean resolveAndArchive(FileSystem fs, Path baseArchiveDir,
+ Collection toArchive) throws IOException {
+ long start = EnvironmentEdgeManager.currentTimeMillis();
+ List failures = resolveAndArchive(fs, baseArchiveDir, toArchive, start);
+
+ // clean out the failures by just deleting them
+ if (failures.size() > 0) {
+ try {
+ LOG.error("Failed to complete archive, deleting extra store files.");
+ deleteFilesWithoutArchiving(failures);
+ } catch (IOException e) {
+ LOG.warn("Failed to delete store file(s) when archiving failed", e);
+ }
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Resolve any conflict with an existing archive file via timestamp-append
+ * renaming of the existing file and then archive the passed in files.
+ * @param fs {@link FileSystem} on which to archive the files
+ * @param baseArchiveDir base archive directory to store the files. If any of
+ * the files to archive are directories, will append the name of the
+ * directory to the base archive directory name, creating a parallel
+ * structure.
+ * @param toArchive files/directories that need to be archvied
+ * @param start time the archiving started - used for resolving archive
+ * conflicts.
+ * @return the list of failed to archive files.
+ * @throws IOException if an unexpected file operation exception occured
+ */
+ private static List resolveAndArchive(FileSystem fs, Path baseArchiveDir,
+ Collection toArchive, long start) throws IOException {
+ // short circuit if no files to move
+ if (toArchive.size() == 0) return Collections.emptyList();
+
+ LOG.debug("moving files to the archive directory: " + baseArchiveDir);
+
+ // make sure the archive directory exists
+ if (!fs.exists(baseArchiveDir)) {
+ if (!fs.mkdirs(baseArchiveDir)) {
+ throw new IOException("Failed to create the archive directory:" + baseArchiveDir
+ + ", quitting archive attempt.");
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug("Created archive directory:" + baseArchiveDir);
+ }
+ }
+
+ List failures = new ArrayList();
+ String startTime = Long.toString(start);
+ for (File file : toArchive) {
+ // if its a file archive it
+ try {
+ LOG.debug("Archiving:" + file);
+ if (file.isFile()) {
+ // attempt to archive the file
+ if (!resolveAndArchiveFile(baseArchiveDir, file, startTime)) {
+ LOG.warn("Couldn't archive " + file + " into backup directory: " + baseArchiveDir);
+ failures.add(file);
+ }
+ } else {
+ // otherwise its a directory and we need to archive all files
+ LOG.debug(file + " is a directory, archiving children files");
+ // so we add the directory name to the one base archive
+ Path directoryArchiveDir = new Path(baseArchiveDir, file.getName());
+ // and then get all the files from that directory and attempt to
+ // archive those too
+ Collection children = file.getChildren();
+ failures.addAll(resolveAndArchive(fs, directoryArchiveDir, children, start));
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to archive file: " + file, e);
+ failures.add(file);
+ }
+ }
+ return failures;
+ }
+
+ /**
+ * Attempt to archive the passed in file to the store archive directory.
+ *
+ * If the same file already exists in the archive, it is moved to a
+ * timestamped directory under the archive directory and the new file is put
+ * in its place.
+ * @param fs FileSystem on which to move files
+ * @param storeArchiveDirectory {@link Path} to the directory that stores the
+ * archives of the hfiles
+ * @param currentFile {@link Path} to the original HFile that will be archived
+ * @param archiveStartTime
+ * @return true if the file is successfully archived. false
+ * if there was a problem, but the operation still completed.
+ * @throws IOException on failure to complete {@link FileSystem} operations.
+ */
+ private static boolean resolveAndArchiveFile(Path storeArchiveDirectory, File currentFile,
+ String archiveStartTime) throws IOException {
+ // build path as it should be in the archive
+ String filename = currentFile.getName();
+ Path archiveFile = new Path(storeArchiveDirectory, filename);
+ FileSystem fs = currentFile.getFileSystem();
+ // if the file already exists in the archive, move that one to a
+ // timestamped backup
+ if (fs.exists(archiveFile)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("File:" + archiveFile
+ + " already exists in archive, moving to timestamped backup and overwriting current.");
+ }
+
+ Path backedupArchiveFile = new Path(storeArchiveDirectory, filename + SEPARATOR
+ + archiveStartTime);
+
+ // move the archive file to the stamped backup
+ if (!fs.rename(archiveFile, backedupArchiveFile)) {
+ LOG.warn("Could not rename archive file to backup: " + backedupArchiveFile
+ + ", deleting existing file in favor of newer.");
+ fs.delete(archiveFile, false);
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug("Backed up archive file from: " + archiveFile);
+ }
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug("No existing file in archive for:" + archiveFile
+ + ", free to archive original file.");
+ }
+
+ // at this point, we should have a free spot for the archive file
+ if (currentFile.moveAndClose(archiveFile)) {
+ LOG.error("Failed to archive file:" + currentFile);
+ return false;
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug("Finished archiving file from: " + currentFile + ", to: " + archiveFile);
+ }
+ return true;
+ }
+
+ /**
+ * Simple delete of regular files from the {@link FileSystem}.
+ *
+ * This method is a more generic implementation that the other deleteXXX
+ * methods in this class, allowing more code reuse at the cost of a couple
+ * more, short-lived objects (which should have minimum impact on the jvm).
+ * @param fs {@link FileSystem} where the files live
+ * @param files {@link Collection} of files to be deleted
+ * @throws IOException if a file cannot be deleted. All files will be
+ * attempted to deleted before throwing the exception, rather than
+ * failing at the first file.
+ */
+ private static void deleteFilesWithoutArchiving(Collection files) throws IOException {
+ boolean failure = false;
+ for (File file : files) {
+ try {
+ LOG.debug("Deleting region file:" + file);
+ file.delete();
+ } catch (IOException e) {
+ LOG.error("Failed to delete file:" + file);
+ failure = true;
+ }
+ }
+ if (failure) {
+ throw new IOException("Failed to delete all store files. See log for failures.");
+ }
+ }
+
+ /**
+ * Adapt a type to match the {@link File} interface, which is used internally
+ * for handling archival/removal of files
+ * @param type to adapt to the {@link File} interface
+ */
+ private static abstract class FileConverter implements Function {
+ protected final FileSystem fs;
+
+ public FileConverter(FileSystem fs) {
+ this.fs = fs;
+ }
+ }
+
+ /**
+ * Convert a FileStatus to something we can manage in the archiving
+ */
+ private static class FileStatusConverter extends FileConverter {
+ public FileStatusConverter(FileSystem fs) {
+ super(fs);
+ }
+
+ @Override
+ public File apply(FileStatus input) {
+ return new FileablePath(fs, input.getPath());
+ }
+ }
+
+ /**
+ * Convert the {@link StoreFile} into something we can manage in the archive
+ * methods
+ */
+ private static class StoreToFile extends FileConverter {
+ public StoreToFile(FileSystem fs) {
+ super(fs);
+ }
+
+ @Override
+ public File apply(StoreFile input) {
+ return new FileableStoreFile(fs, input);
+ }
+ }
+
+ /**
+ * Wrapper to handle file operations uniformly
+ */
+ private static abstract class File {
+ protected final FileSystem fs;
+
+ public File(FileSystem fs) {
+ this.fs = fs;
+ }
+
+ /**
+ * Delete the file
+ * @throws IOException on failure
+ */
+ abstract void delete() throws IOException;
+
+ /**
+ * Check to see if this is a file or a directory
+ * @return true if it is a file, false otherwise
+ * @throws IOException on {@link FileSystem} connection error
+ */
+ abstract boolean isFile() throws IOException;
+
+ /**
+ * @return if this is a directory, returns all the children in the
+ * directory, otherwise returns an empty list
+ * @throws IOException
+ */
+ abstract Collection getChildren() throws IOException;
+
+ /**
+ * close any outside readers of the file
+ * @throws IOException
+ */
+ abstract void close() throws IOException;
+
+ /**
+ * @return the name of the file (not the full fs path, just the individual
+ * file name)
+ */
+ abstract String getName();
+
+ /**
+ * @return the path to this file
+ */
+ abstract Path getPath();
+
+ /**
+ * Move the file to the given destination
+ * @param dest
+ * @return true on success
+ * @throws IOException
+ */
+ public boolean moveAndClose(Path dest) throws IOException {
+ this.close();
+ Path p = this.getPath();
+ return !fs.rename(p, dest);
+ }
+
+ /**
+ * @return the {@link FileSystem} on which this file resides
+ */
+ public FileSystem getFileSystem() {
+ return this.fs;
+ }
+
+ @Override
+ public String toString() {
+ return this.getClass() + ", file:" + getPath().toString();
+ }
+ }
+
+ /**
+ * A {@link File} that wraps a simple {@link Path} on a {@link FileSystem}.
+ */
+ private static class FileablePath extends File {
+ private final Path file;
+ private final FileStatusConverter getAsFile;
+
+ public FileablePath(FileSystem fs, Path file) {
+ super(fs);
+ this.file = file;
+ this.getAsFile = new FileStatusConverter(fs);
+ }
+
+ @Override
+ public void delete() throws IOException {
+ fs.delete(file, true);
+ }
+
+ @Override
+ public String getName() {
+ return file.getName();
+ }
+
+ @Override
+ public Collection getChildren() throws IOException {
+ if (fs.isFile(file)) return Collections.emptyList();
+ return Collections2.transform(Arrays.asList(fs.listStatus(file)), getAsFile);
+ }
+
+ @Override
+ public boolean isFile() throws IOException {
+ return fs.isFile(file);
+ }
+
+ @Override
+ public void close() throws IOException {
+ // NOOP - files are implicitly closed on removal
+ }
+
+ @Override
+ Path getPath() {
+ return file;
+ }
+ }
+
+ /**
+ * {@link File} adapter for a {@link StoreFile} living on a {@link FileSystem}
+ * .
+ */
+ private static class FileableStoreFile extends File {
+ StoreFile file;
+
+ public FileableStoreFile(FileSystem fs, StoreFile store) {
+ super(fs);
+ this.file = store;
+ }
+
+ @Override
+ public void delete() throws IOException {
+ file.deleteReader();
+ }
+
+ @Override
+ public String getName() {
+ return file.getPath().getName();
+ }
+
+ @Override
+ public boolean isFile() {
+ return true;
+ }
+
+ @Override
+ public Collection getChildren() throws IOException {
+ // storefiles don't have children
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void close() throws IOException {
+ file.closeReader(true);
+ }
+
+ @Override
+ Path getPath() {
+ return file.getPath();
+ }
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/TableHFileArchiveTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/TableHFileArchiveTracker.java
new file mode 100644
index 0000000..52a2d89
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/TableHFileArchiveTracker.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Track HFile archiving state changes in ZooKeeper. Keeps track of the tables
+ * whose HFiles should be archived.
+ *
+ * {@link TableHFileArchiveTracker#start()} needs to be called to start monitoring
+ * for tables to archive.
+ */
+public class TableHFileArchiveTracker extends ZooKeeperListener implements HFileArchiveMonitor {
+ private static final Log LOG = LogFactory.getLog(TableHFileArchiveTracker.class);
+ private HFileArchiveTableMonitor monitor;
+
+ private TableHFileArchiveTracker(ZooKeeperWatcher watcher, HFileArchiveTableMonitor monitor) {
+ super(watcher);
+ watcher.registerListener(this);
+ this.monitor = monitor;
+ }
+
+ /**
+ * Start monitoring for archive updates
+ * @throws KeeperException on failure to find/create nodes
+ */
+ public void start() throws KeeperException {
+ // if archiving is enabled, then read in the list of tables to archive
+ LOG.debug("Starting hfile archive tracker...");
+ this.checkEnabledAndUpdate();
+ LOG.debug("Finished starting hfile archive tracker!");
+ }
+
+ @Override
+ public void nodeCreated(String path) {
+ // if it is the archive path
+ if (!path.startsWith(watcher.archiveHFileZNode)) return;
+
+ LOG.debug("Archive node: " + path + " created");
+ // since we are already enabled, just update a single table
+ String table = path.substring(watcher.archiveHFileZNode.length());
+
+ // if a new table has been added
+ if (table.length() != 0) {
+ try {
+ addAndReWatchTable(path);
+ } catch (KeeperException e) {
+ LOG.warn("Couldn't read zookeeper data for table, not archiving", e);
+ }
+ }
+ // the top level node has come up, so read in all the tables
+ else {
+ checkEnabledAndUpdate();
+ }
+ }
+
+ @Override
+ public void nodeChildrenChanged(String path) {
+ if (!path.startsWith(watcher.archiveHFileZNode)) return;
+
+ LOG.debug("Archive node: " + path + " children changed.");
+ // a table was added to the archive
+ try {
+ updateWatchedTables();
+ } catch (KeeperException e) {
+ LOG.error("Failed to update tables to archive", e);
+ }
+ }
+
+ /**
+ * Add this table to the tracker and then read a watch on that node.
+ *
+ * Handles situtation where table is deleted in the time between the update
+ * and resetting the watch by deleting the table via
+ * {@link #safeStopTrackingTable(String)}
+ * @param tableZnode full zookeeper path to the table to be added
+ * @throws KeeperException if an unexpected zk exception occurs
+ */
+ private void addAndReWatchTable(String tableZnode) throws KeeperException {
+ getMonitor().addTable(ZKUtil.getNodeName(tableZnode));
+ // re-add a watch to the table created
+ // and check to make sure it wasn't deleted
+ if (!ZKUtil.watchAndCheckExists(watcher, tableZnode)) {
+ safeStopTrackingTable(tableZnode);
+ }
+ }
+
+ /**
+ * Stop tracking a table. Ensures that the table doesn't exist, but if it does
+ * attempts to add the table back via {@link #addAndReWatchTable(String)} (its
+ * a 'safe' removal)
+ * @param tableZnode full zookeeper path to the table to be added
+ * @throws KeeperException if an unexpected zk exception occurs
+ */
+ private void safeStopTrackingTable(String tableZnode) throws KeeperException {
+ getMonitor().removeTable(ZKUtil.getNodeName(tableZnode));
+ // if the table exists, then add and rewatch it
+ if (ZKUtil.checkExists(watcher, tableZnode) >= 0) {
+ addAndReWatchTable(tableZnode);
+ }
+ }
+
+ @Override
+ public void nodeDeleted(String path) {
+ if (!path.startsWith(watcher.archiveHFileZNode)) return;
+
+ LOG.debug("Archive node: " + path + " deleted");
+ String table = path.substring(watcher.archiveHFileZNode.length());
+ // if we stop archiving all tables
+ if (table.length() == 0) {
+ // make sure we have the tracker before deleting the archive
+ // but if we don't, we don't care about delete
+ clearTables();
+ // watches are one-time events, so we need to renew our subscription to
+ // the archive node and might as well check to make sure archiving
+ // didn't come back on at the same time
+ checkEnabledAndUpdate();
+ return;
+ }
+ // just stop archiving one table
+ // note that we don't attempt to add another watch for that table into zk.
+ // We have no assurances that the table will be archived again (or even
+ // exists for that matter), so its better not to add unnecessary load to
+ // zk for watches. If the table is created again, then we will get the
+ // notification in childrenChanaged.
+ getMonitor().removeTable(ZKUtil.getNodeName(path));
+ }
+
+ /**
+ * Sets the watch on the top-level archive znode, and then updates the montior
+ * with the current tables that should be archived (and ensures that those
+ * nodes are watched as well).
+ */
+ private void checkEnabledAndUpdate() {
+ try {
+ if (ZKUtil.watchAndCheckExists(watcher, watcher.archiveHFileZNode)) {
+ LOG.debug(watcher.archiveHFileZNode + " znode does exist, checking for tables to archive");
+
+ // update the tables we should backup, to get the most recent state.
+ // This is safer than also watching for children and then hoping we get
+ // all the updates as it makes sure we get and watch all the children
+ updateWatchedTables();
+ } else {
+ LOG.debug("Archiving not currently enabling, waiting");
+ }
+ } catch (KeeperException e) {
+ LOG.warn("Failed to watch for archiving znode", e);
+ }
+ }
+
+ /**
+ * Read the list of children under the archive znode as table names and then
+ * sets those tables to the list of tables that we should archive
+ * @throws KeeperException if there is an unexpected zk exception
+ */
+ private void updateWatchedTables() throws KeeperException {
+ // get the children and watch for new children
+ LOG.debug("Updating watches on tables to archive.");
+ // get the children and add watches for each of the children
+ List tables = ZKUtil.listChildrenAndWatchThem(watcher, watcher.archiveHFileZNode);
+ LOG.debug("Starting archive for tables:" + tables);
+ // if archiving is still enabled
+ if (tables != null && tables.size() > 0) {
+ getMonitor().setArchiveTables(tables);
+ } else {
+ LOG.debug("No tables to archive.");
+ // only if we currently have a tracker, then clear the archive
+ clearTables();
+ }
+ }
+
+ /**
+ * Remove the currently archived tables.
+ *
+ * Does some intelligent checking to make sure we don't prematurely create an
+ * archive tracker.
+ */
+ private void clearTables() {
+ getMonitor().clearArchive();
+ }
+
+ @Override
+ public boolean keepHFiles(String tableName) {
+ return getMonitor().keepHFiles(tableName);
+ }
+
+ /**
+ * @return the tracker for which tables should be archived.
+ */
+ public final HFileArchiveTableMonitor getMonitor() {
+ return this.monitor;
+ }
+
+ /**
+ * Set the table tracker for the overall archive tracker.
+ *
+ * Exposed for TESTING!
+ * @param tracker tracker for which tables should be archived.
+ */
+ public void setTableMonitor(HFileArchiveTableMonitor tracker) {
+ this.monitor = tracker;
+ }
+
+ /**
+ * Create an archive tracker for the passed in server
+ * @param zkw Parent's zookeeper to monitor/updated
+ * @param parent the server for which the tracker is created.
+ * @return ZooKeeper tracker to monitor for this server if this server should
+ * archive hfiles for a given table
+ */
+ public static TableHFileArchiveTracker create(ZooKeeperWatcher zkw, Server parent) {
+ return create(zkw, new HFileArchiveTableMonitor(parent, zkw));
+ }
+
+ /**
+ * Create an archive tracker with the special passed in table monitor. Should
+ * only be used in special cases (eg. testing)
+ * @param zkw Watcher for the ZooKeeper cluster that we should track
+ * @param monitor Monitor for which tables need hfile archiving
+ * @return ZooKeeper tracker to monitor for this server if this server should
+ * archive hfiles for a given table
+ */
+ public static TableHFileArchiveTracker create(ZooKeeperWatcher zkw, HFileArchiveTableMonitor monitor) {
+ return new TableHFileArchiveTracker(zkw, monitor);
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
index 9bf2396..5d10d46 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
@@ -24,8 +24,11 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.SocketTimeoutException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
@@ -1916,6 +1919,239 @@ public class HBaseAdmin implements Abortable, Closeable {
}
/**
+ * @return A new {@link HFileArchiveManager} to manage which tables' hfiles
+ * should be archived rather than deleted.
+ * @throws ZooKeeperConnectionException
+ * @throws IOException
+ */
+ private synchronized HFileArchiveManager createHFileArchiveManager()
+ throws ZooKeeperConnectionException, IOException {
+ return new HFileArchiveManager(this.getConnection(), this.conf);
+ }
+
+ /**
+ * Turn on backups for all HFiles for the given table.
+ *
+ * All deleted hfiles are moved to the
+ * {@value HConstants#HFILE_ARCHIVE_DIRECTORY} directory under the table
+ * directory, rather than being deleted.
+ *
+ * If backups are already enabled for this table, does nothing.
+ *
+ * Synchronous operation.
+ *
+ * Assumes that offline/dead regionservers will get the update on start.
+ *
+ * WARNING: No guarantees are made if multiple clients simultaneously
+ * attempt to disable/enable hfile backup on the same table.
+ * @param table name of the table to start backing up
+ * @throws IOException
+ */
+ public void enableHFileBackup(String table) throws IOException {
+ enableHFileBackup(Bytes.toBytes(table));
+ }
+
+
+ /**
+ * Enable HFile backups synchronously. Ensures that all regionservers hosting
+ * the table have recived the notification to start archiving hfiles.
+ *
+ * Synchronous operation.
+ *
+ * Assumes that offline/dead regionservers will get the update on start.
+ *
+ * WARNING: No guarantees are made if multiple clients simultaneously
+ * attempt to disable/enable hfile backup on the same table.
+ * @param tableName name of the table on which to start backing up hfiles
+ * @throws IOException if the backup cannot be enabled
+ */
+ public void enableHFileBackup(final byte[] tableName) throws IOException {
+ // this is a little inefficient since we do a read of meta to see if it
+ // exists, but is a lot cleaner than catching a NPE below when looking for
+ // online regions and the table doesn't exist.
+ if (!this.tableExists(tableName)) {
+ throw new IOException("Table: " + Bytes.toString(tableName)
+ + " does not exist, cannot create a backup for a non-existant table.");
+ }
+ // do the asynchronous update
+ enableHFileBackupAsync(tableName);
+
+ // and then wait for it to propagate
+ // while all the regions have yet to receive zk update and we are not done
+ // retrying and backing off
+ CatalogTracker ct = getCatalogTracker();
+ HFileArchiveManager manager = createHFileArchiveManager();
+ int tries = 0;
+ try {
+ // just doing the normal amount of retries, as opposed to the multiplier
+ // since we are creating a new connection every time, rather than worrying
+ // about connection timeouts, unavailability. If ZK does go down, then we
+ // are pretty hosed in terms of backup and want to bail quickly.
+ while (!allServersArchivingTable(manager, ct, tableName) && tries < this.numRetries) {
+ // Sleep while we wait for the RS to get updated
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("try:" + tries + "/" + this.numRetries
+ + ", Not all regionservers for table '" + Bytes.toString(tableName)
+ + "' have joined the backup. Waiting...");
+ }
+ Thread.sleep(getPauseTime(tries++));
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException("Interrupted when backing up table "
+ + Bytes.toString(tableName));
+ }
+ }
+
+ LOG.debug("Done waiting for table: " + Bytes.toString(tableName) + " to join archive.");
+ // if we couldn't get all regions we expect, bail out
+ if (tries >= this.numRetries) {
+ // disable backups
+ manager.disableHFileBackup(tableName);
+ throw new IOException("Failed to get all regions to join backup in " + tries
+ + " tries, for table:" + Bytes.toString(tableName));
+ }
+ } finally {
+ cleanupCatalogTracker(ct);
+ manager.stop();
+ }
+ }
+
+ /**
+ * Turn on backups for all HFiles for the given table.
+ *
+ * All deleted hfiles are moved to the archive directory under the table
+ * directory, rather than being deleted.
+ *
+ * If backups are already enabled for this table, does nothing.
+ * @param table name of the table to start backing up
+ * @throws IOException if an unexpected exception occurs
+ */
+ public void enableHFileBackupAsync(final byte[] table)
+ throws IOException {
+ createHFileArchiveManager().enableHFileBackup(table).stop();
+ }
+
+ /**
+ * Disable hfile backups for the given table.
+ *
+ * Previously backed up files are still retained (if present).
+ *
+ * Asynchronous operation - some extra HFiles may be retained, in the archive
+ * directory after disable is called, dependent on the latency in zookeeper to
+ * the servers.
+ * @param table name of the table stop backing up
+ * @throws IOException if an unexpected exception occurs
+ */
+ public void disableHFileBackup(String table) throws IOException {
+ disableHFileBackup(Bytes.toBytes(table));
+ }
+
+ /**
+ * Disable hfile backups for the given table.
+ *
+ * Previously backed up files are still retained (if present).
+ *
+ * Asynchronous operation - some extra HFiles may be retained, in the archive
+ * directory after disable is called, dependent on the latency in zookeeper to
+ * the servers.
+ * @param table name of the table stop backing up
+ * @throws IOException if an unexpected exception occurs
+ */
+ public void disableHFileBackup(final byte[] table) throws IOException {
+ createHFileArchiveManager().disableHFileBackup(table).stop();
+ }
+
+ /**
+ * Disable hfile backups for all tables.
+ *
+ * Previously backed up files are still retained (if present).
+ *
+ * Asynchronous operation - some extra HFiles may be retained, in the archive
+ * directory after disable is called, dependent on the latency in zookeeper to
+ * the servers.
+ * @throws IOException if an unexpected exception occurs
+ */
+ public void disableHFileBackup() throws IOException {
+ createHFileArchiveManager().disableHFileBackup().stop();
+ }
+
+ /**
+ * Determine if archiving is enabled (but not necessarily fully propagated)
+ * for a table
+ * @param table name of the table to check
+ * @return true if it is, false otherwise
+ * @throws IOException if a connection to ZooKeeper cannot be established
+ */
+ public boolean getArchivingEnabled(byte[] table) throws IOException {
+ HFileArchiveManager manager = createHFileArchiveManager();
+ try {
+ return manager.isArchivingEnabled(table);
+ } catch (IOException e) {
+ return false;
+ } finally {
+ manager.stop();
+ }
+ }
+
+ /**
+ * Determine if archiving is enabled (but not necessarily fully propagated)
+ * for a table
+ * @param table name of the table to check
+ * @return true if it is, false otherwise
+ * @throws IOException if a connection to ZooKeeper cannot be established
+ */
+ public boolean getArchivingEnabled(String table) throws IOException {
+ return getArchivingEnabled(Bytes.toBytes(table));
+ }
+
+ private boolean allServersArchivingTable(HFileArchiveManager manager, CatalogTracker ct,
+ byte[] tableName) throws IOException {
+
+ // then get the list of RS that have confirmed archiving table
+ List serverNames = manager.serversArchiving(tableName);
+ // add the master as a server to check for
+
+ Collections.sort(serverNames);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Expecting archiving from at least servers:" + serverNames);
+ }
+
+ // get the regions and their servers associated with the table
+ List> regionAndLocations;
+ try {
+ regionAndLocations = MetaReader.getTableRegionsAndLocations(ct, Bytes.toString(tableName));
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException(
+ "Interrupted when getting expected regions and servers to backup table: "
+ + Bytes.toString(tableName));
+ }
+ // now get the RS that should be archiving the table
+ Set expected = new TreeSet();
+ for (Pair rl : regionAndLocations) {
+ // if the region is assigned
+ if (rl.getSecond() != null) {
+ expected.add(rl.getSecond().toString());
+ }
+ }
+ // and add the master server as an expected server too, since that has the
+ // catalog janitor
+ expected.add(this.getClusterStatus().getMaster().toString());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Expecting archiving from at least servers:" + serverNames);
+ }
+
+ // now compare the list of expected vs. those currently checked in
+ // now build list of the current RS
+ for (String expectedServer : expected) {
+ // if the expected RS is not in the list, then they haven't all joined
+ if (Collections.binarySearch(serverNames, expectedServer) < 0) return false;
+ }
+ return true;
+ }
+
+ /**
* @see {@link #execute}
*/
private abstract static class MasterCallable implements Callable{
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HFileArchiveManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HFileArchiveManager.java
new file mode 100644
index 0000000..6966221
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HFileArchiveManager.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.util.HFileArchiveUtil.getTableNode;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Client-side manager for which table(s)' HFiles to archive
+ */
+class HFileArchiveManager {
+
+ private static final Log LOG = LogFactory.getLog(HFileArchiveManager.class);
+ private final ZooKeeperWatcher zooKeeper;
+ private volatile boolean stopped = false;
+
+ public HFileArchiveManager(HConnection connection, Configuration conf)
+ throws ZooKeeperConnectionException, IOException {
+ this(new ZooKeeperWatcher(conf, "hfileArchiveManger-on-" + connection.toString(), connection));
+ }
+
+ public HFileArchiveManager(ZooKeeperWatcher watcher) {
+ this.zooKeeper = watcher;
+ }
+
+ /**
+ * Turn on auto-backups of HFiles on the specified table.
+ *
+ * When HFiles would be deleted, they are instead interned to the backup
+ * directory specified or HConstants#DEFAULT_HFILE_ARCHIVE_DIRECTORY
+ * @param table name of the table to enable backups on
+ * @return this for chaining.
+ * @throws IOException
+ */
+ public HFileArchiveManager enableHFileBackup(byte[] table)
+ throws IOException {
+ try {
+ enable(this.zooKeeper, table);
+ return this;
+ } catch (KeeperException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Table backups on HFiles for the given table
+ * @param table name of the table to disable backups on
+ * @return this for chaining.
+ * @throws IOException
+ */
+ public HFileArchiveManager disableHFileBackup(byte[] table) throws IOException {
+ try {
+ disable(this.zooKeeper, table);
+ return this;
+ } catch (KeeperException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Disable backups on all tables in the cluster
+ * @return this for chaining.
+ * @throws IOException if the number of attempts is exceeded
+ */
+ public HFileArchiveManager disableHFileBackup() throws IOException {
+ LOG.debug("Disabling backups on all tables.");
+ try {
+ ZKUtil.deleteNodeRecursively(this.zooKeeper, this.zooKeeper.archiveHFileZNode);
+ return this;
+ } catch (KeeperException e) {
+ throw new IOException("Unexpected ZK exception!", e);
+ }
+ }
+
+ /**
+ * Get the current list of all the regionservers that are currently involved
+ * in archiving the table
+ * @param table name of table under which to check for regions that are
+ * archiving.
+ * @return the currently online regions that are archiving the table
+ * @throws IOException if an unexpected connection issues occurs
+ */
+ @SuppressWarnings("unchecked")
+ public List serversArchiving(byte[] table) throws IOException {
+ try {
+ // build the table znode
+ String tableNode = getTableNode(zooKeeper, table);
+ List regions = ZKUtil.listChildrenNoWatch(zooKeeper, tableNode);
+ return (List) (regions == null ? Collections.emptyList() : regions);
+ } catch (KeeperException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Best effort enable of table backups. If a region serving a table is
+ * offline, it will be notified on startup.
+ *
+ * No attempt is made to make sure that backups are successfully created - it
+ * is inherently an asynchronous operation.
+ * @param zooKeeper watcher connection to zk cluster
+ * @param table table name on which to enable archiving
+ * @throws KeeperException
+ */
+ private void enable(ZooKeeperWatcher zooKeeper, byte[] table)
+ throws KeeperException {
+ LOG.debug("Ensuring archiving znode exists");
+ ZKUtil.createAndFailSilent(zooKeeper, zooKeeper.archiveHFileZNode);
+
+ // then add the table to the list of znodes to archive
+ String tableNode = getTableNode(zooKeeper, table);
+ LOG.debug("Creating: " + tableNode + ", data: []");
+ ZKUtil.createSetData(zooKeeper, tableNode, new byte[0]);
+ }
+
+ /**
+ * Disable all archiving of files for a given table
+ *
+ * Note: Asynchronous
+ * @param zooKeeper watcher for the ZK cluster
+ * @param table name of the table to disable
+ * @throws KeeperException if an unexpected ZK connection issues occurs
+ */
+ private void disable(ZooKeeperWatcher zooKeeper, byte[] table) throws KeeperException {
+ // ensure the latest state of the archive node is found
+ zooKeeper.sync(zooKeeper.archiveHFileZNode);
+
+ // if the top-level archive node is gone, the we are done
+ if (ZKUtil.checkExists(zooKeeper, zooKeeper.archiveHFileZNode) < 0) {
+ return;
+ }
+ // delete the table node, from the archive - will be noticed by
+ // regionservers
+ String tableNode = getTableNode(zooKeeper, table);
+ // make sure the table is the latest version so the delete takes
+ zooKeeper.sync(tableNode);
+
+ LOG.debug("Attempting to delete table node:" + tableNode);
+ ZKUtil.deleteNodeRecursively(zooKeeper, tableNode);
+ }
+
+ public void stop() {
+ if (!this.stopped) {
+ this.stopped = true;
+ LOG.debug("Stopping HFileArchiveManager...");
+ this.zooKeeper.close();
+ }
+ }
+
+ /**
+ * Check to see if the table is currently marked for archiving
+ * @param table name of the table to check
+ * @return true if the archive znode for that table exists,
+ * false if not
+ * @throws IOException if an unexpected zookeeper error occurs
+ */
+ public boolean isArchivingEnabled(byte[] table) throws IOException {
+ String tableNode = HFileArchiveUtil.getTableNode(zooKeeper, table);
+ try {
+ return ZKUtil.checkExists(zooKeeper, tableNode) >= 0;
+ } catch (KeeperException e) {
+ throw new IOException("Failed to get the table znode:" + Bytes.toString(table), e);
+ }
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/Reference.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/Reference.java
index 5804a84..e80b579 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/Reference.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/Reference.java
@@ -65,7 +65,7 @@ public class Reference implements Writable {
* For split HStoreFiles, it specifies if the file covers the lower half or
* the upper half of the key range
*/
- static enum Range {
+ public static enum Range {
/** HStoreFile contains upper half of key range */
top,
/** HStoreFile contains lower half of key range */
@@ -93,7 +93,7 @@ public class Reference implements Writable {
* @param splitRow This is row we are splitting around.
* @param fr
*/
- Reference(final byte [] splitRow, final Range fr) {
+ public Reference(final byte[] splitRow, final Range fr) {
this.splitkey = splitRow == null? null: KeyValue.createFirstOnRow(splitRow).getKey();
this.region = fr;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java
index e751f65..85d7975 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java
@@ -39,11 +39,11 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.backup.HFileArchiveMonitor;
+import org.apache.hadoop.hbase.backup.HFileDisposer;
import org.apache.hadoop.hbase.catalog.MetaEditor;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Bytes;
@@ -51,7 +51,6 @@ import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Writables;
-
/**
* A janitor for the catalog tables. Scans the .META. catalog
* table on a period looking for unused regions to garbage collect.
@@ -61,14 +60,16 @@ class CatalogJanitor extends Chore {
private static final Log LOG = LogFactory.getLog(CatalogJanitor.class.getName());
private final Server server;
private final MasterServices services;
+ private final HFileArchiveMonitor hfileArchiveMontior;
private boolean enabled = true;
- CatalogJanitor(final Server server, final MasterServices services) {
+ CatalogJanitor(final Server server, final MasterServices services, HFileArchiveMonitor monitor) {
super(server.getServerName() + "-CatalogJanitor",
server.getConfiguration().getInt("hbase.catalogjanitor.interval", 300000),
server);
this.server = server;
this.services = services;
+ this.hfileArchiveMontior = monitor;
}
@Override
@@ -214,7 +215,7 @@ class CatalogJanitor extends Chore {
if (hasNoReferences(a) && hasNoReferences(b)) {
LOG.debug("Deleting region " + parent.getRegionNameAsString() +
" because daughter splits no longer hold references");
- // wipe out daughter references from parent region
+ // wipe out daughter references from parent region in meta
removeDaughtersFromParent(parent);
// This latter regionOffline should not be necessary but is done for now
@@ -225,8 +226,7 @@ class CatalogJanitor extends Chore {
this.services.getAssignmentManager().regionOffline(parent);
}
FileSystem fs = this.services.getMasterFileSystem().getFileSystem();
- Path rootdir = this.services.getMasterFileSystem().getRootDir();
- HRegion.deleteRegion(fs, rootdir, parent);
+ HFileDisposer.disposeRegion(fs, hfileArchiveMontior, parent);
MetaEditor.deleteRegion(this.server.getCatalogTracker(), parent);
result = true;
}
@@ -325,4 +325,12 @@ class CatalogJanitor extends Chore {
throws FileNotFoundException, IOException {
return this.services.getTableDescriptors().get(Bytes.toString(tableName));
}
+
+ /**
+ * Exposed for TESTING
+ * @return the current monitor for the files being managed
+ */
+ HFileArchiveMonitor getHfileArchiveMonitor() {
+ return hfileArchiveMontior;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index a3af820..4504d2c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.UnknownRegionException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.backup.TableHFileArchiveTracker;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.HConnectionManager;
@@ -258,6 +259,8 @@ Server {
*/
private ObjectName mxBean = null;
+ private TableHFileArchiveTracker tableHfileArchiveTracker;
+
/**
* Initializes the HMaster. The steps are as follows:
*
@@ -473,6 +476,9 @@ Server {
boolean wasUp = this.clusterStatusTracker.isClusterUp();
if (!wasUp) this.clusterStatusTracker.setClusterUp();
+ this.tableHfileArchiveTracker = TableHFileArchiveTracker.create(zooKeeper, this);
+ this.tableHfileArchiveTracker.start();
+
LOG.info("Server active/primary master; " + this.serverName +
", sessionid=0x" +
Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) +
@@ -628,7 +634,7 @@ Server {
// been assigned.
status.setStatus("Starting balancer and catalog janitor");
this.balancerChore = getAndStartBalancerChore(this);
- this.catalogJanitorChore = new CatalogJanitor(this, this);
+ this.catalogJanitorChore = new CatalogJanitor(this, this, this.tableHfileArchiveTracker);
startCatalogJanitorChore();
registerMBean();
@@ -1965,6 +1971,22 @@ Server {
}
/**
+ * Exposed for TESTING only!
+ * @return the internal tracker for archiving hfiles
+ */
+ TableHFileArchiveTracker getHFileArchiveMonitor() {
+ return this.tableHfileArchiveTracker;
+ }
+
+ /**
+ * Exposed for TESTING!
+ * @return the current catalog janitor
+ */
+ CatalogJanitor getCatalogJanitor() {
+ return this.catalogJanitorChore;
+ }
+
+ /**
* Special method, only used by hbck.
*/
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 86f8e77..713f950 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -84,6 +84,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.UnknownScannerException;
+import org.apache.hadoop.hbase.backup.HFileDisposer;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
@@ -948,16 +949,7 @@ public class HRegion implements HeapSize { // , Writable{
writestate.writesEnabled = false;
wasFlushing = writestate.flushing;
LOG.debug("Closing " + this + ": disabling compactions & flushes");
- while (writestate.compacting > 0 || writestate.flushing) {
- LOG.debug("waiting for " + writestate.compacting + " compactions" +
- (writestate.flushing ? " & cache flush" : "") +
- " to complete for region " + this);
- try {
- writestate.wait();
- } catch (InterruptedException iex) {
- // continue
- }
- }
+ waitForFlushesAndCompactions();
}
// If we were not just flushing, is it worth doing a preflush...one
// that will clear out of the bulk of the memstore before we put up
@@ -1032,6 +1024,25 @@ public class HRegion implements HeapSize { // , Writable{
}
}
+ /**
+ * Wait for all current flushes and compactions of the region to complete.
+ *
+ * VISBILE FOR TESTING
+ */
+ void waitForFlushesAndCompactions() {
+ synchronized (writestate) {
+ while (writestate.compacting > 0 || writestate.flushing) {
+ LOG.debug("waiting for " + writestate.compacting + " compactions"
+ + (writestate.flushing ? " & cache flush" : "") + " to complete for region " + this);
+ try {
+ writestate.wait();
+ } catch (InterruptedException iex) {
+ // continue
+ }
+ }
+ }
+ }
+
protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool(
final String threadNamePrefix) {
int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
@@ -4112,8 +4123,15 @@ public class HRegion implements HeapSize { // , Writable{
LOG.debug("Files for new region");
listPaths(fs, dstRegion.getRegionDir());
}
- deleteRegion(fs, a.getRegionDir());
- deleteRegion(fs, b.getRegionDir());
+
+ // delete out the 'A' region
+ HFileDisposer.disposeRegion(fs, a.getRegionServerServices(),
+ FSUtils.getRootDir(a.getBaseConf()),
+ a.getTableDir(), a.getRegionDir());
+ // delete out the 'B' region
+ HFileDisposer.disposeRegion(fs, b.getRegionServerServices(),
+ FSUtils.getRootDir(b.getBaseConf()),
+ b.getTableDir(), b.getRegionDir());
LOG.info("merge completed. New region is " + dstRegion);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 89e3d25..e2e19cd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -80,6 +80,8 @@ import org.apache.hadoop.hbase.UnknownRowLockException;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.YouAreDeadException;
import org.apache.hadoop.hbase.ZNodeClearer;
+import org.apache.hadoop.hbase.backup.HFileArchiveMonitor;
+import org.apache.hadoop.hbase.backup.TableHFileArchiveTracker;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaEditor;
import org.apache.hadoop.hbase.catalog.MetaReader;
@@ -422,6 +424,8 @@ public class HRegionServer implements ClientProtocol,
*/
private MovedRegionsCleaner movedRegionsCleaner;
+ /** Store file archiving management */
+ TableHFileArchiveTracker hfileArchiveTracker;
/**
* Starts a HRegionServer at the default location
@@ -643,8 +647,9 @@ public class HRegionServer implements ClientProtocol,
* Finally put up a catalog tracker.
* @throws IOException
* @throws InterruptedException
+ * @throws KeeperException
*/
- private void initializeZooKeeper() throws IOException, InterruptedException {
+ private void initializeZooKeeper() throws IOException, InterruptedException, KeeperException {
// Open connection to zookeeper and set primary watcher
this.zooKeeper = new ZooKeeperWatcher(conf, REGIONSERVER + ":" +
this.isa.getPort(), this);
@@ -666,6 +671,9 @@ public class HRegionServer implements ClientProtocol,
this.catalogTracker = new CatalogTracker(this.zooKeeper, this.conf,
this, this.conf.getInt("hbase.regionserver.catalog.timeout", Integer.MAX_VALUE));
catalogTracker.start();
+
+ this.hfileArchiveTracker = TableHFileArchiveTracker.create(zooKeeper, this);
+ this.hfileArchiveTracker.start();
}
/**
@@ -3788,6 +3796,11 @@ public class HRegionServer implements ClientProtocol,
}
+ @Override
+ public HFileArchiveMonitor getHFileArchiveMonitor() {
+ return this.hfileArchiveTracker;
+ }
+
// This map will containsall the regions that we closed for a move.
// We add the time it was moved as we don't want to keep too old information
protected Map> movedRegions =
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
index 6884d53..03df8bf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
@@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.backup.HFileArchiveMonitor;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
@@ -86,4 +87,10 @@ public interface RegionServerServices extends OnlineRegions {
* @return Return the FileSystem object used by the regionserver
*/
public FileSystem getFileSystem();
+
+ /**
+ * @return the manager that keeps track of which tables should be archived and
+ * where they should be archived
+ */
+ public HFileArchiveMonitor getHFileArchiveMonitor();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index 7157c04..011f762 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.backup.HFileDisposer;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@@ -130,7 +131,7 @@ public class Store extends SchemaConfigured implements HeapSize {
private final int blockingStoreFileCount;
private volatile long storeSize = 0L;
private volatile long totalUncompressedBytes = 0L;
- private final Object flushLock = new Object();
+ final Object flushLock = new Object();
final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final boolean verifyBulkLoads;
@@ -1021,6 +1022,7 @@ public class Store extends SchemaConfigured implements HeapSize {
this.compactor.compact(this, filesToCompact, cr.isMajor(), maxId);
// Move the compaction into place.
if (this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
+ LOG.debug("Completing compaction by moving files into place.");
sf = completeCompaction(filesToCompact, writer);
if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postCompact(this, sf);
@@ -1610,10 +1612,13 @@ public class Store extends SchemaConfigured implements HeapSize {
// Tell observers that list of StoreFiles has changed.
notifyChangedReadersObservers();
- // Finally, delete old store files.
- for (StoreFile hsf: compactedFiles) {
- hsf.deleteReader();
- }
+
+ // let the archive util decide if we should archive or delete the files
+ LOG.debug("Removing store files after compaction...");
+ HFileDisposer.disposeStoreFiles(this.region.getRegionServerServices(), this.region,
+ this.conf, this.family.getName(), compactedFiles);
+
+
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
LOG.error("Failed replacing compacted files in " + this +
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java
new file mode 100644
index 0000000..c9c4ebe
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+
+/**
+ * Helper class for all utilities related to archival/retrieval of HFiles
+ */
+public class HFileArchiveUtil {
+ public static final String DEFAULT_HFILE_ARCHIVE_DIRECTORY = ".archive";
+
+ private HFileArchiveUtil() {
+ // non-external instantiation - util class
+ }
+
+ /**
+ * Get the directory to archive a store directory
+ * @param conf {@link Configuration} to read for the archive directory name
+ * @param region parent region information under which the store currently
+ * lives
+ * @param family name of the family in the store
+ * @return {@link Path} to the directory to archive the given store or
+ * null if it should not be archived
+ */
+ public static Path getStoreArchivePath(Configuration conf, HRegion region, byte [] family){
+ return getStoreArchivePath(conf, region.getRegionInfo(), region.getTableDir(), family);
+ }
+
+ /**
+ * Get the directory to archive a store directory
+ * @param conf {@link Configuration} to read for the archive directory name
+ * @param region parent region information under which the store currently
+ * lives
+ * @param tabledir directory for the table under which the store currently
+ * lives
+ * @param family name of the family in the store
+ * @return {@link Path} to the directory to archive the given store or
+ * null if it should not be archived
+ */
+ public static Path getStoreArchivePath(Configuration conf, HRegionInfo region, Path tabledir,
+ byte[] family) {
+ Path tableArchiveDir = getTableArchivePath(conf, tabledir);
+ return Store.getStoreHomedir(tableArchiveDir,
+ HRegionInfo.encodeRegionName(region.getRegionName()), family);
+ }
+
+ /**
+ * Get the archive directory for a given region under the specified table
+ * @param conf {@link Configuration} to read the archive directory from
+ * @param tabledir the original table directory
+ * @param regiondir the path to the region directory
+ * @return {@link Path} to the directory to archive the given region, or
+ * null if it should not be archived
+ */
+ public static Path getRegionArchiveDir(Configuration conf, Path tabledir, Path regiondir) {
+ // get the archive directory for a table
+ Path archiveDir = getTableArchivePath(conf, tabledir);
+ // if the monitor isn't archiving that table, then we don't specify an
+ // archive directory
+ if (archiveDir == null) return null;
+
+ // then add on the region path under the archive
+ String encodedRegionName = regiondir.getName();
+ return HRegion.getRegionDir(archiveDir, encodedRegionName);
+ }
+
+ /**
+ * Get the path to the table archive directory based on the configured archive
+ * directory.
+ *
+ * Assumed that the table should already be archived.
+ * @param conf {@link Configuration} to read the archive property from
+ * @param tabledir directory of the table to be archived.
+ * @return {@link Path} to the archive directory for the table
+ */
+ public static Path getTableArchivePath(Configuration conf, Path tabledir) {
+ return getTableArchivePath(getConfiguredArchiveDir(conf), tabledir);
+ }
+
+ private static Path getTableArchivePath(String archivedir, Path tabledir) {
+ Path root = tabledir.getParent();
+ // now build the archive directory path
+ // first the top-level archive directory
+ // generally "/hbase/.archive/[table]
+ return new Path(new Path(root, archivedir), tabledir.getName());
+ }
+
+ /**
+ * Get the archive directory as per the configuration
+ * @param conf {@link Configuration} to read the archive directory from (can
+ * be null, in which case you get the default value).
+ * @return the configured archived directory or the default specified by
+ * {@value HFileArchiveUtil#DEFAULT_HFILE_ARCHIVE_DIRECTORY}
+ */
+ public static String getConfiguredArchiveDir(Configuration conf) {
+ return conf == null ? HFileArchiveUtil.DEFAULT_HFILE_ARCHIVE_DIRECTORY : conf.get(
+ HConstants.HFILE_ARCHIVE_DIRECTORY, HFileArchiveUtil.DEFAULT_HFILE_ARCHIVE_DIRECTORY);
+ }
+
+ /**
+ * Get the zookeeper node associated with archiving the given table
+ * @param zkw watcher for the zk cluster
+ * @param table name of the table to check
+ * @return znode for the table's archive status
+ */
+ public static String getTableNode(ZooKeeperWatcher zkw, byte[] table) {
+ return ZKUtil.joinZNode(zkw.archiveHFileZNode, Bytes.toString(table));
+ }
+
+ /**
+ * Get the zookeeper node associated with archiving the given table
+ * @param zkw watcher for the zk cluster
+ * @param table name of the table to check
+ * @return znode for the table's archive status
+ */
+ public static String getTableNode(ZooKeeperWatcher zkw, String table) {
+ return ZKUtil.joinZNode(zkw.archiveHFileZNode, table);
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
index 33bc1d0..17383c3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
@@ -102,6 +102,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
public String clusterIdZNode;
// znode used for log splitting work assignment
public String splitLogZNode;
+ // znode to track archiving hfiles
+ public String archiveHFileZNode;
// Certain ZooKeeper nodes need to be world-readable
public static final ArrayList CREATOR_ALL_AND_WORLD_READABLE =
@@ -212,6 +214,9 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
conf.get("zookeeper.znode.clusterId", "hbaseid"));
splitLogZNode = ZKUtil.joinZNode(baseZNode,
conf.get("zookeeper.znode.splitlog", HConstants.SPLIT_LOGDIR_NAME));
+ archiveHFileZNode = ZKUtil
+ .joinZNode(baseZNode, conf.get("zookeeper.znode.hfile.archive",
+ HConstants.HFILE_ARCHIVE_ZNODE_PARENT));
}
/**
diff --git a/hbase-server/src/main/resources/hbase-default.xml b/hbase-server/src/main/resources/hbase-default.xml
index 370b06f..325d21b 100644
--- a/hbase-server/src/main/resources/hbase-default.xml
+++ b/hbase-server/src/main/resources/hbase-default.xml
@@ -854,7 +854,6 @@
files when hbase.data.umask.enable is true
-
hbase.metrics.showTableName
true
@@ -864,7 +863,6 @@
In both cases, the aggregated metric M across tables and cfs will be reported.
-
hbase.metrics.exposeOperationTimes
true
@@ -873,5 +871,12 @@
have their times exposed through Hadoop metrics per CF and per region.
-
+
+ hbase.table.archive.directory
+ .archive
+ Per-table directory name under which to backup files for a
+ table. Files are moved to the same directories as they would be under the
+ table directory, but instead are just one level lower (under
+ table/.archive/... rather than table/...). Currently only applies to HFiles.
+
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index be932d7..24ff7e3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -950,7 +950,10 @@ public class HBaseTestingUtility {
* @param tableName existing table
*/
public void deleteTable(byte[] tableName) throws IOException {
- getHBaseAdmin().disableTable(tableName);
+ try {
+ getHBaseAdmin().disableTable(tableName);
+ } catch (TableNotEnabledException e) {
+ }
getHBaseAdmin().deleteTable(tableName);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/SimpleHFileArchiveTableMonitor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/SimpleHFileArchiveTableMonitor.java
new file mode 100644
index 0000000..7ff2652
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/SimpleHFileArchiveTableMonitor.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.backup.HFileArchiveTableMonitor;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+
+/**
+ * Simple monitor that does not do anything on calls to
+ * {@link #registerTable(String)}.
+ *
+ * Should be used for testing only
+ */
+public class SimpleHFileArchiveTableMonitor extends HFileArchiveTableMonitor {
+
+ public SimpleHFileArchiveTableMonitor(Server parent, ZooKeeperWatcher zkw) {
+ super(parent, zkw);
+ }
+
+ @Override
+ public void registerTable(String table) {
+ // NOOP - for testing
+ }
+
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchivingCleanup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchivingCleanup.java
new file mode 100644
index 0000000..5f96313
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchivingCleanup.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.backup.HFileArchiveCleanup;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.HFileArchiveTestingUtil;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(MediumTests.class)
+public class TestHFileArchivingCleanup {
+
+ private static final Log LOG = LogFactory.getLog(TestHFileArchivingCleanup.class);
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+ private static final String STRING_TABLE_NAME = "test";
+ private static final byte[] TABLE_NAME = Bytes.toBytes(STRING_TABLE_NAME);
+ private static final byte[] TEST_FAM = Bytes.toBytes("fam");
+
+ /**
+ * Setup the config for the cluster
+ */
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ setupConf(UTIL.getConfiguration());
+ UTIL.startMiniCluster();
+ }
+
+ private static void setupConf(Configuration conf) {
+ // disable the ui
+ conf.setInt("hbase.regionsever.info.port", -1);
+ }
+
+ @Before
+ public void setup() throws Exception {
+ UTIL.createTable(TABLE_NAME, TEST_FAM);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ UTIL.deleteTable(TABLE_NAME);
+ // and cleanup the archive directory
+ try {
+ UTIL.getTestFileSystem().delete(new Path(UTIL.getDefaultRootDirPath(), ".archive"), true);
+ } catch (IOException e) {
+ LOG.warn("Failure to delete archive directory", e);
+ }
+ // make sure that backups are off for all tables
+ UTIL.getHBaseAdmin().disableHFileBackup();
+ }
+
+ @AfterClass
+ public static void cleanupTest() throws Exception {
+ try {
+ UTIL.shutdownMiniCluster();
+ } catch (Exception e) {
+ // NOOP;
+ }
+ }
+
+ /**
+ * Test that we cleanly remove archives using the utility
+ * @throws Exception
+ */
+ @Test
+ public void testMasterDeletesFailedArchive() throws Exception {
+ final HBaseAdmin admin = UTIL.getHBaseAdmin();
+
+ // get the current store files for the region
+ List servingRegions = UTIL.getHBaseCluster().getRegions(TABLE_NAME);
+ // make sure we only have 1 region serving this table
+ assertEquals(1, servingRegions.size());
+ final HRegion region = servingRegions.get(0);
+
+ // turn on archiving
+ admin.enableHFileBackup(TABLE_NAME);
+ LOG.debug("----Starting test of cleanup");
+ // so lets put some files in the archive that are newer than the start
+ FileSystem fs = UTIL.getTestFileSystem();
+ Path archiveDir = HFileArchiveTestingUtil.getTableArchivePath(UTIL.getConfiguration(), region);
+ // write a tmp file to the archive dir
+ Path tmpFile = new Path(archiveDir, "toDelete");
+ FSDataOutputStream out = fs.create(tmpFile);
+ out.write(1);
+ out.close();
+ LOG.debug("Created toDelete");
+
+ // now run the cleanup util
+ HFileArchiveCleanup.setConfiguration(UTIL.getConfiguration());
+ HFileArchiveCleanup.main(new String[0]);
+ // make sure the fake archived file has been cleaned up
+ assertFalse(fs.exists(tmpFile));
+
+ // now do the same create again, but make sure it still exists since we have
+ // an earlier end time
+
+ // write a tmp file to the archive dir
+ out = fs.create(tmpFile);
+ out.write(1);
+ out.close();
+ long end = fs.getFileStatus(tmpFile).getModificationTime() - 1;
+ LOG.debug("re-created toDelete");
+ HFileArchiveCleanup.main(new String[] { "-e", Long.toString(end) });
+ assertTrue(fs.exists(tmpFile));
+
+ LOG.debug("Still not deleting the file");
+ // now bump the start time to match the end time - still should be there
+ HFileArchiveCleanup.main(new String[] { "-s", Long.toString(end), "-e", Long.toString(end) });
+ assertTrue(fs.exists(tmpFile));
+
+ //now check that we can delete with start == end, but only deleting the right file
+ // first create another file after the tmp file
+ Path otherTmp = new Path(tmpFile.getParent(), tmpFile.getName() + "-1");
+ out = fs.create(otherTmp);
+ out.write(1);
+ out.close();
+
+ FileStatus status = fs.getFileStatus(tmpFile);
+ long time = status.getModificationTime();
+ HFileArchiveCleanup.main(new String[] { "-e", Long.toString(time), "-s", Long.toString(time) });
+ assertFalse(fs.exists(tmpFile));
+ assertTrue(fs.exists(otherTmp));
+
+ // then rename the tmpFile pointer so we don't need to recreate the file
+ tmpFile = otherTmp;
+
+ // now move the start up to include the file, which should delete it
+ LOG.debug("Now should delete the file");
+ HFileArchiveCleanup.main(new String[] { "-s",
+ Long.toString(fs.getFileStatus(tmpFile).getModificationTime()) });
+ assertFalse(fs.exists(tmpFile));
+
+ // now create the files in multiple table directories, and check that we
+ // only delete the one in the specified directory
+ out = fs.create(tmpFile);
+ out.write(1);
+ out.close();
+
+ // create the table archive and put a file in there
+ Path tableArchive = new Path(archiveDir, STRING_TABLE_NAME);
+ fs.mkdirs(tableArchive);
+ Path tableFile = new Path(tableArchive, "table");
+ out = fs.create(tableFile);
+ out.write(1);
+ out.close();
+
+ // now delete just that table
+ LOG.debug("Just cleaning up table: table, files:" + tableFile);
+ HFileArchiveCleanup
+ .main(new String[] { "-s",
+ Long.toString(fs.getFileStatus(tableFile).getModificationTime()), "-t",
+ STRING_TABLE_NAME });
+ assertTrue(fs.exists(tmpFile));
+ assertFalse(fs.exists(tableFile));
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestRegionDisposer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestRegionDisposer.java
new file mode 100644
index 0000000..a44c5fe
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestRegionDisposer.java
@@ -0,0 +1,147 @@
+package org.apache.hadoop.hbase.backup;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.HFileArchiveTestingUtil;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+/**
+ * Test that the {@link HFileDisposer} correctly removes all the parts of a
+ * region when cleaning up a region
+ */
+@Category(MediumTests.class)
+public class TestRegionDisposer {
+
+ private static final String STRING_TABLE_NAME = "test_table";
+
+ private static final Log LOG = LogFactory.getLog(TestRegionDisposer.class);
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+ private static final byte[] TABLE_NAME = Bytes.toBytes(STRING_TABLE_NAME);
+ private static final byte[] TEST_FAM = Bytes.toBytes("fam");
+
+ /**
+ * Setup the config for the cluster
+ */
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ setupConf(UTIL.getConfiguration());
+ }
+
+ private static void setupConf(Configuration conf) {
+ // disable the ui
+ conf.setInt("hbase.regionsever.info.port", -1);
+ // drop the memstore size so we get flushes
+ conf.setInt("hbase.hregion.memstore.flush.size", 25000);
+ }
+
+ @Before
+ public void startMinicluster() throws Exception {
+ UTIL.startMiniCluster();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ // cleanup the cluster if its up still
+ if (UTIL.getHBaseAdmin().tableExists(STRING_TABLE_NAME)) {
+
+ UTIL.deleteTable(TABLE_NAME);
+ }
+ // and cleanup the archive directory
+ try {
+ UTIL.getTestFileSystem().delete(new Path(UTIL.getDefaultRootDirPath(), ".archive"), true);
+ } catch (IOException e) {
+ LOG.warn("Failure to delete archive directory", e);
+ }
+ // make sure that backups are off for all tables
+ UTIL.getHBaseAdmin().disableHFileBackup();
+ }
+
+ @AfterClass
+ public static void cleanupTest() throws Exception {
+ try {
+ UTIL.shutdownMiniCluster();
+ } catch (Exception e) {
+ // NOOP;
+ }
+ }
+
+ @Test
+ public void testRemovesRegionDirOnArchive() throws Exception {
+ UTIL.createTable(TABLE_NAME, TEST_FAM);
+ final HBaseAdmin admin = UTIL.getHBaseAdmin();
+
+ // get the current store files for the region
+ List servingRegions = UTIL.getHBaseCluster().getRegions(TABLE_NAME);
+ // make sure we only have 1 region serving this table
+ assertEquals(1, servingRegions.size());
+ HRegion region = servingRegions.get(0);
+
+ // turn on archiving
+ admin.enableHFileBackup(TABLE_NAME);
+
+ // and load the table
+ UTIL.loadRegion(region, TEST_FAM);
+
+ // shutdown the table so we can manipulate the files
+ admin.disableTable(STRING_TABLE_NAME);
+
+ FileSystem fs = UTIL.getTestFileSystem();
+
+ // now attempt to depose the region
+ Path regionDir = HRegion.getRegionDir(region.getTableDir().getParent(), region.getRegionInfo());
+
+ HFileArchiveMonitor monitor = Mockito.mock(HFileArchiveMonitor.class);
+ Mockito.when(monitor.keepHFiles(STRING_TABLE_NAME)).thenReturn(true);
+ HFileDisposer.disposeRegion(fs, monitor, region.getRegionInfo());
+
+ // check for the existence of the archive directory and some files in it
+ Path archiveDir = HFileArchiveTestingUtil.getRegionArchiveDir(UTIL.getConfiguration(), region);
+ assertTrue(fs.exists(archiveDir));
+
+ // check to make sure the store directory was copied
+ FileStatus[] stores = fs.listStatus(archiveDir);
+ assertTrue(stores.length == 1);
+
+ // make sure we archived the store files
+ FileStatus[] storeFiles = fs.listStatus(stores[0].getPath());
+ assertTrue(storeFiles.length > 0);
+
+ // then ensure the region's directory isn't present
+ assertFalse(fs.exists(regionDir));
+
+ //recreate the table
+ admin.deleteTable(STRING_TABLE_NAME);
+ UTIL.createTable(TABLE_NAME, TEST_FAM);
+
+ // now copy back in the region
+ // fs.copyFromLocalFile(archive, regionDir);
+
+ // and depose the region without archiving
+ Mockito.when(monitor.keepHFiles(STRING_TABLE_NAME)).thenReturn(false);
+ HFileDisposer.disposeRegion(fs, monitor, region.getRegionInfo());
+
+ assertFalse(fs.exists(regionDir));
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index 04ed1a3..15b6a39 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.backup.HFileArchiveMonitor;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.client.AdminProtocol;
import org.apache.hadoop.hbase.client.ClientProtocol;
@@ -508,4 +509,10 @@ class MockRegionServer implements AdminProtocol, ClientProtocol, RegionServerSer
// TODO Auto-generated method stub
return null;
}
+
+ @Override
+ public HFileArchiveMonitor getHFileArchiveMonitor() {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
index 5303608..285cdda 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
@@ -19,10 +19,13 @@
*/
package org.apache.hadoop.hbase.master;
+import static org.apache.hadoop.hbase.util.HFileArchiveTestingUtil.assertArchiveEqualToOriginal;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -33,6 +36,8 @@ import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -41,14 +46,20 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.TableDescriptors;
+import org.apache.hadoop.hbase.backup.HFileArchiveMonitor;
+import org.apache.hadoop.hbase.backup.HFileArchiveTableMonitor;
+import org.apache.hadoop.hbase.backup.TableHFileArchiveTracker;
+import org.apache.hadoop.hbase.backup.SimpleHFileArchiveTableMonitor;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.client.AdminProtocol;
import org.apache.hadoop.hbase.client.ClientProtocol;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
@@ -60,10 +71,12 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.mockito.Matchers;
import org.mockito.Mockito;
import com.google.protobuf.RpcController;
@@ -81,15 +94,15 @@ public class TestCatalogJanitor {
private final CatalogTracker ct;
MockServer(final HBaseTestingUtility htu)
- throws NotAllMetaRegionsOnlineException, IOException, InterruptedException {
+ throws NotAllMetaRegionsOnlineException, IOException, InterruptedException {
this.c = htu.getConfiguration();
ClientProtocol ri = Mockito.mock(ClientProtocol.class);
MutateResponse.Builder builder = MutateResponse.newBuilder();
builder.setProcessed(true);
try {
Mockito.when(ri.mutate(
- (RpcController)Mockito.any(), (MutateRequest)Mockito.any())).
- thenReturn(builder.build());
+ (RpcController)Matchers.any(), (MutateRequest)Matchers.any())).
+ thenReturn(builder.build());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
@@ -135,7 +148,7 @@ public class TestCatalogJanitor {
public void abort(String why, Throwable e) {
//no-op
}
-
+
@Override
public boolean isAborted() {
return false;
@@ -166,6 +179,10 @@ public class TestCatalogJanitor {
MockMasterServices(final Server server) throws IOException {
this.mfs = new MasterFileSystem(server, this, null);
+ // this is funky, but ensures that the filesystem gets the right root
+ // directory when directly accessed
+ this.mfs.getFileSystem().getConf()
+ .set(HConstants.HBASE_DIR, mfs.getRootDir().toString());
this.asm = Mockito.mock(AssignmentManager.class);
}
@@ -224,7 +241,7 @@ public class TestCatalogJanitor {
public void abort(String why, Throwable e) {
//no-op
}
-
+
@Override
public boolean isAborted() {
return false;
@@ -250,29 +267,29 @@ public class TestCatalogJanitor {
// TODO Auto-generated method stub
return null;
}
-
+
@Override
public Map getAll() throws IOException {
// TODO Auto-generated method stub
return null;
}
-
+
@Override
public HTableDescriptor get(byte[] tablename)
- throws FileNotFoundException, IOException {
+ throws FileNotFoundException, IOException {
return get(Bytes.toString(tablename));
}
-
+
@Override
public HTableDescriptor get(String tablename)
- throws FileNotFoundException, IOException {
+ throws FileNotFoundException, IOException {
return createHTableDescriptor();
}
-
+
@Override
public void add(HTableDescriptor htd) throws IOException {
// TODO Auto-generated method stub
-
+
}
};
}
@@ -317,34 +334,37 @@ public class TestCatalogJanitor {
Server server = new MockServer(htu);
try {
MasterServices services = new MockMasterServices(server);
- CatalogJanitor janitor = new CatalogJanitor(server, services);
+ ZooKeeperWatcher watcher = Mockito.mock(ZooKeeperWatcher.class);
+ HFileArchiveMonitor monitor = TableHFileArchiveTracker.create(watcher,
+ new SimpleHFileArchiveTableMonitor(services, watcher));
+ CatalogJanitor janitor = new CatalogJanitor(server, services, monitor);
// Create regions.
HTableDescriptor htd = new HTableDescriptor("table");
htd.addFamily(new HColumnDescriptor("f"));
HRegionInfo parent =
- new HRegionInfo(htd.getName(), Bytes.toBytes("aaa"),
+ new HRegionInfo(htd.getName(), Bytes.toBytes("aaa"),
Bytes.toBytes("eee"));
HRegionInfo splita =
- new HRegionInfo(htd.getName(), Bytes.toBytes("aaa"),
+ new HRegionInfo(htd.getName(), Bytes.toBytes("aaa"),
Bytes.toBytes("ccc"));
HRegionInfo splitb =
- new HRegionInfo(htd.getName(), Bytes.toBytes("ccc"),
+ new HRegionInfo(htd.getName(), Bytes.toBytes("ccc"),
Bytes.toBytes("eee"));
// Test that when both daughter regions are in place, that we do not
// remove the parent.
List kvs = new ArrayList();
kvs.add(new KeyValue(parent.getRegionName(), HConstants.CATALOG_FAMILY,
- HConstants.SPLITA_QUALIFIER, Writables.getBytes(splita)));
+ HConstants.SPLITA_QUALIFIER, Writables.getBytes(splita)));
kvs.add(new KeyValue(parent.getRegionName(), HConstants.CATALOG_FAMILY,
- HConstants.SPLITB_QUALIFIER, Writables.getBytes(splitb)));
+ HConstants.SPLITB_QUALIFIER, Writables.getBytes(splitb)));
Result r = new Result(kvs);
// Add a reference under splitA directory so we don't clear out the parent.
Path rootdir = services.getMasterFileSystem().getRootDir();
Path tabledir =
- HTableDescriptor.getTableDir(rootdir, htd.getName());
+ HTableDescriptor.getTableDir(rootdir, htd.getName());
Path storedir = Store.getStoreHomedir(tabledir, splita.getEncodedName(),
- htd.getColumnFamilies()[0].getName());
- Reference ref = Reference.createTopReference(Bytes.toBytes("ccc"));
+ htd.getColumnFamilies()[0].getName());
+ Reference ref = new Reference(Bytes.toBytes("ccc"), Reference.Range.top);
long now = System.currentTimeMillis();
// Reference name has this format: StoreFile#REF_NAME_PARSER
Path p = new Path(storedir, Long.toString(now) + "." + parent.getEncodedName());
@@ -367,7 +387,7 @@ public class TestCatalogJanitor {
*/
@Test
public void testParentCleanedEvenIfDaughterGoneFirst()
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException {
parentWithSpecifiedEndKeyCleanedEvenIfDaughterGoneFirst(
"testParentCleanedEvenIfDaughterGoneFirst", Bytes.toBytes("eee"));
}
@@ -379,11 +399,215 @@ public class TestCatalogJanitor {
*/
@Test
public void testLastParentCleanedEvenIfDaughterGoneFirst()
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException {
parentWithSpecifiedEndKeyCleanedEvenIfDaughterGoneFirst(
"testLastParentCleanedEvenIfDaughterGoneFirst", new byte[0]);
}
+ @Test
+ public void testArchiveOldRegion() throws Exception {
+ String table = "table";
+ HBaseTestingUtility htu = new HBaseTestingUtility();
+ setRootDirAndCleanIt(htu, "testCleanParent");
+ Server server = new MockServer(htu);
+ try {
+ MasterServices services = new MockMasterServices(server);
+ // add the test table as the one to be archived
+ ZooKeeperWatcher watcher = Mockito.mock(ZooKeeperWatcher.class);
+ SimpleHFileArchiveTableMonitor monitor = new SimpleHFileArchiveTableMonitor(services, watcher);
+ TableHFileArchiveTracker tracker = TableHFileArchiveTracker.create(watcher, monitor);
+
+ // create the janitor
+ CatalogJanitor janitor = new CatalogJanitor(server, services, tracker);
+
+ // Create regions.
+ HTableDescriptor htd = new HTableDescriptor(table);
+ htd.addFamily(new HColumnDescriptor("f"));
+ HRegionInfo parent = new HRegionInfo(htd.getName(), Bytes.toBytes("aaa"),
+ Bytes.toBytes("eee"));
+ HRegionInfo splita = new HRegionInfo(htd.getName(), Bytes.toBytes("aaa"),
+ Bytes.toBytes("ccc"));
+ HRegionInfo splitb = new HRegionInfo(htd.getName(), Bytes.toBytes("ccc"),
+ Bytes.toBytes("eee"));
+ // Test that when both daughter regions are in place, that we do not
+ // remove the parent.
+ List kvs = new ArrayList();
+ kvs.add(new KeyValue(parent.getRegionName(), HConstants.CATALOG_FAMILY,
+ HConstants.SPLITA_QUALIFIER, Writables.getBytes(splita)));
+ kvs.add(new KeyValue(parent.getRegionName(), HConstants.CATALOG_FAMILY,
+ HConstants.SPLITB_QUALIFIER, Writables.getBytes(splitb)));
+ Result r = new Result(kvs);
+
+ Path rootdir = services.getMasterFileSystem().getRootDir();
+ Path tabledir = HTableDescriptor.getTableDir(rootdir, htd.getName());
+ Path storedir = Store.getStoreHomedir(tabledir, parent.getEncodedName(),
+ htd.getColumnFamilies()[0].getName());
+
+ // first do a delete without archiving
+ addMockStoreFiles(2, services, storedir);
+ assertTrue(janitor.cleanParent(parent, r));
+
+ // and make sure that no files are archived
+ FileSystem fs = services.getMasterFileSystem().getFileSystem();
+ Path storeArchive = HFileArchiveUtil.getStoreArchivePath(services.getConfiguration(), parent,
+ tabledir, htd.getColumnFamilies()[0].getName());
+ assertEquals(0, fs.listStatus(storeArchive).length);
+
+ // enable archiving, make sure that files get archived
+ monitor.addTable(table);
+ addMockStoreFiles(2, services, storedir);
+ // get the current store files for comparison
+ FileStatus[] storeFiles = fs.listStatus(storedir);
+
+ // do the cleaning of the parent
+ assertTrue(janitor.cleanParent(parent, r));
+
+ // and now check to make sure that the files have actually been archived
+ FileStatus[] archivedStoreFiles = fs.listStatus(storeArchive);
+ assertArchiveEqualToOriginal(storeFiles, archivedStoreFiles, fs);
+ } finally {
+ server.stop("shutdown");
+ }
+ }
+
+ /**
+ * Test that if a store file with the same name is present as those already
+ * backed up cause the already archived files to be timestamped backup
+ */
+ @Test
+ public void testDuplicateHFileResolution() throws Exception {
+ String table = "table";
+ HBaseTestingUtility htu = new HBaseTestingUtility();
+ setRootDirAndCleanIt(htu, "testCleanParent");
+ Server server = new MockServer(htu);
+ try {
+ MasterServices services = new MockMasterServices(server);
+ // add the test table as the one to be archived
+ ZooKeeperWatcher watcher = Mockito.mock(ZooKeeperWatcher.class);
+ TableHFileArchiveTracker manager = TableHFileArchiveTracker.create(watcher,
+ new SimpleHFileArchiveTableMonitor(services, watcher));
+ HFileArchiveTableMonitor tracker = manager.getMonitor();
+ tracker.addTable(table);
+
+ // create the janitor
+ CatalogJanitor janitor = new CatalogJanitor(server, services, manager);
+
+ // Create regions.
+ HTableDescriptor htd = new HTableDescriptor(table);
+ htd.addFamily(new HColumnDescriptor("f"));
+ HRegionInfo parent = new HRegionInfo(htd.getName(), Bytes.toBytes("aaa"),
+ Bytes.toBytes("eee"));
+ HRegionInfo splita = new HRegionInfo(htd.getName(), Bytes.toBytes("aaa"),
+ Bytes.toBytes("ccc"));
+ HRegionInfo splitb = new HRegionInfo(htd.getName(), Bytes.toBytes("ccc"),
+ Bytes.toBytes("eee"));
+ // Test that when both daughter regions are in place, that we do not
+ // remove the parent.
+ List kvs = new ArrayList();
+ kvs.add(new KeyValue(parent.getRegionName(), HConstants.CATALOG_FAMILY,
+ HConstants.SPLITA_QUALIFIER, Writables.getBytes(splita)));
+ kvs.add(new KeyValue(parent.getRegionName(), HConstants.CATALOG_FAMILY,
+ HConstants.SPLITB_QUALIFIER, Writables.getBytes(splitb)));
+ Result r = new Result(kvs);
+
+ Path rootdir = services.getMasterFileSystem().getRootDir();
+ Path tabledir = HTableDescriptor.getTableDir(rootdir, htd.getName());
+ Path storedir = Store.getStoreHomedir(tabledir, parent.getEncodedName(),
+ htd.getColumnFamilies()[0].getName());
+
+ FileSystem fs = services.getMasterFileSystem().getFileSystem();
+ Path storeArchive = HFileArchiveUtil.getStoreArchivePath(services.getConfiguration(), parent,
+ tabledir, htd.getColumnFamilies()[0].getName());
+
+ // enable archiving, make sure that files get archived
+ tracker.addTable(table);
+ addMockStoreFiles(2, services, storedir);
+ // get the current store files for comparison
+ FileStatus[] storeFiles = fs.listStatus(storedir);
+
+ // do the cleaning of the parent
+ assertTrue(janitor.cleanParent(parent, r));
+
+ // and now check to make sure that the files have actually been archived
+ FileStatus[] archivedStoreFiles = fs.listStatus(storeArchive);
+ assertArchiveEqualToOriginal(storeFiles, archivedStoreFiles, fs);
+
+ // now add store files with the same names as before to check backup
+ // enable archiving, make sure that files get archived
+ tracker.addTable(table);
+ addMockStoreFiles(2, services, storedir);
+
+ // do the cleaning of the parent
+ assertTrue(janitor.cleanParent(parent, r));
+
+ // and now check to make sure that the files have actually been archived
+ archivedStoreFiles = fs.listStatus(storeArchive);
+ assertArchiveEqualToOriginal(storeFiles, archivedStoreFiles, fs, true);
+ } finally {
+ server.stop("shutdown");
+ }
+ }
+
+ @Category(MediumTests.class)
+ @Test
+ public void testCatalogJanitorMonitoredInArchive() throws Exception {
+ HBaseTestingUtility util = new HBaseTestingUtility();
+ try {
+ // setup the minicluster
+ util.startMiniCluster();
+ byte[] TABLE_NAME = Bytes.toBytes("TABLE");
+ HBaseAdmin admin = util.getHBaseAdmin();
+ util.createTable(TABLE_NAME, Bytes.toBytes("cf"));
+ assertFalse(admin.getArchivingEnabled(TABLE_NAME));
+
+ // make sure a simple enable works
+ admin.enableHFileBackup(TABLE_NAME);
+ assertTrue(admin.getArchivingEnabled(TABLE_NAME));
+ admin.disableHFileBackup();
+ assertFalse(admin.getArchivingEnabled(TABLE_NAME));
+
+ // set the table tracker to fail on archiving
+ CatalogJanitor catalog = util.getMiniHBaseCluster().getMaster().getCatalogJanitor();
+ TableHFileArchiveTracker tracker = (TableHFileArchiveTracker) catalog.getHfileArchiveMonitor();
+
+ // keep the original around
+ HFileArchiveTableMonitor orig = tracker.getMonitor();
+
+ // set a new moitor that we know will not register archiving
+ tracker.setTableMonitor(new SimpleHFileArchiveTableMonitor(util.getMiniHBaseCluster().getMaster(),
+ util.getZooKeeperWatcher()));
+
+ try {
+ // try turning on archiving, but it should fail
+ admin.enableHFileBackup(TABLE_NAME);
+ fail("Shouldn't have been able to finish archiving with the catalog tracker not joining");
+ } catch (IOException e) {
+ // reset the tracker, if we don't get an exception, then everything
+ // breaks
+ tracker.setTableMonitor(orig);
+ }
+ } finally {
+ util.shutdownMiniCluster();
+ }
+ }
+
+ private void addMockStoreFiles(int count, MasterServices services,
+ Path storedir) throws IOException {
+ // get the existing store files
+ FileSystem fs = services.getMasterFileSystem().getFileSystem();
+ fs.mkdirs(storedir);
+ // create the store files in the parent
+ for (int i = 0; i < count; i++) {
+ Path storeFile = new Path(storedir, "_store" + i);
+ FSDataOutputStream dos = fs.create(storeFile, true);
+ dos.writeBytes("Some data: " + i);
+ dos.close();
+ }
+ // make sure the mock store files are there
+ FileStatus[] storeFiles = fs.listStatus(storedir);
+ assertEquals(count, storeFiles.length);
+ }
+
/**
* Make sure parent with specified end key gets cleaned up even if daughter is cleaned up before it.
*
@@ -393,13 +617,17 @@ public class TestCatalogJanitor {
* @throws InterruptedException
*/
private void parentWithSpecifiedEndKeyCleanedEvenIfDaughterGoneFirst(
- final String rootDir, final byte[] lastEndKey)
- throws IOException, InterruptedException {
+ final String rootDir, final byte[] lastEndKey)
+ throws IOException, InterruptedException {
HBaseTestingUtility htu = new HBaseTestingUtility();
setRootDirAndCleanIt(htu, rootDir);
Server server = new MockServer(htu);
MasterServices services = new MockMasterServices(server);
- CatalogJanitor janitor = new CatalogJanitor(server, services);
+ ZooKeeperWatcher watcher = Mockito.mock(ZooKeeperWatcher.class);
+ TableHFileArchiveTracker monitor = TableHFileArchiveTracker.create(watcher,
+ new SimpleHFileArchiveTableMonitor(
+ services, watcher));
+ CatalogJanitor janitor = new CatalogJanitor(server, services, monitor);
final HTableDescriptor htd = createHTableDescriptor();
// Create regions: aaa->{lastEndKey}, aaa->ccc, aaa->bbb, bbb->ccc, etc.
@@ -429,12 +657,12 @@ public class TestCatalogJanitor {
HRegionInfo splitba = new HRegionInfo(htd.getName(), Bytes.toBytes("ccc"),
Bytes.toBytes("ddd"));
HRegionInfo splitbb = new HRegionInfo(htd.getName(), Bytes.toBytes("ddd"),
- lastEndKey);
+ lastEndKey);
// First test that our Comparator works right up in CatalogJanitor.
// Just fo kicks.
SortedMap regions =
- new TreeMap(new CatalogJanitor.SplitParentFirstComparator());
+ new TreeMap(new CatalogJanitor.SplitParentFirstComparator());
// Now make sure that this regions map sorts as we expect it to.
regions.put(parent, createResult(parent, splita, splitb));
regions.put(splitb, createResult(splitb, splitba, splitbb));
@@ -455,7 +683,7 @@ public class TestCatalogJanitor {
// Now play around with the cleanParent function. Create a ref from splita
// up to the parent.
Path splitaRef =
- createReferences(services, htd, parent, splita, Bytes.toBytes("ccc"), false);
+ createReferences(services, htd, parent, splita, Bytes.toBytes("ccc"), false);
// Make sure actual super parent sticks around because splita has a ref.
assertFalse(janitor.cleanParent(parent, regions.get(parent)));
@@ -471,9 +699,9 @@ public class TestCatalogJanitor {
assertTrue(fs.delete(splitaRef, true));
// Create the refs from daughters of splita.
Path splitaaRef =
- createReferences(services, htd, splita, splitaa, Bytes.toBytes("bbb"), false);
+ createReferences(services, htd, splita, splitaa, Bytes.toBytes("bbb"), false);
Path splitabRef =
- createReferences(services, htd, splita, splitab, Bytes.toBytes("bbb"), true);
+ createReferences(services, htd, splita, splitab, Bytes.toBytes("bbb"), true);
// Test splita. It should stick around because references from splitab, etc.
assertFalse(janitor.cleanParent(splita, regions.get(splita)));
@@ -492,10 +720,12 @@ public class TestCatalogJanitor {
private String setRootDirAndCleanIt(final HBaseTestingUtility htu,
final String subdir)
- throws IOException {
+ throws IOException {
Path testdir = htu.getDataTestDir(subdir);
FileSystem fs = FileSystem.get(htu.getConfiguration());
- if (fs.exists(testdir)) assertTrue(fs.delete(testdir, true));
+ if (fs.exists(testdir)) {
+ assertTrue(fs.delete(testdir, true));
+ }
htu.getConfiguration().set(HConstants.HBASE_DIR, testdir.toString());
return htu.getConfiguration().get(HConstants.HBASE_DIR);
}
@@ -513,13 +743,13 @@ public class TestCatalogJanitor {
private Path createReferences(final MasterServices services,
final HTableDescriptor htd, final HRegionInfo parent,
final HRegionInfo daughter, final byte [] midkey, final boolean top)
- throws IOException {
+ throws IOException {
Path rootdir = services.getMasterFileSystem().getRootDir();
Path tabledir = HTableDescriptor.getTableDir(rootdir, parent.getTableName());
Path storedir = Store.getStoreHomedir(tabledir, daughter.getEncodedName(),
htd.getColumnFamilies()[0].getName());
- Reference ref =
- top? Reference.createTopReference(midkey): Reference.createBottomReference(midkey);
+ Reference ref = new Reference(midkey,
+ top? Reference.Range.top: Reference.Range.bottom);
long now = System.currentTimeMillis();
// Reference name has this format: StoreFile#REF_NAME_PARSER
Path p = new Path(storedir, Long.toString(now) + "." + parent.getEncodedName());
@@ -530,7 +760,7 @@ public class TestCatalogJanitor {
private Result createResult(final HRegionInfo parent, final HRegionInfo a,
final HRegionInfo b)
- throws IOException {
+ throws IOException {
List kvs = new ArrayList();
kvs.add(new KeyValue(parent.getRegionName(), HConstants.CATALOG_FAMILY,
HConstants.SPLITA_QUALIFIER, Writables.getBytes(a)));
@@ -547,6 +777,6 @@ public class TestCatalogJanitor {
@org.junit.Rule
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
- new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
+ new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionHFileArchiving.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionHFileArchiving.java
new file mode 100644
index 0000000..b1bdf02
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionHFileArchiving.java
@@ -0,0 +1,599 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.apache.hadoop.hbase.util.HFileArchiveTestingUtil.assertArchiveEqualToOriginal;
+import static org.apache.hadoop.hbase.util.HFileArchiveTestingUtil.compareArchiveToOriginal;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.backup.HFileArchiveMonitor;
+import org.apache.hadoop.hbase.backup.HFileArchiveTableMonitor;
+import org.apache.hadoop.hbase.backup.HFileDisposer;
+import org.apache.hadoop.hbase.backup.TableHFileArchiveTracker;
+import org.apache.hadoop.hbase.backup.SimpleHFileArchiveTableMonitor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.HFileArchiveTestingUtil;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
+
+/**
+ * Spin up a small cluster and check that a region properly archives its hfiles
+ * when enabled.
+ */
+@Category(MediumTests.class)
+public class TestRegionHFileArchiving {
+
+ private static final Log LOG = LogFactory.getLog(TestRegionHFileArchiving.class);
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+ private static final String STRING_TABLE_NAME = "test";
+ private static final byte[] TEST_FAM = Bytes.toBytes("fam");
+ private static final byte[] TABLE_NAME = Bytes.toBytes(STRING_TABLE_NAME);
+ private static final int numRS = 2;
+ private static final int maxTries = 5;
+
+ /**
+ * Setup the config for the cluster
+ */
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ setupConf(UTIL.getConfiguration());
+ UTIL.startMiniCluster(numRS);
+ }
+
+ private static void setupConf(Configuration conf) {
+ // disable the ui
+ conf.setInt("hbase.regionsever.info.port", -1);
+ // change the flush size to a small amount, regulating number of store files
+ conf.setInt("hbase.hregion.memstore.flush.size", 25000);
+ // so make sure we get a compaction when doing a load, but keep around some
+ // files in the store
+ conf.setInt("hbase.hstore.compaction.min", 10);
+ conf.setInt("hbase.hstore.compactionThreshold", 10);
+ // block writes if we get to 12 store files
+ conf.setInt("hbase.hstore.blockingStoreFiles", 12);
+ // drop the number of attempts for the hbase admin
+ UTIL.getConfiguration().setInt("hbase.client.retries.number", 1);
+ }
+
+ @Before
+ public void setup() throws Exception {
+ UTIL.createTable(TABLE_NAME, TEST_FAM);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ UTIL.deleteTable(TABLE_NAME);
+ // and cleanup the archive directory
+ try {
+ UTIL.getTestFileSystem().delete(new Path(UTIL.getDefaultRootDirPath(), ".archive"), true);
+ } catch (IOException e) {
+ LOG.warn("Failure to delete archive directory", e);
+ }
+ // make sure that backups are off for all tables
+ UTIL.getHBaseAdmin().disableHFileBackup();
+ }
+
+ @AfterClass
+ public static void cleanupTest() throws Exception {
+ try {
+ UTIL.shutdownMiniCluster();
+ } catch (Exception e) {
+ // NOOP;
+ }
+ }
+
+ @Test
+ public void testEnableDisableArchiving() throws Exception {
+ // Make sure archiving is not enabled
+ assertFalse(UTIL.getHBaseAdmin().getArchivingEnabled(TABLE_NAME));
+
+ // get the RS and region serving our table
+ HBaseAdmin admin = UTIL.getHBaseAdmin();
+ List servingRegions = UTIL.getHBaseCluster().getRegions(TABLE_NAME);
+ // make sure we only have 1 region serving this table
+ assertEquals(1, servingRegions.size());
+ HRegion region = servingRegions.get(0);
+ HRegionServer hrs = UTIL.getRSForFirstRegionInTable(TABLE_NAME);
+ FileSystem fs = hrs.getFileSystem();
+
+ // enable archiving
+ admin.enableHFileBackup(TABLE_NAME);
+ assertTrue(UTIL.getHBaseAdmin().getArchivingEnabled(TABLE_NAME));
+
+ // put some load on the table and make sure that files do get archived
+ loadAndCompact(region);
+
+ // check that we actually have some store files that were archived
+ HFileArchiveMonitor monitor = hrs.getHFileArchiveMonitor();
+ Store store = region.getStore(TEST_FAM);
+ Path storeArchiveDir = HFileArchiveTestingUtil.getStoreArchivePath(UTIL.getConfiguration(),
+ region, store);
+
+ // check the archive
+ assertTrue(fs.exists(storeArchiveDir));
+ assertTrue(fs.listStatus(storeArchiveDir).length > 0);
+
+ // now test that we properly stop backing up
+ LOG.debug("Stopping backup and testing that we don't archive");
+ admin.disableHFileBackup(STRING_TABLE_NAME);
+ int counter = 0;
+ while (monitor.keepHFiles(STRING_TABLE_NAME)) {
+ LOG.debug("Waiting for archive change to propaate");
+ Thread.sleep(100);
+ // max tries to propagate - if not, something is probably horribly
+ // wrong with zk/notification
+ assertTrue("Exceeded max tries to propagate hfile backup changes", counter++ < maxTries);
+ }
+
+ // delete the existing archive files
+ fs.delete(storeArchiveDir, true);
+
+ // and then put some more data in the table and ensure it compacts
+ loadAndCompact(region);
+
+ // put data into the table again to make sure that we don't copy new files
+ // over into the archive directory
+
+ // ensure there are no archived files
+ assertFalse(fs.exists(storeArchiveDir));
+
+ // make sure that overall backup also disables per-table
+ admin.enableHFileBackup(TABLE_NAME);
+ admin.disableHFileBackup();
+ assertFalse(UTIL.getHBaseAdmin().getArchivingEnabled(TABLE_NAME));
+ }
+
+ /**
+ * Ensure that archiving won't be turned on but have the tracker miss the
+ * update to the table via incorrect ZK watches (especially because
+ * createWithParents is not transactional).
+ * @throws Exception
+ */
+ @Test
+ public void testFindsTablesAfterArchivingEnabled() throws Exception {
+ // 1. create a tracker to track the nodes
+ ZooKeeperWatcher zkw = UTIL.getZooKeeperWatcher();
+ HRegionServer hrs = UTIL.getRSForFirstRegionInTable(TABLE_NAME);
+ TableHFileArchiveTracker tracker = hrs.hfileArchiveTracker;
+
+ // 2. create the archiving enabled znode
+ ZKUtil.createAndFailSilent(zkw, zkw.archiveHFileZNode);
+
+ // 3. now turn on archiving for the test table
+ HBaseAdmin admin = UTIL.getHBaseAdmin();
+ admin.enableHFileBackup(TABLE_NAME);
+
+ // 4. make sure that archiving is enabled for that tracker
+ assertTrue(tracker.keepHFiles(STRING_TABLE_NAME));
+ }
+
+ /**
+ * Test that we do synchronously start archiving and not return until we are
+ * done
+ */
+ @Test
+ public void testSynchronousArchiving() throws Exception {
+ HBaseAdmin admin = UTIL.getHBaseAdmin();
+
+ // 1. turn on hfile backups
+ LOG.debug("----Starting archiving");
+ admin.enableHFileBackup(TABLE_NAME);
+ assertTrue(UTIL.getHBaseAdmin().getArchivingEnabled(TABLE_NAME));
+
+ // 2. ensure that backups are kept on each RS
+ // get all the monitors
+ for (int i = 0; i < numRS; i++) {
+ HRegionServer hrs = UTIL.getHBaseCluster().getRegionServer(i);
+ // make sure that at least regions hosting the table have received the
+ // update to start archiving
+ if (hrs.getOnlineRegions(TABLE_NAME).size() > 0) {
+ assertTrue(hrs.getHFileArchiveMonitor().keepHFiles(STRING_TABLE_NAME));
+ }
+ }
+
+ // 3. now attempt to archive some other table that doesn't exist
+ try {
+ admin.enableHFileBackup("other table");
+ fail("Should get an IOException if a table cannot be backed up.");
+ } catch (IOException e) {
+ // this should happen
+ }
+ assertFalse("Table 'other table' should not be archived - it doesn't exist!", UTIL
+ .getHBaseAdmin().getArchivingEnabled(Bytes.toBytes("other table")));
+
+ // 4. now prevent one of the regionservers from archiving, which should
+ // cause archiving to fail
+ // make sure all archiving is off
+ admin.disableHFileBackup();
+ assertFalse(admin.getArchivingEnabled(TABLE_NAME));
+
+ // then hack the RS to not do any registration
+ HRegionServer hrs = UTIL.getRSForFirstRegionInTable(TABLE_NAME);
+ HFileArchiveTableMonitor orig = hrs.hfileArchiveTracker.getMonitor();
+ // set the monitor so it doesn't attempt to register the regionserver
+ hrs.hfileArchiveTracker.setTableMonitor(new SimpleHFileArchiveTableMonitor(hrs, UTIL
+ .getZooKeeperWatcher()));
+
+ // try turning on archiving, but it should fail
+ try {
+ admin.enableHFileBackup(TABLE_NAME);
+ fail("Shouldn't have been able to finish archiving");
+ } catch (IOException e) {
+ // reset the tracker, if we don't get an exception, then everything breaks
+ hrs.hfileArchiveTracker.setTableMonitor(orig);
+ }
+ }
+
+ /**
+ * Test the advanced case where we turn on archiving and the region propagates
+ * the change down to the store
+ */
+ @SuppressWarnings("null")
+ @Category(LargeTests.class)
+ @Test(timeout = 200000)
+ // timeout = 200 sec - if it takes longer, something is seriously borked with
+ // the minicluster.
+ public void testCompactAndArchive() throws Exception {
+ HBaseAdmin admin = UTIL.getHBaseAdmin();
+
+ // get the RS and region serving our table
+ List servingRegions = UTIL.getHBaseCluster().getRegions(TABLE_NAME);
+ // make sure we only have 1 region serving this table
+ assertEquals(1, servingRegions.size());
+ HRegion region = servingRegions.get(0);
+
+ // get the parent RS and monitor
+ HRegionServer hrs = UTIL.getRSForFirstRegionInTable(TABLE_NAME);
+ FileSystem fs = hrs.getFileSystem();
+ Store store = region.getStores().get(TEST_FAM);
+
+ // 1. put some data on the region
+ LOG.debug("-------Loading table");
+ UTIL.loadRegion(region, TEST_FAM);
+
+ // get the current store files for the region
+ // and that there is only one store in the region
+ assertEquals(1, region.getStores().size());
+
+ int fileCount = store.getStorefiles().size();
+ assertTrue("Need more than 1 store file to compact and test archiving", fileCount > 1);
+ LOG.debug("Currently have: " + fileCount + " store files.");
+ LOG.debug("Has store files:");
+ for (StoreFile sf : store.getStorefiles()) {
+ LOG.debug("\t" + sf.getPath());
+ }
+
+ // 2. make sure that table archiving is enabled
+ // first force a flush to make sure nothing weird happens
+ region.flushcache();
+
+ // turn on hfile backups into .archive
+ LOG.debug("-----Enabling backups");
+ admin.enableHFileBackup(TABLE_NAME);
+
+ Path storeArchiveDir = HFileArchiveTestingUtil.getStoreArchivePath(UTIL.getConfiguration(),
+ region, store);
+
+ // make sure we block the store from compacting/flushing files in the middle
+ // of our copy of the store files
+ List origFiles = null;
+ FileStatus[] originals = null;
+ List copiedStores = null;
+ FileStatus[] archivedFiles = null;
+ // wait for all the compactions to finish
+ waitOnCompactions(store);
+
+ // lock the store for compactions
+ boolean done = false;
+ int tries = 0;
+ // this is a couple times to ensure that it wasn't just a compaction issue
+ while (!done && tries++ < maxTries) {
+ // wait on memstore flushes to finish
+ region.waitForFlushesAndCompactions();
+ synchronized (store.filesCompacting) {
+ synchronized (store.flushLock) {
+ // if there are files unlock it and try again
+ if (store.filesCompacting.size() > 0) {
+ LOG.debug("Got some more files, waiting on compaction to finish again.");
+ continue;
+ }
+ LOG.debug("Locked the store");
+
+ // make sure we don't have any extra archive files from random
+ // compactions in the middle of the test
+ LOG.debug("-----Initial files in store archive:");
+ if (fs.exists(storeArchiveDir)) {
+ for (FileStatus f : fs.listStatus(storeArchiveDir)) {
+ LOG.debug("Deleting archive file: " + f.getPath()
+ + ", so we have a consistent backup view.");
+ }
+ fs.delete(storeArchiveDir, true);
+ } else LOG.debug("[EMPTY]");
+
+ // make sure that archiving is in a 'clean' state
+ assertNull(fs.listStatus(storeArchiveDir));
+
+ // get the original store files before compaction
+ LOG.debug("------Original store files:");
+ originals = fs.listStatus(store.getHomedir());
+ for (FileStatus f : originals) {
+ LOG.debug("\t" + f.getPath());
+ }
+ // copy the original store files so we can use them for testing
+ // overwriting store files with the same name below
+ origFiles = store.getStorefiles();
+ copiedStores = new ArrayList(origFiles.size());
+ Path temproot = new Path(hrs.getRootDir(), "store_copy");
+ for (StoreFile f : origFiles) {
+ if (!fs.exists(f.getPath())) continue;
+
+ // do the actually copy of the file
+ Path tmpStore = new Path(temproot, f.getPath().getName());
+ FSDataOutputStream tmpOutput = fs.create(tmpStore);
+ FSDataInputStream storeInput = fs.open(f.getPath());
+ while (storeInput.available() > 0) {
+ byte[] remaining = new byte[1024];
+ storeInput.read(remaining);
+ tmpOutput.write(remaining);
+ }
+ tmpOutput.close();
+ storeInput.close();
+ copiedStores.add(tmpStore);
+ }
+ LOG.debug("---------- Triggering compaction");
+ compactRegion(region, TEST_FAM);
+
+ // then get the archived store files
+ LOG.debug("----------Archived store files after compaction (" + storeArchiveDir + "):");
+ archivedFiles = fs.listStatus(storeArchiveDir);
+ for (FileStatus f : archivedFiles) {
+ LOG.debug("\t" + f.getPath());
+ }
+
+ // compare the archive to the original, and then try again if not
+ // equal since a compaction may have been finishing
+ if (!compareArchiveToOriginal(originals, archivedFiles, fs, false)) {
+ LOG.debug("Archive doesn't match, trying again.");
+ done = false;
+ } else done = true;
+ }
+ }
+ }
+ LOG.debug("Unlocked the store.");
+ for (FileStatus f : archivedFiles) {
+ LOG.debug("\t" + f.getPath());
+ }
+ assertArchiveEqualToOriginal(originals, archivedFiles, fs);
+ assertTrue("Tried too many times, something is messed up in the check logic", done);
+ LOG.debug("Archive matches originals.");
+
+ // 3. Now copy back in the store files and trigger another compaction
+
+ // lock again so we don't interfere with a compaction/flush
+ waitOnCompactions(store);
+ done = false;
+ tries = 0;
+ while (!done && tries++ < maxTries) {
+ // wait for flushes
+ region.waitForFlushesAndCompactions();
+ synchronized (store.filesCompacting) {
+ synchronized (store.flushLock) {
+ LOG.debug("Locked the store");
+ if (store.filesCompacting.size() > 0) {
+ LOG.debug("Got some more files, waiting on compaction to finish again.");
+ // hack sleep, but should help if we get a lot of compactions
+ Thread.sleep(100);
+ continue;
+ }
+
+ // delete the store directory (just in case)
+ fs.delete(store.getHomedir(), true);
+
+ // and copy back in the original store files (from before)
+ fs.mkdirs(store.getHomedir());
+ for (int i = 0; i < copiedStores.size(); i++) {
+ fs.rename(copiedStores.get(i), origFiles.get(i).getPath());
+ }
+ // now archive the files again
+ LOG.debug("Removing the store files again.");
+ HFileDisposer.disposeStoreFiles(hrs, store.getHRegion(), store.conf, store.getFamily()
+ .getName(), origFiles);
+
+ // ensure the files match to originals, but with a backup directory
+ LOG.debug("Checking originals vs. backed up (from archived) versions");
+ archivedFiles = fs.listStatus(storeArchiveDir);
+
+ // check equality to make sure a compaction didn't mess us up
+ if (!compareArchiveToOriginal(originals, archivedFiles, fs, true)) {
+ // loop again, to check the new files
+ LOG.debug("Archive doesn't match, trying again.");
+ done = false;
+ } else done = true;
+ done = true;
+ }
+ }
+ }
+ assertArchiveEqualToOriginal(originals, archivedFiles, fs, true);
+ assertTrue("Tried too many times, something is messed up in the check logic", done);
+ }
+
+ private void waitOnCompactions(Store store) throws Exception {
+ // busy loop waiting on files to compact to be empty
+ while (store.filesCompacting.size() > 0) {
+ Thread.sleep(100);
+ }
+ }
+
+ @Test
+ public void testRegionSplitAndArchive() throws Exception {
+ // start archiving
+ HBaseAdmin admin = UTIL.getHBaseAdmin();
+ admin.enableHFileBackup(TABLE_NAME);
+
+ // get the current store files for the region
+ final List servingRegions = UTIL.getHBaseCluster().getRegions(TABLE_NAME);
+ // make sure we only have 1 region serving this table
+ assertEquals(1, servingRegions.size());
+ HRegion region = servingRegions.get(0);
+
+ // and that there is only one store in the region
+ assertEquals(1, region.getStores().size());
+ Store store = region.getStores().get(TEST_FAM);
+
+ // prep the store files so we get some files
+ LOG.debug("Loading store files");
+ // prepStoreFiles(admin, store, 3);
+ UTIL.loadRegion(region, TEST_FAM);
+
+ // get the files before compaction
+ FileSystem fs = region.getRegionServerServices().getFileSystem();
+
+ // delete out the current archive files, just for ease of comparison
+ // and synchronize to make sure we don't clobber another compaction
+ Path storeArchiveDir = HFileArchiveTestingUtil.getStoreArchivePath(UTIL.getConfiguration(),
+ region, store);
+
+ synchronized (region.writestate) {
+ // wait for all the compactions/flushes to complete on the region
+ LOG.debug("Waiting on region " + region + "to complete compactions & flushes");
+ region.waitForFlushesAndCompactions();
+ LOG.debug("Removing archived files - general cleanup");
+ assertTrue(fs.delete(storeArchiveDir, true));
+ assertTrue(fs.mkdirs(storeArchiveDir));
+ }
+ LOG.debug("Starting split of region");
+ // now split our region
+ admin.split(TABLE_NAME);
+ while (UTIL.getHBaseCluster().getRegions(TABLE_NAME).size() < 2) {
+ LOG.debug("Waiting for regions to split.");
+ Thread.sleep(100);
+ }
+ LOG.debug("Regions finished splitting.");
+ // at this point the region should have split
+ servingRegions.clear();
+ servingRegions.addAll(UTIL.getHBaseCluster().getRegions(TABLE_NAME));
+ // make sure we now have 2 regions serving this table
+ assertEquals(2, servingRegions.size());
+
+ // now check to make sure that those regions will also archive
+ Collection regionservers = Collections2.filter(Collections2.transform(UTIL
+ .getMiniHBaseCluster().getRegionServerThreads(),
+ new Function() {
+
+ @Override
+ public HRegionServer apply(RegionServerThread input) {
+ return input.getRegionServer();
+ }
+ }), new Predicate() {
+
+ @Override
+ public boolean apply(HRegionServer input) {
+ // get the names of the regions hosted by the rs
+ Collection regions;
+ regions = Collections2.transform(input.getOnlineRegions(TABLE_NAME),
+ new Function() {
+ @Override
+ public String apply(HRegion input) {
+ return input.getRegionInfo().getEncodedName();
+ }
+ });
+
+ // then check to make sure this RS is serving one of the serving regions
+ boolean found = false;
+ for (HRegion region : servingRegions) {
+ if (regions.contains(region.getRegionInfo().getEncodedName())) {
+ found = true;
+ break;
+ }
+ }
+ return found;
+
+ }
+ });
+
+ assertTrue(regionservers.size() > 0);
+
+ // check each of the region servers to make sure it has got the update
+ for (HRegionServer serving : regionservers) {
+ assertTrue("RegionServer:" + serving + " hasn't been included in backup",
+ serving.hfileArchiveTracker.keepHFiles(STRING_TABLE_NAME));
+ }
+ }
+
+ /**
+ * Load the given region and then ensure that it compacts some files
+ */
+ private void loadAndCompact(HRegion region) throws Exception {
+ int tries = 0;
+ Exception last = null;
+ while (tries++ <= maxTries) {
+ try {
+ // load the region with data
+ UTIL.loadRegion(region, TEST_FAM);
+ // and then trigger a compaction to be sure we try to archive
+ compactRegion(region, TEST_FAM);
+ return;
+ } catch (Exception e) {
+ // keep this around for if we fail later
+ last = e;
+ }
+ }
+ throw last;
+ }
+
+ /**
+ * Compact all the store files in a given region.
+ */
+ private void compactRegion(HRegion region, byte[] family) throws IOException {
+ Store store = region.getStores().get(TEST_FAM);
+ store.compactRecentForTesting(store.getStorefiles().size());
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/HFileArchiveTestingUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/HFileArchiveTestingUtil.java
new file mode 100644
index 0000000..c542a1b
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/HFileArchiveTestingUtil.java
@@ -0,0 +1,220 @@
+package org.apache.hadoop.hbase.util;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Store;
+
+public class HFileArchiveTestingUtil {
+
+ private static final Log LOG = LogFactory.getLog(HFileArchiveTestingUtil.class);
+
+ private HFileArchiveTestingUtil() {
+ // NOOP private ctor since this is just a utility class
+ }
+
+ public static boolean compareArchiveToOriginal(FileStatus[] previous, FileStatus[] archived,
+ FileSystem fs, boolean hasTimedBackup) {
+
+ List> lists = getFileLists(previous, archived);
+ List original = lists.get(0);
+ Collections.sort(original);
+
+ List currentFiles = lists.get(1);
+ Collections.sort(currentFiles);
+
+ List backedup = lists.get(2);
+ Collections.sort(backedup);
+
+ // check the backed up files versus the current (should match up, less the
+ // backup time in the name)
+ if (!hasTimedBackup == (backedup.size() > 0)) {
+ LOG.debug("backedup files doesn't match expected.");
+ return false;
+ }
+ String msg = null;
+ if (hasTimedBackup) {
+ msg = assertArchiveEquality(original, backedup);
+ if (msg != null) {
+ LOG.debug(msg);
+ return false;
+ }
+ }
+ msg = assertArchiveEquality(original, currentFiles);
+ if (msg != null) {
+ LOG.debug(msg);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Compare the archived files to the files in the original directory
+ * @param previous original files that should have been archived
+ * @param archived files that were archived
+ * @param fs filessystem on which the archiving took place
+ * @throws IOException
+ */
+ public static void assertArchiveEqualToOriginal(FileStatus[] previous, FileStatus[] archived,
+ FileSystem fs) throws IOException {
+ assertArchiveEqualToOriginal(previous, archived, fs, false);
+ }
+
+ /**
+ * Compare the archived files to the files in the original directory
+ * @param previous original files that should have been archived
+ * @param archived files that were archived
+ * @param fs {@link FileSystem} on which the archiving took place
+ * @param hasTimedBackup true if we expect to find an archive backup
+ * directory with a copy of the files in the archive directory (and
+ * the original files).
+ * @throws IOException
+ */
+ public static void assertArchiveEqualToOriginal(FileStatus[] previous, FileStatus[] archived,
+ FileSystem fs, boolean hasTimedBackup) throws IOException {
+
+ List> lists = getFileLists(previous, archived);
+ List original = lists.get(0);
+ Collections.sort(original);
+
+ List currentFiles = lists.get(1);
+ Collections.sort(currentFiles);
+
+ List backedup = lists.get(2);
+ Collections.sort(backedup);
+
+ // check the backed up files versus the current (should match up, less the
+ // backup time in the name)
+ assertEquals("Didn't expect any backup files, but got: " + backedup, hasTimedBackup,
+ backedup.size() > 0);
+ String msg = null;
+ if (hasTimedBackup) {
+ assertArchiveEquality(original, backedup);
+ assertNull(msg, msg);
+ }
+
+ // do the rest of the comparison
+ msg = assertArchiveEquality(original, currentFiles);
+ assertNull(msg, msg);
+ }
+
+ private static String assertArchiveEquality(List expected, List archived) {
+ String compare = compareFileLists(expected, archived);
+ if (!(expected.size() == archived.size())) return "Not the same number of current files\n"
+ + compare;
+ if (!expected.equals(archived)) return "Different backup files, but same amount\n" + compare;
+ return null;
+ }
+
+ /**
+ * @return , where each is sorted
+ */
+ private static List> getFileLists(FileStatus[] previous, FileStatus[] archived) {
+ List> files = new ArrayList>();
+
+ // copy over the original files
+ List originalFileNames = convertToString(previous);
+ files.add(originalFileNames);
+
+ List currentFiles = new ArrayList(previous.length);
+ List backedupFiles = new ArrayList(previous.length);
+ for (FileStatus f : archived) {
+ String name = f.getPath().getName();
+ // if the file has been backed up
+ if (name.contains(".")) {
+ Path parent = f.getPath().getParent();
+ String shortName = name.split("[.]")[0];
+ Path modPath = new Path(parent, shortName);
+ FileStatus file = new FileStatus(f.getLen(), f.isDir(), f.getReplication(),
+ f.getBlockSize(), f.getModificationTime(), modPath);
+ backedupFiles.add(file);
+ } else {
+ // otherwise, add it to the list to compare to the original store files
+ currentFiles.add(name);
+ }
+ }
+
+ files.add(currentFiles);
+ files.add(convertToString(backedupFiles));
+ return files;
+ }
+
+ private static List convertToString(FileStatus[] files) {
+ return convertToString(Arrays.asList(files));
+ }
+
+ private static List convertToString(List files) {
+ List originalFileNames = new ArrayList(files.size());
+ for (FileStatus f : files) {
+ originalFileNames.add(f.getPath().getName());
+ }
+ return originalFileNames;
+ }
+
+ /* Get a pretty representation of the differences */
+ private static String compareFileLists(List expected, List gotten) {
+ StringBuilder sb = new StringBuilder("Expected (" + expected.size() + "): \t\t Gotten ("
+ + gotten.size() + "):\n");
+ List notFound = new ArrayList();
+ for (String s : expected) {
+ if (gotten.contains(s)) sb.append(s + "\t\t" + s + "\n");
+ else notFound.add(s);
+ }
+ sb.append("Not Found:\n");
+ for (String s : notFound) {
+ sb.append(s + "\n");
+ }
+ sb.append("\nExtra:\n");
+ for (String s : gotten) {
+ if (!expected.contains(s)) sb.append(s + "\n");
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Helper method to get the archive directory for the specified region
+ * @param conf {@link Configuration} to check for the name of the archive
+ * directory
+ * @param region region that is being archived
+ * @return {@link Path} to the archive directory for the given region
+ */
+ public static Path getRegionArchiveDir(Configuration conf, HRegion region) {
+ return HFileArchiveUtil.getRegionArchiveDir(conf, region.getTableDir(), region.getRegionDir());
+ }
+
+ /**
+ * Helper method to get the table archive directory for the specified region
+ * @param conf {@link Configuration} to check for the name of the archive
+ * directory
+ * @param region region that is being archived
+ * @return {@link Path} to the table archive directory for the given region
+ */
+ public static Path getTableArchivePath(Configuration conf, HRegion region) {
+ return HFileArchiveUtil.getTableArchivePath(conf, region.getTableDir());
+ }
+
+ /**
+ * Helper method to get the store archive directory for the specified region
+ * @param conf {@link Configuration} to check for the name of the archive
+ * directory
+ * @param region region that is being archived
+ * @param store store that is archiving files
+ * @return {@link Path} to the store archive directory for the given region
+ */
+ public static Path getStoreArchivePath(Configuration conf, HRegion region, Store store) {
+ return HFileArchiveUtil.getStoreArchivePath(conf, region, store.getFamily().getName());
+
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java
index 3f61cfb..f3d2bd7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.backup.HFileArchiveMonitor;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.ipc.RpcServer;
@@ -35,7 +36,6 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
@@ -156,4 +156,10 @@ public class MockRegionServerServices implements RegionServerServices {
public void setFileSystem(FileSystem hfs) {
this.hfs = (HFileSystem)hfs;
}
+
+ @Override
+ public HFileArchiveMonitor getHFileArchiveMonitor() {
+ // TODO Implement getHFileArchiveManager
+ return null;
+ }
}