diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index 395ed6d..77ed6da 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -22,13 +22,12 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Set; -import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupCopyJob; import org.apache.hadoop.hbase.backup.BackupInfo; @@ -39,7 +38,11 @@ import org.apache.hadoop.hbase.backup.BackupRestoreFactory; import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.mapreduce.WALPlayer; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Tool; /** * Incremental backup implementation. @@ -68,92 +71,6 @@ public class IncrementalTableBackupClient extends TableBackupClient { return list; } - private List getMissingFiles(List incrBackupFileList) throws IOException { - FileSystem fs = FileSystem.get(conf); - List list = new ArrayList(); - for (String file : incrBackupFileList) { - if (!fs.exists(new Path(file))) { - list.add(file); - } - } - return list; - - } - - /** - * Do incremental copy. - * @param backupInfo backup info - */ - private void incrementalCopy(BackupInfo backupInfo) throws Exception { - - LOG.info("Incremental copy is starting."); - // set overall backup phase: incremental_copy - backupInfo.setPhase(BackupPhase.INCREMENTAL_COPY); - // get incremental backup file list and prepare parms for DistCp - List incrBackupFileList = backupInfo.getIncrBackupFileList(); - // filter missing files out (they have been copied by previous backups) - incrBackupFileList = filterMissingFiles(incrBackupFileList); - String[] strArr = incrBackupFileList.toArray(new String[incrBackupFileList.size() + 1]); - strArr[strArr.length - 1] = backupInfo.getHLogTargetDir(); - - BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf); - int counter = 0; - int MAX_ITERAIONS = 2; - while (counter++ < MAX_ITERAIONS) { - // We run DistCp maximum 2 times - // If it fails on a second time, we throw Exception - int res = - copyService.copy(backupInfo, backupManager, conf, BackupType.INCREMENTAL, strArr); - - if (res != 0) { - LOG.error("Copy incremental log files failed with return code: " + res + "."); - throw new IOException("Failed of Hadoop Distributed Copy from " - + StringUtils.join(incrBackupFileList, ",") + " to " - + backupInfo.getHLogTargetDir()); - } - List missingFiles = getMissingFiles(incrBackupFileList); - - if (missingFiles.isEmpty()) { - break; - } else { - // Repeat DistCp, some files have been moved from WALs to oldWALs during previous run - // update backupInfo and strAttr - if (counter == MAX_ITERAIONS) { - String msg = - "DistCp could not finish the following files: " + StringUtils.join(missingFiles, ","); - LOG.error(msg); - throw new IOException(msg); - } - List converted = convertFilesFromWALtoOldWAL(missingFiles); - incrBackupFileList.removeAll(missingFiles); - incrBackupFileList.addAll(converted); - backupInfo.setIncrBackupFileList(incrBackupFileList); - - // Run DistCp only for missing files (which have been moved from WALs to oldWALs - // during previous run) - strArr = converted.toArray(new String[converted.size() + 1]); - strArr[strArr.length - 1] = backupInfo.getHLogTargetDir(); - } - } - - LOG.info("Incremental copy from " + StringUtils.join(incrBackupFileList, ",") + " to " - + backupInfo.getHLogTargetDir() + " finished."); - } - - private List convertFilesFromWALtoOldWAL(List missingFiles) throws IOException { - List list = new ArrayList(); - for (String path : missingFiles) { - if (path.indexOf(Path.SEPARATOR + HConstants.HREGION_LOGDIR_NAME) < 0) { - LOG.error("Copy incremental log files failed, file is missing : " + path); - throw new IOException("Failed of Hadoop Distributed Copy to " - + backupInfo.getHLogTargetDir() + ", file is missing " + path); - } - list.add(path.replace(Path.SEPARATOR + HConstants.HREGION_LOGDIR_NAME, Path.SEPARATOR - + HConstants.HREGION_OLDLOGDIR_NAME)); - } - return list; - } - @Override public void execute() throws IOException { @@ -175,7 +92,9 @@ public class IncrementalTableBackupClient extends TableBackupClient { try { // copy out the table and region info files for each table BackupUtils.copyTableRegionInfo(conn, backupInfo, conf); - incrementalCopy(backupInfo); + // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT + convertWALsAndCopy(backupInfo); + incrementalCopyHFiles(backupInfo); // Save list of WAL files copied backupManager.recordWALFiles(backupInfo.getIncrBackupFileList()); } catch (Exception e) { @@ -213,4 +132,110 @@ public class IncrementalTableBackupClient extends TableBackupClient { } } + + private void incrementalCopyHFiles(BackupInfo backupInfo) throws Exception { + + LOG.info("Incremental copy HFiles is starting."); + // set overall backup phase: incremental_copy + backupInfo.setPhase(BackupPhase.INCREMENTAL_COPY); + // get incremental backup file list and prepare parms for DistCp + List incrBackupFileList = new ArrayList(); + // Add Bulk output + incrBackupFileList.add(getBulkOutputDir().toString()); + // filter missing files out (they have been copied by previous backups) + String[] strArr = incrBackupFileList.toArray(new String[incrBackupFileList.size() + 1]); + strArr[strArr.length - 1] = backupInfo.getBackupRootDir(); + + BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf); + + int res = copyService.copy(backupInfo, backupManager, conf, BackupType.INCREMENTAL, strArr); + + if (res != 0) { + LOG.error("Copy incremental HFile files failed with return code: " + res + "."); + throw new IOException("Failed of Hadoop Distributed Copy from " + + StringUtils.join(",", incrBackupFileList) + " to " + backupInfo.getHLogTargetDir()); + } + deleteBulkLoadDirectory(); + LOG.info("Incremental copy HFiles from " + StringUtils.join(",", incrBackupFileList) + " to " + + backupInfo.getBackupRootDir() + " finished."); + } + + private void deleteBulkLoadDirectory() throws IOException { + // delete original bulk load directory on method exit + Path path = getBulkOutputDir(); + FileSystem fs = FileSystem.get(conf); + boolean result = fs.delete(path, true); + if (!result) { + LOG.warn ("Could not delete " + path); + } + + } + private void convertWALsAndCopy(BackupInfo backupInfo) throws IOException { + // get incremental backup file list and prepare parms for DistCp + List incrBackupFileList = backupInfo.getIncrBackupFileList(); + // filter missing files out (they have been copied by previous backups) + incrBackupFileList = filterMissingFiles(incrBackupFileList); + // Get list of tables in incremental backup set + Set tableSet = backupManager.getIncrementalBackupTableSet(); + for(TableName table : tableSet) { + // Check if table exists + if(tableExists(table, conn)) { + convertWALToHFiles(incrBackupFileList, table); + } else { + LOG.warn("Table "+ table+" does not exists. Skipping in WAL converter"); + } + } + + } + + private boolean tableExists(TableName table, Connection conn) throws IOException { + try (Admin admin = conn.getAdmin();) { + return admin.tableExists(table); + } + } + + private void convertWALToHFiles(List dirPaths, TableName tableName) throws IOException { + + String bulkOutputConfKey; + Tool player = new WALPlayer(); + + bulkOutputConfKey = WALPlayer.BULK_OUTPUT_CONF_KEY; + + // Player reads all files in arbitrary directory structure and creates + // a Map task for each file. We use ';' as separator + // because WAL file names contains ',' + String dirs = StringUtils.join(";", dirPaths); + + Path bulkOutputPath = getBulkOutputDirForTable(tableName); + conf.set(bulkOutputConfKey, bulkOutputPath.toString()); + conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";"); + String[] playerArgs = { dirs, tableName.getNameAsString() }; + + try { + // TODO Player must tolerate missing files or exceptions during conversion + player.setConf(conf); + player.run(playerArgs); + // TODO Check missing files and repeat + conf.unset(WALPlayer.INPUT_FILES_SEPARATOR_KEY); + } catch (Exception e) { + throw new IOException("Can not convert from directory " + dirs + + " (check Hadoop and HBase logs) ", e); + } + } + + private Path getBulkOutputDirForTable(TableName table) { + Path tablePath = getBulkOutputDir(); + tablePath = new Path(tablePath, table.getNamespaceAsString()); + tablePath = new Path(tablePath, table.getQualifierAsString()); + return new Path(tablePath, "data"); + } + + private Path getBulkOutputDir() { + String backupId = backupInfo.getBackupId(); + Path path = new Path(backupInfo.getBackupRootDir()); + path = new Path(path, ".tmp"); + path = new Path(path, backupId); + return path; + } + } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java index f418305..e174694 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java @@ -159,8 +159,9 @@ public class RestoreTablesClient { // full backup path comes first for (int i = 1; i < images.length; i++) { BackupImage im = images[i]; - String logBackupDir = HBackupFileSystem.getLogBackupDir(im.getRootDir(), im.getBackupId()); - dirList.add(new Path(logBackupDir)); + String fileBackupDir = HBackupFileSystem.getTableBackupDir(im.getRootDir(), + im.getBackupId(), sTable)+ Path.SEPARATOR+"data"; + dirList.add(new Path(fileBackupDir)); } String dirs = StringUtils.join(dirList, ","); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java index 5641720..77730b9 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java @@ -107,11 +107,12 @@ public class HFileSplitterJob extends Configured implements Tool { String inputDirs = args[0]; String tabName = args[1]; conf.setStrings(TABLES_KEY, tabName); + conf.set(FileInputFormat.INPUT_DIR, inputDirs); Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime())); job.setJarByClass(HFileSplitterJob.class); - FileInputFormat.addInputPaths(job, inputDirs); + //FileInputFormat.addInputPaths(job, inputDirs); job.setInputFormatClass(HFileInputFormat.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java index ffb61ec..abe9341 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java @@ -62,13 +62,8 @@ public class MapReduceRestoreJob implements RestoreJob { String bulkOutputConfKey; - if (fullBackupRestore) { - player = new HFileSplitterJob(); - bulkOutputConfKey = HFileSplitterJob.BULK_OUTPUT_CONF_KEY; - } else { - player = new WALPlayer(); - bulkOutputConfKey = WALPlayer.BULK_OUTPUT_CONF_KEY; - } + player = new HFileSplitterJob(); + bulkOutputConfKey = HFileSplitterJob.BULK_OUTPUT_CONF_KEY; // Player reads all files in arbitrary directory structure and creates // a Map task for each file String dirs = StringUtils.join(dirPaths, ","); @@ -88,7 +83,10 @@ public class MapReduceRestoreJob implements RestoreJob { Path bulkOutputPath = getBulkOutputDir(getFileNameCompatibleString(newTableNames[i])); Configuration conf = getConf(); conf.set(bulkOutputConfKey, bulkOutputPath.toString()); - String[] playerArgs = { dirs, tableNames[i].getNameAsString() }; + String[] playerArgs = + { dirs, + fullBackupRestore? newTableNames[i].getNameAsString():tableNames[i].getNameAsString() + }; int result = 0; int loaderResult = 0; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java index a130c21..302200b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java @@ -46,8 +46,6 @@ import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; -import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; -import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; @@ -281,47 +279,6 @@ public class RestoreTool { return tableDescriptor; } - /** - * Duplicate the backup image if it's on local cluster - * @see HStore#bulkLoadHFile(String, long) - * @see HRegionFileSystem#bulkLoadStoreFile(String familyName, Path srcPath, long seqNum) - * @param tableArchivePath archive path - * @return the new tableArchivePath - * @throws IOException exception - */ - Path checkLocalAndBackup(Path tableArchivePath) throws IOException { - // Move the file if it's on local cluster - boolean isCopyNeeded = false; - - FileSystem srcFs = tableArchivePath.getFileSystem(conf); - FileSystem desFs = FileSystem.get(conf); - if (tableArchivePath.getName().startsWith("/")) { - isCopyNeeded = true; - } else { - // This should match what is done in @see HRegionFileSystem#bulkLoadStoreFile(String, Path, - // long) - if (srcFs.getUri().equals(desFs.getUri())) { - LOG.debug("cluster hold the backup image: " + srcFs.getUri() + "; local cluster node: " - + desFs.getUri()); - isCopyNeeded = true; - } - } - if (isCopyNeeded) { - LOG.debug("File " + tableArchivePath + " on local cluster, back it up before restore"); - if (desFs.exists(restoreTmpPath)) { - try { - desFs.delete(restoreTmpPath, true); - } catch (IOException e) { - LOG.debug("Failed to delete path: " + restoreTmpPath - + ", need to check whether restore target DFS cluster is healthy"); - } - } - FileUtil.copy(srcFs, tableArchivePath, desFs, restoreTmpPath, false, conf); - LOG.debug("Copied to temporary path on local cluster: " + restoreTmpPath); - tableArchivePath = restoreTmpPath; - } - return tableArchivePath; - } private HTableDescriptor getTableDescriptor(FileSystem fileSys, TableName tableName, String lastIncrBackupId) throws IOException { @@ -403,33 +360,13 @@ public class RestoreTool { // the regions in fine grain checkAndCreateTable(conn, tableBackupPath, tableName, newTableName, regionPathList, tableDescriptor, truncateIfExists); - if (tableArchivePath != null) { - // start real restore through bulkload - // if the backup target is on local cluster, special action needed - Path tempTableArchivePath = checkLocalAndBackup(tableArchivePath); - if (tempTableArchivePath.equals(tableArchivePath)) { - if (LOG.isDebugEnabled()) { - LOG.debug("TableArchivePath for bulkload using existPath: " + tableArchivePath); - } - } else { - regionPathList = getRegionList(tempTableArchivePath); // point to the tempDir - if (LOG.isDebugEnabled()) { - LOG.debug("TableArchivePath for bulkload using tempPath: " + tempTableArchivePath); - } - } + RestoreJob restoreService = BackupRestoreFactory.getRestoreJob(conf); + Path[] paths = new Path[regionPathList.size()]; + regionPathList.toArray(paths); + restoreService.run(paths, new TableName[]{tableName}, new TableName[] {newTableName}, true); - LoadIncrementalHFiles loader = createLoader(tempTableArchivePath, false); - for (Path regionPath : regionPathList) { - String regionName = regionPath.toString(); - if (LOG.isDebugEnabled()) { - LOG.debug("Restoring HFiles from directory " + regionName); - } - String[] args = { regionName, newTableName.getNameAsString() }; - loader.run(args); - } - } - // we do not recovered edits } catch (Exception e) { + LOG.error(e); throw new IllegalStateException("Cannot restore hbase table", e); } } @@ -453,28 +390,6 @@ public class RestoreTool { } /** - * Create a {@link LoadIncrementalHFiles} instance to be used to restore the HFiles of a full - * backup. - * @return the {@link LoadIncrementalHFiles} instance - * @throws IOException exception - */ - private LoadIncrementalHFiles createLoader(Path tableArchivePath, boolean multipleTables) - throws IOException { - - // By default, it is 32 and loader will fail if # of files in any region exceed this - // limit. Bad for snapshot restore. - this.conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE); - this.conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes"); - LoadIncrementalHFiles loader = null; - try { - loader = new LoadIncrementalHFiles(this.conf); - } catch (Exception e1) { - throw new IOException(e1); - } - return loader; - } - - /** * Calculate region boundaries and add all the column families to the table descriptor * @param regionDirList region dir list * @return a set of keys to store the boundaries diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java index 0ca78b4..2c20987 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java @@ -46,6 +46,7 @@ import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.StringUtils; /** @@ -253,8 +254,9 @@ public class WALInputFormat extends InputFormat { } private Path[] getInputPaths(Configuration conf) { - String inpDirs = conf.get("mapreduce.input.fileinputformat.inputdir"); - return StringUtils.stringToPath(inpDirs.split(",")); + String inpDirs = conf.get(FileInputFormat.INPUT_DIR); + return StringUtils.stringToPath( + inpDirs.split(conf.get(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ","))); } private List getFiles(FileSystem fs, Path dir, long startTime, long endTime) diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java index d16dcf5..0d728cb 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -73,6 +73,8 @@ public class WALPlayer extends Configured implements Tool { public final static String BULK_OUTPUT_CONF_KEY = "wal.bulk.output"; public final static String TABLES_KEY = "wal.input.tables"; public final static String TABLE_MAP_KEY = "wal.input.tablesmap"; + public final static String INPUT_FILES_SEPARATOR_KEY = "wal.input.separator"; + // This relies on Hadoop Configuration to handle warning about deprecated configs and // to set the correct non-deprecated configs when an old one shows up. @@ -280,11 +282,10 @@ public class WALPlayer extends Configured implements Tool { } conf.setStrings(TABLES_KEY, tables); conf.setStrings(TABLE_MAP_KEY, tableMap); + conf.set(FileInputFormat.INPUT_DIR, inputDirs); Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + System.currentTimeMillis())); job.setJarByClass(WALPlayer.class); - FileInputFormat.addInputPaths(job, inputDirs); - job.setInputFormatClass(WALInputFormat.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class);