diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index eb1ac79..b5084a6 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -657,7 +657,7 @@ public final class HConstants { public static final String ENABLE_WAL_COMPRESSION = "hbase.regionserver.wal.enablecompression"; -/** Region in Transition metrics threshold time */ + /** Region in Transition metrics threshold time */ public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD="hbase.metrics.rit.stuck.warning.threshold"; public static final String LOAD_BALANCER_SLOP_KEY = "hbase.regions.slop"; @@ -670,6 +670,10 @@ public final class HConstants { /** delimiter used between portions of a region name */ public static final int DELIMITER = ','; + /** Configuration key for the directory to backup HFiles for a table */ + public static final String HFILE_ARCHIVE_DIRECTORY = "hbase.table.archive.directory"; + public static final String HFILE_ARCHIVE_ZNODE_PARENT = "hfilearchive"; + private HConstants() { // Can't be instantiated with this ctor. } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiveCleanup.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiveCleanup.java new file mode 100644 index 0000000..4a88e2e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiveCleanup.java @@ -0,0 +1,249 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup; + +import java.io.IOException; +import java.util.Stack; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.Parser; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.HFileArchiveUtil; + +import com.google.common.base.Function; + +/** + * Utility to help cleanup extra archive files. This is particularly useful if + * there is a partially completed archive, but one of the regionservers bailed + * before it could be completed. Ignores directories as existing archiving might + * need them. + *

+ * Run with -h to see help. + */ +public class HFileArchiveCleanup { + + private static final Log LOG = LogFactory.getLog(HFileArchiveCleanup.class); + + private static Configuration conf; + + // configuration + private String table = null; + private static Function shouldDelete; + + public static void main(String[] args) throws Exception { + // setup + HFileArchiveCleanup cleaner = new HFileArchiveCleanup(); + try { + if (!cleaner.parse(args)) { + System.exit(-1); + } + } catch (ParseException e) { + LOG.error("Parse Exception: " + e.getMessage()); + cleaner.printHelp(); + System.exit(-1); + } + + // run the cleanup + Configuration conf = getConf(); + FileSystem fs = FSUtils.getCurrentFileSystem(conf); + + Path archiveDir; + // if not cleaning a specific table, then just cleanup the archive directory + if (cleaner.table == null) { + archiveDir = new Path(FSUtils.getRootDir(conf), + HFileArchiveUtil.getConfiguredArchiveDir(conf)); + } else { + Path tableDir = HTableDescriptor.getTableDir(FSUtils.getRootDir(conf), + Bytes.toBytes(cleaner.table)); + archiveDir = HFileArchiveUtil.getTableArchivePath(conf, tableDir); + } + + LOG.debug("Cleaning up: " + archiveDir); + // iterate through the archive directory and delete everything that falls + // outside the range + Stack directories = new Stack(); + directories.add(archiveDir); + // while there are more directories to look at + while (!directories.isEmpty()) { + Path parent = directories.pop(); + // if the parent exists + if (fs.exists(parent)) { + // get all the children + FileStatus[] children = fs.listStatus(parent); + for (FileStatus child : children) { + // push directory to be cleaned + if (child.isDir()) { + directories.push(child.getPath()); + continue; + } + // otherwise its a file and we can check deletion + // we have to check mod times against what the command line says + long mod = child.getModificationTime(); + if (LOG.isDebugEnabled()) { + LOG.debug("Child file: " + child.getPath() + ", has mod time:" + mod); + } + // check to see if it meets the specified timestamps + if (shouldDelete.apply(mod)) { + try { + if (!fs.delete(child.getPath(), false)) { + LOG.warn("Could not delete:" + child); + } + } catch (IOException e) { + LOG.warn("Failed to delete child", e); + } + } + } + } + } + } + + /** + * Set the configuration to use. + *

+ * Exposed for TESTING! + * @param conf + */ + static void setConfiguration(Configuration conf) { + HFileArchiveCleanup.conf = conf; + } + + private synchronized static Configuration getConf() { + if (HFileArchiveCleanup.conf == null) { + setConfiguration(HBaseConfiguration.create()); + } + return HFileArchiveCleanup.conf; + } + + // --------------------------- + // Setup and options parsing + // --------------------------- + + private final Options opts; + private final Parser parser; + + @SuppressWarnings("static-access") + public HFileArchiveCleanup() { + // create all the necessary options + opts = new Options(); + // create the time option + OptionBuilder + .hasArg() + .withArgName("start time") + .withDescription( + "Start time from which to start removing" + + " backup files in seconds from the epoch (inclusive). " + + "If not set, all archive files are removed from the " + + "specified table up to the end time. Not setting start " + + "OR end time deletes all archived files."); + opts.addOption(OptionBuilder.create('s')); + + // end time option + OptionBuilder + .hasArg() + .withArgName("end time") + .withDescription( + "End time from which to start removing backup files in seconds " + + "from the epoch (exclusive). If not set, all archive files forward " + + "from the start time are removed. Not setting start OR end time deletes " + + "all archived files."); + opts.addOption(OptionBuilder.create('e')); + + // create the tablename option + OptionBuilder + .hasArg() + .withArgName("table") + .withDescription( + "Name of the table for which to remove backups. If not set, all archived " + + "files will be cleaned up.") + .withType(String.class); + opts.addOption(OptionBuilder.create('t')); + + // add the help + opts.addOption(OptionBuilder.hasArg(false).withDescription("Show this help").create('h')); + + // create the parser + parser = new GnuParser(); + } + + public boolean parse(String[] args) throws ParseException { + CommandLine cmd; + cmd = parser.parse(opts, args); + //hack around final and annonymous innner classes + final long[] startend = new long[] { -1, Long.MAX_VALUE }; + if (cmd.hasOption('h')) { + printHelp(); + return false; + } + // get start/end times + if (cmd.hasOption('s')) { + startend[0]= Long.parseLong(cmd.getOptionValue('s')); + LOG.debug("Setting start time to:" + startend[0]); + } + if (cmd.hasOption('e')) { + startend[1]= Long.parseLong(cmd.getOptionValue('e')); + LOG.debug("Setting end time to:" + startend[1]); + } + + if (startend[0] > startend[1]) { + throw new ParseException("Start time cannot exceed end time."); + } + + if (startend[0] == startend[1]) { + //set the same start and end time + shouldDelete = new Function() { + @Override + public Boolean apply(Long input) { + return input.longValue() ==startend[0]; + } + }; + } else { + //set a range to delete + shouldDelete = new Function() { + @Override + public Boolean apply(Long input) { + return startend[0] <= input && input < startend[1]; + } + }; + } + + // get the table to cleanup + if (cmd.hasOption('t')) { + this.table = cmd.getOptionValue('t'); + } + return true; + } + + public void printHelp() { + HelpFormatter help = new HelpFormatter(); + help.printHelp("java HFileArchiveCleanup", opts); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiveMonitor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiveMonitor.java new file mode 100644 index 0000000..3d77c5d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiveMonitor.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup; + + +/** + * Monitor for which tables the HFiles should be archived. + */ +public interface HFileArchiveMonitor { + + /** + * Determine if the given table should or should not allow its hfiles to be + * deleted + * + * @param tableName name of the table to check + * @return true if its store files should be retained, false + * otherwise + */ + public boolean keepHFiles(String tableName); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiveTableMonitor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiveTableMonitor.java new file mode 100644 index 0000000..51ae8ea --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiveTableMonitor.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup; + +import java.util.List; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.util.HFileArchiveUtil; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +/** + * Monitor the actual tables for which HFiles are archived. + *

+ * It is internally synchronized to ensure consistent view of the table state. + *

+ * By default, updates zookeeper that this server has received the update to + * archive the hfiles for the given table. + */ +public class HFileArchiveTableMonitor extends Configured implements HFileArchiveMonitor { + private static final Log LOG = LogFactory.getLog(HFileArchiveTableMonitor.class); + private final Set archivedTables = new TreeSet(); + + protected final ZooKeeperWatcher zkw; + protected final Server parent; + + /** + * Exposed for testing only. Generally, the + * {@link TableHFileArchiveTracker#create(ZooKeeperWatcher, Server)} should be used + * when working with table archive monitors. + * @param parent server for which we should monitor archiving + * @param zkw watcher for the zookeeper cluster for archiving hfiles + */ + protected HFileArchiveTableMonitor(Server parent, ZooKeeperWatcher zkw) { + this.parent = parent; + this.zkw = zkw; + } + + /** + * Set the tables to be archived. Internally adds each table and attempts to + * register it. + *

+ * Note: All previous tables will be removed in favor of these tables. + * @param tables add each of the tables to be archived. + */ + public synchronized void setArchiveTables(List tables) { + archivedTables.clear(); + archivedTables.addAll(tables); + for (String table : tables) { + registerTable(table); + } + } + + /** + * Add the named table to be those being archived. Attempts to register the + * table + * @param table name of the table to be registered + */ + public synchronized void addTable(String table) { + if (this.keepHFiles(table)) { + LOG.debug("Already archiving table: " + table + ", ignoring it"); + return; + } + archivedTables.add(table); + registerTable(table); + } + + /** + * Subclass hook for registering a table with external sources. + *

+ * Currently updates ZK with the fact that the current server has started + * archiving hfiles for the specified table. + * @param table name of the table that is being archived + */ + protected void registerTable(String table) { + LOG.debug("Registering table '" + table + "' as being archived."); + + // notify that we are archiving the table for this server + try { + String tablenode = HFileArchiveUtil.getTableNode(zkw, table); + String serverNode = ZKUtil.joinZNode(tablenode, parent.getServerName().toString()); + ZKUtil.createEphemeralNodeAndWatch(zkw, serverNode, new byte[0]); + } catch (KeeperException e) { + LOG.error("Failing to update that this server(" + parent.getServerName() + + " is joining archive of table:" + table, e); + } + } + + public synchronized void removeTable(String table) { + archivedTables.remove(table); + } + + public synchronized void clearArchive() { + archivedTables.clear(); + } + + @Override + public synchronized boolean keepHFiles(String tableName) { + return archivedTables.contains(tableName); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileDisposer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileDisposer.java new file mode 100644 index 0000000..4f56231 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileDisposer.java @@ -0,0 +1,685 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.HFileArchiveUtil; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; + +/** + * Utility class to handle the removal of HFiles (or the respective + * {@link StoreFile StoreFiles} for a HRegion from the {@link FileSystem}. The + * hfiles will be archived or deleted, depending on the state of the system. + */ +public class HFileDisposer { + private static final Log LOG = LogFactory.getLog(HFileDisposer.class); + private static final String SEPARATOR = "."; + + private HFileDisposer() { + // hidden ctor since this is just a util + } + + /** + * Cleans up all the files for a HRegion, either via archiving (if the + * {@link HFileArchiveMonitor} indicates it should be) or by just removing all + * the files. + * @param fs the file system object + * @param monitor manager for if the region should be archived or deleted + * @param info HRegionInfo for region to be deleted + * @throws IOException + */ + public static void disposeRegion(FileSystem fs, HFileArchiveMonitor monitor, HRegionInfo info) + throws IOException { + Path rootDir = FSUtils.getRootDir(fs.getConf()); + disposeRegion(fs, monitor, rootDir, HTableDescriptor.getTableDir(rootDir, info.getTableName()), + HRegion.getRegionDir(rootDir, info)); + } + + /** + * Cleans up all the files for a HRegion, either via archiving (if the + * {@link HFileArchiveMonitor}, from the {@link RegionServerServices}, + * indicates it should be) or by just removing all the files. + * @param fs FileSystem where the region files reside + * @param rss services to obtain the {@link HFileArchiveMonitor} via + * {@link RegionServerServices#getHFileArchiveMonitor()}. Testing + * notes: if the returned monitor is null, the region files are + * deleted + * @param rootdir root directory of the hbase installation on the + * {@link FileSystem} + * @param tabledir {@link Path} to where the table is being stored (for + * building the archive path) + * @param regiondir {@link Path} to where a region is being stored (for + * building the archive path) + * @throws IOException + */ + public static void disposeRegion(FileSystem fs, RegionServerServices rss, Path rootdir, + Path tabledir, Path regiondir) throws IOException { + HFileArchiveMonitor manager = rss == null ? null : rss.getHFileArchiveMonitor(); + disposeRegion(fs, manager, rootdir, tabledir, regiondir); + } + + /** + * Remove an entire region from the table directory. + *

+ * Either archives the region or outright deletes it, depending on if + * archiving is enabled. + * @param fs {@link FileSystem} from which to remove the region + * @param monitor Monitor for which tables should be archived or deleted + * @param rootdir {@link Path} to the root directory where hbase files are + * stored (for building the archive path) + * @param tabledir {@link Path} to where the table is being stored (for + * building the archive path) + * @param regionDir {@link Path} to where a region is being stored (for + * building the archive path) + * @return true if the region was sucessfully deleted. false + * if the filesystem operations could not complete. + * @throws IOException if the request cannot be completed + */ + public static boolean disposeRegion(FileSystem fs, HFileArchiveMonitor monitor, Path rootdir, + Path tabledir, Path regionDir) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("REMOVING region " + regionDir.toString()); + } + // check to make sure we don't keep files, in which case just delete them + // NOTE: assumption here that tabledir = tablename + if (shouldDeleteFiles(monitor, tabledir)) { + LOG.debug("Deleting hregion directory (" + regionDir + ") - no backup"); + return deleteRegionWithoutArchiving(fs, regionDir); + } + + // otherwise, we archive the files + // make sure the regiondir lives under the tabledir + Preconditions.checkArgument(regionDir.toString().startsWith(tabledir.toString())); + + // get the directory to archive region files + Path regionArchiveDir = HFileArchiveUtil.getRegionArchiveDir(fs.getConf(), tabledir, regionDir); + if (regionArchiveDir == null) { + LOG.warn("No archive directory could be found for the region:" + regionDir + + ", deleting instead"); + deleteRegionWithoutArchiving(fs, regionDir); + // we should have archived, but failed to. Doesn't matter if we deleted + // the archived files correctly or not. + return false; + } + + FileStatusConverter getAsFile = new FileStatusConverter(fs); + // otherwise, we attempt to archive the store files + + // build collection of just the store directories to archive + Collection toArchive = Collections2.transform( + Collections2.filter(Arrays.asList(fs.listStatus(regionDir)), new Predicate() { + @Override + public boolean apply(FileStatus input) { + // filter out paths that are not store directories + if (!input.isDir() || input.getPath().getName().toString().startsWith(".")) return false; + return true; + } + }), getAsFile); + + boolean success = false; + try { + success = resolveAndArchive(fs, regionArchiveDir, toArchive); + } catch (IOException e) { + success = false; + } + + // if that was successful, then we delete the region + if (success) { + return deleteRegionWithoutArchiving(fs, regionDir); + } + + throw new IOException("Received error when attempting to archive files (" + toArchive + + "), cannot delete region directory."); + } + + /** + * Without regard for backup, delete a region. Should be used with caution. + * @param regionDir {@link Path} to the region to be deleted. + * @throws IOException on filesystem operation failure + */ + private static boolean deleteRegionWithoutArchiving(FileSystem fs, Path regionDir) + throws IOException { + if (fs.delete(regionDir, true)) { + LOG.debug("Deleted all region files in: " + regionDir); + return true; + } + LOG.debug("Failed to delete region directory:" + regionDir); + return false; + } + + /** + * Remove the store files, either by archiving them or outright deletion + * @param rss services to obtain the {@link HFileArchiveMonitor} via + * {@link RegionServerServices#getHFileArchiveMonitor()}. Testing + * notes: if the services or the returned monitor is null, the region + * files are deleted + * @param parent Parent region hosting the store files + * @param conf {@link Configuration} to examine to determine the archive + * directory + * @param family the family hosting the store files + * @param compactedFiles files to be disposed of. No further reading of these + * files should be attempted; otherwise likely to cause an + * {@link IOException} + * @throws IOException if the files could not be correctly disposed. + */ + public static void disposeStoreFiles(RegionServerServices rss, HRegion parent, + Configuration conf, byte[] family, Collection compactedFiles) throws IOException { + + // short circuit if we don't have any files to delete + if (compactedFiles.size() == 0) { + LOG.debug("No store files to dispose, done!"); + return; + } + + // if the monitor isn't available or indicates we shouldn't archive, then we + // delete the store files + if (shouldDeleteFiles(rss, parent.getTableDir())) { + deleteStoreFilesWithoutArchiving(compactedFiles); + return; + } + + // build the archive path + FileSystem fs = rss.getFileSystem(); + Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, parent, family); + + // make sure we don't archive if we can't and that the archive dir exists + if (storeArchiveDir == null || !fs.mkdirs(storeArchiveDir)) { + LOG.warn("Could make archive directory (" + storeArchiveDir + ") for store:" + + Bytes.toString(family) + ", deleting compacted files instead."); + deleteStoreFilesWithoutArchiving(compactedFiles); + return; + } + + // otherwise we attempt to archive the store files + LOG.debug("Archiving compacted store files."); + + // wrap the storefile into a File + StoreToFile getStorePath = new StoreToFile(fs); + Collection storeFiles = Collections2.transform(compactedFiles, getStorePath); + + // do the actual archive + if (!resolveAndArchive(fs, storeArchiveDir, storeFiles)) { + throw new IOException("Failed to archive/delete all the files." + + " Something is probably arwy on the filesystem."); + } + } + + /** + * Check to see if we should archive the given files, based on the state of + * the regionserver + * @param rss services to provide the {@link HFileArchiveMonitor}, can be null + * (in which case returns true). + * @param tabledir table directory where the files are being archived. NOTE: + * implicit assumption here that the tabledir = name of the table + * @return true if the files should be deleted false if they + * should be archived + */ + private static boolean shouldDeleteFiles(RegionServerServices rss, Path tabledir) { + if (rss == null) return true; + return shouldDeleteFiles(rss.getHFileArchiveMonitor(), tabledir); + } + + /** + * Check to see if we should archive the given files, based on the state of + * the regionserver + * @param monitor services to indicate if the files should be archived and can + * be null (in which case returns true). + * @param tabledir table directory where the files are being archived. NOTE: + * implicit assumption here that the tabledir = name of the table + * @return true if the files should be deleted (or the monitor is + * null), false if they should be archived + */ + private static boolean shouldDeleteFiles(HFileArchiveMonitor monitor, Path tabledir) { + if (monitor == null) return true; + return !monitor.keepHFiles(tabledir.getName()); + } + + /** + * Just do a simple delete of the given store files + *

+ * A best effort is made to delete each of the files, rather than bailing on + * the first failure. + *

+ * This method is preferable to + * {@link #deleteFilesWithoutArchiving(Collection)} since it consumes less + * resources, but is limited in terms of usefulness + * @param compactedFiles store files to delete from the file system. + * @throws IOException if a file cannot be deleted. All files will be + * attempted to deleted before throwing the exception, rather than + * failing at the first file. + */ + private static void deleteStoreFilesWithoutArchiving(Collection compactedFiles) + throws IOException { + LOG.debug("Deleting store files without archiving."); + boolean failure = false; + for (StoreFile hsf : compactedFiles) { + try { + hsf.deleteReader(); + } catch (IOException e) { + LOG.error("Failed to delete store file:" + hsf.getPath()); + failure = true; + } + } + if (failure) { + throw new IOException("Failed to delete all store files. See log for failures."); + } + } + + /** + * Archive the given files and resolve any conflicts with existing files via + * appending the time archiving started (so all conflicts in the same group + * have the same timestamp appended). + *

+ * Recursively copies over files to archive if any of the files to archive are + * directories. Archive directory structure for children is the base archive + * directory name + the parent directory. + * @param fs {@link FileSystem} on which to archive the files + * @param baseArchiveDir base archive directory to archive the given files + * @param toArchive files to be archived + * @return true on success, false otherwise + * @throws IOException on unexpected failure + */ + private static boolean resolveAndArchive(FileSystem fs, Path baseArchiveDir, + Collection toArchive) throws IOException { + long start = EnvironmentEdgeManager.currentTimeMillis(); + List failures = resolveAndArchive(fs, baseArchiveDir, toArchive, start); + + // clean out the failures by just deleting them + if (failures.size() > 0) { + try { + LOG.error("Failed to complete archive, deleting extra store files."); + deleteFilesWithoutArchiving(failures); + } catch (IOException e) { + LOG.warn("Failed to delete store file(s) when archiving failed", e); + } + return false; + } + return true; + } + + /** + * Resolve any conflict with an existing archive file via timestamp-append + * renaming of the existing file and then archive the passed in files. + * @param fs {@link FileSystem} on which to archive the files + * @param baseArchiveDir base archive directory to store the files. If any of + * the files to archive are directories, will append the name of the + * directory to the base archive directory name, creating a parallel + * structure. + * @param toArchive files/directories that need to be archvied + * @param start time the archiving started - used for resolving archive + * conflicts. + * @return the list of failed to archive files. + * @throws IOException if an unexpected file operation exception occured + */ + private static List resolveAndArchive(FileSystem fs, Path baseArchiveDir, + Collection toArchive, long start) throws IOException { + // short circuit if no files to move + if (toArchive.size() == 0) return Collections.emptyList(); + + LOG.debug("moving files to the archive directory: " + baseArchiveDir); + + // make sure the archive directory exists + if (!fs.exists(baseArchiveDir)) { + if (!fs.mkdirs(baseArchiveDir)) { + throw new IOException("Failed to create the archive directory:" + baseArchiveDir + + ", quitting archive attempt."); + } else if (LOG.isDebugEnabled()) { + LOG.debug("Created archive directory:" + baseArchiveDir); + } + } + + List failures = new ArrayList(); + String startTime = Long.toString(start); + for (File file : toArchive) { + // if its a file archive it + try { + LOG.debug("Archiving:" + file); + if (file.isFile()) { + // attempt to archive the file + if (!resolveAndArchiveFile(baseArchiveDir, file, startTime)) { + LOG.warn("Couldn't archive " + file + " into backup directory: " + baseArchiveDir); + failures.add(file); + } + } else { + // otherwise its a directory and we need to archive all files + LOG.debug(file + " is a directory, archiving children files"); + // so we add the directory name to the one base archive + Path directoryArchiveDir = new Path(baseArchiveDir, file.getName()); + // and then get all the files from that directory and attempt to + // archive those too + Collection children = file.getChildren(); + failures.addAll(resolveAndArchive(fs, directoryArchiveDir, children, start)); + } + } catch (IOException e) { + LOG.warn("Failed to archive file: " + file, e); + failures.add(file); + } + } + return failures; + } + + /** + * Attempt to archive the passed in file to the store archive directory. + *

+ * If the same file already exists in the archive, it is moved to a + * timestamped directory under the archive directory and the new file is put + * in its place. + * @param fs FileSystem on which to move files + * @param storeArchiveDirectory {@link Path} to the directory that stores the + * archives of the hfiles + * @param currentFile {@link Path} to the original HFile that will be archived + * @param archiveStartTime + * @return true if the file is successfully archived. false + * if there was a problem, but the operation still completed. + * @throws IOException on failure to complete {@link FileSystem} operations. + */ + private static boolean resolveAndArchiveFile(Path storeArchiveDirectory, File currentFile, + String archiveStartTime) throws IOException { + // build path as it should be in the archive + String filename = currentFile.getName(); + Path archiveFile = new Path(storeArchiveDirectory, filename); + FileSystem fs = currentFile.getFileSystem(); + // if the file already exists in the archive, move that one to a + // timestamped backup + if (fs.exists(archiveFile)) { + if (LOG.isDebugEnabled()) { + LOG.debug("File:" + archiveFile + + " already exists in archive, moving to timestamped backup and overwriting current."); + } + + Path backedupArchiveFile = new Path(storeArchiveDirectory, filename + SEPARATOR + + archiveStartTime); + + // move the archive file to the stamped backup + if (!fs.rename(archiveFile, backedupArchiveFile)) { + LOG.warn("Could not rename archive file to backup: " + backedupArchiveFile + + ", deleting existing file in favor of newer."); + fs.delete(archiveFile, false); + } else if (LOG.isDebugEnabled()) { + LOG.debug("Backed up archive file from: " + archiveFile); + } + } else if (LOG.isDebugEnabled()) { + LOG.debug("No existing file in archive for:" + archiveFile + + ", free to archive original file."); + } + + // at this point, we should have a free spot for the archive file + if (currentFile.moveAndClose(archiveFile)) { + LOG.error("Failed to archive file:" + currentFile); + return false; + } else if (LOG.isDebugEnabled()) { + LOG.debug("Finished archiving file from: " + currentFile + ", to: " + archiveFile); + } + return true; + } + + /** + * Simple delete of regular files from the {@link FileSystem}. + *

+ * This method is a more generic implementation that the other deleteXXX + * methods in this class, allowing more code reuse at the cost of a couple + * more, short-lived objects (which should have minimum impact on the jvm). + * @param fs {@link FileSystem} where the files live + * @param files {@link Collection} of files to be deleted + * @throws IOException if a file cannot be deleted. All files will be + * attempted to deleted before throwing the exception, rather than + * failing at the first file. + */ + private static void deleteFilesWithoutArchiving(Collection files) throws IOException { + boolean failure = false; + for (File file : files) { + try { + LOG.debug("Deleting region file:" + file); + file.delete(); + } catch (IOException e) { + LOG.error("Failed to delete file:" + file); + failure = true; + } + } + if (failure) { + throw new IOException("Failed to delete all store files. See log for failures."); + } + } + + /** + * Adapt a type to match the {@link File} interface, which is used internally + * for handling archival/removal of files + * @param type to adapt to the {@link File} interface + */ + private static abstract class FileConverter implements Function { + protected final FileSystem fs; + + public FileConverter(FileSystem fs) { + this.fs = fs; + } + } + + /** + * Convert a FileStatus to something we can manage in the archiving + */ + private static class FileStatusConverter extends FileConverter { + public FileStatusConverter(FileSystem fs) { + super(fs); + } + + @Override + public File apply(FileStatus input) { + return new FileablePath(fs, input.getPath()); + } + } + + /** + * Convert the {@link StoreFile} into something we can manage in the archive + * methods + */ + private static class StoreToFile extends FileConverter { + public StoreToFile(FileSystem fs) { + super(fs); + } + + @Override + public File apply(StoreFile input) { + return new FileableStoreFile(fs, input); + } + } + + /** + * Wrapper to handle file operations uniformly + */ + private static abstract class File { + protected final FileSystem fs; + + public File(FileSystem fs) { + this.fs = fs; + } + + /** + * Delete the file + * @throws IOException on failure + */ + abstract void delete() throws IOException; + + /** + * Check to see if this is a file or a directory + * @return true if it is a file, false otherwise + * @throws IOException on {@link FileSystem} connection error + */ + abstract boolean isFile() throws IOException; + + /** + * @return if this is a directory, returns all the children in the + * directory, otherwise returns an empty list + * @throws IOException + */ + abstract Collection getChildren() throws IOException; + + /** + * close any outside readers of the file + * @throws IOException + */ + abstract void close() throws IOException; + + /** + * @return the name of the file (not the full fs path, just the individual + * file name) + */ + abstract String getName(); + + /** + * @return the path to this file + */ + abstract Path getPath(); + + /** + * Move the file to the given destination + * @param dest + * @return true on success + * @throws IOException + */ + public boolean moveAndClose(Path dest) throws IOException { + this.close(); + Path p = this.getPath(); + return !fs.rename(p, dest); + } + + /** + * @return the {@link FileSystem} on which this file resides + */ + public FileSystem getFileSystem() { + return this.fs; + } + + @Override + public String toString() { + return this.getClass() + ", file:" + getPath().toString(); + } + } + + /** + * A {@link File} that wraps a simple {@link Path} on a {@link FileSystem}. + */ + private static class FileablePath extends File { + private final Path file; + private final FileStatusConverter getAsFile; + + public FileablePath(FileSystem fs, Path file) { + super(fs); + this.file = file; + this.getAsFile = new FileStatusConverter(fs); + } + + @Override + public void delete() throws IOException { + fs.delete(file, true); + } + + @Override + public String getName() { + return file.getName(); + } + + @Override + public Collection getChildren() throws IOException { + if (fs.isFile(file)) return Collections.emptyList(); + return Collections2.transform(Arrays.asList(fs.listStatus(file)), getAsFile); + } + + @Override + public boolean isFile() throws IOException { + return fs.isFile(file); + } + + @Override + public void close() throws IOException { + // NOOP - files are implicitly closed on removal + } + + @Override + Path getPath() { + return file; + } + } + + /** + * {@link File} adapter for a {@link StoreFile} living on a {@link FileSystem} + * . + */ + private static class FileableStoreFile extends File { + StoreFile file; + + public FileableStoreFile(FileSystem fs, StoreFile store) { + super(fs); + this.file = store; + } + + @Override + public void delete() throws IOException { + file.deleteReader(); + } + + @Override + public String getName() { + return file.getPath().getName(); + } + + @Override + public boolean isFile() { + return true; + } + + @Override + public Collection getChildren() throws IOException { + // storefiles don't have children + return Collections.emptyList(); + } + + @Override + public void close() throws IOException { + file.closeReader(true); + } + + @Override + Path getPath() { + return file.getPath(); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/TableHFileArchiveTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/TableHFileArchiveTracker.java new file mode 100644 index 0000000..52a2d89 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/TableHFileArchiveTracker.java @@ -0,0 +1,250 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup; + +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +/** + * Track HFile archiving state changes in ZooKeeper. Keeps track of the tables + * whose HFiles should be archived. + *

+ * {@link TableHFileArchiveTracker#start()} needs to be called to start monitoring + * for tables to archive. + */ +public class TableHFileArchiveTracker extends ZooKeeperListener implements HFileArchiveMonitor { + private static final Log LOG = LogFactory.getLog(TableHFileArchiveTracker.class); + private HFileArchiveTableMonitor monitor; + + private TableHFileArchiveTracker(ZooKeeperWatcher watcher, HFileArchiveTableMonitor monitor) { + super(watcher); + watcher.registerListener(this); + this.monitor = monitor; + } + + /** + * Start monitoring for archive updates + * @throws KeeperException on failure to find/create nodes + */ + public void start() throws KeeperException { + // if archiving is enabled, then read in the list of tables to archive + LOG.debug("Starting hfile archive tracker..."); + this.checkEnabledAndUpdate(); + LOG.debug("Finished starting hfile archive tracker!"); + } + + @Override + public void nodeCreated(String path) { + // if it is the archive path + if (!path.startsWith(watcher.archiveHFileZNode)) return; + + LOG.debug("Archive node: " + path + " created"); + // since we are already enabled, just update a single table + String table = path.substring(watcher.archiveHFileZNode.length()); + + // if a new table has been added + if (table.length() != 0) { + try { + addAndReWatchTable(path); + } catch (KeeperException e) { + LOG.warn("Couldn't read zookeeper data for table, not archiving", e); + } + } + // the top level node has come up, so read in all the tables + else { + checkEnabledAndUpdate(); + } + } + + @Override + public void nodeChildrenChanged(String path) { + if (!path.startsWith(watcher.archiveHFileZNode)) return; + + LOG.debug("Archive node: " + path + " children changed."); + // a table was added to the archive + try { + updateWatchedTables(); + } catch (KeeperException e) { + LOG.error("Failed to update tables to archive", e); + } + } + + /** + * Add this table to the tracker and then read a watch on that node. + *

+ * Handles situtation where table is deleted in the time between the update + * and resetting the watch by deleting the table via + * {@link #safeStopTrackingTable(String)} + * @param tableZnode full zookeeper path to the table to be added + * @throws KeeperException if an unexpected zk exception occurs + */ + private void addAndReWatchTable(String tableZnode) throws KeeperException { + getMonitor().addTable(ZKUtil.getNodeName(tableZnode)); + // re-add a watch to the table created + // and check to make sure it wasn't deleted + if (!ZKUtil.watchAndCheckExists(watcher, tableZnode)) { + safeStopTrackingTable(tableZnode); + } + } + + /** + * Stop tracking a table. Ensures that the table doesn't exist, but if it does + * attempts to add the table back via {@link #addAndReWatchTable(String)} (its + * a 'safe' removal) + * @param tableZnode full zookeeper path to the table to be added + * @throws KeeperException if an unexpected zk exception occurs + */ + private void safeStopTrackingTable(String tableZnode) throws KeeperException { + getMonitor().removeTable(ZKUtil.getNodeName(tableZnode)); + // if the table exists, then add and rewatch it + if (ZKUtil.checkExists(watcher, tableZnode) >= 0) { + addAndReWatchTable(tableZnode); + } + } + + @Override + public void nodeDeleted(String path) { + if (!path.startsWith(watcher.archiveHFileZNode)) return; + + LOG.debug("Archive node: " + path + " deleted"); + String table = path.substring(watcher.archiveHFileZNode.length()); + // if we stop archiving all tables + if (table.length() == 0) { + // make sure we have the tracker before deleting the archive + // but if we don't, we don't care about delete + clearTables(); + // watches are one-time events, so we need to renew our subscription to + // the archive node and might as well check to make sure archiving + // didn't come back on at the same time + checkEnabledAndUpdate(); + return; + } + // just stop archiving one table + // note that we don't attempt to add another watch for that table into zk. + // We have no assurances that the table will be archived again (or even + // exists for that matter), so its better not to add unnecessary load to + // zk for watches. If the table is created again, then we will get the + // notification in childrenChanaged. + getMonitor().removeTable(ZKUtil.getNodeName(path)); + } + + /** + * Sets the watch on the top-level archive znode, and then updates the montior + * with the current tables that should be archived (and ensures that those + * nodes are watched as well). + */ + private void checkEnabledAndUpdate() { + try { + if (ZKUtil.watchAndCheckExists(watcher, watcher.archiveHFileZNode)) { + LOG.debug(watcher.archiveHFileZNode + " znode does exist, checking for tables to archive"); + + // update the tables we should backup, to get the most recent state. + // This is safer than also watching for children and then hoping we get + // all the updates as it makes sure we get and watch all the children + updateWatchedTables(); + } else { + LOG.debug("Archiving not currently enabling, waiting"); + } + } catch (KeeperException e) { + LOG.warn("Failed to watch for archiving znode", e); + } + } + + /** + * Read the list of children under the archive znode as table names and then + * sets those tables to the list of tables that we should archive + * @throws KeeperException if there is an unexpected zk exception + */ + private void updateWatchedTables() throws KeeperException { + // get the children and watch for new children + LOG.debug("Updating watches on tables to archive."); + // get the children and add watches for each of the children + List tables = ZKUtil.listChildrenAndWatchThem(watcher, watcher.archiveHFileZNode); + LOG.debug("Starting archive for tables:" + tables); + // if archiving is still enabled + if (tables != null && tables.size() > 0) { + getMonitor().setArchiveTables(tables); + } else { + LOG.debug("No tables to archive."); + // only if we currently have a tracker, then clear the archive + clearTables(); + } + } + + /** + * Remove the currently archived tables. + *

+ * Does some intelligent checking to make sure we don't prematurely create an + * archive tracker. + */ + private void clearTables() { + getMonitor().clearArchive(); + } + + @Override + public boolean keepHFiles(String tableName) { + return getMonitor().keepHFiles(tableName); + } + + /** + * @return the tracker for which tables should be archived. + */ + public final HFileArchiveTableMonitor getMonitor() { + return this.monitor; + } + + /** + * Set the table tracker for the overall archive tracker. + *

+ * Exposed for TESTING! + * @param tracker tracker for which tables should be archived. + */ + public void setTableMonitor(HFileArchiveTableMonitor tracker) { + this.monitor = tracker; + } + + /** + * Create an archive tracker for the passed in server + * @param zkw Parent's zookeeper to monitor/updated + * @param parent the server for which the tracker is created. + * @return ZooKeeper tracker to monitor for this server if this server should + * archive hfiles for a given table + */ + public static TableHFileArchiveTracker create(ZooKeeperWatcher zkw, Server parent) { + return create(zkw, new HFileArchiveTableMonitor(parent, zkw)); + } + + /** + * Create an archive tracker with the special passed in table monitor. Should + * only be used in special cases (eg. testing) + * @param zkw Watcher for the ZooKeeper cluster that we should track + * @param monitor Monitor for which tables need hfile archiving + * @return ZooKeeper tracker to monitor for this server if this server should + * archive hfiles for a given table + */ + public static TableHFileArchiveTracker create(ZooKeeperWatcher zkw, HFileArchiveTableMonitor monitor) { + return new TableHFileArchiveTracker(zkw, monitor); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 90d35bf..ba5befe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -24,8 +24,11 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.net.SocketTimeoutException; import java.util.Arrays; +import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; @@ -1964,6 +1967,239 @@ public class HBaseAdmin implements Abortable, Closeable { } /** + * @return A new {@link HFileArchiveManager} to manage which tables' hfiles + * should be archived rather than deleted. + * @throws ZooKeeperConnectionException + * @throws IOException + */ + private synchronized HFileArchiveManager createHFileArchiveManager() + throws ZooKeeperConnectionException, IOException { + return new HFileArchiveManager(this.getConnection(), this.conf); + } + + /** + * Turn on backups for all HFiles for the given table. + *

+ * All deleted hfiles are moved to the + * {@value HConstants#HFILE_ARCHIVE_DIRECTORY} directory under the table + * directory, rather than being deleted. + *

+ * If backups are already enabled for this table, does nothing. + *

+ * Synchronous operation. + *

+ * Assumes that offline/dead regionservers will get the update on start. + *

+ * WARNING: No guarantees are made if multiple clients simultaneously + * attempt to disable/enable hfile backup on the same table. + * @param table name of the table to start backing up + * @throws IOException + */ + public void enableHFileBackup(String table) throws IOException { + enableHFileBackup(Bytes.toBytes(table)); + } + + + /** + * Enable HFile backups synchronously. Ensures that all regionservers hosting + * the table have recived the notification to start archiving hfiles. + *

+ * Synchronous operation. + *

+ * Assumes that offline/dead regionservers will get the update on start. + *

+ * WARNING: No guarantees are made if multiple clients simultaneously + * attempt to disable/enable hfile backup on the same table. + * @param tableName name of the table on which to start backing up hfiles + * @throws IOException if the backup cannot be enabled + */ + public void enableHFileBackup(final byte[] tableName) throws IOException { + // this is a little inefficient since we do a read of meta to see if it + // exists, but is a lot cleaner than catching a NPE below when looking for + // online regions and the table doesn't exist. + if (!this.tableExists(tableName)) { + throw new IOException("Table: " + Bytes.toString(tableName) + + " does not exist, cannot create a backup for a non-existant table."); + } + // do the asynchronous update + enableHFileBackupAsync(tableName); + + // and then wait for it to propagate + // while all the regions have yet to receive zk update and we are not done + // retrying and backing off + CatalogTracker ct = getCatalogTracker(); + HFileArchiveManager manager = createHFileArchiveManager(); + int tries = 0; + try { + // just doing the normal amount of retries, as opposed to the multiplier + // since we are creating a new connection every time, rather than worrying + // about connection timeouts, unavailability. If ZK does go down, then we + // are pretty hosed in terms of backup and want to bail quickly. + while (!allServersArchivingTable(manager, ct, tableName) && tries < this.numRetries) { + // Sleep while we wait for the RS to get updated + try { + if (LOG.isDebugEnabled()) { + LOG.debug("try:" + tries + "/" + this.numRetries + + ", Not all regionservers for table '" + Bytes.toString(tableName) + + "' have joined the backup. Waiting..."); + } + Thread.sleep(getPauseTime(tries++)); + } catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted when backing up table " + + Bytes.toString(tableName)); + } + } + + LOG.debug("Done waiting for table: " + Bytes.toString(tableName) + " to join archive."); + // if we couldn't get all regions we expect, bail out + if (tries >= this.numRetries) { + // disable backups + manager.disableHFileBackup(tableName); + throw new IOException("Failed to get all regions to join backup in " + tries + + " tries, for table:" + Bytes.toString(tableName)); + } + } finally { + cleanupCatalogTracker(ct); + manager.stop(); + } + } + + /** + * Turn on backups for all HFiles for the given table. + *

+ * All deleted hfiles are moved to the archive directory under the table + * directory, rather than being deleted. + *

+ * If backups are already enabled for this table, does nothing. + * @param table name of the table to start backing up + * @throws IOException if an unexpected exception occurs + */ + public void enableHFileBackupAsync(final byte[] table) + throws IOException { + createHFileArchiveManager().enableHFileBackup(table).stop(); + } + + /** + * Disable hfile backups for the given table. + *

+ * Previously backed up files are still retained (if present). + *

+ * Asynchronous operation - some extra HFiles may be retained, in the archive + * directory after disable is called, dependent on the latency in zookeeper to + * the servers. + * @param table name of the table stop backing up + * @throws IOException if an unexpected exception occurs + */ + public void disableHFileBackup(String table) throws IOException { + disableHFileBackup(Bytes.toBytes(table)); + } + + /** + * Disable hfile backups for the given table. + *

+ * Previously backed up files are still retained (if present). + *

+ * Asynchronous operation - some extra HFiles may be retained, in the archive + * directory after disable is called, dependent on the latency in zookeeper to + * the servers. + * @param table name of the table stop backing up + * @throws IOException if an unexpected exception occurs + */ + public void disableHFileBackup(final byte[] table) throws IOException { + createHFileArchiveManager().disableHFileBackup(table).stop(); + } + + /** + * Disable hfile backups for all tables. + *

+ * Previously backed up files are still retained (if present). + *

+ * Asynchronous operation - some extra HFiles may be retained, in the archive + * directory after disable is called, dependent on the latency in zookeeper to + * the servers. + * @throws IOException if an unexpected exception occurs + */ + public void disableHFileBackup() throws IOException { + createHFileArchiveManager().disableHFileBackup().stop(); + } + + /** + * Determine if archiving is enabled (but not necessarily fully propagated) + * for a table + * @param table name of the table to check + * @return true if it is, false otherwise + * @throws IOException if a connection to ZooKeeper cannot be established + */ + public boolean getArchivingEnabled(byte[] table) throws IOException { + HFileArchiveManager manager = createHFileArchiveManager(); + try { + return manager.isArchivingEnabled(table); + } catch (IOException e) { + return false; + } finally { + manager.stop(); + } + } + + /** + * Determine if archiving is enabled (but not necessarily fully propagated) + * for a table + * @param table name of the table to check + * @return true if it is, false otherwise + * @throws IOException if a connection to ZooKeeper cannot be established + */ + public boolean getArchivingEnabled(String table) throws IOException { + return getArchivingEnabled(Bytes.toBytes(table)); + } + + private boolean allServersArchivingTable(HFileArchiveManager manager, CatalogTracker ct, + byte[] tableName) throws IOException { + + // then get the list of RS that have confirmed archiving table + List serverNames = manager.serversArchiving(tableName); + // add the master as a server to check for + + Collections.sort(serverNames); + + if (LOG.isDebugEnabled()) { + LOG.debug("Expecting archiving from at least servers:" + serverNames); + } + + // get the regions and their servers associated with the table + List> regionAndLocations; + try { + regionAndLocations = MetaReader.getTableRegionsAndLocations(ct, Bytes.toString(tableName)); + } catch (InterruptedException e) { + throw new InterruptedIOException( + "Interrupted when getting expected regions and servers to backup table: " + + Bytes.toString(tableName)); + } + // now get the RS that should be archiving the table + Set expected = new TreeSet(); + for (Pair rl : regionAndLocations) { + // if the region is assigned + if (rl.getSecond() != null) { + expected.add(rl.getSecond().toString()); + } + } + // and add the master server as an expected server too, since that has the + // catalog janitor + expected.add(this.getClusterStatus().getMaster().toString()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Expecting archiving from at least servers:" + serverNames); + } + + // now compare the list of expected vs. those currently checked in + // now build list of the current RS + for (String expectedServer : expected) { + // if the expected RS is not in the list, then they haven't all joined + if (Collections.binarySearch(serverNames, expectedServer) < 0) return false; + } + return true; + } + + /** * @see {@link #execute} */ private abstract static class MasterCallable implements Callable{ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HFileArchiveManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HFileArchiveManager.java new file mode 100644 index 0000000..6966221 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HFileArchiveManager.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.util.HFileArchiveUtil.getTableNode; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.HFileArchiveUtil; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +/** + * Client-side manager for which table(s)' HFiles to archive + */ +class HFileArchiveManager { + + private static final Log LOG = LogFactory.getLog(HFileArchiveManager.class); + private final ZooKeeperWatcher zooKeeper; + private volatile boolean stopped = false; + + public HFileArchiveManager(HConnection connection, Configuration conf) + throws ZooKeeperConnectionException, IOException { + this(new ZooKeeperWatcher(conf, "hfileArchiveManger-on-" + connection.toString(), connection)); + } + + public HFileArchiveManager(ZooKeeperWatcher watcher) { + this.zooKeeper = watcher; + } + + /** + * Turn on auto-backups of HFiles on the specified table. + *

+ * When HFiles would be deleted, they are instead interned to the backup + * directory specified or HConstants#DEFAULT_HFILE_ARCHIVE_DIRECTORY + * @param table name of the table to enable backups on + * @return this for chaining. + * @throws IOException + */ + public HFileArchiveManager enableHFileBackup(byte[] table) + throws IOException { + try { + enable(this.zooKeeper, table); + return this; + } catch (KeeperException e) { + throw new IOException(e); + } + } + + /** + * Table backups on HFiles for the given table + * @param table name of the table to disable backups on + * @return this for chaining. + * @throws IOException + */ + public HFileArchiveManager disableHFileBackup(byte[] table) throws IOException { + try { + disable(this.zooKeeper, table); + return this; + } catch (KeeperException e) { + throw new IOException(e); + } + } + + /** + * Disable backups on all tables in the cluster + * @return this for chaining. + * @throws IOException if the number of attempts is exceeded + */ + public HFileArchiveManager disableHFileBackup() throws IOException { + LOG.debug("Disabling backups on all tables."); + try { + ZKUtil.deleteNodeRecursively(this.zooKeeper, this.zooKeeper.archiveHFileZNode); + return this; + } catch (KeeperException e) { + throw new IOException("Unexpected ZK exception!", e); + } + } + + /** + * Get the current list of all the regionservers that are currently involved + * in archiving the table + * @param table name of table under which to check for regions that are + * archiving. + * @return the currently online regions that are archiving the table + * @throws IOException if an unexpected connection issues occurs + */ + @SuppressWarnings("unchecked") + public List serversArchiving(byte[] table) throws IOException { + try { + // build the table znode + String tableNode = getTableNode(zooKeeper, table); + List regions = ZKUtil.listChildrenNoWatch(zooKeeper, tableNode); + return (List) (regions == null ? Collections.emptyList() : regions); + } catch (KeeperException e) { + throw new IOException(e); + } + } + + /** + * Best effort enable of table backups. If a region serving a table is + * offline, it will be notified on startup. + *

+ * No attempt is made to make sure that backups are successfully created - it + * is inherently an asynchronous operation. + * @param zooKeeper watcher connection to zk cluster + * @param table table name on which to enable archiving + * @throws KeeperException + */ + private void enable(ZooKeeperWatcher zooKeeper, byte[] table) + throws KeeperException { + LOG.debug("Ensuring archiving znode exists"); + ZKUtil.createAndFailSilent(zooKeeper, zooKeeper.archiveHFileZNode); + + // then add the table to the list of znodes to archive + String tableNode = getTableNode(zooKeeper, table); + LOG.debug("Creating: " + tableNode + ", data: []"); + ZKUtil.createSetData(zooKeeper, tableNode, new byte[0]); + } + + /** + * Disable all archiving of files for a given table + *

+ * Note: Asynchronous + * @param zooKeeper watcher for the ZK cluster + * @param table name of the table to disable + * @throws KeeperException if an unexpected ZK connection issues occurs + */ + private void disable(ZooKeeperWatcher zooKeeper, byte[] table) throws KeeperException { + // ensure the latest state of the archive node is found + zooKeeper.sync(zooKeeper.archiveHFileZNode); + + // if the top-level archive node is gone, the we are done + if (ZKUtil.checkExists(zooKeeper, zooKeeper.archiveHFileZNode) < 0) { + return; + } + // delete the table node, from the archive - will be noticed by + // regionservers + String tableNode = getTableNode(zooKeeper, table); + // make sure the table is the latest version so the delete takes + zooKeeper.sync(tableNode); + + LOG.debug("Attempting to delete table node:" + tableNode); + ZKUtil.deleteNodeRecursively(zooKeeper, tableNode); + } + + public void stop() { + if (!this.stopped) { + this.stopped = true; + LOG.debug("Stopping HFileArchiveManager..."); + this.zooKeeper.close(); + } + } + + /** + * Check to see if the table is currently marked for archiving + * @param table name of the table to check + * @return true if the archive znode for that table exists, + * false if not + * @throws IOException if an unexpected zookeeper error occurs + */ + public boolean isArchivingEnabled(byte[] table) throws IOException { + String tableNode = HFileArchiveUtil.getTableNode(zooKeeper, table); + try { + return ZKUtil.checkExists(zooKeeper, tableNode) >= 0; + } catch (KeeperException e) { + throw new IOException("Failed to get the table znode:" + Bytes.toString(table), e); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/Reference.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/Reference.java index 5804a84..e80b579 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/Reference.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/Reference.java @@ -65,7 +65,7 @@ public class Reference implements Writable { * For split HStoreFiles, it specifies if the file covers the lower half or * the upper half of the key range */ - static enum Range { + public static enum Range { /** HStoreFile contains upper half of key range */ top, /** HStoreFile contains lower half of key range */ @@ -93,7 +93,7 @@ public class Reference implements Writable { * @param splitRow This is row we are splitting around. * @param fr */ - Reference(final byte [] splitRow, final Range fr) { + public Reference(final byte[] splitRow, final Range fr) { this.splitkey = splitRow == null? null: KeyValue.createFirstOnRow(splitRow).getKey(); this.region = fr; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java index 1492548..09f10a8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java @@ -40,11 +40,11 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.Server; -import org.apache.hadoop.hbase.TableExistsException; +import org.apache.hadoop.hbase.backup.HFileArchiveMonitor; +import org.apache.hadoop.hbase.backup.HFileDisposer; import org.apache.hadoop.hbase.catalog.MetaEditor; import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.Bytes; @@ -52,7 +52,6 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Writables; - /** * A janitor for the catalog tables. Scans the .META. catalog * table on a period looking for unused regions to garbage collect. @@ -62,14 +61,16 @@ class CatalogJanitor extends Chore { private static final Log LOG = LogFactory.getLog(CatalogJanitor.class.getName()); private final Server server; private final MasterServices services; + private final HFileArchiveMonitor hfileArchiveMontior; private boolean enabled = true; - CatalogJanitor(final Server server, final MasterServices services) { + CatalogJanitor(final Server server, final MasterServices services, HFileArchiveMonitor monitor) { super(server.getServerName() + "-CatalogJanitor", server.getConfiguration().getInt("hbase.catalogjanitor.interval", 300000), server); this.server = server; this.services = services; + this.hfileArchiveMontior = monitor; } @Override @@ -236,7 +237,7 @@ class CatalogJanitor extends Chore { if (hasNoReferences(a) && hasNoReferences(b)) { LOG.debug("Deleting region " + parent.getRegionNameAsString() + " because daughter splits no longer hold references"); - // wipe out daughter references from parent region + // wipe out daughter references from parent region in meta removeDaughtersFromParent(parent); // This latter regionOffline should not be necessary but is done for now @@ -247,8 +248,7 @@ class CatalogJanitor extends Chore { this.services.getAssignmentManager().regionOffline(parent); } FileSystem fs = this.services.getMasterFileSystem().getFileSystem(); - Path rootdir = this.services.getMasterFileSystem().getRootDir(); - HRegion.deleteRegion(fs, rootdir, parent); + HFileDisposer.disposeRegion(fs, hfileArchiveMontior, parent); MetaEditor.deleteRegion(this.server.getCatalogTracker(), parent); result = true; } @@ -347,4 +347,12 @@ class CatalogJanitor extends Chore { throws FileNotFoundException, IOException { return this.services.getTableDescriptors().get(Bytes.toString(tableName)); } + + /** + * Exposed for TESTING + * @return the current monitor for the files being managed + */ + HFileArchiveMonitor getHfileArchiveMonitor() { + return hfileArchiveMontior; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index fba2e4e..22e48b1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.UnknownRegionException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.backup.TableHFileArchiveTracker; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.client.HConnectionManager; @@ -283,6 +284,8 @@ Server { */ private ObjectName mxBean = null; + private TableHFileArchiveTracker tableHfileArchiveTracker; + /** * Initializes the HMaster. The steps are as follows: *

@@ -498,6 +501,9 @@ Server { boolean wasUp = this.clusterStatusTracker.isClusterUp(); if (!wasUp) this.clusterStatusTracker.setClusterUp(); + this.tableHfileArchiveTracker = TableHFileArchiveTracker.create(zooKeeper, this); + this.tableHfileArchiveTracker.start(); + LOG.info("Server active/primary master; " + this.serverName + ", sessionid=0x" + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) + @@ -660,7 +666,7 @@ Server { // been assigned. status.setStatus("Starting balancer and catalog janitor"); this.balancerChore = getAndStartBalancerChore(this); - this.catalogJanitorChore = new CatalogJanitor(this, this); + this.catalogJanitorChore = new CatalogJanitor(this, this, this.tableHfileArchiveTracker); startCatalogJanitorChore(); registerMBean(); @@ -2101,6 +2107,22 @@ Server { } /** + * Exposed for TESTING only! + * @return the internal tracker for archiving hfiles + */ + TableHFileArchiveTracker getHFileArchiveMonitor() { + return this.tableHfileArchiveTracker; + } + + /** + * Exposed for TESTING! + * @return the current catalog janitor + */ + CatalogJanitor getCatalogJanitor() { + return this.catalogJanitorChore; + } + + /** * Special method, only used by hbck. */ @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index f322111..f00f714 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -84,6 +84,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.UnknownScannerException; +import org.apache.hadoop.hbase.backup.HFileDisposer; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; @@ -938,16 +939,7 @@ public class HRegion implements HeapSize { // , Writable{ writestate.writesEnabled = false; wasFlushing = writestate.flushing; LOG.debug("Closing " + this + ": disabling compactions & flushes"); - while (writestate.compacting > 0 || writestate.flushing) { - LOG.debug("waiting for " + writestate.compacting + " compactions" + - (writestate.flushing ? " & cache flush" : "") + - " to complete for region " + this); - try { - writestate.wait(); - } catch (InterruptedException iex) { - // continue - } - } + waitForFlushesAndCompactions(); } // If we were not just flushing, is it worth doing a preflush...one // that will clear out of the bulk of the memstore before we put up @@ -1022,6 +1014,25 @@ public class HRegion implements HeapSize { // , Writable{ } } + /** + * Wait for all current flushes and compactions of the region to complete. + *

+ * VISBILE FOR TESTING + */ + void waitForFlushesAndCompactions() { + synchronized (writestate) { + while (writestate.compacting > 0 || writestate.flushing) { + LOG.debug("waiting for " + writestate.compacting + " compactions" + + (writestate.flushing ? " & cache flush" : "") + " to complete for region " + this); + try { + writestate.wait(); + } catch (InterruptedException iex) { + // continue + } + } + } + } + protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool( final String threadNamePrefix) { int numStores = Math.max(1, this.htableDescriptor.getFamilies().size()); @@ -4109,8 +4120,15 @@ public class HRegion implements HeapSize { // , Writable{ LOG.debug("Files for new region"); listPaths(fs, dstRegion.getRegionDir()); } - deleteRegion(fs, a.getRegionDir()); - deleteRegion(fs, b.getRegionDir()); + + // delete out the 'A' region + HFileDisposer.disposeRegion(fs, a.getRegionServerServices(), + FSUtils.getRootDir(a.getBaseConf()), + a.getTableDir(), a.getRegionDir()); + // delete out the 'B' region + HFileDisposer.disposeRegion(fs, b.getRegionServerServices(), + FSUtils.getRootDir(b.getBaseConf()), + b.getTableDir(), b.getRegionDir()); LOG.info("merge completed. New region is " + dstRegion); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index a548e4e..2a2cb60 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -80,6 +80,8 @@ import org.apache.hadoop.hbase.UnknownRowLockException; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.YouAreDeadException; import org.apache.hadoop.hbase.ZNodeClearer; +import org.apache.hadoop.hbase.backup.HFileArchiveMonitor; +import org.apache.hadoop.hbase.backup.TableHFileArchiveTracker; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.catalog.MetaEditor; import org.apache.hadoop.hbase.catalog.MetaReader; @@ -422,6 +424,8 @@ public class HRegionServer implements ClientProtocol, */ private MovedRegionsCleaner movedRegionsCleaner; + /** Store file archiving management */ + TableHFileArchiveTracker hfileArchiveTracker; /** * Starts a HRegionServer at the default location @@ -643,8 +647,9 @@ public class HRegionServer implements ClientProtocol, * Finally put up a catalog tracker. * @throws IOException * @throws InterruptedException + * @throws KeeperException */ - private void initializeZooKeeper() throws IOException, InterruptedException { + private void initializeZooKeeper() throws IOException, InterruptedException, KeeperException { // Open connection to zookeeper and set primary watcher this.zooKeeper = new ZooKeeperWatcher(conf, REGIONSERVER + ":" + this.isa.getPort(), this); @@ -666,6 +671,9 @@ public class HRegionServer implements ClientProtocol, this.catalogTracker = new CatalogTracker(this.zooKeeper, this.conf, this, this.conf.getInt("hbase.regionserver.catalog.timeout", Integer.MAX_VALUE)); catalogTracker.start(); + + this.hfileArchiveTracker = TableHFileArchiveTracker.create(zooKeeper, this); + this.hfileArchiveTracker.start(); } /** @@ -3806,6 +3814,11 @@ public class HRegionServer implements ClientProtocol, } + @Override + public HFileArchiveMonitor getHFileArchiveMonitor() { + return this.hfileArchiveTracker; + } + // This map will containsall the regions that we closed for a move. // We add the time it was moved as we don't want to keep too old information protected Map> movedRegions = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java index 6884d53..03df8bf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java @@ -24,6 +24,7 @@ import java.util.Map; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.backup.HFileArchiveMonitor; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.regionserver.wal.HLog; @@ -86,4 +87,10 @@ public interface RegionServerServices extends OnlineRegions { * @return Return the FileSystem object used by the regionserver */ public FileSystem getFileSystem(); + + /** + * @return the manager that keeps track of which tables should be archived and + * where they should be archived + */ + public HFileArchiveMonitor getHFileArchiveMonitor(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 632164e..7e7d66d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.KVComparator; import org.apache.hadoop.hbase.RemoteExceptionHandler; +import org.apache.hadoop.hbase.backup.HFileDisposer; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.hfile.CacheConfig; @@ -130,7 +131,7 @@ public class Store extends SchemaConfigured implements HeapSize { private final int blockingStoreFileCount; private volatile long storeSize = 0L; private volatile long totalUncompressedBytes = 0L; - private final Object flushLock = new Object(); + final Object flushLock = new Object(); final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final boolean verifyBulkLoads; @@ -1021,6 +1022,7 @@ public class Store extends SchemaConfigured implements HeapSize { this.compactor.compact(this, filesToCompact, cr.isMajor(), maxId); // Move the compaction into place. if (this.conf.getBoolean("hbase.hstore.compaction.complete", true)) { + LOG.debug("Completing compaction by moving files into place."); sf = completeCompaction(filesToCompact, writer); if (region.getCoprocessorHost() != null) { region.getCoprocessorHost().postCompact(this, sf); @@ -1610,10 +1612,13 @@ public class Store extends SchemaConfigured implements HeapSize { // Tell observers that list of StoreFiles has changed. notifyChangedReadersObservers(); - // Finally, delete old store files. - for (StoreFile hsf: compactedFiles) { - hsf.deleteReader(); - } + + // let the archive util decide if we should archive or delete the files + LOG.debug("Removing store files after compaction..."); + HFileDisposer.disposeStoreFiles(this.region.getRegionServerServices(), this.region, + this.conf, this.family.getName(), compactedFiles); + + } catch (IOException e) { e = RemoteExceptionHandler.checkIOException(e); LOG.error("Failed replacing compacted files in " + this + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java new file mode 100644 index 0000000..c9c4ebe --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; + +/** + * Helper class for all utilities related to archival/retrieval of HFiles + */ +public class HFileArchiveUtil { + public static final String DEFAULT_HFILE_ARCHIVE_DIRECTORY = ".archive"; + + private HFileArchiveUtil() { + // non-external instantiation - util class + } + + /** + * Get the directory to archive a store directory + * @param conf {@link Configuration} to read for the archive directory name + * @param region parent region information under which the store currently + * lives + * @param family name of the family in the store + * @return {@link Path} to the directory to archive the given store or + * null if it should not be archived + */ + public static Path getStoreArchivePath(Configuration conf, HRegion region, byte [] family){ + return getStoreArchivePath(conf, region.getRegionInfo(), region.getTableDir(), family); + } + + /** + * Get the directory to archive a store directory + * @param conf {@link Configuration} to read for the archive directory name + * @param region parent region information under which the store currently + * lives + * @param tabledir directory for the table under which the store currently + * lives + * @param family name of the family in the store + * @return {@link Path} to the directory to archive the given store or + * null if it should not be archived + */ + public static Path getStoreArchivePath(Configuration conf, HRegionInfo region, Path tabledir, + byte[] family) { + Path tableArchiveDir = getTableArchivePath(conf, tabledir); + return Store.getStoreHomedir(tableArchiveDir, + HRegionInfo.encodeRegionName(region.getRegionName()), family); + } + + /** + * Get the archive directory for a given region under the specified table + * @param conf {@link Configuration} to read the archive directory from + * @param tabledir the original table directory + * @param regiondir the path to the region directory + * @return {@link Path} to the directory to archive the given region, or + * null if it should not be archived + */ + public static Path getRegionArchiveDir(Configuration conf, Path tabledir, Path regiondir) { + // get the archive directory for a table + Path archiveDir = getTableArchivePath(conf, tabledir); + // if the monitor isn't archiving that table, then we don't specify an + // archive directory + if (archiveDir == null) return null; + + // then add on the region path under the archive + String encodedRegionName = regiondir.getName(); + return HRegion.getRegionDir(archiveDir, encodedRegionName); + } + + /** + * Get the path to the table archive directory based on the configured archive + * directory. + *

+ * Assumed that the table should already be archived. + * @param conf {@link Configuration} to read the archive property from + * @param tabledir directory of the table to be archived. + * @return {@link Path} to the archive directory for the table + */ + public static Path getTableArchivePath(Configuration conf, Path tabledir) { + return getTableArchivePath(getConfiguredArchiveDir(conf), tabledir); + } + + private static Path getTableArchivePath(String archivedir, Path tabledir) { + Path root = tabledir.getParent(); + // now build the archive directory path + // first the top-level archive directory + // generally "/hbase/.archive/[table] + return new Path(new Path(root, archivedir), tabledir.getName()); + } + + /** + * Get the archive directory as per the configuration + * @param conf {@link Configuration} to read the archive directory from (can + * be null, in which case you get the default value). + * @return the configured archived directory or the default specified by + * {@value HFileArchiveUtil#DEFAULT_HFILE_ARCHIVE_DIRECTORY} + */ + public static String getConfiguredArchiveDir(Configuration conf) { + return conf == null ? HFileArchiveUtil.DEFAULT_HFILE_ARCHIVE_DIRECTORY : conf.get( + HConstants.HFILE_ARCHIVE_DIRECTORY, HFileArchiveUtil.DEFAULT_HFILE_ARCHIVE_DIRECTORY); + } + + /** + * Get the zookeeper node associated with archiving the given table + * @param zkw watcher for the zk cluster + * @param table name of the table to check + * @return znode for the table's archive status + */ + public static String getTableNode(ZooKeeperWatcher zkw, byte[] table) { + return ZKUtil.joinZNode(zkw.archiveHFileZNode, Bytes.toString(table)); + } + + /** + * Get the zookeeper node associated with archiving the given table + * @param zkw watcher for the zk cluster + * @param table name of the table to check + * @return znode for the table's archive status + */ + public static String getTableNode(ZooKeeperWatcher zkw, String table) { + return ZKUtil.joinZNode(zkw.archiveHFileZNode, table); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java index 33bc1d0..17383c3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java @@ -102,6 +102,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { public String clusterIdZNode; // znode used for log splitting work assignment public String splitLogZNode; + // znode to track archiving hfiles + public String archiveHFileZNode; // Certain ZooKeeper nodes need to be world-readable public static final ArrayList CREATOR_ALL_AND_WORLD_READABLE = @@ -212,6 +214,9 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { conf.get("zookeeper.znode.clusterId", "hbaseid")); splitLogZNode = ZKUtil.joinZNode(baseZNode, conf.get("zookeeper.znode.splitlog", HConstants.SPLIT_LOGDIR_NAME)); + archiveHFileZNode = ZKUtil + .joinZNode(baseZNode, conf.get("zookeeper.znode.hfile.archive", + HConstants.HFILE_ARCHIVE_ZNODE_PARENT)); } /** diff --git a/hbase-server/src/main/resources/hbase-default.xml b/hbase-server/src/main/resources/hbase-default.xml index 370b06f..325d21b 100644 --- a/hbase-server/src/main/resources/hbase-default.xml +++ b/hbase-server/src/main/resources/hbase-default.xml @@ -854,7 +854,6 @@ files when hbase.data.umask.enable is true - hbase.metrics.showTableName true @@ -864,7 +863,6 @@ In both cases, the aggregated metric M across tables and cfs will be reported. - hbase.metrics.exposeOperationTimes true @@ -873,5 +871,12 @@ have their times exposed through Hadoop metrics per CF and per region. - + + hbase.table.archive.directory + .archive + Per-table directory name under which to backup files for a + table. Files are moved to the same directories as they would be under the + table directory, but instead are just one level lower (under + table/.archive/... rather than table/...). Currently only applies to HFiles. + diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index be932d7..24ff7e3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -950,7 +950,10 @@ public class HBaseTestingUtility { * @param tableName existing table */ public void deleteTable(byte[] tableName) throws IOException { - getHBaseAdmin().disableTable(tableName); + try { + getHBaseAdmin().disableTable(tableName); + } catch (TableNotEnabledException e) { + } getHBaseAdmin().deleteTable(tableName); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/SimpleHFileArchiveTableMonitor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/SimpleHFileArchiveTableMonitor.java new file mode 100644 index 0000000..7ff2652 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/SimpleHFileArchiveTableMonitor.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup; + +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.backup.HFileArchiveTableMonitor; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; + +/** + * Simple monitor that does not do anything on calls to + * {@link #registerTable(String)}. + *

+ * Should be used for testing only + */ +public class SimpleHFileArchiveTableMonitor extends HFileArchiveTableMonitor { + + public SimpleHFileArchiveTableMonitor(Server parent, ZooKeeperWatcher zkw) { + super(parent, zkw); + } + + @Override + public void registerTable(String table) { + // NOOP - for testing + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchivingCleanup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchivingCleanup.java new file mode 100644 index 0000000..5f96313 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchivingCleanup.java @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.backup.HFileArchiveCleanup; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.HFileArchiveTestingUtil; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestHFileArchivingCleanup { + + private static final Log LOG = LogFactory.getLog(TestHFileArchivingCleanup.class); + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private static final String STRING_TABLE_NAME = "test"; + private static final byte[] TABLE_NAME = Bytes.toBytes(STRING_TABLE_NAME); + private static final byte[] TEST_FAM = Bytes.toBytes("fam"); + + /** + * Setup the config for the cluster + */ + @BeforeClass + public static void setupCluster() throws Exception { + setupConf(UTIL.getConfiguration()); + UTIL.startMiniCluster(); + } + + private static void setupConf(Configuration conf) { + // disable the ui + conf.setInt("hbase.regionsever.info.port", -1); + } + + @Before + public void setup() throws Exception { + UTIL.createTable(TABLE_NAME, TEST_FAM); + } + + @After + public void tearDown() throws Exception { + UTIL.deleteTable(TABLE_NAME); + // and cleanup the archive directory + try { + UTIL.getTestFileSystem().delete(new Path(UTIL.getDefaultRootDirPath(), ".archive"), true); + } catch (IOException e) { + LOG.warn("Failure to delete archive directory", e); + } + // make sure that backups are off for all tables + UTIL.getHBaseAdmin().disableHFileBackup(); + } + + @AfterClass + public static void cleanupTest() throws Exception { + try { + UTIL.shutdownMiniCluster(); + } catch (Exception e) { + // NOOP; + } + } + + /** + * Test that we cleanly remove archives using the utility + * @throws Exception + */ + @Test + public void testMasterDeletesFailedArchive() throws Exception { + final HBaseAdmin admin = UTIL.getHBaseAdmin(); + + // get the current store files for the region + List servingRegions = UTIL.getHBaseCluster().getRegions(TABLE_NAME); + // make sure we only have 1 region serving this table + assertEquals(1, servingRegions.size()); + final HRegion region = servingRegions.get(0); + + // turn on archiving + admin.enableHFileBackup(TABLE_NAME); + LOG.debug("----Starting test of cleanup"); + // so lets put some files in the archive that are newer than the start + FileSystem fs = UTIL.getTestFileSystem(); + Path archiveDir = HFileArchiveTestingUtil.getTableArchivePath(UTIL.getConfiguration(), region); + // write a tmp file to the archive dir + Path tmpFile = new Path(archiveDir, "toDelete"); + FSDataOutputStream out = fs.create(tmpFile); + out.write(1); + out.close(); + LOG.debug("Created toDelete"); + + // now run the cleanup util + HFileArchiveCleanup.setConfiguration(UTIL.getConfiguration()); + HFileArchiveCleanup.main(new String[0]); + // make sure the fake archived file has been cleaned up + assertFalse(fs.exists(tmpFile)); + + // now do the same create again, but make sure it still exists since we have + // an earlier end time + + // write a tmp file to the archive dir + out = fs.create(tmpFile); + out.write(1); + out.close(); + long end = fs.getFileStatus(tmpFile).getModificationTime() - 1; + LOG.debug("re-created toDelete"); + HFileArchiveCleanup.main(new String[] { "-e", Long.toString(end) }); + assertTrue(fs.exists(tmpFile)); + + LOG.debug("Still not deleting the file"); + // now bump the start time to match the end time - still should be there + HFileArchiveCleanup.main(new String[] { "-s", Long.toString(end), "-e", Long.toString(end) }); + assertTrue(fs.exists(tmpFile)); + + //now check that we can delete with start == end, but only deleting the right file + // first create another file after the tmp file + Path otherTmp = new Path(tmpFile.getParent(), tmpFile.getName() + "-1"); + out = fs.create(otherTmp); + out.write(1); + out.close(); + + FileStatus status = fs.getFileStatus(tmpFile); + long time = status.getModificationTime(); + HFileArchiveCleanup.main(new String[] { "-e", Long.toString(time), "-s", Long.toString(time) }); + assertFalse(fs.exists(tmpFile)); + assertTrue(fs.exists(otherTmp)); + + // then rename the tmpFile pointer so we don't need to recreate the file + tmpFile = otherTmp; + + // now move the start up to include the file, which should delete it + LOG.debug("Now should delete the file"); + HFileArchiveCleanup.main(new String[] { "-s", + Long.toString(fs.getFileStatus(tmpFile).getModificationTime()) }); + assertFalse(fs.exists(tmpFile)); + + // now create the files in multiple table directories, and check that we + // only delete the one in the specified directory + out = fs.create(tmpFile); + out.write(1); + out.close(); + + // create the table archive and put a file in there + Path tableArchive = new Path(archiveDir, STRING_TABLE_NAME); + fs.mkdirs(tableArchive); + Path tableFile = new Path(tableArchive, "table"); + out = fs.create(tableFile); + out.write(1); + out.close(); + + // now delete just that table + LOG.debug("Just cleaning up table: table, files:" + tableFile); + HFileArchiveCleanup + .main(new String[] { "-s", + Long.toString(fs.getFileStatus(tableFile).getModificationTime()), "-t", + STRING_TABLE_NAME }); + assertTrue(fs.exists(tmpFile)); + assertFalse(fs.exists(tableFile)); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestRegionDisposer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestRegionDisposer.java new file mode 100644 index 0000000..a44c5fe --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestRegionDisposer.java @@ -0,0 +1,147 @@ +package org.apache.hadoop.hbase.backup; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.HFileArchiveTestingUtil; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +/** + * Test that the {@link HFileDisposer} correctly removes all the parts of a + * region when cleaning up a region + */ +@Category(MediumTests.class) +public class TestRegionDisposer { + + private static final String STRING_TABLE_NAME = "test_table"; + + private static final Log LOG = LogFactory.getLog(TestRegionDisposer.class); + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private static final byte[] TABLE_NAME = Bytes.toBytes(STRING_TABLE_NAME); + private static final byte[] TEST_FAM = Bytes.toBytes("fam"); + + /** + * Setup the config for the cluster + */ + @BeforeClass + public static void setupCluster() throws Exception { + setupConf(UTIL.getConfiguration()); + } + + private static void setupConf(Configuration conf) { + // disable the ui + conf.setInt("hbase.regionsever.info.port", -1); + // drop the memstore size so we get flushes + conf.setInt("hbase.hregion.memstore.flush.size", 25000); + } + + @Before + public void startMinicluster() throws Exception { + UTIL.startMiniCluster(); + } + + @After + public void tearDown() throws Exception { + // cleanup the cluster if its up still + if (UTIL.getHBaseAdmin().tableExists(STRING_TABLE_NAME)) { + + UTIL.deleteTable(TABLE_NAME); + } + // and cleanup the archive directory + try { + UTIL.getTestFileSystem().delete(new Path(UTIL.getDefaultRootDirPath(), ".archive"), true); + } catch (IOException e) { + LOG.warn("Failure to delete archive directory", e); + } + // make sure that backups are off for all tables + UTIL.getHBaseAdmin().disableHFileBackup(); + } + + @AfterClass + public static void cleanupTest() throws Exception { + try { + UTIL.shutdownMiniCluster(); + } catch (Exception e) { + // NOOP; + } + } + + @Test + public void testRemovesRegionDirOnArchive() throws Exception { + UTIL.createTable(TABLE_NAME, TEST_FAM); + final HBaseAdmin admin = UTIL.getHBaseAdmin(); + + // get the current store files for the region + List servingRegions = UTIL.getHBaseCluster().getRegions(TABLE_NAME); + // make sure we only have 1 region serving this table + assertEquals(1, servingRegions.size()); + HRegion region = servingRegions.get(0); + + // turn on archiving + admin.enableHFileBackup(TABLE_NAME); + + // and load the table + UTIL.loadRegion(region, TEST_FAM); + + // shutdown the table so we can manipulate the files + admin.disableTable(STRING_TABLE_NAME); + + FileSystem fs = UTIL.getTestFileSystem(); + + // now attempt to depose the region + Path regionDir = HRegion.getRegionDir(region.getTableDir().getParent(), region.getRegionInfo()); + + HFileArchiveMonitor monitor = Mockito.mock(HFileArchiveMonitor.class); + Mockito.when(monitor.keepHFiles(STRING_TABLE_NAME)).thenReturn(true); + HFileDisposer.disposeRegion(fs, monitor, region.getRegionInfo()); + + // check for the existence of the archive directory and some files in it + Path archiveDir = HFileArchiveTestingUtil.getRegionArchiveDir(UTIL.getConfiguration(), region); + assertTrue(fs.exists(archiveDir)); + + // check to make sure the store directory was copied + FileStatus[] stores = fs.listStatus(archiveDir); + assertTrue(stores.length == 1); + + // make sure we archived the store files + FileStatus[] storeFiles = fs.listStatus(stores[0].getPath()); + assertTrue(storeFiles.length > 0); + + // then ensure the region's directory isn't present + assertFalse(fs.exists(regionDir)); + + //recreate the table + admin.deleteTable(STRING_TABLE_NAME); + UTIL.createTable(TABLE_NAME, TEST_FAM); + + // now copy back in the region + // fs.copyFromLocalFile(archive, regionDir); + + // and depose the region without archiving + Mockito.when(monitor.keepHFiles(STRING_TABLE_NAME)).thenReturn(false); + HFileDisposer.disposeRegion(fs, monitor, region.getRegionInfo()); + + assertFalse(fs.exists(regionDir)); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index 04ed1a3..15b6a39 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.backup.HFileArchiveMonitor; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.client.AdminProtocol; import org.apache.hadoop.hbase.client.ClientProtocol; @@ -508,4 +509,10 @@ class MockRegionServer implements AdminProtocol, ClientProtocol, RegionServerSer // TODO Auto-generated method stub return null; } + + @Override + public HFileArchiveMonitor getHFileArchiveMonitor() { + // TODO Auto-generated method stub + return null; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java index 2495987..b0c7146 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java @@ -19,13 +19,15 @@ */ package org.apache.hadoop.hbase.master; +import static org.apache.hadoop.hbase.util.HFileArchiveTestingUtil.assertArchiveEqualToOriginal; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; +import static org.junit.Assert.fail; import java.io.FileNotFoundException; import java.io.IOException; @@ -36,6 +38,8 @@ import java.util.SortedMap; import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -44,14 +48,20 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.TableDescriptors; +import org.apache.hadoop.hbase.backup.HFileArchiveMonitor; +import org.apache.hadoop.hbase.backup.HFileArchiveTableMonitor; +import org.apache.hadoop.hbase.backup.TableHFileArchiveTracker; +import org.apache.hadoop.hbase.backup.SimpleHFileArchiveTableMonitor; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.client.AdminProtocol; import org.apache.hadoop.hbase.client.ClientProtocol; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HConnectionTestingUtility; @@ -65,10 +75,12 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.HFileArchiveUtil; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.Matchers; import org.mockito.Mockito; import com.google.protobuf.RpcController; @@ -86,15 +98,15 @@ public class TestCatalogJanitor { private final CatalogTracker ct; MockServer(final HBaseTestingUtility htu) - throws NotAllMetaRegionsOnlineException, IOException, InterruptedException { + throws NotAllMetaRegionsOnlineException, IOException, InterruptedException { this.c = htu.getConfiguration(); ClientProtocol ri = Mockito.mock(ClientProtocol.class); MutateResponse.Builder builder = MutateResponse.newBuilder(); builder.setProcessed(true); try { Mockito.when(ri.mutate( - (RpcController)Mockito.any(), (MutateRequest)Mockito.any())). - thenReturn(builder.build()); + (RpcController)Matchers.any(), (MutateRequest)Matchers.any())). + thenReturn(builder.build()); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } @@ -140,7 +152,7 @@ public class TestCatalogJanitor { public void abort(String why, Throwable e) { //no-op } - + @Override public boolean isAborted() { return false; @@ -171,6 +183,10 @@ public class TestCatalogJanitor { MockMasterServices(final Server server) throws IOException { this.mfs = new MasterFileSystem(server, this, null, false); + // this is funky, but ensures that the filesystem gets the right root + // directory when directly accessed + this.mfs.getFileSystem().getConf() + .set(HConstants.HBASE_DIR, mfs.getRootDir().toString()); this.asm = Mockito.mock(AssignmentManager.class); } @@ -229,7 +245,7 @@ public class TestCatalogJanitor { public void abort(String why, Throwable e) { //no-op } - + @Override public boolean isAborted() { return false; @@ -255,29 +271,29 @@ public class TestCatalogJanitor { // TODO Auto-generated method stub return null; } - + @Override public Map getAll() throws IOException { // TODO Auto-generated method stub return null; } - + @Override public HTableDescriptor get(byte[] tablename) - throws FileNotFoundException, IOException { + throws FileNotFoundException, IOException { return get(Bytes.toString(tablename)); } - + @Override public HTableDescriptor get(String tablename) - throws FileNotFoundException, IOException { + throws FileNotFoundException, IOException { return createHTableDescriptor(); } - + @Override public void add(HTableDescriptor htd) throws IOException { // TODO Auto-generated method stub - + } }; } @@ -322,34 +338,37 @@ public class TestCatalogJanitor { Server server = new MockServer(htu); try { MasterServices services = new MockMasterServices(server); - CatalogJanitor janitor = new CatalogJanitor(server, services); + ZooKeeperWatcher watcher = Mockito.mock(ZooKeeperWatcher.class); + HFileArchiveMonitor monitor = TableHFileArchiveTracker.create(watcher, + new SimpleHFileArchiveTableMonitor(services, watcher)); + CatalogJanitor janitor = new CatalogJanitor(server, services, monitor); // Create regions. HTableDescriptor htd = new HTableDescriptor("table"); htd.addFamily(new HColumnDescriptor("f")); HRegionInfo parent = - new HRegionInfo(htd.getName(), Bytes.toBytes("aaa"), + new HRegionInfo(htd.getName(), Bytes.toBytes("aaa"), Bytes.toBytes("eee")); HRegionInfo splita = - new HRegionInfo(htd.getName(), Bytes.toBytes("aaa"), + new HRegionInfo(htd.getName(), Bytes.toBytes("aaa"), Bytes.toBytes("ccc")); HRegionInfo splitb = - new HRegionInfo(htd.getName(), Bytes.toBytes("ccc"), + new HRegionInfo(htd.getName(), Bytes.toBytes("ccc"), Bytes.toBytes("eee")); // Test that when both daughter regions are in place, that we do not // remove the parent. List kvs = new ArrayList(); kvs.add(new KeyValue(parent.getRegionName(), HConstants.CATALOG_FAMILY, - HConstants.SPLITA_QUALIFIER, Writables.getBytes(splita))); + HConstants.SPLITA_QUALIFIER, Writables.getBytes(splita))); kvs.add(new KeyValue(parent.getRegionName(), HConstants.CATALOG_FAMILY, - HConstants.SPLITB_QUALIFIER, Writables.getBytes(splitb))); + HConstants.SPLITB_QUALIFIER, Writables.getBytes(splitb))); Result r = new Result(kvs); // Add a reference under splitA directory so we don't clear out the parent. Path rootdir = services.getMasterFileSystem().getRootDir(); Path tabledir = - HTableDescriptor.getTableDir(rootdir, htd.getName()); + HTableDescriptor.getTableDir(rootdir, htd.getName()); Path storedir = Store.getStoreHomedir(tabledir, splita.getEncodedName(), - htd.getColumnFamilies()[0].getName()); - Reference ref = Reference.createTopReference(Bytes.toBytes("ccc")); + htd.getColumnFamilies()[0].getName()); + Reference ref = new Reference(Bytes.toBytes("ccc"), Reference.Range.top); long now = System.currentTimeMillis(); // Reference name has this format: StoreFile#REF_NAME_PARSER Path p = new Path(storedir, Long.toString(now) + "." + parent.getEncodedName()); @@ -372,7 +391,7 @@ public class TestCatalogJanitor { */ @Test public void testParentCleanedEvenIfDaughterGoneFirst() - throws IOException, InterruptedException { + throws IOException, InterruptedException { parentWithSpecifiedEndKeyCleanedEvenIfDaughterGoneFirst( "testParentCleanedEvenIfDaughterGoneFirst", Bytes.toBytes("eee")); } @@ -384,11 +403,215 @@ public class TestCatalogJanitor { */ @Test public void testLastParentCleanedEvenIfDaughterGoneFirst() - throws IOException, InterruptedException { + throws IOException, InterruptedException { parentWithSpecifiedEndKeyCleanedEvenIfDaughterGoneFirst( "testLastParentCleanedEvenIfDaughterGoneFirst", new byte[0]); } + @Test + public void testArchiveOldRegion() throws Exception { + String table = "table"; + HBaseTestingUtility htu = new HBaseTestingUtility(); + setRootDirAndCleanIt(htu, "testCleanParent"); + Server server = new MockServer(htu); + try { + MasterServices services = new MockMasterServices(server); + // add the test table as the one to be archived + ZooKeeperWatcher watcher = Mockito.mock(ZooKeeperWatcher.class); + SimpleHFileArchiveTableMonitor monitor = new SimpleHFileArchiveTableMonitor(services, watcher); + TableHFileArchiveTracker tracker = TableHFileArchiveTracker.create(watcher, monitor); + + // create the janitor + CatalogJanitor janitor = new CatalogJanitor(server, services, tracker); + + // Create regions. + HTableDescriptor htd = new HTableDescriptor(table); + htd.addFamily(new HColumnDescriptor("f")); + HRegionInfo parent = new HRegionInfo(htd.getName(), Bytes.toBytes("aaa"), + Bytes.toBytes("eee")); + HRegionInfo splita = new HRegionInfo(htd.getName(), Bytes.toBytes("aaa"), + Bytes.toBytes("ccc")); + HRegionInfo splitb = new HRegionInfo(htd.getName(), Bytes.toBytes("ccc"), + Bytes.toBytes("eee")); + // Test that when both daughter regions are in place, that we do not + // remove the parent. + List kvs = new ArrayList(); + kvs.add(new KeyValue(parent.getRegionName(), HConstants.CATALOG_FAMILY, + HConstants.SPLITA_QUALIFIER, Writables.getBytes(splita))); + kvs.add(new KeyValue(parent.getRegionName(), HConstants.CATALOG_FAMILY, + HConstants.SPLITB_QUALIFIER, Writables.getBytes(splitb))); + Result r = new Result(kvs); + + Path rootdir = services.getMasterFileSystem().getRootDir(); + Path tabledir = HTableDescriptor.getTableDir(rootdir, htd.getName()); + Path storedir = Store.getStoreHomedir(tabledir, parent.getEncodedName(), + htd.getColumnFamilies()[0].getName()); + + // first do a delete without archiving + addMockStoreFiles(2, services, storedir); + assertTrue(janitor.cleanParent(parent, r)); + + // and make sure that no files are archived + FileSystem fs = services.getMasterFileSystem().getFileSystem(); + Path storeArchive = HFileArchiveUtil.getStoreArchivePath(services.getConfiguration(), parent, + tabledir, htd.getColumnFamilies()[0].getName()); + assertEquals(0, fs.listStatus(storeArchive).length); + + // enable archiving, make sure that files get archived + monitor.addTable(table); + addMockStoreFiles(2, services, storedir); + // get the current store files for comparison + FileStatus[] storeFiles = fs.listStatus(storedir); + + // do the cleaning of the parent + assertTrue(janitor.cleanParent(parent, r)); + + // and now check to make sure that the files have actually been archived + FileStatus[] archivedStoreFiles = fs.listStatus(storeArchive); + assertArchiveEqualToOriginal(storeFiles, archivedStoreFiles, fs); + } finally { + server.stop("shutdown"); + } + } + + /** + * Test that if a store file with the same name is present as those already + * backed up cause the already archived files to be timestamped backup + */ + @Test + public void testDuplicateHFileResolution() throws Exception { + String table = "table"; + HBaseTestingUtility htu = new HBaseTestingUtility(); + setRootDirAndCleanIt(htu, "testCleanParent"); + Server server = new MockServer(htu); + try { + MasterServices services = new MockMasterServices(server); + // add the test table as the one to be archived + ZooKeeperWatcher watcher = Mockito.mock(ZooKeeperWatcher.class); + TableHFileArchiveTracker manager = TableHFileArchiveTracker.create(watcher, + new SimpleHFileArchiveTableMonitor(services, watcher)); + HFileArchiveTableMonitor tracker = manager.getMonitor(); + tracker.addTable(table); + + // create the janitor + CatalogJanitor janitor = new CatalogJanitor(server, services, manager); + + // Create regions. + HTableDescriptor htd = new HTableDescriptor(table); + htd.addFamily(new HColumnDescriptor("f")); + HRegionInfo parent = new HRegionInfo(htd.getName(), Bytes.toBytes("aaa"), + Bytes.toBytes("eee")); + HRegionInfo splita = new HRegionInfo(htd.getName(), Bytes.toBytes("aaa"), + Bytes.toBytes("ccc")); + HRegionInfo splitb = new HRegionInfo(htd.getName(), Bytes.toBytes("ccc"), + Bytes.toBytes("eee")); + // Test that when both daughter regions are in place, that we do not + // remove the parent. + List kvs = new ArrayList(); + kvs.add(new KeyValue(parent.getRegionName(), HConstants.CATALOG_FAMILY, + HConstants.SPLITA_QUALIFIER, Writables.getBytes(splita))); + kvs.add(new KeyValue(parent.getRegionName(), HConstants.CATALOG_FAMILY, + HConstants.SPLITB_QUALIFIER, Writables.getBytes(splitb))); + Result r = new Result(kvs); + + Path rootdir = services.getMasterFileSystem().getRootDir(); + Path tabledir = HTableDescriptor.getTableDir(rootdir, htd.getName()); + Path storedir = Store.getStoreHomedir(tabledir, parent.getEncodedName(), + htd.getColumnFamilies()[0].getName()); + + FileSystem fs = services.getMasterFileSystem().getFileSystem(); + Path storeArchive = HFileArchiveUtil.getStoreArchivePath(services.getConfiguration(), parent, + tabledir, htd.getColumnFamilies()[0].getName()); + + // enable archiving, make sure that files get archived + tracker.addTable(table); + addMockStoreFiles(2, services, storedir); + // get the current store files for comparison + FileStatus[] storeFiles = fs.listStatus(storedir); + + // do the cleaning of the parent + assertTrue(janitor.cleanParent(parent, r)); + + // and now check to make sure that the files have actually been archived + FileStatus[] archivedStoreFiles = fs.listStatus(storeArchive); + assertArchiveEqualToOriginal(storeFiles, archivedStoreFiles, fs); + + // now add store files with the same names as before to check backup + // enable archiving, make sure that files get archived + tracker.addTable(table); + addMockStoreFiles(2, services, storedir); + + // do the cleaning of the parent + assertTrue(janitor.cleanParent(parent, r)); + + // and now check to make sure that the files have actually been archived + archivedStoreFiles = fs.listStatus(storeArchive); + assertArchiveEqualToOriginal(storeFiles, archivedStoreFiles, fs, true); + } finally { + server.stop("shutdown"); + } + } + + @Category(MediumTests.class) + @Test + public void testCatalogJanitorMonitoredInArchive() throws Exception { + HBaseTestingUtility util = new HBaseTestingUtility(); + try { + // setup the minicluster + util.startMiniCluster(); + byte[] TABLE_NAME = Bytes.toBytes("TABLE"); + HBaseAdmin admin = util.getHBaseAdmin(); + util.createTable(TABLE_NAME, Bytes.toBytes("cf")); + assertFalse(admin.getArchivingEnabled(TABLE_NAME)); + + // make sure a simple enable works + admin.enableHFileBackup(TABLE_NAME); + assertTrue(admin.getArchivingEnabled(TABLE_NAME)); + admin.disableHFileBackup(); + assertFalse(admin.getArchivingEnabled(TABLE_NAME)); + + // set the table tracker to fail on archiving + CatalogJanitor catalog = util.getMiniHBaseCluster().getMaster().getCatalogJanitor(); + TableHFileArchiveTracker tracker = (TableHFileArchiveTracker) catalog.getHfileArchiveMonitor(); + + // keep the original around + HFileArchiveTableMonitor orig = tracker.getMonitor(); + + // set a new moitor that we know will not register archiving + tracker.setTableMonitor(new SimpleHFileArchiveTableMonitor(util.getMiniHBaseCluster().getMaster(), + util.getZooKeeperWatcher())); + + try { + // try turning on archiving, but it should fail + admin.enableHFileBackup(TABLE_NAME); + fail("Shouldn't have been able to finish archiving with the catalog tracker not joining"); + } catch (IOException e) { + // reset the tracker, if we don't get an exception, then everything + // breaks + tracker.setTableMonitor(orig); + } + } finally { + util.shutdownMiniCluster(); + } + } + + private void addMockStoreFiles(int count, MasterServices services, + Path storedir) throws IOException { + // get the existing store files + FileSystem fs = services.getMasterFileSystem().getFileSystem(); + fs.mkdirs(storedir); + // create the store files in the parent + for (int i = 0; i < count; i++) { + Path storeFile = new Path(storedir, "_store" + i); + FSDataOutputStream dos = fs.create(storeFile, true); + dos.writeBytes("Some data: " + i); + dos.close(); + } + // make sure the mock store files are there + FileStatus[] storeFiles = fs.listStatus(storedir); + assertEquals(count, storeFiles.length); + } + /** * Make sure parent with specified end key gets cleaned up even if daughter is cleaned up before it. * @@ -398,13 +621,17 @@ public class TestCatalogJanitor { * @throws InterruptedException */ private void parentWithSpecifiedEndKeyCleanedEvenIfDaughterGoneFirst( - final String rootDir, final byte[] lastEndKey) - throws IOException, InterruptedException { + final String rootDir, final byte[] lastEndKey) + throws IOException, InterruptedException { HBaseTestingUtility htu = new HBaseTestingUtility(); setRootDirAndCleanIt(htu, rootDir); Server server = new MockServer(htu); MasterServices services = new MockMasterServices(server); - CatalogJanitor janitor = new CatalogJanitor(server, services); + ZooKeeperWatcher watcher = Mockito.mock(ZooKeeperWatcher.class); + TableHFileArchiveTracker monitor = TableHFileArchiveTracker.create(watcher, + new SimpleHFileArchiveTableMonitor( + services, watcher)); + CatalogJanitor janitor = new CatalogJanitor(server, services, monitor); final HTableDescriptor htd = createHTableDescriptor(); // Create regions: aaa->{lastEndKey}, aaa->ccc, aaa->bbb, bbb->ccc, etc. @@ -434,12 +661,12 @@ public class TestCatalogJanitor { HRegionInfo splitba = new HRegionInfo(htd.getName(), Bytes.toBytes("ccc"), Bytes.toBytes("ddd")); HRegionInfo splitbb = new HRegionInfo(htd.getName(), Bytes.toBytes("ddd"), - lastEndKey); + lastEndKey); // First test that our Comparator works right up in CatalogJanitor. // Just fo kicks. SortedMap regions = - new TreeMap(new CatalogJanitor.SplitParentFirstComparator()); + new TreeMap(new CatalogJanitor.SplitParentFirstComparator()); // Now make sure that this regions map sorts as we expect it to. regions.put(parent, createResult(parent, splita, splitb)); regions.put(splitb, createResult(splitb, splitba, splitbb)); @@ -460,7 +687,7 @@ public class TestCatalogJanitor { // Now play around with the cleanParent function. Create a ref from splita // up to the parent. Path splitaRef = - createReferences(services, htd, parent, splita, Bytes.toBytes("ccc"), false); + createReferences(services, htd, parent, splita, Bytes.toBytes("ccc"), false); // Make sure actual super parent sticks around because splita has a ref. assertFalse(janitor.cleanParent(parent, regions.get(parent))); @@ -476,9 +703,9 @@ public class TestCatalogJanitor { assertTrue(fs.delete(splitaRef, true)); // Create the refs from daughters of splita. Path splitaaRef = - createReferences(services, htd, splita, splitaa, Bytes.toBytes("bbb"), false); + createReferences(services, htd, splita, splitaa, Bytes.toBytes("bbb"), false); Path splitabRef = - createReferences(services, htd, splita, splitab, Bytes.toBytes("bbb"), true); + createReferences(services, htd, splita, splitab, Bytes.toBytes("bbb"), true); // Test splita. It should stick around because references from splitab, etc. assertFalse(janitor.cleanParent(splita, regions.get(splita))); @@ -538,7 +765,8 @@ public class TestCatalogJanitor { splitParents.put(parent, makeResultFromHRegionInfo(parent, splita, splitb)); splitParents.put(splita, makeResultFromHRegionInfo(splita, splitaa, splitab)); - CatalogJanitor janitor = spy(new CatalogJanitor(server, services)); + HFileArchiveMonitor monitor = Mockito.mock(HFileArchiveMonitor.class); + CatalogJanitor janitor = spy(new CatalogJanitor(server, services, monitor)); doReturn(new Pair>( 10, splitParents)).when(janitor).getSplitParents(); @@ -587,10 +815,12 @@ public class TestCatalogJanitor { private String setRootDirAndCleanIt(final HBaseTestingUtility htu, final String subdir) - throws IOException { + throws IOException { Path testdir = htu.getDataTestDir(subdir); FileSystem fs = FileSystem.get(htu.getConfiguration()); - if (fs.exists(testdir)) assertTrue(fs.delete(testdir, true)); + if (fs.exists(testdir)) { + assertTrue(fs.delete(testdir, true)); + } htu.getConfiguration().set(HConstants.HBASE_DIR, testdir.toString()); return htu.getConfiguration().get(HConstants.HBASE_DIR); } @@ -608,13 +838,13 @@ public class TestCatalogJanitor { private Path createReferences(final MasterServices services, final HTableDescriptor htd, final HRegionInfo parent, final HRegionInfo daughter, final byte [] midkey, final boolean top) - throws IOException { + throws IOException { Path rootdir = services.getMasterFileSystem().getRootDir(); Path tabledir = HTableDescriptor.getTableDir(rootdir, parent.getTableName()); Path storedir = Store.getStoreHomedir(tabledir, daughter.getEncodedName(), htd.getColumnFamilies()[0].getName()); - Reference ref = - top? Reference.createTopReference(midkey): Reference.createBottomReference(midkey); + Reference ref = new Reference(midkey, + top? Reference.Range.top: Reference.Range.bottom); long now = System.currentTimeMillis(); // Reference name has this format: StoreFile#REF_NAME_PARSER Path p = new Path(storedir, Long.toString(now) + "." + parent.getEncodedName()); @@ -625,7 +855,7 @@ public class TestCatalogJanitor { private Result createResult(final HRegionInfo parent, final HRegionInfo a, final HRegionInfo b) - throws IOException { + throws IOException { List kvs = new ArrayList(); kvs.add(new KeyValue(parent.getRegionName(), HConstants.CATALOG_FAMILY, HConstants.SPLITA_QUALIFIER, Writables.getBytes(a))); @@ -642,6 +872,6 @@ public class TestCatalogJanitor { @org.junit.Rule public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = - new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); + new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionHFileArchiving.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionHFileArchiving.java new file mode 100644 index 0000000..b1bdf02 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionHFileArchiving.java @@ -0,0 +1,599 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.apache.hadoop.hbase.util.HFileArchiveTestingUtil.assertArchiveEqualToOriginal; +import static org.apache.hadoop.hbase.util.HFileArchiveTestingUtil.compareArchiveToOriginal; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.backup.HFileArchiveMonitor; +import org.apache.hadoop.hbase.backup.HFileArchiveTableMonitor; +import org.apache.hadoop.hbase.backup.HFileDisposer; +import org.apache.hadoop.hbase.backup.TableHFileArchiveTracker; +import org.apache.hadoop.hbase.backup.SimpleHFileArchiveTableMonitor; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.HFileArchiveTestingUtil; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; + +/** + * Spin up a small cluster and check that a region properly archives its hfiles + * when enabled. + */ +@Category(MediumTests.class) +public class TestRegionHFileArchiving { + + private static final Log LOG = LogFactory.getLog(TestRegionHFileArchiving.class); + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private static final String STRING_TABLE_NAME = "test"; + private static final byte[] TEST_FAM = Bytes.toBytes("fam"); + private static final byte[] TABLE_NAME = Bytes.toBytes(STRING_TABLE_NAME); + private static final int numRS = 2; + private static final int maxTries = 5; + + /** + * Setup the config for the cluster + */ + @BeforeClass + public static void setupCluster() throws Exception { + setupConf(UTIL.getConfiguration()); + UTIL.startMiniCluster(numRS); + } + + private static void setupConf(Configuration conf) { + // disable the ui + conf.setInt("hbase.regionsever.info.port", -1); + // change the flush size to a small amount, regulating number of store files + conf.setInt("hbase.hregion.memstore.flush.size", 25000); + // so make sure we get a compaction when doing a load, but keep around some + // files in the store + conf.setInt("hbase.hstore.compaction.min", 10); + conf.setInt("hbase.hstore.compactionThreshold", 10); + // block writes if we get to 12 store files + conf.setInt("hbase.hstore.blockingStoreFiles", 12); + // drop the number of attempts for the hbase admin + UTIL.getConfiguration().setInt("hbase.client.retries.number", 1); + } + + @Before + public void setup() throws Exception { + UTIL.createTable(TABLE_NAME, TEST_FAM); + } + + @After + public void tearDown() throws Exception { + UTIL.deleteTable(TABLE_NAME); + // and cleanup the archive directory + try { + UTIL.getTestFileSystem().delete(new Path(UTIL.getDefaultRootDirPath(), ".archive"), true); + } catch (IOException e) { + LOG.warn("Failure to delete archive directory", e); + } + // make sure that backups are off for all tables + UTIL.getHBaseAdmin().disableHFileBackup(); + } + + @AfterClass + public static void cleanupTest() throws Exception { + try { + UTIL.shutdownMiniCluster(); + } catch (Exception e) { + // NOOP; + } + } + + @Test + public void testEnableDisableArchiving() throws Exception { + // Make sure archiving is not enabled + assertFalse(UTIL.getHBaseAdmin().getArchivingEnabled(TABLE_NAME)); + + // get the RS and region serving our table + HBaseAdmin admin = UTIL.getHBaseAdmin(); + List servingRegions = UTIL.getHBaseCluster().getRegions(TABLE_NAME); + // make sure we only have 1 region serving this table + assertEquals(1, servingRegions.size()); + HRegion region = servingRegions.get(0); + HRegionServer hrs = UTIL.getRSForFirstRegionInTable(TABLE_NAME); + FileSystem fs = hrs.getFileSystem(); + + // enable archiving + admin.enableHFileBackup(TABLE_NAME); + assertTrue(UTIL.getHBaseAdmin().getArchivingEnabled(TABLE_NAME)); + + // put some load on the table and make sure that files do get archived + loadAndCompact(region); + + // check that we actually have some store files that were archived + HFileArchiveMonitor monitor = hrs.getHFileArchiveMonitor(); + Store store = region.getStore(TEST_FAM); + Path storeArchiveDir = HFileArchiveTestingUtil.getStoreArchivePath(UTIL.getConfiguration(), + region, store); + + // check the archive + assertTrue(fs.exists(storeArchiveDir)); + assertTrue(fs.listStatus(storeArchiveDir).length > 0); + + // now test that we properly stop backing up + LOG.debug("Stopping backup and testing that we don't archive"); + admin.disableHFileBackup(STRING_TABLE_NAME); + int counter = 0; + while (monitor.keepHFiles(STRING_TABLE_NAME)) { + LOG.debug("Waiting for archive change to propaate"); + Thread.sleep(100); + // max tries to propagate - if not, something is probably horribly + // wrong with zk/notification + assertTrue("Exceeded max tries to propagate hfile backup changes", counter++ < maxTries); + } + + // delete the existing archive files + fs.delete(storeArchiveDir, true); + + // and then put some more data in the table and ensure it compacts + loadAndCompact(region); + + // put data into the table again to make sure that we don't copy new files + // over into the archive directory + + // ensure there are no archived files + assertFalse(fs.exists(storeArchiveDir)); + + // make sure that overall backup also disables per-table + admin.enableHFileBackup(TABLE_NAME); + admin.disableHFileBackup(); + assertFalse(UTIL.getHBaseAdmin().getArchivingEnabled(TABLE_NAME)); + } + + /** + * Ensure that archiving won't be turned on but have the tracker miss the + * update to the table via incorrect ZK watches (especially because + * createWithParents is not transactional). + * @throws Exception + */ + @Test + public void testFindsTablesAfterArchivingEnabled() throws Exception { + // 1. create a tracker to track the nodes + ZooKeeperWatcher zkw = UTIL.getZooKeeperWatcher(); + HRegionServer hrs = UTIL.getRSForFirstRegionInTable(TABLE_NAME); + TableHFileArchiveTracker tracker = hrs.hfileArchiveTracker; + + // 2. create the archiving enabled znode + ZKUtil.createAndFailSilent(zkw, zkw.archiveHFileZNode); + + // 3. now turn on archiving for the test table + HBaseAdmin admin = UTIL.getHBaseAdmin(); + admin.enableHFileBackup(TABLE_NAME); + + // 4. make sure that archiving is enabled for that tracker + assertTrue(tracker.keepHFiles(STRING_TABLE_NAME)); + } + + /** + * Test that we do synchronously start archiving and not return until we are + * done + */ + @Test + public void testSynchronousArchiving() throws Exception { + HBaseAdmin admin = UTIL.getHBaseAdmin(); + + // 1. turn on hfile backups + LOG.debug("----Starting archiving"); + admin.enableHFileBackup(TABLE_NAME); + assertTrue(UTIL.getHBaseAdmin().getArchivingEnabled(TABLE_NAME)); + + // 2. ensure that backups are kept on each RS + // get all the monitors + for (int i = 0; i < numRS; i++) { + HRegionServer hrs = UTIL.getHBaseCluster().getRegionServer(i); + // make sure that at least regions hosting the table have received the + // update to start archiving + if (hrs.getOnlineRegions(TABLE_NAME).size() > 0) { + assertTrue(hrs.getHFileArchiveMonitor().keepHFiles(STRING_TABLE_NAME)); + } + } + + // 3. now attempt to archive some other table that doesn't exist + try { + admin.enableHFileBackup("other table"); + fail("Should get an IOException if a table cannot be backed up."); + } catch (IOException e) { + // this should happen + } + assertFalse("Table 'other table' should not be archived - it doesn't exist!", UTIL + .getHBaseAdmin().getArchivingEnabled(Bytes.toBytes("other table"))); + + // 4. now prevent one of the regionservers from archiving, which should + // cause archiving to fail + // make sure all archiving is off + admin.disableHFileBackup(); + assertFalse(admin.getArchivingEnabled(TABLE_NAME)); + + // then hack the RS to not do any registration + HRegionServer hrs = UTIL.getRSForFirstRegionInTable(TABLE_NAME); + HFileArchiveTableMonitor orig = hrs.hfileArchiveTracker.getMonitor(); + // set the monitor so it doesn't attempt to register the regionserver + hrs.hfileArchiveTracker.setTableMonitor(new SimpleHFileArchiveTableMonitor(hrs, UTIL + .getZooKeeperWatcher())); + + // try turning on archiving, but it should fail + try { + admin.enableHFileBackup(TABLE_NAME); + fail("Shouldn't have been able to finish archiving"); + } catch (IOException e) { + // reset the tracker, if we don't get an exception, then everything breaks + hrs.hfileArchiveTracker.setTableMonitor(orig); + } + } + + /** + * Test the advanced case where we turn on archiving and the region propagates + * the change down to the store + */ + @SuppressWarnings("null") + @Category(LargeTests.class) + @Test(timeout = 200000) + // timeout = 200 sec - if it takes longer, something is seriously borked with + // the minicluster. + public void testCompactAndArchive() throws Exception { + HBaseAdmin admin = UTIL.getHBaseAdmin(); + + // get the RS and region serving our table + List servingRegions = UTIL.getHBaseCluster().getRegions(TABLE_NAME); + // make sure we only have 1 region serving this table + assertEquals(1, servingRegions.size()); + HRegion region = servingRegions.get(0); + + // get the parent RS and monitor + HRegionServer hrs = UTIL.getRSForFirstRegionInTable(TABLE_NAME); + FileSystem fs = hrs.getFileSystem(); + Store store = region.getStores().get(TEST_FAM); + + // 1. put some data on the region + LOG.debug("-------Loading table"); + UTIL.loadRegion(region, TEST_FAM); + + // get the current store files for the region + // and that there is only one store in the region + assertEquals(1, region.getStores().size()); + + int fileCount = store.getStorefiles().size(); + assertTrue("Need more than 1 store file to compact and test archiving", fileCount > 1); + LOG.debug("Currently have: " + fileCount + " store files."); + LOG.debug("Has store files:"); + for (StoreFile sf : store.getStorefiles()) { + LOG.debug("\t" + sf.getPath()); + } + + // 2. make sure that table archiving is enabled + // first force a flush to make sure nothing weird happens + region.flushcache(); + + // turn on hfile backups into .archive + LOG.debug("-----Enabling backups"); + admin.enableHFileBackup(TABLE_NAME); + + Path storeArchiveDir = HFileArchiveTestingUtil.getStoreArchivePath(UTIL.getConfiguration(), + region, store); + + // make sure we block the store from compacting/flushing files in the middle + // of our copy of the store files + List origFiles = null; + FileStatus[] originals = null; + List copiedStores = null; + FileStatus[] archivedFiles = null; + // wait for all the compactions to finish + waitOnCompactions(store); + + // lock the store for compactions + boolean done = false; + int tries = 0; + // this is a couple times to ensure that it wasn't just a compaction issue + while (!done && tries++ < maxTries) { + // wait on memstore flushes to finish + region.waitForFlushesAndCompactions(); + synchronized (store.filesCompacting) { + synchronized (store.flushLock) { + // if there are files unlock it and try again + if (store.filesCompacting.size() > 0) { + LOG.debug("Got some more files, waiting on compaction to finish again."); + continue; + } + LOG.debug("Locked the store"); + + // make sure we don't have any extra archive files from random + // compactions in the middle of the test + LOG.debug("-----Initial files in store archive:"); + if (fs.exists(storeArchiveDir)) { + for (FileStatus f : fs.listStatus(storeArchiveDir)) { + LOG.debug("Deleting archive file: " + f.getPath() + + ", so we have a consistent backup view."); + } + fs.delete(storeArchiveDir, true); + } else LOG.debug("[EMPTY]"); + + // make sure that archiving is in a 'clean' state + assertNull(fs.listStatus(storeArchiveDir)); + + // get the original store files before compaction + LOG.debug("------Original store files:"); + originals = fs.listStatus(store.getHomedir()); + for (FileStatus f : originals) { + LOG.debug("\t" + f.getPath()); + } + // copy the original store files so we can use them for testing + // overwriting store files with the same name below + origFiles = store.getStorefiles(); + copiedStores = new ArrayList(origFiles.size()); + Path temproot = new Path(hrs.getRootDir(), "store_copy"); + for (StoreFile f : origFiles) { + if (!fs.exists(f.getPath())) continue; + + // do the actually copy of the file + Path tmpStore = new Path(temproot, f.getPath().getName()); + FSDataOutputStream tmpOutput = fs.create(tmpStore); + FSDataInputStream storeInput = fs.open(f.getPath()); + while (storeInput.available() > 0) { + byte[] remaining = new byte[1024]; + storeInput.read(remaining); + tmpOutput.write(remaining); + } + tmpOutput.close(); + storeInput.close(); + copiedStores.add(tmpStore); + } + LOG.debug("---------- Triggering compaction"); + compactRegion(region, TEST_FAM); + + // then get the archived store files + LOG.debug("----------Archived store files after compaction (" + storeArchiveDir + "):"); + archivedFiles = fs.listStatus(storeArchiveDir); + for (FileStatus f : archivedFiles) { + LOG.debug("\t" + f.getPath()); + } + + // compare the archive to the original, and then try again if not + // equal since a compaction may have been finishing + if (!compareArchiveToOriginal(originals, archivedFiles, fs, false)) { + LOG.debug("Archive doesn't match, trying again."); + done = false; + } else done = true; + } + } + } + LOG.debug("Unlocked the store."); + for (FileStatus f : archivedFiles) { + LOG.debug("\t" + f.getPath()); + } + assertArchiveEqualToOriginal(originals, archivedFiles, fs); + assertTrue("Tried too many times, something is messed up in the check logic", done); + LOG.debug("Archive matches originals."); + + // 3. Now copy back in the store files and trigger another compaction + + // lock again so we don't interfere with a compaction/flush + waitOnCompactions(store); + done = false; + tries = 0; + while (!done && tries++ < maxTries) { + // wait for flushes + region.waitForFlushesAndCompactions(); + synchronized (store.filesCompacting) { + synchronized (store.flushLock) { + LOG.debug("Locked the store"); + if (store.filesCompacting.size() > 0) { + LOG.debug("Got some more files, waiting on compaction to finish again."); + // hack sleep, but should help if we get a lot of compactions + Thread.sleep(100); + continue; + } + + // delete the store directory (just in case) + fs.delete(store.getHomedir(), true); + + // and copy back in the original store files (from before) + fs.mkdirs(store.getHomedir()); + for (int i = 0; i < copiedStores.size(); i++) { + fs.rename(copiedStores.get(i), origFiles.get(i).getPath()); + } + // now archive the files again + LOG.debug("Removing the store files again."); + HFileDisposer.disposeStoreFiles(hrs, store.getHRegion(), store.conf, store.getFamily() + .getName(), origFiles); + + // ensure the files match to originals, but with a backup directory + LOG.debug("Checking originals vs. backed up (from archived) versions"); + archivedFiles = fs.listStatus(storeArchiveDir); + + // check equality to make sure a compaction didn't mess us up + if (!compareArchiveToOriginal(originals, archivedFiles, fs, true)) { + // loop again, to check the new files + LOG.debug("Archive doesn't match, trying again."); + done = false; + } else done = true; + done = true; + } + } + } + assertArchiveEqualToOriginal(originals, archivedFiles, fs, true); + assertTrue("Tried too many times, something is messed up in the check logic", done); + } + + private void waitOnCompactions(Store store) throws Exception { + // busy loop waiting on files to compact to be empty + while (store.filesCompacting.size() > 0) { + Thread.sleep(100); + } + } + + @Test + public void testRegionSplitAndArchive() throws Exception { + // start archiving + HBaseAdmin admin = UTIL.getHBaseAdmin(); + admin.enableHFileBackup(TABLE_NAME); + + // get the current store files for the region + final List servingRegions = UTIL.getHBaseCluster().getRegions(TABLE_NAME); + // make sure we only have 1 region serving this table + assertEquals(1, servingRegions.size()); + HRegion region = servingRegions.get(0); + + // and that there is only one store in the region + assertEquals(1, region.getStores().size()); + Store store = region.getStores().get(TEST_FAM); + + // prep the store files so we get some files + LOG.debug("Loading store files"); + // prepStoreFiles(admin, store, 3); + UTIL.loadRegion(region, TEST_FAM); + + // get the files before compaction + FileSystem fs = region.getRegionServerServices().getFileSystem(); + + // delete out the current archive files, just for ease of comparison + // and synchronize to make sure we don't clobber another compaction + Path storeArchiveDir = HFileArchiveTestingUtil.getStoreArchivePath(UTIL.getConfiguration(), + region, store); + + synchronized (region.writestate) { + // wait for all the compactions/flushes to complete on the region + LOG.debug("Waiting on region " + region + "to complete compactions & flushes"); + region.waitForFlushesAndCompactions(); + LOG.debug("Removing archived files - general cleanup"); + assertTrue(fs.delete(storeArchiveDir, true)); + assertTrue(fs.mkdirs(storeArchiveDir)); + } + LOG.debug("Starting split of region"); + // now split our region + admin.split(TABLE_NAME); + while (UTIL.getHBaseCluster().getRegions(TABLE_NAME).size() < 2) { + LOG.debug("Waiting for regions to split."); + Thread.sleep(100); + } + LOG.debug("Regions finished splitting."); + // at this point the region should have split + servingRegions.clear(); + servingRegions.addAll(UTIL.getHBaseCluster().getRegions(TABLE_NAME)); + // make sure we now have 2 regions serving this table + assertEquals(2, servingRegions.size()); + + // now check to make sure that those regions will also archive + Collection regionservers = Collections2.filter(Collections2.transform(UTIL + .getMiniHBaseCluster().getRegionServerThreads(), + new Function() { + + @Override + public HRegionServer apply(RegionServerThread input) { + return input.getRegionServer(); + } + }), new Predicate() { + + @Override + public boolean apply(HRegionServer input) { + // get the names of the regions hosted by the rs + Collection regions; + regions = Collections2.transform(input.getOnlineRegions(TABLE_NAME), + new Function() { + @Override + public String apply(HRegion input) { + return input.getRegionInfo().getEncodedName(); + } + }); + + // then check to make sure this RS is serving one of the serving regions + boolean found = false; + for (HRegion region : servingRegions) { + if (regions.contains(region.getRegionInfo().getEncodedName())) { + found = true; + break; + } + } + return found; + + } + }); + + assertTrue(regionservers.size() > 0); + + // check each of the region servers to make sure it has got the update + for (HRegionServer serving : regionservers) { + assertTrue("RegionServer:" + serving + " hasn't been included in backup", + serving.hfileArchiveTracker.keepHFiles(STRING_TABLE_NAME)); + } + } + + /** + * Load the given region and then ensure that it compacts some files + */ + private void loadAndCompact(HRegion region) throws Exception { + int tries = 0; + Exception last = null; + while (tries++ <= maxTries) { + try { + // load the region with data + UTIL.loadRegion(region, TEST_FAM); + // and then trigger a compaction to be sure we try to archive + compactRegion(region, TEST_FAM); + return; + } catch (Exception e) { + // keep this around for if we fail later + last = e; + } + } + throw last; + } + + /** + * Compact all the store files in a given region. + */ + private void compactRegion(HRegion region, byte[] family) throws IOException { + Store store = region.getStores().get(TEST_FAM); + store.compactRecentForTesting(store.getStorefiles().size()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/HFileArchiveTestingUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/HFileArchiveTestingUtil.java new file mode 100644 index 0000000..c542a1b --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/HFileArchiveTestingUtil.java @@ -0,0 +1,220 @@ +package org.apache.hadoop.hbase.util; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.Store; + +public class HFileArchiveTestingUtil { + + private static final Log LOG = LogFactory.getLog(HFileArchiveTestingUtil.class); + + private HFileArchiveTestingUtil() { + // NOOP private ctor since this is just a utility class + } + + public static boolean compareArchiveToOriginal(FileStatus[] previous, FileStatus[] archived, + FileSystem fs, boolean hasTimedBackup) { + + List> lists = getFileLists(previous, archived); + List original = lists.get(0); + Collections.sort(original); + + List currentFiles = lists.get(1); + Collections.sort(currentFiles); + + List backedup = lists.get(2); + Collections.sort(backedup); + + // check the backed up files versus the current (should match up, less the + // backup time in the name) + if (!hasTimedBackup == (backedup.size() > 0)) { + LOG.debug("backedup files doesn't match expected."); + return false; + } + String msg = null; + if (hasTimedBackup) { + msg = assertArchiveEquality(original, backedup); + if (msg != null) { + LOG.debug(msg); + return false; + } + } + msg = assertArchiveEquality(original, currentFiles); + if (msg != null) { + LOG.debug(msg); + return false; + } + return true; + } + + /** + * Compare the archived files to the files in the original directory + * @param previous original files that should have been archived + * @param archived files that were archived + * @param fs filessystem on which the archiving took place + * @throws IOException + */ + public static void assertArchiveEqualToOriginal(FileStatus[] previous, FileStatus[] archived, + FileSystem fs) throws IOException { + assertArchiveEqualToOriginal(previous, archived, fs, false); + } + + /** + * Compare the archived files to the files in the original directory + * @param previous original files that should have been archived + * @param archived files that were archived + * @param fs {@link FileSystem} on which the archiving took place + * @param hasTimedBackup true if we expect to find an archive backup + * directory with a copy of the files in the archive directory (and + * the original files). + * @throws IOException + */ + public static void assertArchiveEqualToOriginal(FileStatus[] previous, FileStatus[] archived, + FileSystem fs, boolean hasTimedBackup) throws IOException { + + List> lists = getFileLists(previous, archived); + List original = lists.get(0); + Collections.sort(original); + + List currentFiles = lists.get(1); + Collections.sort(currentFiles); + + List backedup = lists.get(2); + Collections.sort(backedup); + + // check the backed up files versus the current (should match up, less the + // backup time in the name) + assertEquals("Didn't expect any backup files, but got: " + backedup, hasTimedBackup, + backedup.size() > 0); + String msg = null; + if (hasTimedBackup) { + assertArchiveEquality(original, backedup); + assertNull(msg, msg); + } + + // do the rest of the comparison + msg = assertArchiveEquality(original, currentFiles); + assertNull(msg, msg); + } + + private static String assertArchiveEquality(List expected, List archived) { + String compare = compareFileLists(expected, archived); + if (!(expected.size() == archived.size())) return "Not the same number of current files\n" + + compare; + if (!expected.equals(archived)) return "Different backup files, but same amount\n" + compare; + return null; + } + + /** + * @return , where each is sorted + */ + private static List> getFileLists(FileStatus[] previous, FileStatus[] archived) { + List> files = new ArrayList>(); + + // copy over the original files + List originalFileNames = convertToString(previous); + files.add(originalFileNames); + + List currentFiles = new ArrayList(previous.length); + List backedupFiles = new ArrayList(previous.length); + for (FileStatus f : archived) { + String name = f.getPath().getName(); + // if the file has been backed up + if (name.contains(".")) { + Path parent = f.getPath().getParent(); + String shortName = name.split("[.]")[0]; + Path modPath = new Path(parent, shortName); + FileStatus file = new FileStatus(f.getLen(), f.isDir(), f.getReplication(), + f.getBlockSize(), f.getModificationTime(), modPath); + backedupFiles.add(file); + } else { + // otherwise, add it to the list to compare to the original store files + currentFiles.add(name); + } + } + + files.add(currentFiles); + files.add(convertToString(backedupFiles)); + return files; + } + + private static List convertToString(FileStatus[] files) { + return convertToString(Arrays.asList(files)); + } + + private static List convertToString(List files) { + List originalFileNames = new ArrayList(files.size()); + for (FileStatus f : files) { + originalFileNames.add(f.getPath().getName()); + } + return originalFileNames; + } + + /* Get a pretty representation of the differences */ + private static String compareFileLists(List expected, List gotten) { + StringBuilder sb = new StringBuilder("Expected (" + expected.size() + "): \t\t Gotten (" + + gotten.size() + "):\n"); + List notFound = new ArrayList(); + for (String s : expected) { + if (gotten.contains(s)) sb.append(s + "\t\t" + s + "\n"); + else notFound.add(s); + } + sb.append("Not Found:\n"); + for (String s : notFound) { + sb.append(s + "\n"); + } + sb.append("\nExtra:\n"); + for (String s : gotten) { + if (!expected.contains(s)) sb.append(s + "\n"); + } + return sb.toString(); + } + + /** + * Helper method to get the archive directory for the specified region + * @param conf {@link Configuration} to check for the name of the archive + * directory + * @param region region that is being archived + * @return {@link Path} to the archive directory for the given region + */ + public static Path getRegionArchiveDir(Configuration conf, HRegion region) { + return HFileArchiveUtil.getRegionArchiveDir(conf, region.getTableDir(), region.getRegionDir()); + } + + /** + * Helper method to get the table archive directory for the specified region + * @param conf {@link Configuration} to check for the name of the archive + * directory + * @param region region that is being archived + * @return {@link Path} to the table archive directory for the given region + */ + public static Path getTableArchivePath(Configuration conf, HRegion region) { + return HFileArchiveUtil.getTableArchivePath(conf, region.getTableDir()); + } + + /** + * Helper method to get the store archive directory for the specified region + * @param conf {@link Configuration} to check for the name of the archive + * directory + * @param region region that is being archived + * @param store store that is archiving files + * @return {@link Path} to the store archive directory for the given region + */ + public static Path getStoreArchivePath(Configuration conf, HRegion region, Store store) { + return HFileArchiveUtil.getStoreArchivePath(conf, region, store.getFamily().getName()); + + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java index 3f61cfb..f3d2bd7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java @@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentSkipListMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.backup.HFileArchiveMonitor; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.ipc.RpcServer; @@ -35,7 +36,6 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.wal.HLog; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; @@ -156,4 +156,10 @@ public class MockRegionServerServices implements RegionServerServices { public void setFileSystem(FileSystem hfs) { this.hfs = (HFileSystem)hfs; } + + @Override + public HFileArchiveMonitor getHFileArchiveMonitor() { + // TODO Implement getHFileArchiveManager + return null; + } }