diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java index 6f642a443b..cc69ef01ee 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java @@ -64,12 +64,20 @@ public interface BackupAdmin extends Closeable { /** * Delete backup image command - * @param backupIds backup id list + * @param backupIds array of backup ids * @return total number of deleted sessions * @throws IOException exception */ int deleteBackups(String[] backupIds) throws IOException; + + /** + * Merge backup images command + * @param backupIds array of backup ids + * @throws IOException exception + */ + void mergeBackups(String[] backupIds) throws IOException; + /** * Show backup history command * @param n last n backup sessions @@ -113,7 +121,7 @@ public interface BackupAdmin extends Closeable { /** * Add tables to backup set command * @param name name of backup set. - * @param tables list of tables to be added to this set. + * @param tables array of tables to be added to this set. * @throws IOException exception */ void addToBackupSet(String name, TableName[] tables) throws IOException; @@ -121,7 +129,7 @@ public interface BackupAdmin extends Closeable { /** * Remove tables from backup set * @param name name of backup set. - * @param tables list of tables to be removed from this set. + * @param tables array of tables to be removed from this set. * @throws IOException exception */ void removeFromBackupSet(String name, TableName[] tables) throws IOException; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupClientFactory.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupClientFactory.java index 21d73ccde4..be7bd9d9ff 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupClientFactory.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupClientFactory.java @@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.client.Connection; public class BackupClientFactory { - public static TableBackupClient create (Connection conn, String backupId, BackupRequest request) + public static TableBackupClient create(Connection conn, String backupId, BackupRequest request) throws IOException { Configuration conf = conn.getConfiguration(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java index e2cdb2f4cb..9dd85317e2 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java @@ -111,6 +111,8 @@ public class BackupDriver extends AbstractHBaseTool { type = BackupCommand.SET; } else if (BackupCommand.REPAIR.name().equalsIgnoreCase(cmd)) { type = BackupCommand.REPAIR; + } else if (BackupCommand.MERGE.name().equalsIgnoreCase(cmd)) { + type = BackupCommand.MERGE; } else { System.out.println("Unsupported command for backup: " + cmd); printToolUsage(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupMergeJob.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupMergeJob.java new file mode 100644 index 0000000000..7437fb5e85 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupMergeJob.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Backup merge operation job interface. Concrete implementation is provided by backup provider, + * see {@link BackupRestoreFactory} + */ + +@InterfaceAudience.Private +public interface BackupMergeJob extends Configurable { + + /** + * Run backup merge operation + * @param backupIds backup image ids + * @throws IOException + */ + void run(String[] backupIds) throws IOException; +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java index 6d8967a06e..d72c88432f 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreFactory.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.backup; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyJob; +import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupMergeJob; import org.apache.hadoop.hbase.backup.mapreduce.MapReduceRestoreJob; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.util.ReflectionUtils; @@ -32,6 +33,7 @@ public final class BackupRestoreFactory { public final static String HBASE_INCR_RESTORE_IMPL_CLASS = "hbase.incremental.restore.class"; public final static String HBASE_BACKUP_COPY_IMPL_CLASS = "hbase.backup.copy.class"; + public final static String HBASE_BACKUP_MERGE_IMPL_CLASS = "hbase.backup.merge.class"; private BackupRestoreFactory() { throw new AssertionError("Instantiating utility class..."); @@ -40,7 +42,7 @@ public final class BackupRestoreFactory { /** * Gets backup restore job * @param conf configuration - * @return backup restore task instance + * @return backup restore job instance */ public static RestoreJob getRestoreJob(Configuration conf) { Class cls = @@ -53,7 +55,7 @@ public final class BackupRestoreFactory { /** * Gets backup copy job * @param conf configuration - * @return backup copy task + * @return backup copy job instance */ public static BackupCopyJob getBackupCopyJob(Configuration conf) { Class cls = @@ -63,4 +65,18 @@ public final class BackupRestoreFactory { service.setConf(conf); return service; } + + /** + * Gets backup merge job + * @param conf configuration + * @return backup merge job instance + */ + public static BackupMergeJob getBackupMergeJob(Configuration conf) { + Class cls = + conf.getClass(HBASE_BACKUP_MERGE_IMPL_CLASS, MapReduceBackupMergeJob.class, + BackupMergeJob.class); + BackupMergeJob service = ReflectionUtils.newInstance(cls, conf); + service.setConf(conf); + return service; + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java index 46044db61e..376ed664da 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java @@ -63,6 +63,18 @@ public class HBackupFileSystem { + Path.SEPARATOR; } + + public static String + getTableBackupDataDir(String backupRootDir, String backupId, TableName tableName) { + return getTableBackupDir(backupRootDir, backupId, tableName) + Path.SEPARATOR+"data"; + } + + + public static Path getBackupPath(String backupRootDir, String backupId) + { + return new Path (backupRootDir + Path.SEPARATOR + backupId); + } + /** * Given the backup root dir, backup id and the table name, return the backup image location, * which is also where the backup manifest file is. return value look like: @@ -94,17 +106,15 @@ public class HBackupFileSystem { return new Path(getLogBackupDir(backupRootDir, backupId)); } - private static Path getManifestPath(TableName tableName, Configuration conf, Path backupRootPath, + // TODO we do not keep WAL files anymore + // Move manifest file to other place + private static Path getManifestPath(Configuration conf, Path backupRootPath, String backupId) throws IOException { - Path manifestPath = - new Path(getTableBackupPath(tableName, backupRootPath, backupId), - BackupManifest.MANIFEST_FILE_NAME); + Path manifestPath = null; FileSystem fs = backupRootPath.getFileSystem(conf); - if (!fs.exists(manifestPath)) { - // check log dir for incremental backup case manifestPath = - new Path(getLogBackupDir(backupRootPath.toString(), backupId) + Path.SEPARATOR + new Path(getBackupPath(backupRootPath.toString(), backupId) + Path.SEPARATOR + BackupManifest.MANIFEST_FILE_NAME); if (!fs.exists(manifestPath)) { String errorMsg = @@ -113,14 +123,13 @@ public class HBackupFileSystem { + " correspond to previously taken backup ?"; throw new IOException(errorMsg); } - } return manifestPath; } - public static BackupManifest getManifest(TableName tableName, Configuration conf, + public static BackupManifest getManifest(Configuration conf, Path backupRootPath, String backupId) throws IOException { BackupManifest manifest = - new BackupManifest(conf, getManifestPath(tableName, conf, backupRootPath, backupId)); + new BackupManifest(conf, getManifestPath(conf, backupRootPath, backupId)); return manifest; } @@ -134,7 +143,7 @@ public class HBackupFileSystem { TableName[] tableArray, Configuration conf, Path backupRootPath, String backupId) throws IOException { for (TableName tableName : tableArray) { - BackupManifest manifest = getManifest(tableName, conf, backupRootPath, backupId); + BackupManifest manifest = getManifest(conf, backupRootPath, backupId); backupManifestMap.put(tableName, manifest); } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java index d1ee8e13dd..d65493c2f1 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java @@ -36,8 +36,10 @@ import org.apache.hadoop.hbase.backup.BackupAdmin; import org.apache.hadoop.hbase.backup.BackupClientFactory; import org.apache.hadoop.hbase.backup.BackupInfo; import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; +import org.apache.hadoop.hbase.backup.BackupMergeJob; import org.apache.hadoop.hbase.backup.BackupRequest; import org.apache.hadoop.hbase.backup.BackupRestoreConstants; +import org.apache.hadoop.hbase.backup.BackupRestoreFactory; import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.HBackupFileSystem; import org.apache.hadoop.hbase.backup.RestoreRequest; @@ -66,7 +68,7 @@ public class BackupAdminImpl implements BackupAdmin { @Override public void close() throws IOException { if (conn != null) { - conn.close(); + // conn.close(); } } @@ -613,4 +615,49 @@ public class BackupAdminImpl implements BackupAdmin { return tableList; } + @Override + public void mergeBackups(String[] backupIds) throws IOException { + try (final BackupSystemTable sysTable = new BackupSystemTable(conn);) { + checkIfValidForMerge(backupIds, sysTable); + BackupMergeJob job = BackupRestoreFactory.getBackupMergeJob(conn.getConfiguration()); + job.run(backupIds); + } + } + + /** + * All backups MUST be in the same destination, + * no FULL backups are allowed - only INCREMENTAL ones + * only in COMPLETE state + * @param backupIds list of backup ids + * @param table backup system table + * @throws IOException + */ + private void checkIfValidForMerge(String[] backupIds, BackupSystemTable table) + throws IOException + { + String backupRoot = null; + for (String backupId: backupIds) { + BackupInfo bInfo = table.readBackupInfo(backupId); + if (bInfo == null) { + String msg = "Backup session "+ backupId + " not found"; + throw new IOException(msg); + } + if(backupRoot == null) { + backupRoot = bInfo.getBackupRootDir(); + } else if( !bInfo.getBackupRootDir().equals(backupRoot)) { + throw new IOException("Found different backup destinations in a list of backup sessions \n" + +"1. "+ backupRoot+"\n" + "2. " + bInfo.getBackupRootDir()); + } + if( bInfo.getType() == BackupType.FULL) { + throw new IOException("FULL backup image can not be merged for: \n"+ bInfo); + } + + if( bInfo.getState() != BackupState.COMPLETE) { + throw new IOException("Backup image "+backupId+" can not be merged becuase of its state: " + + bInfo.getState()); + } + } + } + + } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java index 2a5c9595e7..9dcc16732b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java @@ -114,6 +114,8 @@ public final class BackupCommands { + " remove remove tables from a set\n" + " list list all backup sets in the system\n" + " describe describe set\n" + " delete delete backup set\n"; + public static final String MERGE_CMD_USAGE = "Usage: hbase backup merge [backup_ids]\n" + + " backup_ids Comma separated list of backup image ids.\n"; public static final String USAGE_FOOTER = ""; @@ -155,10 +157,18 @@ public final class BackupCommands { String[] ids = table.getListOfBackupIdsFromDeleteOperation(); if(ids !=null && ids.length > 0) { - System.err.println("Found failed backup delete coommand. "); + System.err.println("Found failed backup DELETE coommand. "); System.err.println("Backup system recovery is required."); - throw new IOException("Failed backup delete found, aborted command execution"); + throw new IOException("Failed backup DELETE found, aborted command execution"); } + + ids = table.getListOfBackupIdsFromMergeOperation(); + if(ids !=null && ids.length > 0) { + System.err.println("Found failed backup MERGE coommand. "); + System.err.println("Backup system recovery is required."); + throw new IOException("Failed backup MERGE found, aborted command execution"); + } + } } } @@ -220,6 +230,9 @@ public final class BackupCommands { case REPAIR: cmd = new RepairCommand(conf, cmdline); break; + case MERGE: + cmd = new MergeCommand(conf, cmdline); + break; case HELP: default: cmd = new HelpCommand(conf, cmdline); @@ -586,6 +599,7 @@ public final class BackupCommands { System.out.println("REPAIR status: no failed sessions found." +" Checking failed delete backup operation ..."); repairFailedBackupDeletionIfAny(conn, sysTable); + repairFailedBackupMergeIfAny(conn, sysTable); return; } backupInfo = list.get(0); @@ -617,13 +631,13 @@ public final class BackupCommands { { String[] backupIds = sysTable.getListOfBackupIdsFromDeleteOperation(); if (backupIds == null ||backupIds.length == 0) { - System.out.println("No failed backup delete operation found"); + System.out.println("No failed backup DELETE operation found"); // Delete backup table snapshot if exists BackupSystemTable.deleteSnapshot(conn); return; } - System.out.println("Found failed delete operation for: " + StringUtils.join(backupIds)); - System.out.println("Running delete again ..."); + System.out.println("Found failed DELETE operation for: " + StringUtils.join(backupIds)); + System.out.println("Running DELETE again ..."); // Restore table from snapshot BackupSystemTable.restoreFromSnapshot(conn); // Finish previous failed session @@ -631,16 +645,85 @@ public final class BackupCommands { try(BackupAdmin admin = new BackupAdminImpl(conn);) { admin.deleteBackups(backupIds); } - System.out.println("Delete operation finished OK: "+ StringUtils.join(backupIds)); + System.out.println("DELETE operation finished OK: "+ StringUtils.join(backupIds)); } + + private void repairFailedBackupMergeIfAny(Connection conn, BackupSystemTable sysTable) + throws IOException + { + String[] backupIds = sysTable.getListOfBackupIdsFromMergeOperation(); + if (backupIds == null ||backupIds.length == 0) { + System.out.println("No failed backup MERGE operation found"); + // Delete backup table snapshot if exists + BackupSystemTable.deleteSnapshot(conn); + return; + } + System.out.println("Found failed MERGE operation for: " + StringUtils.join(backupIds)); + System.out.println("Running MEREGE again ..."); + // Restore table from snapshot + BackupSystemTable.restoreFromSnapshot(conn); + // Finish previous failed session + sysTable.finishMergeOperation(); + try(BackupAdmin admin = new BackupAdminImpl(conn);) { + admin.mergeBackups(backupIds); + } + System.out.println("MERGE operation finished OK: "+ StringUtils.join(backupIds)); + + } + + @Override protected void printUsage() { System.out.println(REPAIR_CMD_USAGE); } } + + private static class MergeCommand extends Command { + + MergeCommand(Configuration conf, CommandLine cmdline) { + super(conf); + this.cmdline = cmdline; + } + + @Override + protected boolean requiresNoActiveSession() { + return true; + } + + @Override + protected boolean requiresConsistentState() { + return true; + } + + @Override + public void execute() throws IOException { + super.execute(); + + String[] args = cmdline == null ? null : cmdline.getArgs(); + if (args ==null || (args.length != 2)) { + System.err.println("ERROR: wrong number of arguments: " + + (args == null? null:args.length)); + printUsage(); + throw new IOException(INCORRECT_USAGE); + } + + String[] backupIds = args[1].split(","); + Configuration conf = getConf() != null ? getConf() : HBaseConfiguration.create(); + try (final Connection conn = ConnectionFactory.createConnection(conf); + final BackupAdminImpl admin = new BackupAdminImpl(conn);) { + admin.mergeBackups(backupIds); + } + } + + @Override + protected void printUsage() { + System.out.println(MERGE_CMD_USAGE); + } + } + // TODO Cancel command private static class CancelCommand extends Command { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java index a929700a98..d2a2891300 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java @@ -313,7 +313,7 @@ public class BackupManager implements Closeable { } } else { Path logBackupPath = - HBackupFileSystem.getLogBackupPath(backup.getBackupRootDir(), backup.getBackupId()); + HBackupFileSystem.getBackupPath(backup.getBackupRootDir(), backup.getBackupId()); LOG.debug("Current backup has an incremental backup ancestor, " + "touching its image manifest in " + logBackupPath.toString() + " to construct the dependency."); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java index b8adac9fba..524bb79673 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupInfo; import org.apache.hadoop.hbase.backup.BackupType; +import org.apache.hadoop.hbase.backup.HBackupFileSystem; import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; @@ -294,6 +295,16 @@ public class BackupManifest { return this.ancestors; } + public void removeAncestors(List backupIds) { + List toRemove = new ArrayList(); + for (BackupImage im: this.ancestors) { + if (backupIds.contains(im.getBackupId())) { + toRemove.add(im); + } + } + this.ancestors.removeAll(toRemove); + } + private void addAncestor(BackupImage backupImage) { this.getAncestors().add(backupImage); } @@ -464,18 +475,15 @@ public class BackupManifest { } /** - * Persist the manifest file. + * TODO: fix it. Persist the manifest file. * @throws IOException IOException when storing the manifest file. */ public void store(Configuration conf) throws BackupException { byte[] data = backupImage.toProto().toByteArray(); // write the file, overwrite if already exist - String logBackupDir = - BackupUtils.getLogBackupDir(backupImage.getRootDir(), backupImage.getBackupId()); - Path manifestFilePath = - new Path(new Path((tableBackupDir != null ? tableBackupDir : logBackupDir)), - MANIFEST_FILE_NAME); + Path manifestFilePath = new Path(HBackupFileSystem.getBackupPath(backupImage.getRootDir(), + backupImage.getBackupId()), MANIFEST_FILE_NAME); try (FSDataOutputStream out = manifestFilePath.getFileSystem(conf).create(manifestFilePath, true);) { out.write(data); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java index e5a3daace2..4fa263b776 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java @@ -147,6 +147,8 @@ public final class BackupSystemTable implements Closeable { private final static String BULK_LOAD_PREFIX = "bulk:"; private final static byte[] BULK_LOAD_PREFIX_BYTES = BULK_LOAD_PREFIX.getBytes(); private final static byte[] DELETE_OP_ROW = "delete_op_row".getBytes(); + private final static byte[] MERGE_OP_ROW = "merge_op_row".getBytes(); + final static byte[] TBL_COL = Bytes.toBytes("tbl"); final static byte[] FAM_COL = Bytes.toBytes("fam"); @@ -1765,6 +1767,86 @@ public final class BackupSystemTable implements Closeable { } } + private Put createPutForMergeOperation(String[] backupIdList) { + + byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ",")); + Put put = new Put(MERGE_OP_ROW); + put.addColumn(META_FAMILY, FAM_COL, value); + return put; + } + + private Put createPutForUpdateTablesForMerge(List tables) { + + byte[] value = Bytes.toBytes(StringUtils.join(tables, ",")); + Put put = new Put(MERGE_OP_ROW); + put.addColumn(META_FAMILY, PATH_COL, value); + return put; + } + + private Delete createDeleteForBackupMergeOperation() { + + Delete delete = new Delete(MERGE_OP_ROW); + delete.addFamily(META_FAMILY); + return delete; + } + + private Get createGetForMergeOperation() { + + Get get = new Get(MERGE_OP_ROW); + get.addFamily(META_FAMILY); + return get; + } + + public void startMergeOperation(String[] backupIdList) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Start merge operation for backups: " + StringUtils.join(backupIdList)); + } + Put put = createPutForMergeOperation(backupIdList); + try (Table table = connection.getTable(tableName)) { + table.put(put); + } + } + + public void updateProcessedTablesForMerge(List tables) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Update tables for merge : " + StringUtils.join(tables, ",")); + } + Put put = createPutForUpdateTablesForMerge(tables); + try (Table table = connection.getTable(tableName)) { + table.put(put); + } + } + + public void finishMergeOperation() throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Finsih merge operation for backup ids "); + } + Delete delete = createDeleteForBackupMergeOperation(); + try (Table table = connection.getTable(tableName)) { + table.delete(delete); + } + } + + public String[] getListOfBackupIdsFromMergeOperation() throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Get backup ids for merge operation"); + } + Get get = createGetForMergeOperation(); + try (Table table = connection.getTable(tableName)) { + Result res = table.get(get); + if (res.isEmpty()) { + return null; + } + Cell cell = res.listCells().get(0); + byte[] val = CellUtil.cloneValue(cell); + if (val.length == 0) { + return null; + } + return new String(val).split(","); + } + } + + static Scan createScanForOrigBulkLoadedFiles(TableName table) throws IOException { Scan scan = new Scan(); byte[] startRow = rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java index 381e9b15df..7c98b129f3 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java @@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.HBackupFileSystem; import org.apache.hadoop.hbase.backup.RestoreRequest; import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; -import org.apache.hadoop.hbase.backup.mapreduce.MapReduceRestoreJob; +import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.backup.util.RestoreTool; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Admin; @@ -148,7 +148,7 @@ public class RestoreTablesClient { Path tableBackupPath = HBackupFileSystem.getTableBackupPath(sTable, backupRoot, backupId); String lastIncrBackupId = images.length == 1 ? null : images[images.length - 1].getBackupId(); // We need hFS only for full restore (see the code) - BackupManifest manifest = HBackupFileSystem.getManifest(sTable, conf, backupRoot, backupId); + BackupManifest manifest = HBackupFileSystem.getManifest(conf, backupRoot, backupId); if (manifest.getType() == BackupType.FULL) { fullBackupId = manifest.getBackupImage().getBackupId(); LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from full" + " backup image " @@ -169,8 +169,8 @@ public class RestoreTablesClient { // full backup path comes first for (int i = 1; i < images.length; i++) { BackupImage im = images[i]; - String fileBackupDir = HBackupFileSystem.getTableBackupDir(im.getRootDir(), - im.getBackupId(), sTable)+ Path.SEPARATOR+"data"; + String fileBackupDir = HBackupFileSystem.getTableBackupDataDir(im.getRootDir(), + im.getBackupId(), sTable); dirList.add(new Path(fileBackupDir)); } @@ -196,8 +196,10 @@ public class RestoreTablesClient { TreeSet restoreImageSet = new TreeSet(); boolean truncateIfExists = isOverwrite; Set backupIdSet = new HashSet<>(); + for (int i = 0; i < sTableArray.length; i++) { TableName table = sTableArray[i]; + BackupManifest manifest = backupManifestMap.get(table); // Get the image list of this backup for restore in time order from old // to new. @@ -232,7 +234,7 @@ public class RestoreTablesClient { Map>[] mapForSrc = table.readBulkLoadedFiles(id, sTableList); Map loaderResult; conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true); - LoadIncrementalHFiles loader = MapReduceRestoreJob.createLoader(conf); + LoadIncrementalHFiles loader = BackupUtils.createLoader(conf); for (int i = 0; i < sTableList.size(); i++) { if (mapForSrc[i] != null && !mapForSrc[i].isEmpty()) { loaderResult = loader.run(null, mapForSrc[i], tTableArray[i]); @@ -268,12 +270,14 @@ public class RestoreTablesClient { // case VALIDATION: // check the target tables checkTargetTables(tTableArray, isOverwrite); + // case RESTORE_IMAGES: HashMap backupManifestMap = new HashMap<>(); // check and load backup image manifest for the tables Path rootPath = new Path(targetRootDir); HBackupFileSystem.checkImageManifestExist(backupManifestMap, sTableArray, conf, rootPath, backupId); + restore(backupManifestMap, sTableArray, tTableArray, isOverwrite); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java new file mode 100644 index 0000000000..ac5244d3ec --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java @@ -0,0 +1,325 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.mapreduce; + +import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupInfo; +import org.apache.hadoop.hbase.backup.BackupMergeJob; +import org.apache.hadoop.hbase.backup.BackupRestoreConstants; +import org.apache.hadoop.hbase.backup.HBackupFileSystem; +import org.apache.hadoop.hbase.backup.impl.BackupManifest; +import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; +import org.apache.hadoop.hbase.backup.util.BackupUtils; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.util.Tool; + +/** + * MapReduce implementation of {@link BackupMergeJob} + * Must be initialized with configuration of a backup destination cluster + * + */ + +@InterfaceAudience.Private +public class MapReduceBackupMergeJob implements BackupMergeJob { + public static final Log LOG = LogFactory.getLog(MapReduceBackupMergeJob.class); + + private Tool player; + private Configuration conf; + + + public MapReduceBackupMergeJob() { + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public void run(String[] backupIds) throws IOException { + String bulkOutputConfKey; + + // TODO : run player on remote cluster + player = new MapReduceHFileSplitterJob(); + bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY; + // Player reads all files in arbitrary directory structure and creates + // a Map task for each file + String bids = StringUtils.join(backupIds, ","); + + if (LOG.isDebugEnabled()) { + LOG.debug("Merge backup images " + bids); + } + + List> processedTableList = new ArrayList>(); + boolean finishedTables = false; + Connection conn = ConnectionFactory.createConnection(getConf()); + BackupSystemTable table = new BackupSystemTable(conn); + FileSystem fs = FileSystem.get(getConf()); + + try { + + // Start merge operation + table.startMergeOperation(backupIds); + + // Select most recent backup id + String mergedBackupId = findMostRecentBackupId(backupIds); + + TableName[] tableNames = getTableNamesInBackupImages(backupIds); + String backupRoot = null; + + BackupInfo bInfo = table.readBackupInfo(backupIds[0]); + backupRoot = bInfo.getBackupRootDir(); + + for (int i = 0; i < tableNames.length; i++) { + + LOG.info("Merge backup images for " + tableNames[i]); + + // Find input directories for table + + Path[] dirPaths = findInputDirectories(fs, backupRoot, tableNames[i], backupIds); + String dirs = StringUtils.join(dirPaths, ","); + Path bulkOutputPath = + BackupUtils.getBulkOutputDir(BackupUtils.getFileNameCompatibleString(tableNames[i]), + getConf(), false); + // Delete content if exists + if (fs.exists(bulkOutputPath)) { + if (!fs.delete(bulkOutputPath, true)) { + LOG.warn("Can not delete: " + bulkOutputPath); + } + } + Configuration conf = getConf(); + conf.set(bulkOutputConfKey, bulkOutputPath.toString()); + String[] playerArgs = { dirs, tableNames[i].getNameAsString() }; + + int result = 0; + + player.setConf(getConf()); + result = player.run(playerArgs); + if (succeeded(result)) { + // Add to processed table list + processedTableList.add( new Pair(tableNames[i], bulkOutputPath)); + } else { + throw new IOException("Can not merge backup images for " + dirs + + " (check Hadoop/MR and HBase logs). Player return code =" + result); + } + LOG.debug("Merge Job finished:" + result); + } + List tableList = toTableNameList(processedTableList); + table.updateProcessedTablesForMerge(tableList); + finishedTables = true; + + // Move data + for (Pair tn : processedTableList) { + moveData(fs, backupRoot, tn.getSecond(), tn.getFirst(), mergedBackupId); + } + + // Delete old data and update manifest + List backupsToDelete = getBackupIdsToDelete(backupIds, mergedBackupId); + deleteBackupImages(backupsToDelete, conn, fs, backupRoot); + updateBackupManifest(backupRoot, mergedBackupId, backupsToDelete); + + // Finish merge session + table.finishMergeOperation(); + } catch (Exception e) { + LOG.error(e); + if (!finishedTables) { + // cleanup bulk directories and finish merge + // merge MUST be repeated (no need for repair) + cleanupBulkLoadDirs (fs, toPathList(processedTableList)); + table.finishMergeOperation(); + throw new IOException ("Backup merge operation failed, you should try it again", e); + } else { + // backup repair must be run + throw new IOException ("Backup merge operation failed, run backup repair tool to restore system's integrity", e); + } + } finally { + if (table != null) { + table.close(); + } + if (conn !=null) { + conn.close(); + } + } + } + + private List toPathList(List> processedTableList) { + ArrayList list = new ArrayList(); + for(Pair p: processedTableList) { + list.add(p.getSecond()); + } + return list; + } + + private List toTableNameList(List> processedTableList) { + ArrayList list = new ArrayList(); + for(Pair p: processedTableList) { + list.add(p.getFirst()); + } + return list; + } + + private void cleanupBulkLoadDirs(FileSystem fs, List pathList) throws IOException { + for (Path path: pathList) { + + if (!fs.delete(path, true)) { + LOG.warn("Can't delete "+ path); + } + } + } + + private void updateBackupManifest(String backupRoot, String mergedBackupId, + List backupsToDelete) throws IllegalArgumentException, IOException { + + BackupManifest manifest = HBackupFileSystem.getManifest(conf, + new Path(backupRoot), mergedBackupId); + manifest.getBackupImage().removeAncestors(backupsToDelete); + // save back + manifest.store(conf); + + } + + private void deleteBackupImages(List backupIds, Connection conn, + FileSystem fs, String backupRoot) throws IOException + { + + + // Delete from backup system table + try (BackupSystemTable table = new BackupSystemTable(conn);) { + for (String backupId: backupIds) { + table.deleteBackupInfo(backupId); + } + } + + // Delete from file system + for (String backupId: backupIds) { + Path backupDirPath = HBackupFileSystem.getBackupPath(backupRoot, + backupId); + + if (!fs.delete(backupDirPath, true)) { + LOG.warn("Could not delete "+ backupDirPath); + } + } + } + + private List getBackupIdsToDelete(String[] backupIds, String mergedBackupId) { + List list = new ArrayList(); + for (String id : backupIds) { + if (id.equals(mergedBackupId)) { + continue; + } + list.add(id); + } + return list; + } + + private void moveData(FileSystem fs, String backupRoot, Path bulkOutputPath, + TableName tableName, String mergedBackupId) throws IllegalArgumentException, IOException + { + + Path dest = new Path(HBackupFileSystem.getTableBackupDataDir(backupRoot, + mergedBackupId, tableName)); + + // Delete all in dest + if(!fs.delete(dest, true)) { + throw new IOException("Could not delete "+ dest); + } + + FileStatus[] fsts = fs.listStatus(bulkOutputPath); + for (FileStatus fst: fsts) { + if (fst.isDirectory()) { + fs.rename(fst.getPath().getParent(), dest ); + } + } + + } + + private String findMostRecentBackupId(String[] backupIds) { + long recentTimestamp = Long.MIN_VALUE; + for (String backupId: backupIds) { + long ts = Long.parseLong(backupId.split("_")[1]); + if (ts > recentTimestamp) { + recentTimestamp = ts; + } + } + return BackupRestoreConstants.BACKUPID_PREFIX + recentTimestamp; + } + + private TableName[] getTableNamesInBackupImages(String[] backupIds) throws IOException + { + + Set allSet = new HashSet(); + + try ( Connection conn = ConnectionFactory.createConnection(conf); + BackupSystemTable table = new BackupSystemTable(conn); ) + { + for ( String backupId: backupIds) { + BackupInfo bInfo = table.readBackupInfo(backupId); + + allSet.addAll(bInfo.getTableNames()); + } + } + + TableName[] ret = new TableName[allSet.size()]; + return allSet.toArray(ret); + } + + private Path[] findInputDirectories(FileSystem fs, String backupRoot, + TableName tableName, String[] backupIds) throws IOException { + + List dirs = new ArrayList(); + + for (String backupId: backupIds) { + Path fileBackupDirPath = new Path(HBackupFileSystem.getTableBackupDataDir(backupRoot, + backupId, tableName)); + if (fs.exists(fileBackupDirPath)) { + dirs.add(fileBackupDirPath); + } else { + if (LOG.isTraceEnabled()) { + LOG.trace("File: "+fileBackupDirPath + " does not exist."); + } + } + } + Path[] ret = new Path[dirs.size()]; + return dirs.toArray(ret); + } + + +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java similarity index 94% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java index 604e5021f5..a307ea9f73 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java @@ -53,18 +53,18 @@ import org.apache.hadoop.util.ToolRunner; * for later bulk importing. */ @InterfaceAudience.Private -public class HFileSplitterJob extends Configured implements Tool { - private static final Log LOG = LogFactory.getLog(HFileSplitterJob.class); +public class MapReduceHFileSplitterJob extends Configured implements Tool { + private static final Log LOG = LogFactory.getLog(MapReduceHFileSplitterJob.class); final static String NAME = "HFileSplitterJob"; public final static String BULK_OUTPUT_CONF_KEY = "hfile.bulk.output"; public final static String TABLES_KEY = "hfile.input.tables"; public final static String TABLE_MAP_KEY = "hfile.input.tablesmap"; private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; - public HFileSplitterJob() { + public MapReduceHFileSplitterJob() { } - protected HFileSplitterJob(final Configuration c) { + protected MapReduceHFileSplitterJob(final Configuration c) { super(c); } @@ -111,7 +111,7 @@ public class HFileSplitterJob extends Configured implements Tool { Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime())); - job.setJarByClass(HFileSplitterJob.class); + job.setJarByClass(MapReduceHFileSplitterJob.class); job.setInputFormatClass(HFileInputFormat.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); @@ -164,7 +164,7 @@ public class HFileSplitterJob extends Configured implements Tool { * @throws Exception When running the job fails. */ public static void main(String[] args) throws Exception { - int ret = ToolRunner.run(new HFileSplitterJob(HBaseConfiguration.create()), args); + int ret = ToolRunner.run(new MapReduceHFileSplitterJob(HBaseConfiguration.create()), args); System.exit(ret); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java index 4161ca9cca..b6fb91d94f 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java @@ -17,31 +17,31 @@ */ package org.apache.hadoop.hbase.backup.mapreduce; +import static org.apache.hadoop.hbase.backup.util.BackupUtils.failed; +import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded; + import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupRestoreConstants; import org.apache.hadoop.hbase.backup.RestoreJob; +import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; -import org.apache.hadoop.hbase.mapreduce.WALPlayer; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.util.Tool; + /** * MapReduce implementation of {@link RestoreJob} * - * For full backup restore, it runs {@link HFileSplitterJob} job and creates + * For backup restore, it runs {@link MapReduceHFileSplitterJob} job and creates * HFiles which are aligned with a region boundaries of a table being - * restored, for incremental backup restore it runs {@link WALPlayer} in - * bulk load mode (creates HFiles from WAL edits). + * restored. * * The resulting HFiles then are loaded using HBase bulk load tool * {@link LoadIncrementalHFiles} @@ -62,8 +62,8 @@ public class MapReduceRestoreJob implements RestoreJob { String bulkOutputConfKey; - player = new HFileSplitterJob(); - bulkOutputConfKey = HFileSplitterJob.BULK_OUTPUT_CONF_KEY; + player = new MapReduceHFileSplitterJob(); + bulkOutputConfKey = MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY; // Player reads all files in arbitrary directory structure and creates // a Map task for each file String dirs = StringUtils.join(dirPaths, ","); @@ -80,7 +80,9 @@ public class MapReduceRestoreJob implements RestoreJob { LOG.info("Restore " + tableNames[i] + " into " + newTableNames[i]); - Path bulkOutputPath = getBulkOutputDir(getFileNameCompatibleString(newTableNames[i])); + Path bulkOutputPath = + BackupUtils.getBulkOutputDir( + BackupUtils.getFileNameCompatibleString(newTableNames[i]), getConf()); Configuration conf = getConf(); conf.set(bulkOutputConfKey, bulkOutputPath.toString()); String[] playerArgs = @@ -96,7 +98,7 @@ public class MapReduceRestoreJob implements RestoreJob { result = player.run(playerArgs); if (succeeded(result)) { // do bulk load - LoadIncrementalHFiles loader = createLoader(getConf()); + LoadIncrementalHFiles loader = BackupUtils.createLoader(getConf()); if (LOG.isDebugEnabled()) { LOG.debug("Restoring HFiles from directory " + bulkOutputPath); } @@ -113,59 +115,13 @@ public class MapReduceRestoreJob implements RestoreJob { } LOG.debug("Restore Job finished:" + result); } catch (Exception e) { + LOG.error(e); throw new IOException("Can not restore from backup directory " + dirs + " (check Hadoop and HBase logs) ", e); } - } } - private String getFileNameCompatibleString(TableName table) { - return table.getNamespaceAsString() + "-" + table.getQualifierAsString(); - } - - private boolean failed(int result) { - return result != 0; - } - - private boolean succeeded(int result) { - return result == 0; - } - - public static LoadIncrementalHFiles createLoader(Configuration config) throws IOException { - // set configuration for restore: - // LoadIncrementalHFile needs more time - // hbase.rpc.timeout 600000 - // calculates - Integer milliSecInHour = 3600000; - Configuration conf = new Configuration(config); - conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, milliSecInHour); - - // By default, it is 32 and loader will fail if # of files in any region exceed this - // limit. Bad for snapshot restore. - conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE); - conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes"); - LoadIncrementalHFiles loader = null; - try { - loader = new LoadIncrementalHFiles(conf); - } catch (Exception e) { - throw new IOException(e); - } - return loader; - } - - private Path getBulkOutputDir(String tableName) throws IOException { - Configuration conf = getConf(); - FileSystem fs = FileSystem.get(conf); - String tmp = - conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, - HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); - Path path = - new Path(tmp + Path.SEPARATOR + "bulk_output-" + tableName + "-" - + EnvironmentEdgeManager.currentTime()); - fs.deleteOnExit(path); - return path; - } @Override public Configuration getConf() { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java index e32853d809..37db9c2bf6 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java @@ -56,7 +56,9 @@ import org.apache.hadoop.hbase.backup.impl.BackupManifest; import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; @@ -68,6 +70,7 @@ import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; public final class BackupUtils { protected static final Log LOG = LogFactory.getLog(BackupUtils.class); public static final String LOGNAME_SEPARATOR = "."; + public static int MILLISEC_IN_HOUR = 3600000; private BackupUtils() { throw new AssertionError("Instantiating utility class..."); @@ -699,4 +702,62 @@ public final class BackupUtils { return isValid; } + public static Path getBulkOutputDir(String tableName, Configuration conf, + boolean deleteOnExit ) + throws IOException + { + FileSystem fs = FileSystem.get(conf); + String tmp = + conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, + HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); + Path path = + new Path(tmp + Path.SEPARATOR + "bulk_output-" + tableName + "-" + + EnvironmentEdgeManager.currentTime()); + if (deleteOnExit) { + fs.deleteOnExit(path); + } + return path; + } + + + public static Path getBulkOutputDir(String tableName, Configuration conf) + throws IOException + { + return getBulkOutputDir(tableName, conf, true); + } + + + public static String getFileNameCompatibleString(TableName table) { + return table.getNamespaceAsString() + "-" + table.getQualifierAsString(); + } + + public static boolean failed(int result) { + return result != 0; + } + + public static boolean succeeded(int result) { + return result == 0; + } + + + public static LoadIncrementalHFiles createLoader(Configuration config) throws IOException { + // set configuration for restore: + // LoadIncrementalHFile needs more time + // hbase.rpc.timeout 600000 + // calculates + Configuration conf = new Configuration(config); + conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, MILLISEC_IN_HOUR); + + // By default, it is 32 and loader will fail if # of files in any region exceed this + // limit. Bad for snapshot restore. + conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE); + conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes"); + LoadIncrementalHFiles loader = null; + try { + loader = new LoadIncrementalHFiles(conf); + } catch (Exception e) { + throw new IOException(e); + } + return loader; + } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMerge.java hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMerge.java new file mode 100644 index 0000000000..78746480f2 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMerge.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup; + +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl; +import org.apache.hadoop.hbase.backup.util.BackupUtils; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import com.google.common.collect.Lists; + +@Category(LargeTests.class) +@RunWith(Parameterized.class) +public class TestIncrementalBackupMerge extends TestBackupBase { + private static final Log LOG = LogFactory.getLog(TestIncrementalBackupMerge.class); + + @Parameterized.Parameters + public static Collection data() { + provider = "multiwal"; + List params = new ArrayList(); + params.add(new Object[] { Boolean.TRUE }); + return params; + } + + public TestIncrementalBackupMerge(Boolean b) { + } + + @Test + public void TestIncBackupMergeRestore() throws Exception { + + int ADD_ROWS = 99; + // #1 - create full backup for all tables + LOG.info("create full backup image for all tables"); + + List tables = Lists.newArrayList(table1, table2); + + Connection conn = ConnectionFactory.createConnection(conf1); + + HBaseAdmin admin = null; + admin = (HBaseAdmin) conn.getAdmin(); + BackupAdminImpl client = new BackupAdminImpl(conn); + + BackupRequest request = createBackupRequest(BackupType.FULL, tables, BACKUP_ROOT_DIR); + String backupIdFull = client.backupTables(request); + + assertTrue(checkSucceeded(backupIdFull)); + + // #2 - insert some data to table1 + HTable t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS); + LOG.debug("writing " + ADD_ROWS + " rows to " + table1); + + Assert.assertEquals(TEST_UTIL.countRows(t1), NB_ROWS_IN_BATCH + ADD_ROWS); + t1.close(); + LOG.debug("written " + ADD_ROWS + " rows to " + table1); + + HTable t2 = insertIntoTable(conn, table2, famName, 1, ADD_ROWS); + + Assert.assertEquals(TEST_UTIL.countRows(t2), NB_ROWS_IN_BATCH + ADD_ROWS); + t2.close(); + LOG.debug("written " + ADD_ROWS + " rows to " + table2); + + // #3 - incremental backup for multiple tables + tables = Lists.newArrayList(table1, table2); + request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR); + String backupIdIncMultiple = client.backupTables(request); + assertTrue(checkSucceeded(backupIdIncMultiple)); + + + t1 = insertIntoTable(conn, table1, famName, 2, ADD_ROWS); + t1.close(); + + t2 = insertIntoTable(conn, table2, famName, 2, ADD_ROWS); + t2.close(); + + // #3 - incremental backup for multiple tables + request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR); + String backupIdIncMultiple2 = client.backupTables(request); + assertTrue(checkSucceeded(backupIdIncMultiple2)); + + // #4 Merge backup images + try ( BackupAdmin bAdmin = new BackupAdminImpl(conn); ) + { + String[] backups = new String[] {backupIdIncMultiple, backupIdIncMultiple2}; + bAdmin.mergeBackups(backups); + } + // #6 - restore incremental backup for multiple tables, with overwrite + TableName[] tablesRestoreIncMultiple = new TableName[] { table1, table2 }; + TableName[] tablesMapIncMultiple = new TableName[] { table1_restore, table2_restore }; + client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple2, + false, tablesRestoreIncMultiple, tablesMapIncMultiple, true)); + + Table hTable = conn.getTable(table1_restore); + LOG.debug("After incremental restore: " + hTable.getTableDescriptor()); + LOG.debug("f1 has " + TEST_UTIL.countRows(hTable, famName) + " rows"); + Assert.assertEquals(TEST_UTIL.countRows(hTable, famName), NB_ROWS_IN_BATCH + 2*ADD_ROWS); + + hTable.close(); + + hTable = conn.getTable(table2_restore); + Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH + 2 * ADD_ROWS); + hTable.close(); + + admin.close(); + conn.close(); + + } + +}