Index: src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java (revision 1296010) +++ src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java (working copy) @@ -240,6 +240,20 @@ return status == null? false: fs.exists(status.getPath()); } + /** + * Checks if .tableinfo exists for given table + * + * @param fs file system + * @param tabledir the table's directory + * @return true if exists + * @throws IOException + */ + public static boolean isTableInfoExists(FileSystem fs, Path tabledir) + throws IOException { + FileStatus status = getTableInfoPath(fs, tabledir); + return status == null? false: fs.exists(status.getPath()); + } + private static FileStatus getTableInfoPath(final FileSystem fs, final Path rootdir, final String tableName) throws IOException { Index: src/main/java/org/apache/hadoop/hbase/backups/SnapshotUtilities.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/backups/SnapshotUtilities.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/backups/SnapshotUtilities.java (revision 0) @@ -0,0 +1,216 @@ +package org.apache.hadoop.hbase.backups; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileChecksum; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.ipc.HRegionInterface; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.util.StringUtils; + +public class SnapshotUtilities { + public static final Log LOG = LogFactory.getLog(SnapshotUtilities.class); + + public static boolean checkAndClose(java.io.Closeable io) { + if (io != null) { + try { + io.close(); + } + catch(IOException ioe) { + LOG.warn(StringUtils.stringifyException(ioe)); + return false; + } + } + return true; + } + + /** + * Check whether the contents of src and dst are the same. + * + * Return false if dstpath does not exist + * + * If the files have different sizes, return false. + * + * If the files have the same sizes, the file checksums will be compared. + * + * When file checksum is not supported in any of file systems, + * two files are considered as the same if they have the same size. + */ + public static boolean sameFile(FileSystem srcfs, FileStatus srcstatus, + FileSystem dstfs, Path dstpath, boolean skipCRCCheck) throws IOException { + FileStatus dststatus; + try { + dststatus = dstfs.getFileStatus(dstpath); + } catch(FileNotFoundException fnfe) { + return false; + } + + //same length? + if (srcstatus.getLen() != dststatus.getLen()) { + return false; + } + + if (skipCRCCheck) { + LOG.debug("Skipping CRC Check"); + return true; + } + + //get src checksum + final FileChecksum srccs; + try { + srccs = srcfs.getFileChecksum(srcstatus.getPath()); + } catch(FileNotFoundException fnfe) { + /* + * Two possible cases: + * (1) src existed once but was deleted between the time period that + * srcstatus was obtained and the try block above. + * (2) srcfs does not support file checksum and (incorrectly) throws + * FNFE, e.g. some previous versions of HftpFileSystem. + * For case (1), it is okay to return true since src was already deleted. + * For case (2), true should be returned. + */ + return true; + } + + //compare checksums + try { + final FileChecksum dstcs = dstfs.getFileChecksum(dststatus.getPath()); + //return true if checksum is not supported + //(i.e. some of the checksums is null) + return srccs == null || dstcs == null || srccs.equals(dstcs); + } catch(FileNotFoundException fnfe) { + return false; + } + } + + /** + * Looks for a file in .Trash of the given user. + * + * @param path the path of the file to look for + * @param hbaseUser the user whose .Trash needs to be checked + * @param srcFileSys the FileSystem on which to look for the file + * + * @return the new path of the file in .Trash or null if not found + */ + public static Path getPathInTrash(Path path, String hbaseUser, + FileSystem srcFileSys) throws IOException { + String trashPrefix = "/user/" + hbaseUser + "/.Trash"; + Path trashPath = new Path(trashPrefix); + FileStatus[] checkpoints = srcFileSys.listStatus(trashPath); + // Go through all checkpoints in .Trash (including Current) to look for our + // missing file. + for (int i = 0; i < checkpoints.length; i++) { + Path probablePath = new Path(checkpoints[i].getPath().toString() + + path.toUri().getPath()); + LOG.info("Probable Path : " + probablePath); + if (srcFileSys.exists(probablePath)) { + return probablePath; + } + } + return null; + } + + /** + * Gets the store file list for a region, retries for a while and then gives + * up. The retries are used to cope with region in transition errors. + * + * @param hRegionInfo + * the region info + * @param flushRegions + * whether or not to flush the regions before asking the regionserver + * for a list of files + * @param conf + * the configuration for the job + * @param snapshot_start_time + * the start time for the snapshot, used only when + * flushRegions is set to true + * @param families + * a comma separated list of column families for which we need to + * retrieve store files (Use {@link Snapshot#SNAPSHOT_ALL_CF} for all + * column families + * @return a {@link Pair} of the regionserver hosting the region and the + * list of store files + * @throws IOException + * if it fails to get a list of store files for the region + */ + public static Pair > getStoreFileList( + HRegionInfo hRegionInfo, boolean flushRegions, Configuration conf, + long snapshot_start_time, String families) throws IOException { + String encodedRegionName = HRegionInfo.encodeRegionName(hRegionInfo + .getRegionName()); + // The maximum amount of time (in seconds) to retry till we get all the + // files for a region. + long retryTimeInMins = + conf.getInt("hbase.backups.region.retryTimeInMins", 5) * 60 * 1000L; + long retryInterval = + conf.getLong("hbase.backups.region.retryIntervalInMillis", 10000); + long startTime = System.currentTimeMillis(); + conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 360000); + HBaseAdmin admin = new HBaseAdmin(conf); + HTable table = new HTable(conf, hRegionInfo.getTableName()); + HConnection connection = admin.getConnection(); + IOException lastException = new IOException( + "Could not get store file list for : " + encodedRegionName); + + while (System.currentTimeMillis() - startTime <= retryTimeInMins) { + try { + HServerAddress rsAddress = table.getRegionLocation( + hRegionInfo.getStartKey(), true).getServerAddress(); + HRegionInterface server = connection.getHRegionConnection(rsAddress); + if (flushRegions) { + server.flushRegion(hRegionInfo.getRegionName(), snapshot_start_time); + } + return new Pair(server,getFileList(server, hRegionInfo.getRegionName(), families)); + } catch (Exception e) { + String msg = "Could not get store file list for region : " + + encodedRegionName; + LOG.warn(msg + " will retry in " + retryInterval + "ms"); + lastException = new IOException(msg, e); + // Sleep for 10 seconds, then retry. + try { + Thread.sleep(retryInterval); + } catch (InterruptedException ie) { + throw new IOException("gestStoreFileList() was interrupted"); + } + } + } + throw lastException; + } + + /** + * Retrieve the list of store files given the region name and list of column + * families + * @param server + * the regionserver to contact + * @param rName + * the region name + * @param families + * the comma separated list of column families + * @return + * the list of store files + */ + private static List getFileList(HRegionInterface server, + byte[] rName, + String families) { + if (families.equals(Snapshot.SNAPSHOT_ALL_CF)) { + return (server.getStoreFileList(rName)); + } + byte[][] family = Bytes.toByteArrays(families.split(",")); + return (server.getStoreFileList(rName, family)); + } + +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/backups/SnapshotMR.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/backups/SnapshotMR.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/backups/SnapshotMR.java (revision 0) @@ -0,0 +1,392 @@ +package org.apache.hadoop.hbase.backups; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.util.FSTableDescriptors; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.StringUtils; + +/** + * Mapper that actually copies the HFiles for a given region. + * + */ +public class SnapshotMR implements Mapper { + public static final Log LOG = LogFactory.getLog(SnapshotMR.class); + + static enum Counter { FILES_COPIED, FILES_SKIPPED, FILES_FAILED, BYTESCOPIED, + BYTESEXPECTED, REGIONS_RETRIED, REGIONS_DONE, FILES_IN_TRASH } + + private String familyNames; + + // config + private int sizeBuf = 128 * 1024; + private FileSystem destFileSys = null; + private FileSystem srcFileSys = null; + private boolean ignoreReadFailures; + private boolean overwrite; + private boolean update; + private Path destPath = null; + private JobConf job; + private boolean skipCRCCheck = false; + private Short replicationFactor = 2; + private boolean flushRegions; + private long snapshot_start_time; + private Path srcPath = null; + private String hbaseUser = "hadoop"; + private byte[] buffer = null; + + // stats + private int failcount = 0; + private int skipcount = 0; + private int copycount = 0; + + private class FileReporter implements Runnable { + private final Path dst; + private final Reporter reporter; + private static final int SLEEP_TIME = 60 * 1000; // 1 minute. + private boolean running; + + public FileReporter(Path dst, Reporter reporter) { + this.dst = dst; + this.reporter = reporter; + this.running = true; + } + + public void run() { + while (running) { + try { + Thread.sleep(SLEEP_TIME); + } catch (InterruptedException e) { + // Ignore. + } + } + } + + public void shutdownReporter() { + this.running = false; + } + } + + private String getCountString() { + return "Copied: " + copycount + " Skipped: " + skipcount + + " Failed: " + failcount; + } + private void updateStatus(Reporter reporter) { + reporter.setStatus(getCountString()); + } + + /** + * Return true if dst should be replaced by src and the update flag is set. + * Right now, this merely checks that the src and dst len are not equal. + * This should be improved on once modification times, CRCs, etc. can + * be meaningful in this context. + * @throws IOException + */ + private boolean needsUpdate(FileStatus srcstatus, + FileSystem dstfs, Path dstpath) throws IOException { + return update && !SnapshotUtilities.sameFile(srcstatus.getPath().getFileSystem(job), + srcstatus, dstfs, dstpath, skipCRCCheck); + } + + private FSDataOutputStream create(Path f, Reporter reporter, + FileStatus srcstat) throws IOException { + if (destFileSys.exists(f)) { + destFileSys.delete(f, false); + } + + FsPermission permission = srcstat.getPermission(); + short replication = this.replicationFactor; + long blockSize = srcstat.getBlockSize(); + return destFileSys.create(f, permission, true, sizeBuf, replication, + blockSize, reporter); + } + + private long regularCopy(FileStatus srcstat, Reporter reporter, Path tmpfile, + Path absdst) throws IOException { + long cbcopied = 0L; + FSDataInputStream in = null; + FSDataOutputStream out = null; + try { + // open src file + in = srcstat.getPath().getFileSystem(job).open(srcstat.getPath()); + reporter.incrCounter(Counter.BYTESEXPECTED, srcstat.getLen()); + // open tmp file + out = create(tmpfile, reporter, srcstat); + // copy file + for (int cbread; (cbread = in.read(buffer)) >= 0;) { + out.write(buffer, 0, cbread); + cbcopied += cbread; + reporter.setStatus(String.format("%.2f ", + cbcopied * 100.0 / srcstat.getLen()) + + absdst + + " [ " + + StringUtils.humanReadableInt(cbcopied) + + " / " + + StringUtils.humanReadableInt(srcstat.getLen()) + " ]"); + } + } finally { + SnapshotUtilities.checkAndClose(in); + SnapshotUtilities.checkAndClose(out); + } + return cbcopied; + } + + /** + * Copy a file to a destination. + * @param srcstat src path and metadata + * @param dstpath dst path + * @param reporter + */ + private void copy(FileStatus srcstat, Path relativedst, + OutputCollector outc, Reporter reporter, int totfiles) + throws Exception { + Path absdst = new Path(destPath, relativedst); + assert totfiles >= 0 : "Invalid file count " + totfiles; + + // if a directory, ensure created even if empty + if (srcstat.isDir()) { + if (destFileSys.exists(absdst)) { + if (!destFileSys.getFileStatus(absdst).isDir()) { + throw new IOException("Failed to mkdirs: " + absdst+" is a file."); + } + } + else if (!destFileSys.mkdirs(absdst)) { + throw new IOException("Failed to mkdirs " + absdst); + } + // TODO: when modification times can be set, directories should be + // emitted to reducers so they might be preserved. Also, mkdirs does + // not currently return an error when the directory already exists; + // if this changes, all directory work might as well be done in reduce + return; + } + + if (destFileSys.exists(absdst) && !overwrite + && !needsUpdate(srcstat, destFileSys, absdst)) { + outc.collect(null, new Text("SKIP: " + srcstat.getPath())); + ++skipcount; + reporter.incrCounter(Counter.FILES_SKIPPED, 1); + updateStatus(reporter); + return; + } + + Path tmpfile = new Path(job.get(Snapshot.SNAPSHOT_TMP_PATH), relativedst); + + long cbcopied = 0L; + cbcopied = regularCopy(srcstat, reporter, tmpfile, absdst); + + cbcopied = destFileSys.getFileStatus(tmpfile).getLen(); + if (cbcopied != srcstat.getLen()) { + throw new IOException("File size not matched: copied " + + bytesString(cbcopied) + " to tmpfile (=" + tmpfile + + ") but expected " + bytesString(srcstat.getLen()) + + " from " + srcstat.getPath()); + } else { + if (destFileSys.exists(absdst) && + destFileSys.getFileStatus(absdst).isDir()) { + throw new IOException(absdst + " is a directory"); + } + if (!destFileSys.mkdirs(absdst.getParent())) { + throw new IOException("Failed to craete parent dir: " + absdst.getParent()); + } + rename(tmpfile, absdst); + + FileStatus dststat = destFileSys.getFileStatus(absdst); + if (dststat.getLen() != srcstat.getLen()) { + destFileSys.delete(absdst, false); + throw new IOException("File size not matched: copied " + + bytesString(dststat.getLen()) + " to dst (=" + absdst + + ") but expected " + bytesString(srcstat.getLen()) + + " from " + srcstat.getPath()); + } + updateDestStatus(srcstat, dststat); + } + + // report at least once for each file + ++copycount; + reporter.incrCounter(Counter.BYTESCOPIED, cbcopied); + reporter.incrCounter(Counter.FILES_COPIED, 1); + updateStatus(reporter); + } + + /** rename tmp to dst, delete dst if already exists */ + private void rename(Path tmp, Path dst) throws IOException { + try { + if (destFileSys.exists(dst)) { + destFileSys.delete(dst, true); + } + if (!destFileSys.rename(tmp, dst)) { + throw new IOException(); + } + } + catch(IOException cause) { + throw (IOException)new IOException("Fail to rename tmp file (=" + tmp + + ") to destination file (=" + dst + ")").initCause(cause); + } + } + + /* + * Preserves the "-pugpt" options of distcp + */ + private void updateDestStatus(FileStatus src, FileStatus dst) throws IOException { + String owner = null; + String group = null; + if (!src.getOwner().equals(dst.getOwner())) { + owner = src.getOwner(); + } + if (!src.getGroup().equals(dst.getGroup())) { + group = src.getGroup(); + } + if (owner != null || group != null) { + destFileSys.setOwner(dst.getPath(), owner, group); + } + + if (!src.getPermission().equals(dst.getPermission())) { + destFileSys.setPermission(dst.getPath(), src.getPermission()); + } + + try { + destFileSys.setTimes(dst.getPath(), src.getModificationTime(), src.getAccessTime()); + } catch (IOException exc) { + if (!dst.isDir()) { //hadoop 0.20 doesn't allow setTimes on dirs + throw exc; + } + } + + } + + static String bytesString(long b) { + return b + " bytes (" + StringUtils.humanReadableInt(b) + ")"; + } + + /** Mapper configuration. + * Extracts source and destination file system, as well as + * top-level paths on source and destination directories. + * Gets the named file systems, to be used later in map. + */ + public void configure(JobConf job) + { + familyNames = job.get(Snapshot.SNAPSHOT_SRC_CF, ""); + + destPath = new Path(job.get(Snapshot.SNAPSHOT_DST_PATH, "/")); + LOG.info("This is destPath:" + destPath); + srcPath = new Path(job.get(Snapshot.SNAPSHOT_SRC_PATH, "/")); + hbaseUser = job.get(Snapshot.SNAPSHOT_SRC_TABLE_USER, "hadoop"); + try { + destFileSys = destPath.getFileSystem(job); + srcFileSys = srcPath.getFileSystem(job); + } catch (IOException ex) { + throw new RuntimeException("Unable to get the named file system.", ex); + } + sizeBuf = job.getInt("copy.buf.size", 128 * 1024); + buffer = new byte[sizeBuf]; + ignoreReadFailures = false; + update = false; + overwrite = true; + skipCRCCheck = false; + replicationFactor = Short.parseShort(job.get(Snapshot.SNAPSHOT_REPLICATION_FACTOR, "")); + flushRegions = job.getBoolean(Snapshot.SNAPSHOT_FLUSH_REGIONS, false); + snapshot_start_time = job.getLong(Snapshot.SNAPSHOT_START_TIME, System.currentTimeMillis()); + this.job = job; + } + + /** + * Map method. Copies one file from source file system to destination. + * + * @param key + * src len + * @param value + * FilePair (FileStatus src, Path dst) + * @param out + * Log of failed copies + * @param reporter + */ + public void map(Text key, HRegionInfo hRegionInfo, + OutputCollector out, Reporter reporter) throws IOException { + + // input is of the form "region server HRegionInfo" + + String encodedRegionName = HRegionInfo.encodeRegionName(hRegionInfo.getRegionName()); + LOG.info("Backing up files from region : " + encodedRegionName); + + List files = SnapshotUtilities.getStoreFileList(hRegionInfo, + flushRegions, job, snapshot_start_time, familyNames).getSecond(); + + // Write out the .regioninfo file to the respective region. + Path regionInfoPath = new Path(new Path(destPath, encodedRegionName), + ".regioninfo"); + FSDataOutputStream regionInfoOut = destFileSys.create(regionInfoPath); + hRegionInfo.write(regionInfoOut); + regionInfoOut.close(); + + Path absDstDir = new Path(destPath, encodedRegionName); + + for(String file : files) { + Path filePath = new Path("/", file); + Path relativeDstDir = new Path(encodedRegionName, filePath.getParent().getName()); + final Path relativedst = new Path(relativeDstDir, filePath.getName()); + try { + final FileStatus srcstat = filePath.getFileSystem(job).getFileStatus(filePath); + // copy 1 file at a time + copy(srcstat, relativedst, out, reporter, 1); + } catch (FileNotFoundException e){ + if (!srcFileSys.exists(filePath)) { + LOG.info("Source file not found : " + filePath); + LOG.info("Looking in .Trash for : " + filePath); + Path srcPathInTrash = SnapshotUtilities.getPathInTrash(filePath, + hbaseUser, srcFileSys); + if (srcPathInTrash != null) { + LOG.info("File found in .Trash : " + srcPathInTrash); + FileStatus srcInTrashStat = srcFileSys.getFileStatus(srcPathInTrash); + try { + copy(srcInTrashStat, relativedst, out, reporter, 1); + } catch (Exception ex) { + throw new IOException(ex.getMessage()); + } + reporter.incrCounter(Counter.FILES_IN_TRASH, 1); + } else { + LOG.info("File NOT found in .Trash : " + filePath); + throw e; + } + } else { + throw e; + } + } catch (Exception e) { + ++failcount; + reporter.incrCounter(Counter.FILES_FAILED, 1); + reporter.incrCounter(Counter.REGIONS_RETRIED, 1); + updateStatus(reporter); + final String sfailure = "FAIL " + relativedst + " : " + + StringUtils.stringifyException(e); + out.collect(null, new Text(sfailure)); + LOG.info(sfailure); + if (destFileSys.exists(absDstDir)) { + destFileSys.delete(absDstDir, true); + } + throw new IOException(e.getMessage()); + } finally { + updateStatus(reporter); + } + } + reporter.incrCounter(Counter.REGIONS_DONE, 1); + } + + @Override + public void close() throws IOException { + } +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/backups/Snapshot.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/backups/Snapshot.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/backups/Snapshot.java (revision 0) @@ -0,0 +1,344 @@ +package org.apache.hadoop.hbase.backups; + +import java.io.IOException; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.mapred.TableMapReduceUtil; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSTableDescriptors; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; + +public class Snapshot { + public static final Log LOG = LogFactory.getLog(Snapshot.class); + + public final static String NAME = "hbase-snapshotter"; + public static final String SNAPSHOT_SRC_TABLE = NAME + ".table.src.name"; + public static final String SNAPSHOT_DST_TABLE = NAME + ".table.dst.name"; + public static final String SNAPSHOT_SRC_CF = NAME + ".table.cf"; + public static final String SNAPSHOT_SRC_PATH = NAME + ".table.src.path"; + public static final String SNAPSHOT_DST_PATH = NAME + ".table.dest.path"; + public static final String SNAPSHOT_INFO_FILE = NAME + ".input.info"; + public static final String SNAPSHOT_TMP_PATH = NAME + ".output.dir"; + public static final String SNAPSHOT_REPLICATION_FACTOR = NAME + + ".replication.factor"; + public static final String SNAPSHOT_FLUSH_REGIONS = NAME + ".flush.regions"; + public static final String SNAPSHOT_SRC_TABLE_USER = NAME + ".table.src.user"; + public static final String SNAPSHOT_START_TIME = NAME + ".start.time"; + public static final String SNAPSHOT_ALL_CF = "all.cf"; + public static final String SNAPSHOT_USE_FASTCOPY = "snapshot.use.fastcopy"; + + // zookeeper nodes of the HBase cluster to backup + static String zkNodeName = null; + // hdfs uri to backup to + static String backup_dfs = null; + // hdfs uri the HBase cluster is running on + static String hbase_dfs = null; + // table to be backed up + static String tableName = null; + static String family = null; + static String dstDir = null; + static boolean flushRegions = false; + static String dfs = null; + static Short replication = 2; + static Configuration conf = null; + static String hbaseUser = "hadoop"; + static String pool = null; + static long snapshot_start_time; + static boolean useFastCopy = true; + + /** + * Main function which does the backup. The algorithm is as follows: + * 1. Get a Map of regions to regionservers for the running cluster + * 2. Run an MR job to: + * 2.1 list files for each region + * 2.2 copy them to a temp location + * 3. Move the successful backups into the correct location + * 4. Clean up the temporary working directory + */ + public static void main(String[] args) throws Exception { + parseArgs(args); + + hbase_dfs = hbase_dfs.toLowerCase(); + // if the backup dfs is not specified, default to the HBase's HDFS + if (backup_dfs == null) { + backup_dfs = hbase_dfs; + } + LOG.info("Backing up to dfs: " + backup_dfs); + LOG.info("These are the cluster's zkNodes: " + zkNodeName); + + // create the HBase conf + conf = HBaseConfiguration.create(); + + // Get a Map of regions to regionservers for the running cluster + Map region_rs_map = listRegionsRS(); + snapshot_start_time = System.currentTimeMillis(); + + // create dfs paths for temporary output + int attemptId = 1; + String jobDirectory = "/" + NAME + "/" + tableName + "_" + family + "_" + + snapshot_start_time; + String inputFile = "_snapshot_input-" + attemptId; + String outputFile = "_snapshot_output-" + attemptId; + Path jobInput = new Path(jobDirectory, inputFile); + Path jobOutput = new Path(jobDirectory, outputFile); + + // write the map information into a temp file in HDFS + writeRegionserverRegionsMap(jobInput, region_rs_map); + LOG.info("Wrote list of regions to : " + jobInput.toUri().getPath()); + + // Start the MR job + Path tmpDstPath = new Path(backup_dfs + jobDirectory, "tmp"); + boolean result = runMRJob(conf, jobInput, jobOutput, snapshot_start_time, + tmpDstPath); + FileSystem fs = tmpDstPath.getFileSystem(conf); + + // we should not exit cleanly if MR fails. + if (!result) { + cleanupAndExit(jobDirectory, 1); + } + // if there is nothing to copy we are done + if (!fs.exists(tmpDstPath)) { + LOG.info("There is nothing to copy."); + cleanupAndExit(jobDirectory, 0); + } + + // copy table descriptor + Path srcTableDir = new Path(new Path(hbase_dfs, conf.get(HConstants.HBASE_DIR)), tableName); + LOG.info("Copying tableinfo from "+srcTableDir+" to "+tmpDstPath); + HTableDescriptor htd = FSTableDescriptors.getTableDescriptor(srcTableDir.getFileSystem(conf), srcTableDir); + FSTableDescriptors.createTableDescriptor(fs, tmpDstPath, htd); + + long snapshot_end_time = System.currentTimeMillis(); + // construct the name of the backup directory. It has the following format: + // --- + // Note that we need the start and end times for point in time recovery. + String snapshotName = tableName + "-" + family + "-" + snapshot_start_time + + "-" + snapshot_end_time; + // construct the Path for the final backup location + Path dstPath = new Path(backup_dfs + dstDir, snapshotName); + FileSystem fsd = dstPath.getFileSystem(conf); + if (!fsd.exists(dstPath.getParent())) { + fsd.mkdirs(dstPath.getParent()); + LOG.info("Created directory: " + dstPath.getParent()); + } + + // move the backed up data from the temp location to th final location + boolean x = fs.rename(tmpDstPath, dstPath); + if (!x) { + LOG.error("Rename failed, exiting: rename(" + tmpDstPath + ", " + dstPath + + ")"); + cleanupAndExit(jobDirectory, 1); + } + + cleanupAndExit(jobDirectory, 0); + } + + private static void cleanupAndExit(String dir, int exitCode) + throws IOException { + Path p = new Path(dir); + FileSystem fs = p.getFileSystem(conf); + fs.delete(p, true); + System.exit(exitCode); + } + + + /** + * Configure and run the MR job + * + * @param conf - configuration object for the HBase backup MR job + * @param jobInput + * @param jobOutput + * @param snapshotStartTime + * @param dstPath + * @return + * @throws IOException + */ + private static boolean runMRJob(Configuration conf, Path jobInput, + Path jobOutput, long snapshotStartTime, Path dstPath) throws IOException { + // get the table directory + String tableBaseDir = conf.get(HConstants.HBASE_DIR); + LOG.info("tableBaseDir : " + tableBaseDir); + LOG.info("tableName : " + tableName); + LOG.info("jobOutput : " + jobOutput.toUri().getPath()); + + Path tableSrcPath = new Path(tableBaseDir, tableName); + Path tableDstPath = new Path(dstPath, tableName); + + JobConf jobconf = new JobConf(conf, Snapshot.class); + jobconf.setJobName(NAME); + + // turn off speculative execution, because DFS doesn't handle + // multiple writers to the same file. + jobconf.setMapSpeculativeExecution(false); + // custom input format to deal with locality if needed + jobconf.setInputFormat(SnapshotInputFormat.class); + // output is text - regions and if the backup for them succeeded or failed + jobconf.setOutputKeyClass(Text.class); + jobconf.setOutputValueClass(Text.class); + // set the mapper + jobconf.setMapperClass(SnapshotMR.class); + // map only task, no reducers + jobconf.setNumReduceTasks(0); + + // TODO: set preserve status label + // job.set(PRESERVE_STATUS_LABEL, args.preservedAttributes); + + // set the copy destination location + jobconf.set(SNAPSHOT_SRC_TABLE, tableName); + jobconf.set(SNAPSHOT_SRC_CF, family); + jobconf.set(SNAPSHOT_INFO_FILE, jobInput.toUri().getPath()); + jobconf.set(SNAPSHOT_TMP_PATH, jobOutput.toUri().getPath() + "-tmp"); + jobconf.set(SNAPSHOT_SRC_PATH, tableSrcPath.toUri().getPath() + "/" + + family); + jobconf.set(SNAPSHOT_DST_PATH, tableDstPath.toUri().toString()); + jobconf.setBoolean(SNAPSHOT_FLUSH_REGIONS, flushRegions); + jobconf.setLong(SNAPSHOT_START_TIME, snapshotStartTime); + jobconf.set(SNAPSHOT_SRC_TABLE_USER, hbaseUser); + + jobconf.set("mapred.input.dir", jobInput.toUri().getPath()); + jobconf.set("mapred.output.dir", jobOutput.toUri().getPath()); + jobconf.set(SNAPSHOT_REPLICATION_FACTOR, replication.toString()); + jobconf.set("hbase.zookeeper.quorum", zkNodeName); + jobconf.setBoolean(SNAPSHOT_USE_FASTCOPY, useFastCopy); + TableMapReduceUtil.addDependencyJars(jobconf); + + if (pool != null) { + jobconf.set("mapred.fairscheduler.pool", pool); + } + + boolean success = true; + try { + JobClient.runJob(jobconf); + // TODO: whats this for? + // finalize(conf, jobconf); + } catch (IOException e) { + LOG.error("Running SnapshotMR job failed", e); + success = false; + } + + return success; + } + + /** + * Writes the map of region server names to list of regions to a HDFS file. + * Format is: + * + * The RS is currently hosting the HRegionInfo. + * + * @param rsToRegionsMap + * @throws IOException + */ + public static void writeRegionserverRegionsMap(Path jobInput, + Map region_rs_map) throws IOException { + // init the fs variables + SequenceFile.Writer src_writer = null; + FileSystem fs = FileSystem.get(conf); + + // write the data into HDFS + try { + src_writer = SequenceFile.createWriter(fs, conf, jobInput, Text.class, + HRegionInfo.class, SequenceFile.CompressionType.NONE); + // write the map into it + for (HRegionInfo hRegion : region_rs_map.keySet()) { + String rs = region_rs_map.get(hRegion).toString(); + LOG.info("Appending " + rs + ", " + hRegion.getEncodedName()); + src_writer.append(new Text(rs), new HRegionInfo(hRegion)); + } + src_writer.sync(); + } finally { + SnapshotUtilities.checkAndClose(src_writer); + } + } + + public static Map listRegionsRS() + throws Exception { + Configuration config = HBaseConfiguration.create(); + config.set("hbase.zookeeper.quorum", zkNodeName); + // Let us get to the table that we are looking for. + // Once we get it, we are going to + // - get a list of regions that are present in the table. + // - for each region, fetch and print the list of store files. + + // An alternative approach would have been to go to the master, + // get the list of servers serving data (across all tables) and then + // go to each of the servers, and get a list of regions that they store + // (across all tables) and filter it out for the table we need. + HTable table = new HTable(config, Bytes.toBytes(tableName)); + java.util.Map regions_servers_map = table + .getRegionsInfo(); + + return regions_servers_map; + } + + protected static void parseArgs(String[] args) { + try { + for (int i = 0; i < args.length; i++) { + if (args[i].equals("--hbase-dfs")) { + hbase_dfs = args[++i]; + } else if (args[i].equals("--zookeeper")) { + zkNodeName = args[++i]; + } else if (args[i].equals("--table")) { + tableName = args[++i]; + } else if (args[i].equals("--family")) { + family = args[++i]; + } else if (args[i].equals("--backup-dir")) { + dstDir = args[++i]; + } else if (args[i].equals("--flush-region-flag")) { + flushRegions = Boolean.parseBoolean(args[++i]); + } else if (args[i].equals("--backup-dfs")) { + backup_dfs = args[++i]; + } else if (args[i].equals("--replication")) { + replication = Short.parseShort(args[++i]); + } else if (args[i].equals("--user")) { + hbaseUser = args[++i]; + } else if (args[i].equals("--pool")) { + pool = args[++i]; + } else if (args[i].equals("--useFastCopy")) { + useFastCopy = Boolean.parseBoolean(args[++i]); + } else { + printUsageAndExit("Unrecognized option: " + args[i]); + } + } + if (family == null) + family = SNAPSHOT_ALL_CF; + if (family == null || tableName == null || hbase_dfs == null + || dstDir == null) { + printUsageAndExit("Too few options, or option not set!"); + } + } catch (ArrayIndexOutOfBoundsException e) { + printUsageAndExit("Too few options, or option not set!"); + } + } + + protected static void printUsageAndExit(String msg) { + System.out.println(msg); + System.out + .println("Usage: Subclass-of-HBaseLoader " + + "\n--hbase-dfs " + + "\n--zookeeper " + + "\n--table " + + "\n--backup-dir " + + "\n[--backup-dfs , defaults to hbase-dfs]" + + "\n[--family ]" + + "\n[--replication ]" + + "\n[--flush-region-flag ]" + + "\n[--pool ]" + + "\n[--useFastCopy , whether or not to use fast copy for copying, default is true]" + + "\n[--user ]"); + System.exit(1); + } +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/backups/ImportTable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/backups/ImportTable.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/backups/ImportTable.java (revision 0) @@ -0,0 +1,201 @@ +package org.apache.hadoop.hbase.backups; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSTableDescriptors; +import org.apache.hadoop.hbase.util.Writables; + +/** + * Imports an HBase table from a backup + */ +public class ImportTable { + public static final Log LOG = LogFactory.getLog(ImportTable.class); + + static Configuration conf = null; + static String tableName = null; + static String zkNodeName = null; + static String backupDir = null; + static String dfs = null; + static HBaseAdmin admin = null; + + public static void main(String[] args) throws Exception { + parseArgs(args); + + conf = HBaseConfiguration.create(); + conf.set("fs.default.name", dfs); + if (zkNodeName != null) { + conf.set("hbase.zookeeper.quorum", zkNodeName); + admin = new HBaseAdmin(conf); + } + importTable(); + + } + + public static void importTable() throws IOException { + LOG.info("In import for table: " + tableName); + + FileSystem fs = FileSystem.get(conf); + String hbaseDir = conf.get(HConstants.HBASE_DIR); + LOG.info("The hbase base dir: " + hbaseDir); + Path tableDir = new Path(hbaseDir, tableName); + + if (fs.exists(tableDir)) { + // Not sure what to do if table exists - maybe move it aside + LOG.error("Table already exists."); + System.exit(1); + } + + Path backupDirPath = new Path(dfs + backupDir); + if (!FSTableDescriptors.isTableInfoExists(fs, backupDirPath)) { + LOG.error("No .tableinfo '"+backupDir+"' is not a valid backup directory"); + System.exit(1); + } + if (tableName.compareTo(backupDirPath.getName()) != 0) { + LOG.info("Rename - Fix region names / .regioninfo files"); + backupDirPath = fixDir(backupDirPath, tableName); + } + + fs.rename(backupDirPath, tableDir); + LOG.info("Renamed dir: " + backupDirPath.toString() + " to dir: " + tableDir.toString()); + + LOG.info("Copying/fixing tableinfo from "+new Path(dfs + backupDir)+" to "+hbaseDir+" ("+tableName+")"); + HTableDescriptor htd = FSTableDescriptors.getTableDescriptor(fs, new Path(dfs + backupDir)); + htd.setName(Bytes.toBytes(tableName)); + FSTableDescriptors.createTableDescriptor(fs, new Path(hbaseDir), htd); + + addToMeta(tableDir); + } + + protected static void addToMeta(Path tableDir) throws IOException { + FileSystem fs = FileSystem.get(conf); + HTable metaTable = new HTable(conf, HConstants.META_TABLE_NAME); + FileStatus[] files = fs.listStatus(tableDir); + for (FileStatus file : files) { + LOG.debug("File: " + file.getPath().toString()); + if (! file.isDir()) continue; + if (file.getPath().getName().compareTo(HConstants.HREGION_COMPACTIONDIR_NAME) == 0) continue; + if (file.getPath().getName().compareTo(".tmp") == 0) continue; + Path regionInfoPath = new Path(file.getPath(), HRegion.REGIONINFO_FILE); + if (! fs.exists(regionInfoPath)) { + LOG.warn("Missing regioninfo file for: " + file.getPath().toString()); + System.exit(1); + } + FSDataInputStream regionInfoIn = fs.open(regionInfoPath); + HRegionInfo hRegionInfo = new HRegionInfo(); + hRegionInfo.readFields(regionInfoIn); + regionInfoIn.close(); + + Put put = new Put(hRegionInfo.getRegionName()); + put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER, Writables.getBytes(hRegionInfo)); + metaTable.put(put); + + if (admin != null) { + // anything more "bulky" one can do here? + LOG.info("Assigning: " + Bytes.toStringBinary(hRegionInfo.getRegionName())); + admin.assign(hRegionInfo.getRegionName()); + } + } + } + + protected static Path fixDir(Path oldTableDir, String newTableName) throws IOException { + FileSystem fs = FileSystem.get(conf); + Path newTableDir = new Path(oldTableDir.getParent(), tableName); + if (!fs.exists(newTableDir)) { + if (!fs.mkdirs(newTableDir)) { + LOG.error("Mkdir failed for: " + newTableDir); + System.exit(1); + } + } + + FileStatus[] oldFiles = fs.listStatus(oldTableDir); + for (FileStatus oldFile : oldFiles) { + LOG.debug("File: " + oldFile.getPath().toString()); + + if (! oldFile.isDir()) continue; + if (oldFile.getPath().getName().compareTo(HConstants.HREGION_COMPACTIONDIR_NAME) == 0) continue; + if (oldFile.getPath().getName().compareTo(".tmp") == 0) continue; + + Path oldRegionInfoPath = new Path(oldFile.getPath(), HRegion.REGIONINFO_FILE); + if (! fs.exists(oldRegionInfoPath)) { + LOG.info("Missing regioninfo file for: " + oldFile.getPath().toString()); + System.exit(1); + } + // read .regioninfo file + FSDataInputStream regionInfoIn = fs.open(oldRegionInfoPath); + HRegionInfo oldHRegionInfo = new HRegionInfo(); + oldHRegionInfo.readFields(regionInfoIn); + regionInfoIn.close(); + fs.delete(oldRegionInfoPath, false); + + // get new RegionName + HRegionInfo tableHRegionInfo = new HRegionInfo(Bytes.toBytes(tableName), oldHRegionInfo.getStartKey(), oldHRegionInfo.getEndKey()); + String tableEncodedRegionName = tableHRegionInfo.getEncodedName(); + Path tableFilePath = new Path(newTableDir, tableEncodedRegionName); + if (!fs.rename(oldFile.getPath(), tableFilePath)) { + LOG.error("Rename failed for : " + oldFile.getPath().toString()); + System.exit(1); + } + LOG.info("Renamed file: " + oldFile.getPath().toString() + " to file: " + tableFilePath.toString()); + + // write out new regioninfo file + Path tableRegionInfoPath = new Path(tableFilePath, HRegion.REGIONINFO_FILE); + FSDataOutputStream regionInfoOut = fs.create(tableRegionInfoPath); + tableHRegionInfo.write(regionInfoOut); + regionInfoOut.close(); + } + return newTableDir; + } + + protected static void parseArgs(String[] args) { + try { + for (int i = 0; i < args.length; i++) { + if (args[i].equals("--table")) { + tableName = args[++i]; + } else if (args[i].equals("--zookeeper")) { + zkNodeName = args[++i]; + } else if (args[i].equals("--backup-dir")) { + backupDir = args[++i]; + } else if (args[i].equals("--hdfs")) { + dfs = args[++i]; + } else { + printUsageAndExit("Unrecognized option: " + args[i]); + System.exit(1); + } + } + if (tableName == null || backupDir == null) { + printUsageAndExit("Too few options, or option not set!"); + } + } catch (ArrayIndexOutOfBoundsException e) { + printUsageAndExit("Too few options, or option not set!"); + } + } + + protected static void printUsageAndExit(String msg) { + System.out.println("Usage: Subclass-of-HBaseLoader" + + "Note that the backup files are *moved* into position under /hbase." + + "\nIf this is not desired the back directory should be copied first." + + "\n--table " + "\n--hdfs :9000>" + + "\n--backup-dir " + + "\n[--zookeeper ] " + + "(used to assign the regions in a life HBase cluster)"); + System.exit(1); + } + +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/backups/SnapshotInputFormat.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/backups/SnapshotInputFormat.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/backups/SnapshotInputFormat.java (revision 0) @@ -0,0 +1,179 @@ +package org.apache.hadoop.hbase.backups; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ipc.HRegionInterface; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileRecordReader; + +/** + * This class uses the Region to RS mapping information to compute the ideal + * mappers to place the backups for each region. The backup job is run on the + * node with the maximum locality. + */ +public class SnapshotInputFormat implements InputFormat { + public static final Log LOG = LogFactory.getLog(SnapshotInputFormat.class); + /** + * The format of the input is: + * + * The RS is currently hosting the HRegionInfo. + * Produce splits such that each is no greater than the quotient of the + * total size and the number of splits requested. Split locations are + * assigned by the node that has the maximum locality so that the HDFS + * copy is efficient. + * + * @param job The handle to the JobConf object + * @param numSplits Number of splits requested + */ + public InputSplit[] getSplits(JobConf job, int numSplits) + throws IOException { + // set up reading the input file + String srcfilelist = job.get(Snapshot.SNAPSHOT_INFO_FILE, ""); + if ("".equals(srcfilelist)) { + throw new RuntimeException("Invalid metadata: listuri(" + srcfilelist + ")"); + } + Path src = new Path(srcfilelist); + FileSystem fs = src.getFileSystem(job); + + ArrayList splits = new ArrayList(numSplits); + Text key = new Text(); + String cfs = job.get(Snapshot.SNAPSHOT_SRC_CF); + long pos = 0L; + long last = 0L; + + SequenceFile.Reader sl = null; + try { + LOG.info("Opening file to read : " + src + ", fs = " + fs.getName()); + sl = new SequenceFile.Reader(fs, src, job); + + // input is of the format "regionserver HRegionInfo", set splits accordingly + for (; sl.next(key); ) { + HRegionInfo value = new HRegionInfo(); + sl.getCurrentValue(value); + last = sl.getPosition(); + LOG.info("Read entry: " + key + ", region = " + value.getEncodedName()); + long splitsize = last - pos; + + // Get the list of files to backup for each region by making an API + // call into the RS + HRegionInfo hRegionInfo = value; + Pair > pair = + SnapshotUtilities.getStoreFileList(hRegionInfo, false, job, 0, cfs); + HRegionInterface server = pair.getFirst(); + List files = pair.getSecond(); + + // nothing to be done if there are no files + if (files.size() == 0) { + String host = server.getHServerInfo().getHostname(); + LOG.info("No files to copy for region: " + + hRegionInfo.getEncodedName() + " copying HRI on: " + host); + String[] hosts = { host }; + splits.add(new FileSplit(src, pos, splitsize, hosts)); + pos = last; + continue; + } + + // make sure the files exist + HashMap blockCountMap = new HashMap(); + int totalBlocks = 0; + for (String file : files) { + Path filePath = new Path("/", file); + FileStatus srcstat = null; + try { + srcstat = filePath.getFileSystem(job).getFileStatus(filePath); + } catch (FileNotFoundException e) { + if (!fs.exists(filePath)) { + LOG.info("Source file not found : " + filePath); + LOG.info("Looking in .Trash for : " + filePath); + String hbaseUser = job.get(Snapshot.SNAPSHOT_SRC_TABLE_USER, + "hadoop"); + Path srcPathInTrash = SnapshotUtilities.getPathInTrash(filePath, + hbaseUser, fs); + if (srcPathInTrash != null) { + LOG.info("File found in .Trash : " + srcPathInTrash); + srcstat = fs.getFileStatus(srcPathInTrash); + } else { + LOG.info("File NOT found in .Trash : " + filePath); + throw e; + } + } else { + throw e; + } + } + + // for every block in the region HFiles, get the block location and + // compute the preferred nodes + BlockLocation[] blockLocs = filePath.getFileSystem(job) + .getFileBlockLocations(srcstat, 0, srcstat.getLen()); + for (BlockLocation blk : blockLocs) { + totalBlocks++; + for (String host : blk.getHosts()) { + Integer count = blockCountMap.get(host); + if (count == null) { + count = new Integer(0); + } + blockCountMap.put(host, new Integer(count + 1)); + } + } + } + + // get the nodes with the max locality of blocks to schedule the mappers on + int largestBlkCount = 0; + String hostToRun = null; + for (String host : blockCountMap.keySet()) { + if (blockCountMap.get(host) > largestBlkCount) { + largestBlkCount = blockCountMap.get(host); + hostToRun = host; + } + } + LOG.info("Copying Region: " + hRegionInfo.getEncodedName() + + " deployed on " + key + " on " + hostToRun + "(" + + largestBlkCount + "/" + totalBlocks + ")"); + String[] hosts = { hostToRun }; + splits.add(new FileSplit(src, pos, splitsize, hosts)); + pos = last; + } + } + finally { + SnapshotUtilities.checkAndClose(sl); + } + + return splits.toArray(new FileSplit[splits.size()]); + } + + public List getFileList(HRegionInterface server, byte[] rName, String cfs) { + if (cfs.equals(Snapshot.SNAPSHOT_ALL_CF)) { + return(server.getStoreFileList(rName)); + } + byte[][] family = Bytes.toByteArrays(cfs.split(",")); + return(server.getStoreFileList(rName, family)); + } + + /** + * Returns a reader for this split of the src file list. + */ + public RecordReader getRecordReader(InputSplit split, + JobConf job, Reporter reporter) throws IOException { + return new SequenceFileRecordReader(job, (FileSplit)split); + } +} \ No newline at end of file