From b05809621e1f114e9939275aa87d8e28e8ef8300 Mon Sep 17 00:00:00 2001 From: Vladimir Rodionov Date: Wed, 10 Jan 2018 16:26:09 -0800 Subject: [PATCH] HBASE-19568: Restore of HBase table using incremental backup doesn't restore rows from an earlier incremental backup --- .../hadoop/hbase/backup/impl/BackupAdminImpl.java | 2 +- .../hadoop/hbase/backup/impl/BackupManager.java | 19 +-- .../hbase/backup/impl/BackupSystemTable.java | 135 ++++++++++++++------- .../backup/impl/IncrementalTableBackupClient.java | 59 +++++++-- .../hbase/backup/impl/RestoreTablesClient.java | 55 ++++----- .../hbase/backup/impl/TableBackupClient.java | 32 ++--- .../apache/hadoop/hbase/backup/TestBackupBase.java | 4 - .../backup/TestIncrementalBackupWithBulkLoad.java | 24 +++- 8 files changed, 213 insertions(+), 117 deletions(-) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java index 8ba57d2df9..f27490c907 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java @@ -271,7 +271,7 @@ public class BackupAdminImpl implements BackupAdmin { LOG.debug(numDeleted + " bulk loaded files out of " + map.size() + " were deleted"); } if (success) { - sysTable.deleteBulkLoadedFiles(map); + sysTable.deleteBulkLoadedRows(new ArrayList(map.keySet())); } sysTable.deleteBackupInfo(backupInfo.getBackupId()); diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java index 7199fd5ab1..dc51df990f 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupInfo; import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; +import org.apache.hadoop.hbase.backup.BackupObserver; import org.apache.hadoop.hbase.backup.BackupRestoreConstants; import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.HBackupFileSystem; @@ -43,6 +44,7 @@ import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; import org.apache.hadoop.hbase.backup.regionserver.LogRollRegionServerProcedureManager; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.procedure.ProcedureManagerHost; import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; @@ -140,10 +142,14 @@ public class BackupManager implements Closeable { conf.set(ProcedureManagerHost.REGIONSERVER_PROCEDURE_CONF_KEY, classes + "," + regionProcedureClass); } + String coproc = conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY); + String regionObserverClass = BackupObserver.class.getName(); + conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, (coproc == null ? "" : coproc + ",") + + regionObserverClass); if (LOG.isDebugEnabled()) { - LOG.debug("Added region procedure manager: " + regionProcedureClass); + LOG.debug("Added region procedure manager: " + regionProcedureClass + + ". Added region observer: " + regionObserverClass); } - } public static boolean isBackupEnabled(Configuration conf) { @@ -415,13 +421,8 @@ public class BackupManager implements Closeable { return systemTable.readBulkloadRows(tableList); } - public void removeBulkLoadedRows(List lst, List rows) throws IOException { - systemTable.removeBulkLoadedRows(lst, rows); - } - - public void writeBulkLoadedFiles(List sTableList, Map>[] maps) - throws IOException { - systemTable.writeBulkLoadedFiles(sTableList, maps, backupInfo.getBackupId()); + public void deleteBulkLoadedRows(List rows) throws IOException { + systemTable.deleteBulkLoadedRows(rows); } /** diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java index 6b721d4e52..cf34d14d25 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java @@ -42,8 +42,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; @@ -53,6 +51,8 @@ import org.apache.hadoop.hbase.backup.BackupRestoreConstants; import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; @@ -62,6 +62,8 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.SnapshotDescription; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; @@ -122,7 +124,21 @@ public final class BackupSystemTable implements Closeable { } + /** + * Backup system table (main) name + */ private TableName tableName; + + /** + * Backup System table name for bulk loaded files. + * We keep all bulk loaded file references in a separate table + * because we have to isolate general backup operations: create, merge etc + * from activity of RegionObserver, which controls process of a bulk loading + * {@link org.apache.hadoop.hbase.backup.BackupObserver} + */ + + private TableName bulkLoadTableName; + /** * Stores backup sessions (contexts) */ @@ -174,20 +190,29 @@ public final class BackupSystemTable implements Closeable { public BackupSystemTable(Connection conn) throws IOException { this.connection = conn; - tableName = BackupSystemTable.getTableName(conn.getConfiguration()); + Configuration conf = this.connection.getConfiguration(); + tableName = BackupSystemTable.getTableName(conf); + bulkLoadTableName = BackupSystemTable.getTableNameForBulkLoadedData(conf); checkSystemTable(); } private void checkSystemTable() throws IOException { try (Admin admin = connection.getAdmin()) { verifyNamespaceExists(admin); - + Configuration conf = connection.getConfiguration(); if (!admin.tableExists(tableName)) { - HTableDescriptor backupHTD = - BackupSystemTable.getSystemTableDescriptor(connection.getConfiguration()); + TableDescriptor backupHTD = + BackupSystemTable.getSystemTableDescriptor(conf); admin.createTable(backupHTD); } - waitForSystemTable(admin); + if (!admin.tableExists(bulkLoadTableName)) { + TableDescriptor blHTD = + BackupSystemTable.getSystemTableForBulkLoadedDataDescriptor(conf); + admin.createTable(blHTD); + } + waitForSystemTable(admin, tableName); + waitForSystemTable(admin, bulkLoadTableName); + } } @@ -207,7 +232,7 @@ public final class BackupSystemTable implements Closeable { } } - private void waitForSystemTable(Admin admin) throws IOException { + private void waitForSystemTable(Admin admin, TableName tableName) throws IOException { long TIMEOUT = 60000; long startTime = EnvironmentEdgeManager.currentTime(); while (!admin.tableExists(tableName) || !admin.isTableAvailable(tableName)) { @@ -216,10 +241,11 @@ public final class BackupSystemTable implements Closeable { } catch (InterruptedException e) { } if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) { - throw new IOException("Failed to create backup system table after " + TIMEOUT + "ms"); + throw new IOException("Failed to create backup system table "+ + tableName +" after " + TIMEOUT + "ms"); } } - LOG.debug("Backup table exists and available"); + LOG.debug("Backup table "+tableName+" exists and available"); } @@ -251,7 +277,7 @@ public final class BackupSystemTable implements Closeable { */ Map readBulkLoadedFiles(String backupId) throws IOException { Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId); - try (Table table = connection.getTable(tableName); + try (Table table = connection.getTable(bulkLoadTableName); ResultScanner scanner = table.getScanner(scan)) { Result res = null; Map map = new TreeMap<>(Bytes.BYTES_COMPARATOR); @@ -279,7 +305,7 @@ public final class BackupSystemTable implements Closeable { throws IOException { Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId); Map>[] mapForSrc = new Map[sTableList == null ? 1 : sTableList.size()]; - try (Table table = connection.getTable(tableName); + try (Table table = connection.getTable(bulkLoadTableName); ResultScanner scanner = table.getScanner(scan)) { Result res = null; while ((res = scanner.next()) != null) { @@ -324,18 +350,6 @@ public final class BackupSystemTable implements Closeable { } } - /* - * @param map Map of row keys to path of bulk loaded hfile - */ - void deleteBulkLoadedFiles(Map map) throws IOException { - try (Table table = connection.getTable(tableName)) { - List dels = new ArrayList<>(); - for (byte[] row : map.keySet()) { - dels.add(new Delete(row).addFamily(BackupSystemTable.META_FAMILY)); - } - table.delete(dels); - } - } /** * Deletes backup status from backup system table table @@ -366,7 +380,7 @@ public final class BackupSystemTable implements Closeable { LOG.debug("write bulk load descriptor to backup " + tabName + " with " + finalPaths.size() + " entries"); } - try (Table table = connection.getTable(tableName)) { + try (Table table = connection.getTable(bulkLoadTableName)) { List puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region, finalPaths); table.put(puts); LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName); @@ -386,7 +400,7 @@ public final class BackupSystemTable implements Closeable { LOG.debug("write bulk load descriptor to backup " + tabName + " with " + pairs.size() + " entries"); } - try (Table table = connection.getTable(tableName)) { + try (Table table = connection.getTable(bulkLoadTableName)) { List puts = BackupSystemTable.createPutForPreparedBulkload(tabName, region, family, pairs); table.put(puts); @@ -399,8 +413,8 @@ public final class BackupSystemTable implements Closeable { * @param lst list of table names * @param rows the rows to be deleted */ - public void removeBulkLoadedRows(List lst, List rows) throws IOException { - try (Table table = connection.getTable(tableName)) { + public void deleteBulkLoadedRows(List rows) throws IOException { + try (Table table = connection.getTable(bulkLoadTableName)) { List lstDels = new ArrayList<>(); for (byte[] row : rows) { Delete del = new Delete(row); @@ -408,7 +422,7 @@ public final class BackupSystemTable implements Closeable { LOG.debug("orig deleting the row: " + Bytes.toString(row)); } table.delete(lstDels); - LOG.debug("deleted " + rows.size() + " original bulkload rows for " + lst.size() + " tables"); + LOG.debug("deleted " + rows.size() + " original bulkload rows"); } } @@ -425,7 +439,7 @@ public final class BackupSystemTable implements Closeable { for (TableName tTable : tableList) { Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(tTable); Map>>> tblMap = map.get(tTable); - try (Table table = connection.getTable(tableName); + try (Table table = connection.getTable(bulkLoadTableName); ResultScanner scanner = table.getScanner(scan)) { Result res = null; while ((res = scanner.next()) != null) { @@ -480,7 +494,7 @@ public final class BackupSystemTable implements Closeable { */ public void writeBulkLoadedFiles(List sTableList, Map>[] maps, String backupId) throws IOException { - try (Table table = connection.getTable(tableName)) { + try (Table table = connection.getTable(bulkLoadTableName)) { long ts = EnvironmentEdgeManager.currentTime(); int cnt = 0; List puts = new ArrayList<>(); @@ -1311,21 +1325,28 @@ public final class BackupSystemTable implements Closeable { * Get backup system table descriptor * @return table's descriptor */ - public static HTableDescriptor getSystemTableDescriptor(Configuration conf) { + public static TableDescriptor getSystemTableDescriptor(Configuration conf) { + + TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(getTableName(conf)); + + ColumnFamilyDescriptorBuilder colBuilder = + ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY); - HTableDescriptor tableDesc = new HTableDescriptor(getTableName(conf)); - HColumnDescriptor colSessionsDesc = new HColumnDescriptor(SESSIONS_FAMILY); - colSessionsDesc.setMaxVersions(1); - // Time to keep backup sessions (secs) + colBuilder.setMaxVersions(1); Configuration config = HBaseConfiguration.create(); int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY, BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT); - colSessionsDesc.setTimeToLive(ttl); - tableDesc.addFamily(colSessionsDesc); - HColumnDescriptor colMetaDesc = new HColumnDescriptor(META_FAMILY); - tableDesc.addFamily(colMetaDesc); - return tableDesc; + colBuilder.setTimeToLive(ttl); + + ColumnFamilyDescriptor colSessionsDesc = colBuilder.build(); + builder.addColumnFamily(colSessionsDesc); + + colBuilder = + ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY); + colBuilder.setTimeToLive(ttl); + builder.addColumnFamily(colBuilder.build()); + return builder.build(); } public static TableName getTableName(Configuration conf) { @@ -1344,6 +1365,38 @@ public final class BackupSystemTable implements Closeable { } /** + * Get backup system table descriptor + * @return table's descriptor + */ + public static TableDescriptor getSystemTableForBulkLoadedDataDescriptor(Configuration conf) { + + TableDescriptorBuilder builder = + TableDescriptorBuilder.newBuilder(getTableNameForBulkLoadedData(conf)); + + ColumnFamilyDescriptorBuilder colBuilder = + ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY); + colBuilder.setMaxVersions(1); + Configuration config = HBaseConfiguration.create(); + int ttl = + config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY, + BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT); + colBuilder.setTimeToLive(ttl); + ColumnFamilyDescriptor colSessionsDesc = colBuilder.build(); + builder.addColumnFamily(colSessionsDesc); + colBuilder = + ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY); + colBuilder.setTimeToLive(ttl); + builder.addColumnFamily(colBuilder.build()); + return builder.build(); + } + + public static TableName getTableNameForBulkLoadedData(Configuration conf) { + String name = + conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY, + BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT) + "_bulk"; + return TableName.valueOf(name); + } + /** * Creates Put operation for a given backup info object * @param context backup info * @return put operation diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index a966ad310d..34d713dd10 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -72,7 +72,6 @@ public class IncrementalTableBackupClient extends TableBackupClient { } protected List filterMissingFiles(List incrBackupFileList) throws IOException { - FileSystem fs = FileSystem.get(conf); List list = new ArrayList(); for (String file : incrBackupFileList) { Path p = new Path(file); @@ -110,6 +109,7 @@ public class IncrementalTableBackupClient extends TableBackupClient { * @param sTableList list of tables to be backed up * @return map of table to List of files */ + @SuppressWarnings("unchecked") protected Map>[] handleBulkLoad(List sTableList) throws IOException { Map>[] mapForSrc = new Map[sTableList.size()]; List activeFiles = new ArrayList(); @@ -117,7 +117,6 @@ public class IncrementalTableBackupClient extends TableBackupClient { Pair>>>>, List> pair = backupManager.readBulkloadRows(sTableList); Map>>>> map = pair.getFirst(); - FileSystem fs = FileSystem.get(conf); FileSystem tgtFs; try { tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf); @@ -126,6 +125,7 @@ public class IncrementalTableBackupClient extends TableBackupClient { } Path rootdir = FSUtils.getRootDir(conf); Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId); + for (Map.Entry>>>> tblEntry : map.entrySet()) { TableName srcTable = tblEntry.getKey(); @@ -192,26 +192,47 @@ public class IncrementalTableBackupClient extends TableBackupClient { } copyBulkLoadedFiles(activeFiles, archiveFiles); - - backupManager.writeBulkLoadedFiles(sTableList, mapForSrc); - backupManager.removeBulkLoadedRows(sTableList, pair.getSecond()); + backupManager.deleteBulkLoadedRows(pair.getSecond()); return mapForSrc; } private void copyBulkLoadedFiles(List activeFiles, List archiveFiles) - throws IOException - { + throws IOException { try { // Enable special mode of BackupDistCp conf.setInt(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 5); // Copy active files String tgtDest = backupInfo.getBackupRootDir() + Path.SEPARATOR + backupInfo.getBackupId(); - if (activeFiles.size() > 0) { + int attempt = 1; + while (activeFiles.size() > 0) { + LOG.info("Copy "+ activeFiles.size() + + " active bulk loaded files. Attempt ="+ (attempt++)); String[] toCopy = new String[activeFiles.size()]; activeFiles.toArray(toCopy); - incrementalCopyHFiles(toCopy, tgtDest); + // Active file can be archived during copy operation, + // we need to handle this properly + try { + incrementalCopyHFiles(toCopy, tgtDest); + break; + } catch (IOException e) { + // Check if some files got archived + // Update active and archived lists + // When file is being moved from active to archive + // directory, the number of active files decreases + + int numOfActive = activeFiles.size(); + updateFileLists(activeFiles, archiveFiles); + if (activeFiles.size() < numOfActive) { + continue; + } + // if not - throw exception + throw e; + } } + // If incremental copy will fail for archived files + // we will have partially loaded files in backup destination (only files from active data + // directory). It is OK, because the backup will marked as FAILED and data will be cleaned up if (archiveFiles.size() > 0) { String[] toCopy = new String[archiveFiles.size()]; archiveFiles.toArray(toCopy); @@ -224,6 +245,25 @@ public class IncrementalTableBackupClient extends TableBackupClient { } + private void updateFileLists(List activeFiles, List archiveFiles) + throws IOException { + List newlyArchived = new ArrayList(); + + for (String spath : activeFiles) { + if (!fs.exists(new Path(spath))) { + newlyArchived.add(spath); + } + } + + if (newlyArchived.size() > 0) { + activeFiles.removeAll(newlyArchived); + archiveFiles.addAll(newlyArchived); + } + + LOG.debug(newlyArchived.size() + " files have been archived."); + + } + @Override public void execute() throws IOException { @@ -322,7 +362,6 @@ public class IncrementalTableBackupClient extends TableBackupClient { protected void deleteBulkLoadDirectory() throws IOException { // delete original bulk load directory on method exit Path path = getBulkOutputDir(); - FileSystem fs = FileSystem.get(conf); boolean result = fs.delete(path, true); if (!result) { LOG.warn("Could not delete " + path); diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java index d3d3e0600d..c6b6bad19d 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java @@ -21,33 +21,31 @@ package org.apache.hadoop.hbase.backup.impl; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.TreeSet; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.HBackupFileSystem; import org.apache.hadoop.hbase.backup.RestoreRequest; import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; -import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.backup.util.RestoreTool; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; -import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem; /** * Restore table implementation @@ -171,8 +169,10 @@ public class RestoreTablesClient { for (int i = 1; i < images.length; i++) { BackupImage im = images[i]; String fileBackupDir = - HBackupFileSystem.getTableBackupDataDir(im.getRootDir(), im.getBackupId(), sTable); - dirList.add(new Path(fileBackupDir)); + HBackupFileSystem.getTableBackupDir(im.getRootDir(), im.getBackupId(), sTable); + List list = getFilesRecursively(fileBackupDir); + dirList.addAll(list); + } String dirs = StringUtils.join(dirList, ","); @@ -185,6 +185,20 @@ public class RestoreTablesClient { LOG.info(sTable + " has been successfully restored to " + tTable); } + private List getFilesRecursively(String fileBackupDir) + throws IllegalArgumentException, IOException { + FileSystem fs = FileSystem.get((new Path(fileBackupDir)).toUri(), new Configuration()); + List list = new ArrayList(); + RemoteIterator it = fs.listFiles(new Path(fileBackupDir), true); + while (it.hasNext()) { + Path p = it.next().getPath(); + if (HFile.isHFileFormat(fs, p)) { + list.add(p); + } + } + return list; + } + /** * Restore operation. Stage 2: resolved Backup Image dependency * @param backupManifestMap : tableName, Manifest @@ -226,27 +240,6 @@ public class RestoreTablesClient { } } } - try (BackupSystemTable table = new BackupSystemTable(conn)) { - List sTableList = Arrays.asList(sTableArray); - for (String id : backupIdSet) { - LOG.debug("restoring bulk load for " + id); - Map>[] mapForSrc = table.readBulkLoadedFiles(id, sTableList); - Map loaderResult; - conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true); - LoadIncrementalHFiles loader = BackupUtils.createLoader(conf); - for (int i = 0; i < sTableList.size(); i++) { - if (mapForSrc[i] != null && !mapForSrc[i].isEmpty()) { - loaderResult = loader.run(mapForSrc[i], tTableArray[i]); - LOG.debug("bulk loading " + sTableList.get(i) + " to " + tTableArray[i]); - if (loaderResult.isEmpty()) { - String msg = "Couldn't bulk load for " + sTableList.get(i) + " to " + tTableArray[i]; - LOG.error(msg); - throw new IOException(msg); - } - } - } - } - } LOG.debug("restoreStage finished"); } diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java index 09ab28c50d..ab24cca001 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java @@ -69,6 +69,7 @@ public abstract class TableBackupClient { protected BackupManager backupManager; protected BackupInfo backupInfo; + protected FileSystem fs; public TableBackupClient() { } @@ -90,6 +91,7 @@ public abstract class TableBackupClient { this.tableList = request.getTableList(); this.conn = conn; this.conf = conn.getConfiguration(); + this.fs = FSUtils.getCurrentFileSystem(conf); backupInfo = backupManager.createBackupInfo(backupId, request.getBackupType(), tableList, request.getTargetRootDir(), request.getTotalTasks(), request.getBandwidth()); @@ -258,22 +260,21 @@ public abstract class TableBackupClient { } } - public static void cleanupAndRestoreBackupSystem (Connection conn, BackupInfo backupInfo, - Configuration conf) throws IOException - { + public static void cleanupAndRestoreBackupSystem(Connection conn, BackupInfo backupInfo, + Configuration conf) throws IOException { BackupType type = backupInfo.getType(); - // if full backup, then delete HBase snapshots if there already are snapshots taken - // and also clean up export snapshot log files if exist - if (type == BackupType.FULL) { - deleteSnapshots(conn, backupInfo, conf); - cleanupExportSnapshotLog(conf); - } - BackupSystemTable.restoreFromSnapshot(conn); - BackupSystemTable.deleteSnapshot(conn); - // clean up the uncompleted data at target directory if the ongoing backup has already entered - // the copy phase - // For incremental backup, DistCp logs will be cleaned with the targetDir. - cleanupTargetDir(backupInfo, conf); + // if full backup, then delete HBase snapshots if there already are snapshots taken + // and also clean up export snapshot log files if exist + if (type == BackupType.FULL) { + deleteSnapshots(conn, backupInfo, conf); + cleanupExportSnapshotLog(conf); + } + BackupSystemTable.restoreFromSnapshot(conn); + BackupSystemTable.deleteSnapshot(conn); + // clean up the uncompleted data at target directory if the ongoing backup has already entered + // the copy phase + // For incremental backup, DistCp logs will be cleaned with the targetDir. + cleanupTargetDir(backupInfo, conf); } @@ -355,7 +356,6 @@ public abstract class TableBackupClient { */ protected void cleanupDistCpLog(BackupInfo backupInfo, Configuration conf) throws IOException { Path rootPath = new Path(backupInfo.getHLogTargetDir()).getParent(); - FileSystem fs = FileSystem.get(rootPath.toUri(), conf); FileStatus[] files = FSUtils.listStatus(fs, rootPath); if (files == null) { return; diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java index 8ce1c0e93c..2be778421f 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java @@ -57,7 +57,6 @@ import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.security.HadoopSecurityEnabledUserProviderForTesting; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.access.SecureTestUtil; @@ -297,9 +296,6 @@ public class TestBackupBase { // setup configuration SecureTestUtil.enableSecurity(TEST_UTIL.getConfiguration()); } - String coproc = conf1.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY); - conf1.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, (coproc == null ? "" : coproc + ",") + - BackupObserver.class.getName()); conf1.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true); BackupManager.decorateMasterConfiguration(conf1); BackupManager.decorateRegionServerConfiguration(conf1); diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java index db33abe382..ed1d010119 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java @@ -113,18 +113,32 @@ public class TestIncrementalBackupWithBulkLoad extends TestBackupBase { request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR); String backupIdIncMultiple = client.backupTables(request); assertTrue(checkSucceeded(backupIdIncMultiple)); + // #4 bulk load again + LOG.debug("bulk loading into " + testName); + int actual1 = TestLoadIncrementalHFiles.loadHFiles(testName, table1Desc, TEST_UTIL, famName, + qualName, false, null, + new byte[][][] { new byte[][] { Bytes.toBytes("ppp"), Bytes.toBytes("qqq") }, + new byte[][] { Bytes.toBytes("rrr"), Bytes.toBytes("sss") }, }, + true, false, true, NB_ROWS_IN_BATCH * 2 + actual, NB_ROWS2); + // #5 - incremental backup for table1 + tables = Lists.newArrayList(table1); + request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR); + String backupIdIncMultiple1 = client.backupTables(request); + assertTrue(checkSucceeded(backupIdIncMultiple1)); + // Delete all data in table1 + TEST_UTIL.deleteTableData(table1); // #5.1 - check tables for full restore */ HBaseAdmin hAdmin = TEST_UTIL.getHBaseAdmin(); // #6 - restore incremental backup for table1 TableName[] tablesRestoreIncMultiple = new TableName[] { table1 }; - TableName[] tablesMapIncMultiple = new TableName[] { table1_restore }; - client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple, - false, tablesRestoreIncMultiple, tablesMapIncMultiple, true)); + //TableName[] tablesMapIncMultiple = new TableName[] { table1_restore }; + client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple1, + false, tablesRestoreIncMultiple, tablesRestoreIncMultiple, true)); - HTable hTable = (HTable) conn.getTable(table1_restore); - Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH * 2+actual); + HTable hTable = (HTable) conn.getTable(table1); + Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH * 2 + actual + actual1); request = createBackupRequest(BackupType.FULL, tables, BACKUP_ROOT_DIR); backupIdFull = client.backupTables(request); -- 2.11.0 (Apple Git-81)