diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java index 6f642a443b..1e71626798 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java @@ -38,8 +38,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; public interface BackupAdmin extends Closeable { /** - * Backup given list of tables fully. This is a synchronous operation. - * It returns backup id on success or throw exception on failure. + * Backup given list of tables fully. This is a synchronous operation. It returns backup id on + * success or throw exception on failure. * @param userRequest BackupRequest instance * @return the backup Id */ @@ -61,16 +61,22 @@ public interface BackupAdmin extends Closeable { */ BackupInfo getBackupInfo(String backupId) throws IOException; - /** * Delete backup image command - * @param backupIds backup id list + * @param backupIds array of backup ids * @return total number of deleted sessions * @throws IOException exception */ int deleteBackups(String[] backupIds) throws IOException; /** + * Merge backup images command + * @param backupIds array of backup ids + * @throws IOException exception + */ + void mergeBackups(String[] backupIds) throws IOException; + + /** * Show backup history command * @param n last n backup sessions * @return list of backup info objects @@ -113,7 +119,7 @@ public interface BackupAdmin extends Closeable { /** * Add tables to backup set command * @param name name of backup set. - * @param tables list of tables to be added to this set. + * @param tables array of tables to be added to this set. * @throws IOException exception */ void addToBackupSet(String name, TableName[] tables) throws IOException; @@ -121,7 +127,7 @@ public interface BackupAdmin extends Closeable { /** * Remove tables from backup set * @param name name of backup set. - * @param tables list of tables to be removed from this set. + * @param tables array of tables to be removed from this set. * @throws IOException exception */ void removeFromBackupSet(String name, TableName[] tables) throws IOException; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupClientFactory.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupClientFactory.java index 21d73ccde4..e9bbfc70ad 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupClientFactory.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupClientFactory.java @@ -27,9 +27,8 @@ import org.apache.hadoop.hbase.client.Connection; public class BackupClientFactory { - public static TableBackupClient create (Connection conn, String backupId, BackupRequest request) - throws IOException - { + public static TableBackupClient create(Connection conn, String backupId, BackupRequest request) + throws IOException { Configuration conf = conn.getConfiguration(); try { String clsName = conf.get(TableBackupClient.BACKUP_CLIENT_IMPL_CLASS); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java index e2cdb2f4cb..9dd85317e2 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java @@ -111,6 +111,8 @@ public class BackupDriver extends AbstractHBaseTool { type = BackupCommand.SET; } else if (BackupCommand.REPAIR.name().equalsIgnoreCase(cmd)) { type = BackupCommand.REPAIR; + } else if (BackupCommand.MERGE.name().equalsIgnoreCase(cmd)) { + type = BackupCommand.MERGE; } else { System.out.println("Unsupported command for backup: " + cmd); printToolUsage(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java index 6d8967a06e..d72c88432f 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.backup; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyJob; +import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupMergeJob; import org.apache.hadoop.hbase.backup.mapreduce.MapReduceRestoreJob; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.util.ReflectionUtils; @@ -32,6 +33,7 @@ public final class BackupRestoreFactory { public final static String HBASE_INCR_RESTORE_IMPL_CLASS = "hbase.incremental.restore.class"; public final static String HBASE_BACKUP_COPY_IMPL_CLASS = "hbase.backup.copy.class"; + public final static String HBASE_BACKUP_MERGE_IMPL_CLASS = "hbase.backup.merge.class"; private BackupRestoreFactory() { throw new AssertionError("Instantiating utility class..."); @@ -40,7 +42,7 @@ public final class BackupRestoreFactory { /** * Gets backup restore job * @param conf configuration - * @return backup restore task instance + * @return backup restore job instance */ public static RestoreJob getRestoreJob(Configuration conf) { Class cls = @@ -53,7 +55,7 @@ public final class BackupRestoreFactory { /** * Gets backup copy job * @param conf configuration - * @return backup copy task + * @return backup copy job instance */ public static BackupCopyJob getBackupCopyJob(Configuration conf) { Class cls = @@ -63,4 +65,18 @@ public final class BackupRestoreFactory { service.setConf(conf); return service; } + + /** + * Gets backup merge job + * @param conf configuration + * @return backup merge job instance + */ + public static BackupMergeJob getBackupMergeJob(Configuration conf) { + Class cls = + conf.getClass(HBASE_BACKUP_MERGE_IMPL_CLASS, MapReduceBackupMergeJob.class, + BackupMergeJob.class); + BackupMergeJob service = ReflectionUtils.newInstance(cls, conf); + service.setConf(conf); + return service; + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java index 46044db61e..1c43e8884f 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java @@ -49,8 +49,8 @@ public class HBackupFileSystem { /** * Given the backup root dir, backup id and the table name, return the backup image location, * which is also where the backup manifest file is. return value look like: - * "hdfs://backup.hbase.org:9000/user/biadmin/backup/backup_1396650096738/default/t1_dn/", - * where "hdfs://backup.hbase.org:9000/user/biadmin/backup" is a backup root directory + * "hdfs://backup.hbase.org:9000/user/biadmin/backup/backup_1396650096738/default/t1_dn/", where + * "hdfs://backup.hbase.org:9000/user/biadmin/backup" is a backup root directory * @param backupRootDir backup root directory * @param backupId backup id * @param tableName table name @@ -63,18 +63,26 @@ public class HBackupFileSystem { + Path.SEPARATOR; } + public static String getTableBackupDataDir(String backupRootDir, String backupId, + TableName tableName) { + return getTableBackupDir(backupRootDir, backupId, tableName) + Path.SEPARATOR + "data"; + } + + public static Path getBackupPath(String backupRootDir, String backupId) { + return new Path(backupRootDir + Path.SEPARATOR + backupId); + } + /** * Given the backup root dir, backup id and the table name, return the backup image location, * which is also where the backup manifest file is. return value look like: - * "hdfs://backup.hbase.org:9000/user/biadmin/backup/backup_1396650096738/default/t1_dn/", - * where "hdfs://backup.hbase.org:9000/user/biadmin/backup" is a backup root directory + * "hdfs://backup.hbase.org:9000/user/biadmin/backup/backup_1396650096738/default/t1_dn/", where + * "hdfs://backup.hbase.org:9000/user/biadmin/backup" is a backup root directory * @param backupRootPath backup root path * @param tableName table name * @param backupId backup Id * @return backupPath for the particular table */ - public static Path getTableBackupPath(TableName tableName, - Path backupRootPath, String backupId) { + public static Path getTableBackupPath(TableName tableName, Path backupRootPath, String backupId) { return new Path(getTableBackupDir(backupRootPath.toString(), backupId, tableName)); } @@ -94,33 +102,30 @@ public class HBackupFileSystem { return new Path(getLogBackupDir(backupRootDir, backupId)); } - private static Path getManifestPath(TableName tableName, Configuration conf, Path backupRootPath, - String backupId) throws IOException { - Path manifestPath = - new Path(getTableBackupPath(tableName, backupRootPath, backupId), - BackupManifest.MANIFEST_FILE_NAME); + // TODO we do not keep WAL files anymore + // Move manifest file to other place + private static Path getManifestPath(Configuration conf, Path backupRootPath, String backupId) + throws IOException { + Path manifestPath = null; FileSystem fs = backupRootPath.getFileSystem(conf); + manifestPath = + new Path(getBackupPath(backupRootPath.toString(), backupId) + Path.SEPARATOR + + BackupManifest.MANIFEST_FILE_NAME); if (!fs.exists(manifestPath)) { - // check log dir for incremental backup case - manifestPath = - new Path(getLogBackupDir(backupRootPath.toString(), backupId) + Path.SEPARATOR - + BackupManifest.MANIFEST_FILE_NAME); - if (!fs.exists(manifestPath)) { - String errorMsg = - "Could not find backup manifest " + BackupManifest.MANIFEST_FILE_NAME + " for " - + backupId + ". File " + manifestPath + " does not exists. Did " + backupId - + " correspond to previously taken backup ?"; - throw new IOException(errorMsg); - } + String errorMsg = + "Could not find backup manifest " + BackupManifest.MANIFEST_FILE_NAME + " for " + + backupId + ". File " + manifestPath + " does not exists. Did " + backupId + + " correspond to previously taken backup ?"; + throw new IOException(errorMsg); } return manifestPath; } - public static BackupManifest getManifest(TableName tableName, Configuration conf, - Path backupRootPath, String backupId) throws IOException { + public static BackupManifest + getManifest(Configuration conf, Path backupRootPath, String backupId) throws IOException { BackupManifest manifest = - new BackupManifest(conf, getManifestPath(tableName, conf, backupRootPath, backupId)); + new BackupManifest(conf, getManifestPath(conf, backupRootPath, backupId)); return manifest; } @@ -134,7 +139,7 @@ public class HBackupFileSystem { TableName[] tableArray, Configuration conf, Path backupRootPath, String backupId) throws IOException { for (TableName tableName : tableArray) { - BackupManifest manifest = getManifest(tableName, conf, backupRootPath, backupId); + BackupManifest manifest = getManifest(conf, backupRootPath, backupId); backupManifestMap.put(tableName, manifest); } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java index d1ee8e13dd..0568efaed6 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java @@ -36,8 +36,10 @@ import org.apache.hadoop.hbase.backup.BackupAdmin; import org.apache.hadoop.hbase.backup.BackupClientFactory; import org.apache.hadoop.hbase.backup.BackupInfo; import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; +import org.apache.hadoop.hbase.backup.BackupMergeJob; import org.apache.hadoop.hbase.backup.BackupRequest; import org.apache.hadoop.hbase.backup.BackupRestoreConstants; +import org.apache.hadoop.hbase.backup.BackupRestoreFactory; import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.HBackupFileSystem; import org.apache.hadoop.hbase.backup.RestoreRequest; @@ -66,11 +68,10 @@ public class BackupAdminImpl implements BackupAdmin { @Override public void close() throws IOException { if (conn != null) { - conn.close(); + // conn.close(); } } - @Override public BackupInfo getBackupInfo(String backupId) throws IOException { BackupInfo backupInfo = null; @@ -110,7 +111,7 @@ public class BackupAdminImpl implements BackupAdmin { } catch (IOException e) { LOG.warn("You can not run delete command while active backup session is in progress. \n" + "If there is no active backup session running, run backup repair utility to restore \n" - +"backup system integrity."); + + "backup system integrity."); return -1; } @@ -126,7 +127,7 @@ public class BackupAdminImpl implements BackupAdmin { sysTable.startDeleteOperation(backupIds); // Step 4: Snapshot backup system table if (!BackupSystemTable.snapshotExists(conn)) { - BackupSystemTable.snapshot(conn); + BackupSystemTable.snapshot(conn); } else { LOG.warn("Backup system table snapshot exists"); } @@ -154,13 +155,13 @@ public class BackupAdminImpl implements BackupAdmin { // Fail delete operation // Step 1 if (snapshotDone) { - if(BackupSystemTable.snapshotExists(conn)) { + if (BackupSystemTable.snapshotExists(conn)) { BackupSystemTable.restoreFromSnapshot(conn); // delete snapshot BackupSystemTable.deleteSnapshot(conn); // We still have record with unfinished delete operation - LOG.error("Delete operation failed, please run backup repair utility to restore "+ - "backup system integrity", e); + LOG.error("Delete operation failed, please run backup repair utility to restore " + + "backup system integrity", e); throw e; } else { LOG.warn("Delete operation succeeded, there were some errors: ", e); @@ -206,17 +207,17 @@ public class BackupAdminImpl implements BackupAdmin { /** * Delete single backup and all related backups
* Algorithm:
- * Backup type: FULL or INCREMENTAL
- * Is this last backup session for table T: YES or NO
- * For every table T from table list 'tables':
- * if(FULL, YES) deletes only physical data (PD)
- * if(FULL, NO), deletes PD, scans all newer backups and removes T from backupInfo,
- * until we either reach the most recent backup for T in the system or FULL backup
- * which includes T
- * if(INCREMENTAL, YES) deletes only physical data (PD) - * if(INCREMENTAL, NO) deletes physical data and for table T scans all backup images between last
- * FULL backup, which is older than the backup being deleted and the next FULL backup (if exists)
- * or last one for a particular table T and removes T from list of backup tables. + * Backup type: FULL or INCREMENTAL
+ * Is this last backup session for table T: YES or NO
+ * For every table T from table list 'tables':
+ * if(FULL, YES) deletes only physical data (PD)
+ * if(FULL, NO), deletes PD, scans all newer backups and removes T from backupInfo,
+ * until we either reach the most recent backup for T in the system or FULL backup
+ * which includes T
+ * if(INCREMENTAL, YES) deletes only physical data (PD) if(INCREMENTAL, NO) deletes physical data + * and for table T scans all backup images between last
+ * FULL backup, which is older than the backup being deleted and the next FULL backup (if exists)
+ * or last one for a particular table T and removes T from list of backup tables. * @param backupId backup id * @param sysTable backup system table * @return total number of deleted backup images @@ -285,8 +286,9 @@ public class BackupAdminImpl implements BackupAdmin { return totalDeleted; } - private void removeTableFromBackupImage(BackupInfo info, TableName tn, - BackupSystemTable sysTable) throws IOException { + private void + removeTableFromBackupImage(BackupInfo info, TableName tn, BackupSystemTable sysTable) + throws IOException { List tables = info.getTableNames(); LOG.debug("Remove " + tn + " from " + info.getBackupId() + " tables=" + info.getTableListAsString()); @@ -485,7 +487,7 @@ public class BackupAdminImpl implements BackupAdmin { private String[] toStringArray(TableName[] list) { String[] arr = new String[list.length]; - for(int i=0; i < list.length; i++) { + for (int i = 0; i < list.length; i++) { arr[i] = list[i].toString(); } return arr; @@ -521,7 +523,7 @@ public class BackupAdminImpl implements BackupAdmin { String targetRootDir = request.getTargetRootDir(); List tableList = request.getTableList(); - String backupId =BackupRestoreConstants.BACKUPID_PREFIX + EnvironmentEdgeManager.currentTime(); + String backupId = BackupRestoreConstants.BACKUPID_PREFIX + EnvironmentEdgeManager.currentTime(); if (type == BackupType.INCREMENTAL) { Set incrTableSet = null; try (BackupSystemTable table = new BackupSystemTable(conn)) { @@ -529,19 +531,20 @@ public class BackupAdminImpl implements BackupAdmin { } if (incrTableSet.isEmpty()) { - String msg = "Incremental backup table set contains no tables. " - + "You need to run full backup first " + - (tableList != null ? "on "+StringUtils.join(tableList, ","): ""); + String msg = + "Incremental backup table set contains no tables. " + + "You need to run full backup first " + + (tableList != null ? "on " + StringUtils.join(tableList, ",") : ""); throw new IOException(msg); } - if(tableList != null) { + if (tableList != null) { tableList.removeAll(incrTableSet); if (!tableList.isEmpty()) { String extraTables = StringUtils.join(tableList, ","); - String msg = "Some tables (" + extraTables + ") haven't gone through full backup. "+ - "Perform full backup on " + extraTables + " first, " - + "then retry the command"; + String msg = + "Some tables (" + extraTables + ") haven't gone through full backup. " + + "Perform full backup on " + extraTables + " first, " + "then retry the command"; throw new IOException(msg); } } @@ -584,14 +587,13 @@ public class BackupAdminImpl implements BackupAdmin { // update table list BackupRequest.Builder builder = new BackupRequest.Builder(); - request = builder.withBackupType(request.getBackupType()). - withTableList(tableList). - withTargetRootDir(request.getTargetRootDir()). - withBackupSetName(request.getBackupSetName()). - withTotalTasks(request.getTotalTasks()). - withBandwidthPerTasks((int)request.getBandwidth()).build(); - - TableBackupClient client =null; + request = + builder.withBackupType(request.getBackupType()).withTableList(tableList) + .withTargetRootDir(request.getTargetRootDir()) + .withBackupSetName(request.getBackupSetName()).withTotalTasks(request.getTotalTasks()) + .withBandwidthPerTasks((int) request.getBandwidth()).build(); + + TableBackupClient client = null; try { client = BackupClientFactory.create(conn, backupId, request); } catch (IOException e) { @@ -613,4 +615,45 @@ public class BackupAdminImpl implements BackupAdmin { return tableList; } + @Override + public void mergeBackups(String[] backupIds) throws IOException { + try (final BackupSystemTable sysTable = new BackupSystemTable(conn);) { + checkIfValidForMerge(backupIds, sysTable); + BackupMergeJob job = BackupRestoreFactory.getBackupMergeJob(conn.getConfiguration()); + job.run(backupIds); + } + } + + /** + * All backups MUST be in the same destination, no FULL backups are allowed - only INCREMENTAL + * ones only in COMPLETE state + * @param backupIds list of backup ids + * @param table backup system table + * @throws IOException + */ + private void checkIfValidForMerge(String[] backupIds, BackupSystemTable table) throws IOException { + String backupRoot = null; + for (String backupId : backupIds) { + BackupInfo bInfo = table.readBackupInfo(backupId); + if (bInfo == null) { + String msg = "Backup session " + backupId + " not found"; + throw new IOException(msg); + } + if (backupRoot == null) { + backupRoot = bInfo.getBackupRootDir(); + } else if (!bInfo.getBackupRootDir().equals(backupRoot)) { + throw new IOException("Found different backup destinations in a list of backup sessions \n" + + "1. " + backupRoot + "\n" + "2. " + bInfo.getBackupRootDir()); + } + if (bInfo.getType() == BackupType.FULL) { + throw new IOException("FULL backup image can not be merged for: \n" + bInfo); + } + + if (bInfo.getState() != BackupState.COMPLETE) { + throw new IOException("Backup image " + backupId + + " can not be merged becuase of its state: " + bInfo.getState()); + } + } + } + } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java index 2a5c9595e7..50c8a15e46 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java @@ -68,7 +68,7 @@ import com.google.common.collect.Lists; */ @InterfaceAudience.Private -public final class BackupCommands { +public final class BackupCommands { public final static String INCORRECT_USAGE = "Incorrect usage"; @@ -78,8 +78,7 @@ public final class BackupCommands { + " describe show the detailed information of a backup image\n" + " history show history of all successful backups\n" + " progress show the progress of the latest backup request\n" - + " set backup set management\n" - + " repair repair backup system table" + + " set backup set management\n" + " repair repair backup system table" + "Run \'hbase backup COMMAND -h\' to see help message for each command\n"; public static final String CREATE_CMD_USAGE = @@ -109,17 +108,20 @@ public final class BackupCommands { public static final String SET_CMD_USAGE = "Usage: hbase backup set COMMAND [name] [tables]\n" + " name Backup set name\n" - + " tables Comma separated list of tables.\n" - + "COMMAND is one of:\n" + " add add tables to a set, create a set if needed\n" + + " tables Comma separated list of tables.\n" + "COMMAND is one of:\n" + + " add add tables to a set, create a set if needed\n" + " remove remove tables from a set\n" + " list list all backup sets in the system\n" + " describe describe set\n" + " delete delete backup set\n"; + public static final String MERGE_CMD_USAGE = "Usage: hbase backup merge [backup_ids]\n" + + " backup_ids Comma separated list of backup image ids.\n"; public static final String USAGE_FOOTER = ""; public static abstract class Command extends Configured { CommandLine cmdline; Connection conn; + Command(Configuration conf) { if (conf == null) { conf = HBaseConfiguration.create(); @@ -140,7 +142,7 @@ public final class BackupCommands { try (BackupSystemTable table = new BackupSystemTable(conn);) { List sessions = table.getBackupInfos(BackupState.RUNNING); - if(sessions.size() > 0) { + if (sessions.size() > 0) { System.err.println("Found backup session in a RUNNING state: "); System.err.println(sessions.get(0)); System.err.println("This may indicate that a previous session has failed abnormally."); @@ -154,11 +156,19 @@ public final class BackupCommands { try (BackupSystemTable table = new BackupSystemTable(conn);) { String[] ids = table.getListOfBackupIdsFromDeleteOperation(); - if(ids !=null && ids.length > 0) { - System.err.println("Found failed backup delete coommand. "); + if (ids != null && ids.length > 0) { + System.err.println("Found failed backup DELETE coommand. "); + System.err.println("Backup system recovery is required."); + throw new IOException("Failed backup DELETE found, aborted command execution"); + } + + ids = table.getListOfBackupIdsFromMergeOperation(); + if (ids != null && ids.length > 0) { + System.err.println("Found failed backup MERGE coommand. "); System.err.println("Backup system recovery is required."); - throw new IOException("Failed backup delete found, aborted command execution"); + throw new IOException("Failed backup MERGE found, aborted command execution"); } + } } } @@ -178,10 +188,10 @@ public final class BackupCommands { protected boolean requiresNoActiveSession() { return false; } + /** - * Command requires consistent state of a backup system - * Backup system may become inconsistent because of an abnormal - * termination of a backup session or delete command + * Command requires consistent state of a backup system Backup system may become inconsistent + * because of an abnormal termination of a backup session or delete command * @return true, if yes */ protected boolean requiresConsistentState() { @@ -220,6 +230,9 @@ public final class BackupCommands { case REPAIR: cmd = new RepairCommand(conf, cmdline); break; + case MERGE: + cmd = new MergeCommand(conf, cmdline); + break; case HELP: default: cmd = new HelpCommand(conf, cmdline); @@ -257,7 +270,7 @@ public final class BackupCommands { throw new IOException(INCORRECT_USAGE); } String[] args = cmdline.getArgs(); - if (args.length !=3) { + if (args.length != 3) { printUsage(); throw new IOException(INCORRECT_USAGE); } @@ -274,7 +287,6 @@ public final class BackupCommands { throw new IOException(INCORRECT_USAGE); } - String tables = null; // Check if we have both: backup set and list of tables @@ -310,14 +322,14 @@ public final class BackupCommands { try (BackupAdminImpl admin = new BackupAdminImpl(conn);) { - BackupRequest.Builder builder = new BackupRequest.Builder(); - BackupRequest request = builder.withBackupType(BackupType.valueOf(args[1].toUpperCase())) - .withTableList(tables != null ? - Lists.newArrayList(BackupUtils.parseTableNames(tables)) : null) - .withTargetRootDir(args[2]) - .withTotalTasks(workers) - .withBandwidthPerTasks(bandwidth) - .withBackupSetName(setName).build(); + BackupRequest.Builder builder = new BackupRequest.Builder(); + BackupRequest request = + builder + .withBackupType(BackupType.valueOf(args[1].toUpperCase())) + .withTableList( + tables != null ? Lists.newArrayList(BackupUtils.parseTableNames(tables)) : null) + .withTargetRootDir(args[2]).withTotalTasks(workers) + .withBandwidthPerTasks(bandwidth).withBackupSetName(setName).build(); String backupId = admin.backupTables(request); System.out.println("Backup session " + backupId + " finished. Status: SUCCESS"); } catch (IOException e) { @@ -544,7 +556,8 @@ public final class BackupCommands { int deleted = admin.deleteBackups(backupIds); System.out.println("Deleted " + deleted + " backups. Total requested: " + args.length); } catch (IOException e) { - System.err.println("Delete command FAILED. Please run backup repair tool to restore backup system integrity"); + System.err + .println("Delete command FAILED. Please run backup repair tool to restore backup system integrity"); throw e; } @@ -584,8 +597,9 @@ public final class BackupCommands { if (list.size() == 0) { // No failed sessions found System.out.println("REPAIR status: no failed sessions found." - +" Checking failed delete backup operation ..."); + + " Checking failed delete backup operation ..."); repairFailedBackupDeletionIfAny(conn, sysTable); + repairFailedBackupMergeIfAny(conn, sysTable); return; } backupInfo = list.get(0); @@ -607,31 +621,52 @@ public final class BackupCommands { // processed recovery already. sysTable.updateBackupInfo(backupInfo); sysTable.finishBackupSession(); - System.out.println("REPAIR status: finished repair failed session:\n "+ backupInfo); + System.out.println("REPAIR status: finished repair failed session:\n " + backupInfo); } } private void repairFailedBackupDeletionIfAny(Connection conn, BackupSystemTable sysTable) - throws IOException - { + throws IOException { String[] backupIds = sysTable.getListOfBackupIdsFromDeleteOperation(); - if (backupIds == null ||backupIds.length == 0) { - System.out.println("No failed backup delete operation found"); + if (backupIds == null || backupIds.length == 0) { + System.out.println("No failed backup DELETE operation found"); // Delete backup table snapshot if exists BackupSystemTable.deleteSnapshot(conn); return; } - System.out.println("Found failed delete operation for: " + StringUtils.join(backupIds)); - System.out.println("Running delete again ..."); + System.out.println("Found failed DELETE operation for: " + StringUtils.join(backupIds)); + System.out.println("Running DELETE again ..."); // Restore table from snapshot BackupSystemTable.restoreFromSnapshot(conn); // Finish previous failed session sysTable.finishBackupSession(); - try(BackupAdmin admin = new BackupAdminImpl(conn);) { + try (BackupAdmin admin = new BackupAdminImpl(conn);) { admin.deleteBackups(backupIds); } - System.out.println("Delete operation finished OK: "+ StringUtils.join(backupIds)); + System.out.println("DELETE operation finished OK: " + StringUtils.join(backupIds)); + + } + + private void repairFailedBackupMergeIfAny(Connection conn, BackupSystemTable sysTable) + throws IOException { + String[] backupIds = sysTable.getListOfBackupIdsFromMergeOperation(); + if (backupIds == null || backupIds.length == 0) { + System.out.println("No failed backup MERGE operation found"); + // Delete backup table snapshot if exists + BackupSystemTable.deleteSnapshot(conn); + return; + } + System.out.println("Found failed MERGE operation for: " + StringUtils.join(backupIds)); + System.out.println("Running MEREGE again ..."); + // Restore table from snapshot + BackupSystemTable.restoreFromSnapshot(conn); + // Finish previous failed session + sysTable.finishMergeOperation(); + try (BackupAdmin admin = new BackupAdminImpl(conn);) { + admin.mergeBackups(backupIds); + } + System.out.println("MERGE operation finished OK: " + StringUtils.join(backupIds)); } @@ -641,6 +676,49 @@ public final class BackupCommands { } } + private static class MergeCommand extends Command { + + MergeCommand(Configuration conf, CommandLine cmdline) { + super(conf); + this.cmdline = cmdline; + } + + @Override + protected boolean requiresNoActiveSession() { + return true; + } + + @Override + protected boolean requiresConsistentState() { + return true; + } + + @Override + public void execute() throws IOException { + super.execute(); + + String[] args = cmdline == null ? null : cmdline.getArgs(); + if (args == null || (args.length != 2)) { + System.err.println("ERROR: wrong number of arguments: " + + (args == null ? null : args.length)); + printUsage(); + throw new IOException(INCORRECT_USAGE); + } + + String[] backupIds = args[1].split(","); + Configuration conf = getConf() != null ? getConf() : HBaseConfiguration.create(); + try (final Connection conn = ConnectionFactory.createConnection(conf); + final BackupAdminImpl admin = new BackupAdminImpl(conn);) { + admin.mergeBackups(backupIds); + } + } + + @Override + protected void printUsage() { + System.out.println(MERGE_CMD_USAGE); + } + } + // TODO Cancel command private static class CancelCommand extends Command { @@ -672,7 +750,6 @@ public final class BackupCommands { @Override public void execute() throws IOException { - int n = parseHistoryLength(); final TableName tableName = getTableName(); final String setName = getTableSetName(); @@ -883,7 +960,7 @@ public final class BackupCommands { private TableName[] toTableNames(String[] tables) { TableName[] arr = new TableName[tables.length]; - for (int i=0; i < tables.length; i++) { + for (int i = 0; i < tables.length; i++) { arr[i] = TableName.valueOf(tables[i]); } return arr; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java index a929700a98..591a855af4 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java @@ -115,8 +115,8 @@ public class BackupManager implements Closeable { } if (LOG.isDebugEnabled()) { - LOG.debug("Added log cleaner: " + cleanerClass +"\n" + - "Added master procedure manager: " + masterProcedureClass); + LOG.debug("Added log cleaner: " + cleanerClass + "\n" + "Added master procedure manager: " + + masterProcedureClass); } } @@ -185,9 +185,8 @@ public class BackupManager implements Closeable { * @return BackupInfo * @throws BackupException exception */ - public BackupInfo createBackupInfo(String backupId, BackupType type, - List tableList, String targetRootDir, int workers, long bandwidth) - throws BackupException { + public BackupInfo createBackupInfo(String backupId, BackupType type, List tableList, + String targetRootDir, int workers, long bandwidth) throws BackupException { if (targetRootDir == null) { throw new BackupException("Wrong backup request parameter: target backup root directory"); } @@ -313,7 +312,7 @@ public class BackupManager implements Closeable { } } else { Path logBackupPath = - HBackupFileSystem.getLogBackupPath(backup.getBackupRootDir(), backup.getBackupId()); + HBackupFileSystem.getBackupPath(backup.getBackupRootDir(), backup.getBackupId()); LOG.debug("Current backup has an incremental backup ancestor, " + "touching its image manifest in " + logBackupPath.toString() + " to construct the dependency."); @@ -382,7 +381,6 @@ public class BackupManager implements Closeable { systemTable.finishBackupSession(); } - /** * Read the last backup start code (timestamp) of last successful backup. Will return null if * there is no startcode stored in backup system table or the value is of length 0. These two @@ -413,7 +411,7 @@ public class BackupManager implements Closeable { } public Pair>>>>, List> - readBulkloadRows(List tableList) throws IOException { + readBulkloadRows(List tableList) throws IOException { return systemTable.readBulkloadRows(tableList); } @@ -448,8 +446,7 @@ public class BackupManager implements Closeable { */ public void writeRegionServerLogTimestamp(Set tables, HashMap newTimestamps) throws IOException { - systemTable.writeRegionServerLogTimestamp(tables, newTimestamps, - backupInfo.getBackupRootDir()); + systemTable.writeRegionServerLogTimestamp(tables, newTimestamps, backupInfo.getBackupRootDir()); } /** diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java index b8adac9fba..7e3201efdb 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupInfo; import org.apache.hadoop.hbase.backup.BackupType; +import org.apache.hadoop.hbase.backup.HBackupFileSystem; import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; @@ -61,9 +62,8 @@ public class BackupManifest { public static final String MANIFEST_FILE_NAME = ".backup.manifest"; /** - * Backup image, the dependency graph is made up by series of backup images - * BackupImage contains all the relevant information to restore the backup and - * is used during restore operation + * Backup image, the dependency graph is made up by series of backup images BackupImage contains + * all the relevant information to restore the backup and is used during restore operation */ public static class BackupImage implements Comparable { @@ -294,6 +294,16 @@ public class BackupManifest { return this.ancestors; } + public void removeAncestors(List backupIds) { + List toRemove = new ArrayList(); + for (BackupImage im : this.ancestors) { + if (backupIds.contains(im.getBackupId())) { + toRemove.add(im); + } + } + this.ancestors.removeAll(toRemove); + } + private void addAncestor(BackupImage backupImage) { this.getAncestors().add(backupImage); } @@ -464,18 +474,16 @@ public class BackupManifest { } /** - * Persist the manifest file. + * TODO: fix it. Persist the manifest file. * @throws IOException IOException when storing the manifest file. */ public void store(Configuration conf) throws BackupException { byte[] data = backupImage.toProto().toByteArray(); // write the file, overwrite if already exist - String logBackupDir = - BackupUtils.getLogBackupDir(backupImage.getRootDir(), backupImage.getBackupId()); Path manifestFilePath = - new Path(new Path((tableBackupDir != null ? tableBackupDir : logBackupDir)), - MANIFEST_FILE_NAME); + new Path(HBackupFileSystem.getBackupPath(backupImage.getRootDir(), + backupImage.getBackupId()), MANIFEST_FILE_NAME); try (FSDataOutputStream out = manifestFilePath.getFileSystem(conf).create(manifestFilePath, true);) { out.write(data); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java index e5a3daace2..1476a146c6 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java @@ -82,6 +82,10 @@ import org.apache.hadoop.hbase.util.Pair; * value = backupId and full WAL file name *

*/ +/** + * @author vrodionov + * + */ @InterfaceAudience.Private public final class BackupSystemTable implements Closeable { private static final Log LOG = LogFactory.getLog(BackupSystemTable.class); @@ -118,7 +122,7 @@ public final class BackupSystemTable implements Closeable { private TableName tableName; /** - * Stores backup sessions (contexts) + * Stores backup sessions (contexts) */ final static byte[] SESSIONS_FAMILY = "session".getBytes(); /** @@ -127,11 +131,10 @@ public final class BackupSystemTable implements Closeable { final static byte[] META_FAMILY = "meta".getBytes(); final static byte[] BULK_LOAD_FAMILY = "bulk".getBytes(); /** - * Connection to HBase cluster, shared among all instances + * Connection to HBase cluster, shared among all instances */ private final Connection connection; - private final static String BACKUP_INFO_PREFIX = "session:"; private final static String START_CODE_ROW = "startcode:"; private final static byte[] ACTIVE_SESSION_ROW = "activesession:".getBytes(); @@ -147,6 +150,7 @@ public final class BackupSystemTable implements Closeable { private final static String BULK_LOAD_PREFIX = "bulk:"; private final static byte[] BULK_LOAD_PREFIX_BYTES = BULK_LOAD_PREFIX.getBytes(); private final static byte[] DELETE_OP_ROW = "delete_op_row".getBytes(); + private final static byte[] MERGE_OP_ROW = "merge_op_row".getBytes(); final static byte[] TBL_COL = Bytes.toBytes("tbl"); final static byte[] FAM_COL = Bytes.toBytes("fam"); @@ -160,7 +164,7 @@ public final class BackupSystemTable implements Closeable { private final static String SET_KEY_PREFIX = "backupset:"; // separator between BULK_LOAD_PREFIX and ordinals - protected final static String BLK_LD_DELIM = ":"; + protected final static String BLK_LD_DELIM = ":"; private final static byte[] EMPTY_VALUE = new byte[] {}; // Safe delimiter in a string @@ -187,19 +191,19 @@ public final class BackupSystemTable implements Closeable { } private void verifyNamespaceExists(Admin admin) throws IOException { - String namespaceName = tableName.getNamespaceAsString(); - NamespaceDescriptor ns = NamespaceDescriptor.create(namespaceName).build(); - NamespaceDescriptor[] list = admin.listNamespaceDescriptors(); - boolean exists = false; - for( NamespaceDescriptor nsd: list) { - if (nsd.getName().equals(ns.getName())) { - exists = true; - break; - } - } - if (!exists) { - admin.createNamespace(ns); + String namespaceName = tableName.getNamespaceAsString(); + NamespaceDescriptor ns = NamespaceDescriptor.create(namespaceName).build(); + NamespaceDescriptor[] list = admin.listNamespaceDescriptors(); + boolean exists = false; + for (NamespaceDescriptor nsd : list) { + if (nsd.getName().equals(ns.getName())) { + exists = true; + break; } + } + if (!exists) { + admin.createNamespace(ns); + } } private void waitForSystemTable(Admin admin) throws IOException { @@ -211,15 +215,13 @@ public final class BackupSystemTable implements Closeable { } catch (InterruptedException e) { } if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) { - throw new IOException("Failed to create backup system table after "+ TIMEOUT+"ms"); + throw new IOException("Failed to create backup system table after " + TIMEOUT + "ms"); } } LOG.debug("Backup table exists and available"); } - - @Override public void close() { // do nothing @@ -257,7 +259,7 @@ public final class BackupSystemTable implements Closeable { byte[] row = CellUtil.cloneRow(res.listCells().get(0)); for (Cell cell : res.listCells()) { if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, - BackupSystemTable.PATH_COL.length) == 0) { + BackupSystemTable.PATH_COL.length) == 0) { map.put(row, Bytes.toString(CellUtil.cloneValue(cell))); } } @@ -286,13 +288,13 @@ public final class BackupSystemTable implements Closeable { String path = null; for (Cell cell : res.listCells()) { if (CellComparator.compareQualifiers(cell, BackupSystemTable.TBL_COL, 0, - BackupSystemTable.TBL_COL.length) == 0) { + BackupSystemTable.TBL_COL.length) == 0) { tbl = TableName.valueOf(CellUtil.cloneValue(cell)); } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0, - BackupSystemTable.FAM_COL.length) == 0) { + BackupSystemTable.FAM_COL.length) == 0) { fam = CellUtil.cloneValue(cell); } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, - BackupSystemTable.PATH_COL.length) == 0) { + BackupSystemTable.PATH_COL.length) == 0) { path = Bytes.toString(CellUtil.cloneValue(cell)); } } @@ -313,9 +315,10 @@ public final class BackupSystemTable implements Closeable { } files.add(new Path(path)); if (LOG.isDebugEnabled()) { - LOG.debug("found bulk loaded file : " + tbl + " " + Bytes.toString(fam) + " " + path); + LOG.debug("found bulk loaded file : " + tbl + " " + Bytes.toString(fam) + " " + path); } - }; + } + ; return mapForSrc; } } @@ -359,16 +362,16 @@ public final class BackupSystemTable implements Closeable { public void writePathsPostBulkLoad(TableName tabName, byte[] region, Map> finalPaths) throws IOException { if (LOG.isDebugEnabled()) { - LOG.debug("write bulk load descriptor to backup " + tabName + " with " + - finalPaths.size() + " entries"); + LOG.debug("write bulk load descriptor to backup " + tabName + " with " + finalPaths.size() + + " entries"); } try (Table table = connection.getTable(tableName)) { - List puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region, - finalPaths); + List puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region, finalPaths); table.put(puts); LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName); } } + /* * For preCommitStoreFile() hook * @param tabName table name @@ -376,15 +379,15 @@ public final class BackupSystemTable implements Closeable { * @param family column family * @param pairs list of paths for hfiles */ - public void writeFilesForBulkLoadPreCommit(TableName tabName, byte[] region, - final byte[] family, final List> pairs) throws IOException { + public void writeFilesForBulkLoadPreCommit(TableName tabName, byte[] region, final byte[] family, + final List> pairs) throws IOException { if (LOG.isDebugEnabled()) { - LOG.debug("write bulk load descriptor to backup " + tabName + " with " + - pairs.size() + " entries"); + LOG.debug("write bulk load descriptor to backup " + tabName + " with " + pairs.size() + + " entries"); } try (Table table = connection.getTable(tableName)) { - List puts = BackupSystemTable.createPutForPreparedBulkload(tabName, region, - family, pairs); + List puts = + BackupSystemTable.createPutForPreparedBulkload(tabName, region, family, pairs); table.put(puts); LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName); } @@ -411,11 +414,11 @@ public final class BackupSystemTable implements Closeable { /* * Reads the rows from backup table recording bulk loaded hfiles * @param tableList list of table names - * @return The keys of the Map are table, region and column family. - * Value of the map reflects whether the hfile was recorded by preCommitStoreFile hook (true) + * @return The keys of the Map are table, region and column family. Value of the map reflects + * whether the hfile was recorded by preCommitStoreFile hook (true) */ public Pair>>>>, List> - readBulkloadRows(List tableList) throws IOException { + readBulkloadRows(List tableList) throws IOException { Map>>>> map = new HashMap<>(); List rows = new ArrayList<>(); for (TableName tTable : tableList) { @@ -437,13 +440,13 @@ public final class BackupSystemTable implements Closeable { String rowStr = Bytes.toString(row); region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr); if (CellComparator.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0, - BackupSystemTable.FAM_COL.length) == 0) { + BackupSystemTable.FAM_COL.length) == 0) { fam = Bytes.toString(CellUtil.cloneValue(cell)); } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, - BackupSystemTable.PATH_COL.length) == 0) { + BackupSystemTable.PATH_COL.length) == 0) { path = Bytes.toString(CellUtil.cloneValue(cell)); } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.STATE_COL, 0, - BackupSystemTable.STATE_COL.length) == 0) { + BackupSystemTable.STATE_COL.length) == 0) { byte[] state = CellUtil.cloneValue(cell); if (Bytes.equals(BackupSystemTable.BL_PREPARE, state)) { raw = true; @@ -484,12 +487,13 @@ public final class BackupSystemTable implements Closeable { Map> map = maps[idx]; TableName tn = sTableList.get(idx); if (map == null) continue; - for (Map.Entry> entry: map.entrySet()) { + for (Map.Entry> entry : map.entrySet()) { byte[] fam = entry.getKey(); List paths = entry.getValue(); for (Path p : paths) { - Put put = BackupSystemTable.createPutForBulkLoadedFile(tn, fam, p.toString(), - backupId, ts, cnt++); + Put put = + BackupSystemTable.createPutForBulkLoadedFile(tn, fam, p.toString(), backupId, ts, + cnt++); puts.add(put); } } @@ -570,7 +574,7 @@ public final class BackupSystemTable implements Closeable { } try (Table table = connection.getTable(tableName)) { Put put = createPutForStartBackupSession(); - //First try to put if row does not exist + // First try to put if row does not exist if (!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, ACTIVE_SESSION_COL, null, put)) { // Row exists, try to put if value == ACTIVE_SESSION_NO if (!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, ACTIVE_SESSION_COL, @@ -587,16 +591,14 @@ public final class BackupSystemTable implements Closeable { return put; } - public void finishBackupSession() throws IOException - { + public void finishBackupSession() throws IOException { if (LOG.isTraceEnabled()) { LOG.trace("Stop backup session"); } try (Table table = connection.getTable(tableName)) { Put put = createPutForStopBackupSession(); - if(!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, ACTIVE_SESSION_COL, - ACTIVE_SESSION_YES, put)) - { + if (!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, ACTIVE_SESSION_COL, + ACTIVE_SESSION_YES, put)) { throw new IOException("There is no active backup session"); } } @@ -630,8 +632,7 @@ public final class BackupSystemTable implements Closeable { res.advance(); Cell cell = res.current(); byte[] row = CellUtil.cloneRow(cell); - String server = - getServerNameForReadRegionServerLastLogRollResult(row); + String server = getServerNameForReadRegionServerLastLogRollResult(row); byte[] data = CellUtil.cloneValue(cell); rsTimestampMap.put(server, Bytes.toLong(data)); } @@ -652,8 +653,7 @@ public final class BackupSystemTable implements Closeable { LOG.trace("write region server last roll log result to backup system table"); } try (Table table = connection.getTable(tableName)) { - Put put = - createPutForRegionServerLastLogRollResult(server, ts, backupRoot); + Put put = createPutForRegionServerLastLogRollResult(server, ts, backupRoot); table.put(put); } } @@ -852,9 +852,7 @@ public final class BackupSystemTable implements Closeable { List puts = new ArrayList(); for (TableName table : tables) { byte[] smapData = toTableServerTimestampProto(table, newTimestamps).toByteArray(); - Put put = - createPutForWriteRegionServerLogTimestamp(table, smapData, - backupRoot); + Put put = createPutForWriteRegionServerLogTimestamp(table, smapData, backupRoot); puts.add(put); } try (Table table = connection.getTable(tableName)) { @@ -1018,8 +1016,7 @@ public final class BackupSystemTable implements Closeable { } } try (Table table = connection.getTable(tableName)) { - List puts = - createPutsForAddWALFiles(files, backupId, backupRoot); + List puts = createPutsForAddWALFiles(files, backupId, backupRoot); table.put(puts); } } @@ -1271,12 +1268,12 @@ public final class BackupSystemTable implements Closeable { if (disjoint.length > 0 && disjoint.length != tables.length) { Put put = createPutForBackupSet(name, disjoint); table.put(put); - } else if(disjoint.length == tables.length) { + } else if (disjoint.length == tables.length) { LOG.warn("Backup set '" + name + "' does not contain tables [" + StringUtils.join(toRemove, " ") + "]"); } else { // disjoint.length == 0 and tables.length >0 - // Delete backup set - LOG.info("Backup set '"+name+"' is empty. Deleting."); + // Delete backup set + LOG.info("Backup set '" + name + "' is empty. Deleting."); deleteBackupSet(name); } } finally { @@ -1356,7 +1353,7 @@ public final class BackupSystemTable implements Closeable { } public static String getSnapshotName(Configuration conf) { - return "snapshot_"+getTableNameAsString(conf).replace(":", "_"); + return "snapshot_" + getTableNameAsString(conf).replace(":", "_"); } /** @@ -1589,17 +1586,16 @@ public final class BackupSystemTable implements Closeable { for (Path path : entry.getValue()) { String file = path.toString(); int lastSlash = file.lastIndexOf("/"); - String filename = file.substring(lastSlash+1); - Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, - Bytes.toString(region), BLK_LD_DELIM, filename)); + String filename = file.substring(lastSlash + 1); + Put put = + new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, + Bytes.toString(region), BLK_LD_DELIM, filename)); put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName()); put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey()); - put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, - file.getBytes()); + put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, file.getBytes()); put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_COMMIT); puts.add(put); - LOG.debug("writing done bulk path " + file + " for " + table + " " + - Bytes.toString(region)); + LOG.debug("writing done bulk path " + file + " for " + table + " " + Bytes.toString(region)); } } return puts; @@ -1607,19 +1603,16 @@ public final class BackupSystemTable implements Closeable { public static void snapshot(Connection conn) throws IOException { - try (Admin admin = conn.getAdmin();){ + try (Admin admin = conn.getAdmin();) { Configuration conf = conn.getConfiguration(); - admin.snapshot(BackupSystemTable.getSnapshotName(conf), - BackupSystemTable.getTableName(conf)); + admin.snapshot(BackupSystemTable.getSnapshotName(conf), BackupSystemTable.getTableName(conf)); } } - public static void restoreFromSnapshot(Connection conn) - throws IOException { + public static void restoreFromSnapshot(Connection conn) throws IOException { Configuration conf = conn.getConfiguration(); - LOG.debug("Restoring " + BackupSystemTable.getTableNameAsString(conf) + - " from snapshot"); + LOG.debug("Restoring " + BackupSystemTable.getTableNameAsString(conf) + " from snapshot"); try (Admin admin = conn.getAdmin();) { String snapshotName = BackupSystemTable.getSnapshotName(conf); if (snapshotExists(admin, snapshotName)) { @@ -1631,8 +1624,8 @@ public final class BackupSystemTable implements Closeable { // Snapshot does not exists, i.e completeBackup failed after // deleting backup system table snapshot // In this case we log WARN and proceed - LOG.warn("Could not restore backup system table. Snapshot " + snapshotName+ - " does not exists."); + LOG.warn("Could not restore backup system table. Snapshot " + snapshotName + + " does not exists."); } } } @@ -1640,7 +1633,7 @@ public final class BackupSystemTable implements Closeable { protected static boolean snapshotExists(Admin admin, String snapshotName) throws IOException { List list = admin.listSnapshots(); - for (SnapshotDescription desc: list) { + for (SnapshotDescription desc : list) { if (desc.getName().equals(snapshotName)) { return true; } @@ -1648,26 +1641,25 @@ public final class BackupSystemTable implements Closeable { return false; } - public static boolean snapshotExists (Connection conn) throws IOException { + public static boolean snapshotExists(Connection conn) throws IOException { return snapshotExists(conn.getAdmin(), getSnapshotName(conn.getConfiguration())); } - public static void deleteSnapshot(Connection conn) - throws IOException { + public static void deleteSnapshot(Connection conn) throws IOException { Configuration conf = conn.getConfiguration(); - LOG.debug("Deleting " + BackupSystemTable.getSnapshotName(conf) + - " from the system"); + LOG.debug("Deleting " + BackupSystemTable.getSnapshotName(conf) + " from the system"); try (Admin admin = conn.getAdmin();) { String snapshotName = BackupSystemTable.getSnapshotName(conf); if (snapshotExists(admin, snapshotName)) { admin.deleteSnapshot(snapshotName); LOG.debug("Done deleting backup system table snapshot"); } else { - LOG.error("Snapshot "+snapshotName+" does not exists"); + LOG.error("Snapshot " + snapshotName + " does not exists"); } } } + /* * Creates Put's for bulk load resulting from running LoadIncrementalHFiles */ @@ -1678,17 +1670,16 @@ public final class BackupSystemTable implements Closeable { Path path = pair.getSecond(); String file = path.toString(); int lastSlash = file.lastIndexOf("/"); - String filename = file.substring(lastSlash+1); - Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, - Bytes.toString(region), BLK_LD_DELIM, filename)); + String filename = file.substring(lastSlash + 1); + Put put = + new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, Bytes.toString(region), + BLK_LD_DELIM, filename)); put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName()); put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, family); - put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, - file.getBytes()); + put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, file.getBytes()); put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_PREPARE); puts.add(put); - LOG.debug("writing raw bulk path " + file + " for " + table + " " + - Bytes.toString(region)); + LOG.debug("writing raw bulk path " + file + " for " + table + " " + Bytes.toString(region)); } return puts; } @@ -1725,7 +1716,6 @@ public final class BackupSystemTable implements Closeable { return get; } - public void startDeleteOperation(String[] backupIdList) throws IOException { if (LOG.isTraceEnabled()) { LOG.trace("Start delete operation for backups: " + StringUtils.join(backupIdList)); @@ -1765,6 +1755,85 @@ public final class BackupSystemTable implements Closeable { } } + private Put createPutForMergeOperation(String[] backupIdList) { + + byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ",")); + Put put = new Put(MERGE_OP_ROW); + put.addColumn(META_FAMILY, FAM_COL, value); + return put; + } + + private Put createPutForUpdateTablesForMerge(List tables) { + + byte[] value = Bytes.toBytes(StringUtils.join(tables, ",")); + Put put = new Put(MERGE_OP_ROW); + put.addColumn(META_FAMILY, PATH_COL, value); + return put; + } + + private Delete createDeleteForBackupMergeOperation() { + + Delete delete = new Delete(MERGE_OP_ROW); + delete.addFamily(META_FAMILY); + return delete; + } + + private Get createGetForMergeOperation() { + + Get get = new Get(MERGE_OP_ROW); + get.addFamily(META_FAMILY); + return get; + } + + public void startMergeOperation(String[] backupIdList) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Start merge operation for backups: " + StringUtils.join(backupIdList)); + } + Put put = createPutForMergeOperation(backupIdList); + try (Table table = connection.getTable(tableName)) { + table.put(put); + } + } + + public void updateProcessedTablesForMerge(List tables) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Update tables for merge : " + StringUtils.join(tables, ",")); + } + Put put = createPutForUpdateTablesForMerge(tables); + try (Table table = connection.getTable(tableName)) { + table.put(put); + } + } + + public void finishMergeOperation() throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Finsih merge operation for backup ids "); + } + Delete delete = createDeleteForBackupMergeOperation(); + try (Table table = connection.getTable(tableName)) { + table.delete(delete); + } + } + + public String[] getListOfBackupIdsFromMergeOperation() throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Get backup ids for merge operation"); + } + Get get = createGetForMergeOperation(); + try (Table table = connection.getTable(tableName)) { + Result res = table.get(get); + if (res.isEmpty()) { + return null; + } + Cell cell = res.listCells().get(0); + byte[] val = CellUtil.cloneValue(cell); + if (val.length == 0) { + return null; + } + return new String(val).split(","); + } + } + static Scan createScanForOrigBulkLoadedFiles(TableName table) throws IOException { Scan scan = new Scan(); byte[] startRow = rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM); @@ -1776,10 +1845,12 @@ public final class BackupSystemTable implements Closeable { scan.setMaxVersions(1); return scan; } + static String getTableNameFromOrigBulkLoadRow(String rowStr) { String[] parts = rowStr.split(BLK_LD_DELIM); return parts[1]; } + static String getRegionNameFromOrigBulkLoadRow(String rowStr) { // format is bulk : namespace : table : region : file String[] parts = rowStr.split(BLK_LD_DELIM); @@ -1791,6 +1862,7 @@ public final class BackupSystemTable implements Closeable { LOG.debug("bulk row string " + rowStr + " region " + parts[idx]); return parts[idx]; } + /* * Used to query bulk loaded hfiles which have been copied by incremental backup * @param backupId the backup Id. It can be null when querying for all tables @@ -1798,13 +1870,14 @@ public final class BackupSystemTable implements Closeable { */ static Scan createScanForBulkLoadedFiles(String backupId) throws IOException { Scan scan = new Scan(); - byte[] startRow = backupId == null ? BULK_LOAD_PREFIX_BYTES : - rowkey(BULK_LOAD_PREFIX, backupId+BLK_LD_DELIM); + byte[] startRow = + backupId == null ? BULK_LOAD_PREFIX_BYTES : rowkey(BULK_LOAD_PREFIX, backupId + + BLK_LD_DELIM); byte[] stopRow = Arrays.copyOf(startRow, startRow.length); stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); scan.setStartRow(startRow); scan.setStopRow(stopRow); - //scan.setTimeRange(lower, Long.MAX_VALUE); + // scan.setTimeRange(lower, Long.MAX_VALUE); scan.addFamily(BackupSystemTable.META_FAMILY); scan.setMaxVersions(1); return scan; @@ -1812,12 +1885,13 @@ public final class BackupSystemTable implements Closeable { static Put createPutForBulkLoadedFile(TableName tn, byte[] fam, String p, String backupId, long ts, int idx) { - Put put = new Put(rowkey(BULK_LOAD_PREFIX, backupId+BLK_LD_DELIM+ts+BLK_LD_DELIM+idx)); + Put put = new Put(rowkey(BULK_LOAD_PREFIX, backupId + BLK_LD_DELIM + ts + BLK_LD_DELIM + idx)); put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, tn.getName()); put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, fam); put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, p.getBytes()); return put; } + /** * Creates put list for list of WAL files * @param files list of WAL file paths @@ -1825,8 +1899,9 @@ public final class BackupSystemTable implements Closeable { * @return put list * @throws IOException exception */ - private List createPutsForAddWALFiles(List files, String backupId, - String backupRoot) throws IOException { + private List + createPutsForAddWALFiles(List files, String backupId, String backupRoot) + throws IOException { List puts = new ArrayList(); for (String file : files) { @@ -1957,5 +2032,4 @@ public final class BackupSystemTable implements Closeable { return sb.toString().getBytes(); } - } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java index 381e9b15df..ea7a7b8b93 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java @@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.HBackupFileSystem; import org.apache.hadoop.hbase.backup.RestoreRequest; import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; -import org.apache.hadoop.hbase.backup.mapreduce.MapReduceRestoreJob; +import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.backup.util.RestoreTool; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Admin; @@ -58,7 +58,6 @@ public class RestoreTablesClient { private Configuration conf; private Connection conn; private String backupId; - private String fullBackupId; private TableName[] sTableArray; private TableName[] tTableArray; private String targetRootDir; @@ -107,8 +106,7 @@ public class RestoreTablesClient { if (existTableList.size() > 0) { if (!isOverwrite) { - LOG.error("Existing table (" - + existTableList + LOG.error("Existing table (" + existTableList + ") found in the restore target, please add " + "\"-overwrite\" option in the command if you mean" + " to restore to these existing tables"); @@ -148,9 +146,8 @@ public class RestoreTablesClient { Path tableBackupPath = HBackupFileSystem.getTableBackupPath(sTable, backupRoot, backupId); String lastIncrBackupId = images.length == 1 ? null : images[images.length - 1].getBackupId(); // We need hFS only for full restore (see the code) - BackupManifest manifest = HBackupFileSystem.getManifest(sTable, conf, backupRoot, backupId); + BackupManifest manifest = HBackupFileSystem.getManifest(conf, backupRoot, backupId); if (manifest.getType() == BackupType.FULL) { - fullBackupId = manifest.getBackupImage().getBackupId(); LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from full" + " backup image " + tableBackupPath.toString()); restoreTool.fullRestoreTable(conn, tableBackupPath, sTable, tTable, truncateIfExists, @@ -169,8 +166,8 @@ public class RestoreTablesClient { // full backup path comes first for (int i = 1; i < images.length; i++) { BackupImage im = images[i]; - String fileBackupDir = HBackupFileSystem.getTableBackupDir(im.getRootDir(), - im.getBackupId(), sTable)+ Path.SEPARATOR+"data"; + String fileBackupDir = + HBackupFileSystem.getTableBackupDataDir(im.getRootDir(), im.getBackupId(), sTable); dirList.add(new Path(fileBackupDir)); } @@ -196,8 +193,10 @@ public class RestoreTablesClient { TreeSet restoreImageSet = new TreeSet(); boolean truncateIfExists = isOverwrite; Set backupIdSet = new HashSet<>(); + for (int i = 0; i < sTableArray.length; i++) { TableName table = sTableArray[i]; + BackupManifest manifest = backupManifestMap.get(table); // Get the image list of this backup for restore in time order from old // to new. @@ -213,11 +212,8 @@ public class RestoreTablesClient { if (restoreImageSet != null && !restoreImageSet.isEmpty()) { LOG.info("Restore includes the following image(s):"); for (BackupImage image : restoreImageSet) { - LOG.info("Backup: " - + image.getBackupId() - + " " - + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(), - table)); + LOG.info("Backup: " + image.getBackupId() + " " + + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(), table)); if (image.getType() == BackupType.INCREMENTAL) { backupIdSet.add(image.getBackupId()); LOG.debug("adding " + image.getBackupId() + " for bulk load"); @@ -232,13 +228,13 @@ public class RestoreTablesClient { Map>[] mapForSrc = table.readBulkLoadedFiles(id, sTableList); Map loaderResult; conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true); - LoadIncrementalHFiles loader = MapReduceRestoreJob.createLoader(conf); + LoadIncrementalHFiles loader = BackupUtils.createLoader(conf); for (int i = 0; i < sTableList.size(); i++) { if (mapForSrc[i] != null && !mapForSrc[i].isEmpty()) { loaderResult = loader.run(null, mapForSrc[i], tTableArray[i]); LOG.debug("bulk loading " + sTableList.get(i) + " to " + tTableArray[i]); if (loaderResult.isEmpty()) { - String msg = "Couldn't bulk load for " + sTableList.get(i) + " to " +tTableArray[i]; + String msg = "Couldn't bulk load for " + sTableList.get(i) + " to " + tTableArray[i]; LOG.error(msg); throw new IOException(msg); } @@ -253,7 +249,7 @@ public class RestoreTablesClient { if (backupId == null) { return 0; } - return Long.parseLong(backupId.substring(backupId.lastIndexOf("_")+1)); + return Long.parseLong(backupId.substring(backupId.lastIndexOf("_") + 1)); } static boolean withinRange(long a, long lower, long upper) { @@ -268,15 +264,15 @@ public class RestoreTablesClient { // case VALIDATION: // check the target tables checkTargetTables(tTableArray, isOverwrite); + // case RESTORE_IMAGES: HashMap backupManifestMap = new HashMap<>(); // check and load backup image manifest for the tables Path rootPath = new Path(targetRootDir); HBackupFileSystem.checkImageManifestExist(backupManifestMap, sTableArray, conf, rootPath, backupId); + restore(backupManifestMap, sTableArray, tTableArray, isOverwrite); } - - } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java similarity index 94% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java index 604e5021f5..a307ea9f73 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java @@ -53,18 +53,18 @@ import org.apache.hadoop.util.ToolRunner; * for later bulk importing. */ @InterfaceAudience.Private -public class HFileSplitterJob extends Configured implements Tool { - private static final Log LOG = LogFactory.getLog(HFileSplitterJob.class); +public class MapReduceHFileSplitterJob extends Configured implements Tool { + private static final Log LOG = LogFactory.getLog(MapReduceHFileSplitterJob.class); final static String NAME = "HFileSplitterJob"; public final static String BULK_OUTPUT_CONF_KEY = "hfile.bulk.output"; public final static String TABLES_KEY = "hfile.input.tables"; public final static String TABLE_MAP_KEY = "hfile.input.tablesmap"; private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; - public HFileSplitterJob() { + public MapReduceHFileSplitterJob() { } - protected HFileSplitterJob(final Configuration c) { + protected MapReduceHFileSplitterJob(final Configuration c) { super(c); } @@ -111,7 +111,7 @@ public class HFileSplitterJob extends Configured implements Tool { Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime())); - job.setJarByClass(HFileSplitterJob.class); + job.setJarByClass(MapReduceHFileSplitterJob.class); job.setInputFormatClass(HFileInputFormat.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); @@ -164,7 +164,7 @@ public class HFileSplitterJob extends Configured implements Tool { * @throws Exception When running the job fails. */ public static void main(String[] args) throws Exception { - int ret = ToolRunner.run(new HFileSplitterJob(HBaseConfiguration.create()), args); + int ret = ToolRunner.run(new MapReduceHFileSplitterJob(HBaseConfiguration.create()), args); System.exit(ret); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java index 4161ca9cca..1209e7c31b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java @@ -17,31 +17,31 @@ */ package org.apache.hadoop.hbase.backup.mapreduce; +import static org.apache.hadoop.hbase.backup.util.BackupUtils.failed; +import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded; + import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupRestoreConstants; import org.apache.hadoop.hbase.backup.RestoreJob; +import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; -import org.apache.hadoop.hbase.mapreduce.WALPlayer; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.util.Tool; + /** * MapReduce implementation of {@link RestoreJob} * - * For full backup restore, it runs {@link HFileSplitterJob} job and creates + * For backup restore, it runs {@link MapReduceHFileSplitterJob} job and creates * HFiles which are aligned with a region boundaries of a table being - * restored, for incremental backup restore it runs {@link WALPlayer} in - * bulk load mode (creates HFiles from WAL edits). + * restored. * * The resulting HFiles then are loaded using HBase bulk load tool * {@link LoadIncrementalHFiles} @@ -62,8 +62,8 @@ public class MapReduceRestoreJob implements RestoreJob { String bulkOutputConfKey; - player = new HFileSplitterJob(); - bulkOutputConfKey = HFileSplitterJob.BULK_OUTPUT_CONF_KEY; + player = new MapReduceHFileSplitterJob(); + bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY; // Player reads all files in arbitrary directory structure and creates // a Map task for each file String dirs = StringUtils.join(dirPaths, ","); @@ -71,8 +71,8 @@ public class MapReduceRestoreJob implements RestoreJob { if (LOG.isDebugEnabled()) { LOG.debug("Restore " + (fullBackupRestore ? "full" : "incremental") + " backup from directory " + dirs + " from hbase tables " - + StringUtils.join(tableNames, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND) + - " to tables " + + StringUtils.join(tableNames, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND) + + " to tables " + StringUtils.join(newTableNames, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND)); } @@ -80,13 +80,16 @@ public class MapReduceRestoreJob implements RestoreJob { LOG.info("Restore " + tableNames[i] + " into " + newTableNames[i]); - Path bulkOutputPath = getBulkOutputDir(getFileNameCompatibleString(newTableNames[i])); + Path bulkOutputPath = + BackupUtils.getBulkOutputDir(BackupUtils.getFileNameCompatibleString(newTableNames[i]), + getConf()); Configuration conf = getConf(); conf.set(bulkOutputConfKey, bulkOutputPath.toString()); String[] playerArgs = - { dirs, - fullBackupRestore? newTableNames[i].getNameAsString():tableNames[i].getNameAsString() - }; + { + dirs, + fullBackupRestore ? newTableNames[i].getNameAsString() : tableNames[i] + .getNameAsString() }; int result = 0; int loaderResult = 0; @@ -96,7 +99,7 @@ public class MapReduceRestoreJob implements RestoreJob { result = player.run(playerArgs); if (succeeded(result)) { // do bulk load - LoadIncrementalHFiles loader = createLoader(getConf()); + LoadIncrementalHFiles loader = BackupUtils.createLoader(getConf()); if (LOG.isDebugEnabled()) { LOG.debug("Restoring HFiles from directory " + bulkOutputPath); } @@ -113,58 +116,11 @@ public class MapReduceRestoreJob implements RestoreJob { } LOG.debug("Restore Job finished:" + result); } catch (Exception e) { + LOG.error(e); throw new IOException("Can not restore from backup directory " + dirs + " (check Hadoop and HBase logs) ", e); } - - } - } - - private String getFileNameCompatibleString(TableName table) { - return table.getNamespaceAsString() + "-" + table.getQualifierAsString(); - } - - private boolean failed(int result) { - return result != 0; - } - - private boolean succeeded(int result) { - return result == 0; - } - - public static LoadIncrementalHFiles createLoader(Configuration config) throws IOException { - // set configuration for restore: - // LoadIncrementalHFile needs more time - // hbase.rpc.timeout 600000 - // calculates - Integer milliSecInHour = 3600000; - Configuration conf = new Configuration(config); - conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, milliSecInHour); - - // By default, it is 32 and loader will fail if # of files in any region exceed this - // limit. Bad for snapshot restore. - conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE); - conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes"); - LoadIncrementalHFiles loader = null; - try { - loader = new LoadIncrementalHFiles(conf); - } catch (Exception e) { - throw new IOException(e); } - return loader; - } - - private Path getBulkOutputDir(String tableName) throws IOException { - Configuration conf = getConf(); - FileSystem fs = FileSystem.get(conf); - String tmp = - conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, - HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); - Path path = - new Path(tmp + Path.SEPARATOR + "bulk_output-" + tableName + "-" - + EnvironmentEdgeManager.currentTime()); - fs.deleteOnExit(path); - return path; } @Override diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java index e32853d809..ce77645e34 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java @@ -56,7 +56,9 @@ import org.apache.hadoop.hbase.backup.impl.BackupManifest; import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; @@ -68,14 +70,15 @@ import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; public final class BackupUtils { protected static final Log LOG = LogFactory.getLog(BackupUtils.class); public static final String LOGNAME_SEPARATOR = "."; + public static final int MILLISEC_IN_HOUR = 3600000; private BackupUtils() { throw new AssertionError("Instantiating utility class..."); } /** - * Loop through the RS log timestamp map for the tables, for each RS, find the min timestamp - * value for the RS among the tables. + * Loop through the RS log timestamp map for the tables, for each RS, find the min timestamp value + * for the RS among the tables. * @param rsLogTimestampMap timestamp map * @return the min timestamp of each RS */ @@ -114,16 +117,17 @@ public final class BackupUtils { } /** - * copy out Table RegionInfo into incremental backup image need to consider move this - * logic into HBackupFileSystem + * copy out Table RegionInfo into incremental backup image need to consider move this logic into + * HBackupFileSystem * @param conn connection * @param backupInfo backup info * @param conf configuration * @throws IOException exception * @throws InterruptedException exception */ - public static void copyTableRegionInfo(Connection conn, BackupInfo backupInfo, - Configuration conf) throws IOException, InterruptedException { + public static void + copyTableRegionInfo(Connection conn, BackupInfo backupInfo, Configuration conf) + throws IOException, InterruptedException { Path rootDir = FSUtils.getRootDir(conf); FileSystem fs = rootDir.getFileSystem(conf); @@ -152,10 +156,8 @@ public final class BackupUtils { LOG.debug("Starting to write region info for table " + table); for (HRegionInfo regionInfo : regions) { Path regionDir = - HRegion.getRegionDir(new Path(backupInfo.getTableBackupDir(table)), - regionInfo); - regionDir = - new Path(backupInfo.getTableBackupDir(table), regionDir.getName()); + HRegion.getRegionDir(new Path(backupInfo.getTableBackupDir(table)), regionInfo); + regionDir = new Path(backupInfo.getTableBackupDir(table), regionDir.getName()); writeRegioninfoOnFilesystem(conf, targetFs, regionDir, regionInfo); } LOG.debug("Finished writing region info for table " + table); @@ -301,7 +303,6 @@ public final class BackupUtils { return ret; } - /** * Check whether the backup path exist * @param backupStr backup @@ -431,8 +432,7 @@ public final class BackupUtils { * @param conf configuration * @throws IOException exception */ - private static void cleanupHLogDir(BackupInfo backupInfo, Configuration conf) - throws IOException { + private static void cleanupHLogDir(BackupInfo backupInfo, Configuration conf) throws IOException { String logDir = backupInfo.getHLogTargetDir(); if (logDir == null) { @@ -452,7 +452,6 @@ public final class BackupUtils { } } - private static void cleanupTargetDir(BackupInfo backupInfo, Configuration conf) { try { // clean up the data at target directory @@ -498,8 +497,8 @@ public final class BackupUtils { * @param tableName table name * @return backupPath String for the particular table */ - public static String getTableBackupDir(String backupRootDir, String backupId, - TableName tableName) { + public static String + getTableBackupDir(String backupRootDir, String backupId, TableName tableName) { return backupRootDir + Path.SEPARATOR + backupId + Path.SEPARATOR + tableName.getNamespaceAsString() + Path.SEPARATOR + tableName.getQualifierAsString() + Path.SEPARATOR; @@ -523,7 +522,6 @@ public final class BackupUtils { return list; } - /** * Calls fs.listStatus() and treats FileNotFoundException as non-fatal This accommodates * differences between hadoop versions, where hadoop 1 does not throw a FileNotFoundException, and @@ -655,19 +653,16 @@ public final class BackupUtils { * @param backupId backup id * @param check check only * @param fromTables table list from - * @param toTables table list to + * @param toTables table list to * @param isOverwrite overwrite data * @return request obkect */ public static RestoreRequest createRestoreRequest(String backupRootDir, String backupId, boolean check, TableName[] fromTables, TableName[] toTables, boolean isOverwrite) { RestoreRequest.Builder builder = new RestoreRequest.Builder(); - RestoreRequest request = builder.withBackupRootDir(backupRootDir) - .withBackupId(backupId) - .withCheck(check) - .withFromTables(fromTables) - .withToTables(toTables) - .withOvewrite(isOverwrite).build(); + RestoreRequest request = + builder.withBackupRootDir(backupRootDir).withBackupId(backupId).withCheck(check) + .withFromTables(fromTables).withToTables(toTables).withOvewrite(isOverwrite).build(); return request; } @@ -699,4 +694,54 @@ public final class BackupUtils { return isValid; } + public static Path getBulkOutputDir(String tableName, Configuration conf, boolean deleteOnExit) + throws IOException { + FileSystem fs = FileSystem.get(conf); + String tmp = + conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); + Path path = + new Path(tmp + Path.SEPARATOR + "bulk_output-" + tableName + "-" + + EnvironmentEdgeManager.currentTime()); + if (deleteOnExit) { + fs.deleteOnExit(path); + } + return path; + } + + public static Path getBulkOutputDir(String tableName, Configuration conf) throws IOException { + return getBulkOutputDir(tableName, conf, true); + } + + public static String getFileNameCompatibleString(TableName table) { + return table.getNamespaceAsString() + "-" + table.getQualifierAsString(); + } + + public static boolean failed(int result) { + return result != 0; + } + + public static boolean succeeded(int result) { + return result == 0; + } + + public static LoadIncrementalHFiles createLoader(Configuration config) throws IOException { + // set configuration for restore: + // LoadIncrementalHFile needs more time + // hbase.rpc.timeout 600000 + // calculates + Configuration conf = new Configuration(config); + conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, MILLISEC_IN_HOUR); + + // By default, it is 32 and loader will fail if # of files in any region exceed this + // limit. Bad for snapshot restore. + conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE); + conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes"); + LoadIncrementalHFiles loader = null; + try { + loader = new LoadIncrementalHFiles(conf); + } catch (Exception e) { + throw new IOException(e); + } + return loader; + } }