diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index cc588f7..165c5a7 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -22,13 +22,12 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Set; -import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupCopyJob; import org.apache.hadoop.hbase.backup.BackupInfo; @@ -40,7 +39,11 @@ import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.util.BackupClientUtil; import org.apache.hadoop.hbase.backup.util.BackupServerUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.mapreduce.WALPlayer; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Tool; /** * Incremental backup implementation. @@ -68,92 +71,6 @@ public class IncrementalTableBackupClient extends TableBackupClient { return list; } - private List getMissingFiles(List incrBackupFileList) throws IOException { - FileSystem fs = FileSystem.get(conf); - List list = new ArrayList(); - for (String file : incrBackupFileList) { - if (!fs.exists(new Path(file))) { - list.add(file); - } - } - return list; - - } - - /** - * Do incremental copy. - * @param backupInfo backup info - */ - private void incrementalCopy(BackupInfo backupInfo) throws Exception { - - LOG.info("Incremental copy is starting."); - // set overall backup phase: incremental_copy - backupInfo.setPhase(BackupPhase.INCREMENTAL_COPY); - // get incremental backup file list and prepare parms for DistCp - List incrBackupFileList = backupInfo.getIncrBackupFileList(); - // filter missing files out (they have been copied by previous backups) - incrBackupFileList = filterMissingFiles(incrBackupFileList); - String[] strArr = incrBackupFileList.toArray(new String[incrBackupFileList.size() + 1]); - strArr[strArr.length - 1] = backupInfo.getHLogTargetDir(); - - BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf); - int counter = 0; - int MAX_ITERAIONS = 2; - while (counter++ < MAX_ITERAIONS) { - // We run DistCp maximum 2 times - // If it fails on a second time, we throw Exception - int res = - copyService.copy(backupInfo, backupManager, conf, BackupType.INCREMENTAL, strArr); - - if (res != 0) { - LOG.error("Copy incremental log files failed with return code: " + res + "."); - throw new IOException("Failed of Hadoop Distributed Copy from " - + StringUtils.join(incrBackupFileList, ",") + " to " - + backupInfo.getHLogTargetDir()); - } - List missingFiles = getMissingFiles(incrBackupFileList); - - if (missingFiles.isEmpty()) { - break; - } else { - // Repeat DistCp, some files have been moved from WALs to oldWALs during previous run - // update backupInfo and strAttr - if (counter == MAX_ITERAIONS) { - String msg = - "DistCp could not finish the following files: " + StringUtils.join(missingFiles, ","); - LOG.error(msg); - throw new IOException(msg); - } - List converted = convertFilesFromWALtoOldWAL(missingFiles); - incrBackupFileList.removeAll(missingFiles); - incrBackupFileList.addAll(converted); - backupInfo.setIncrBackupFileList(incrBackupFileList); - - // Run DistCp only for missing files (which have been moved from WALs to oldWALs - // during previous run) - strArr = converted.toArray(new String[converted.size() + 1]); - strArr[strArr.length - 1] = backupInfo.getHLogTargetDir(); - } - } - - LOG.info("Incremental copy from " + StringUtils.join(incrBackupFileList, ",") + " to " - + backupInfo.getHLogTargetDir() + " finished."); - } - - private List convertFilesFromWALtoOldWAL(List missingFiles) throws IOException { - List list = new ArrayList(); - for (String path : missingFiles) { - if (path.indexOf(Path.SEPARATOR + HConstants.HREGION_LOGDIR_NAME) < 0) { - LOG.error("Copy incremental log files failed, file is missing : " + path); - throw new IOException("Failed of Hadoop Distributed Copy to " - + backupInfo.getHLogTargetDir() + ", file is missing " + path); - } - list.add(path.replace(Path.SEPARATOR + HConstants.HREGION_LOGDIR_NAME, Path.SEPARATOR - + HConstants.HREGION_OLDLOGDIR_NAME)); - } - return list; - } - @Override public void execute() throws IOException { @@ -175,8 +92,9 @@ public class IncrementalTableBackupClient extends TableBackupClient { try { // copy out the table and region info files for each table BackupServerUtil.copyTableRegionInfo(conn, backupInfo, conf); - incrementalCopy(backupInfo); - // Save list of WAL files copied + // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT + convertWALsAndCopy(backupInfo); + incrementalCopyHFiles(backupInfo); // Save list of WAL files copied backupManager.recordWALFiles(backupInfo.getIncrBackupFileList()); } catch (Exception e) { String msg = "Unexpected exception in incremental-backup: incremental copy " + backupId; @@ -213,4 +131,110 @@ public class IncrementalTableBackupClient extends TableBackupClient { } } + + private void incrementalCopyHFiles(BackupInfo backupInfo) throws Exception { + + LOG.info("Incremental copy HFiles is starting."); + // set overall backup phase: incremental_copy + backupInfo.setPhase(BackupPhase.INCREMENTAL_COPY); + // get incremental backup file list and prepare parms for DistCp + List incrBackupFileList = new ArrayList(); + // Add Bulk output + incrBackupFileList.add(getBulkOutputDir().toString()); + // filter missing files out (they have been copied by previous backups) + String[] strArr = incrBackupFileList.toArray(new String[incrBackupFileList.size() + 1]); + strArr[strArr.length - 1] = backupInfo.getBackupRootDir(); + + BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf); + + int res = copyService.copy(backupInfo, backupManager, conf, BackupType.INCREMENTAL, strArr); + + if (res != 0) { + LOG.error("Copy incremental HFile files failed with return code: " + res + "."); + throw new IOException("Failed of Hadoop Distributed Copy from " + + StringUtils.join(",", incrBackupFileList) + " to " + backupInfo.getHLogTargetDir()); + } + deleteBulkLoadDirectory(); + LOG.info("Incremental copy HFiles from " + StringUtils.join(",", incrBackupFileList) + " to " + + backupInfo.getBackupRootDir() + " finished."); + } + + private void deleteBulkLoadDirectory() throws IOException { + // delete original bulk load directory on method exit + Path path = getBulkOutputDir(); + FileSystem fs = FileSystem.get(conf); + boolean result = fs.delete(path, true); + if (!result) { + LOG.warn ("Could not delete " + path); + } + + } + private void convertWALsAndCopy(BackupInfo backupInfo) throws IOException { + // get incremental backup file list and prepare parms for DistCp + List incrBackupFileList = backupInfo.getIncrBackupFileList(); + // filter missing files out (they have been copied by previous backups) + incrBackupFileList = filterMissingFiles(incrBackupFileList); + // Get list of tables in incremental backup set + Set tableSet = backupManager.getIncrementalBackupTableSet(); + for(TableName table : tableSet) { + // Check if table exists + if(tableExists(table, conn)) { + convertWALToHFiles(incrBackupFileList, table); + } else { + LOG.warn("Table "+ table+" does not exists. Skipping in WAL converter"); + } + } + + } + + private boolean tableExists(TableName table, Connection conn) throws IOException { + try (Admin admin = conn.getAdmin();) { + return admin.tableExists(table); + } + } + + private void convertWALToHFiles(List dirPaths, TableName tableName) throws IOException { + + String bulkOutputConfKey; + Tool player = new WALPlayer(); + + bulkOutputConfKey = WALPlayer.BULK_OUTPUT_CONF_KEY; + + // Player reads all files in arbitrary directory structure and creates + // a Map task for each file. We use ';' as separator + // because WAL file names contains ',' + String dirs = StringUtils.join(";", dirPaths); + + Path bulkOutputPath = getBulkOutputDirForTable(tableName); + conf.set(bulkOutputConfKey, bulkOutputPath.toString()); + conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";"); + String[] playerArgs = { dirs, tableName.getNameAsString() }; + + try { + // TODO Player must tolerate missing files or exceptions during conversion + player.setConf(conf); + player.run(playerArgs); + // TODO Check missing files and repeat + conf.unset(WALPlayer.INPUT_FILES_SEPARATOR_KEY); + } catch (Exception e) { + throw new IOException("Can not convert from directory " + dirs + + " (check Hadoop and HBase logs) ", e); + } + } + + private Path getBulkOutputDirForTable(TableName table) { + Path tablePath = getBulkOutputDir(); + tablePath = new Path(tablePath, table.getNamespaceAsString()); + tablePath = new Path(tablePath, table.getQualifierAsString()); + return new Path(tablePath, "data"); + } + + private Path getBulkOutputDir() { + String backupId = backupInfo.getBackupId(); + Path path = new Path(backupInfo.getBackupRootDir()); + path = new Path(path, ".tmp"); + path = new Path(path, backupId); + return path; + } + } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java index a66cd17..4ce7a96 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java @@ -159,8 +159,9 @@ public class RestoreTablesClient { // full backup path comes first for (int i = 1; i < images.length; i++) { BackupImage im = images[i]; - String logBackupDir = HBackupFileSystem.getLogBackupDir(im.getRootDir(), im.getBackupId()); - dirList.add(new Path(logBackupDir)); + String fileBackupDir = HBackupFileSystem.getTableBackupDir(im.getRootDir(), + im.getBackupId(), sTable)+ Path.SEPARATOR+"data"; + dirList.add(new Path(fileBackupDir)); } String dirs = StringUtils.join(dirList, ","); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitter.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitter.java index 79b9f5e..6d20aba 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitter.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitter.java @@ -107,11 +107,13 @@ public class HFileSplitter extends Configured implements Tool { String inputDirs = args[0]; String tabName = args[1]; conf.setStrings(TABLES_KEY, tabName); + conf.set(FileInputFormat.INPUT_DIR, inputDirs); + LOG.debug("DDEBUG "+ inputDirs); Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime())); job.setJarByClass(HFileSplitter.class); - FileInputFormat.addInputPaths(job, inputDirs); + //FileInputFormat.addInputPaths(job, inputDirs); job.setInputFormatClass(HFileInputFormat.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java index 0b89252..9760855 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java @@ -62,13 +62,8 @@ public class MapReduceRestoreJob implements RestoreJob { String bulkOutputConfKey; - if (fullBackupRestore) { - player = new HFileSplitter(); - bulkOutputConfKey = HFileSplitter.BULK_OUTPUT_CONF_KEY; - } else { - player = new WALPlayer(); - bulkOutputConfKey = WALPlayer.BULK_OUTPUT_CONF_KEY; - } + player = new HFileSplitter(); + bulkOutputConfKey = HFileSplitter.BULK_OUTPUT_CONF_KEY; // Player reads all files in arbitrary directory structure and creates // a Map task for each file String dirs = StringUtils.join(dirPaths, ","); @@ -89,7 +84,7 @@ public class MapReduceRestoreJob implements RestoreJob { conf.set(bulkOutputConfKey, bulkOutputPath.toString()); String[] playerArgs = { dirs, - fullBackupRestore?newTableNames[i].getNameAsString():tableNames[i].getNameAsString() + fullBackupRestore? newTableNames[i].getNameAsString():tableNames[i].getNameAsString() }; int result = 0; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java index da950f4..9d26eac 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java @@ -46,6 +46,7 @@ import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.StringUtils; /** @@ -255,8 +256,9 @@ public class WALInputFormat extends InputFormat { } private Path[] getInputPaths(Configuration conf) { - String inpDirs = conf.get("mapreduce.input.fileinputformat.inputdir"); - return StringUtils.stringToPath(inpDirs.split(",")); + String inpDirs = conf.get(FileInputFormat.INPUT_DIR); + return StringUtils.stringToPath( + inpDirs.split(conf.get(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ","))); } private List getFiles(FileSystem fs, Path dir, long startTime, long endTime) diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java index b821cb4..37eca44 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -73,6 +73,8 @@ public class WALPlayer extends Configured implements Tool { public final static String BULK_OUTPUT_CONF_KEY = "wal.bulk.output"; public final static String TABLES_KEY = "wal.input.tables"; public final static String TABLE_MAP_KEY = "wal.input.tablesmap"; + public final static String INPUT_FILES_SEPARATOR_KEY = "wal.input.separator"; + // This relies on Hadoop Configuration to handle warning about deprecated configs and // to set the correct non-deprecated configs when an old one shows up. @@ -280,11 +282,10 @@ public class WALPlayer extends Configured implements Tool { } conf.setStrings(TABLES_KEY, tables); conf.setStrings(TABLE_MAP_KEY, tableMap); + conf.set(FileInputFormat.INPUT_DIR, inputDirs); Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + System.currentTimeMillis())); job.setJarByClass(WALPlayer.class); - FileInputFormat.addInputPaths(job, inputDirs); - job.setInputFormatClass(WALInputFormat.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class);