From 3e33bd6a88e2a0c8edcde770a5748b9a49eebb39 Mon Sep 17 00:00:00 2001 From: Vladimir Rodionov Date: Wed, 3 Jan 2018 14:35:31 -0800 Subject: [PATCH] HBASE-19568: Restore of HBase table using incremental backup doesn't restore rows from an earlier incremental backup --- .../hadoop/hbase/backup/impl/BackupAdminImpl.java | 4 +- .../hadoop/hbase/backup/impl/BackupManager.java | 24 ++-- .../hbase/backup/impl/BackupSystemTable.java | 135 ++++++++++++++------- .../backup/impl/IncrementalTableBackupClient.java | 59 +++++++-- .../hbase/backup/impl/RestoreTablesClient.java | 55 ++++----- .../hbase/backup/impl/TableBackupClient.java | 37 +++--- .../backup/mapreduce/MapReduceRestoreJob.java | 2 +- .../apache/hadoop/hbase/backup/TestBackupBase.java | 4 - .../backup/TestIncrementalBackupWithBulkLoad.java | 27 ++++- 9 files changed, 222 insertions(+), 125 deletions(-) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java index 9a20b7b749..4b27f356c5 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java @@ -46,11 +46,11 @@ import org.apache.hadoop.hbase.backup.HBackupFileSystem; import org.apache.hadoop.hbase.backup.RestoreRequest; import org.apache.hadoop.hbase.backup.util.BackupSet; import org.apache.hadoop.hbase.backup.util.BackupUtils; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private public class BackupAdminImpl implements BackupAdmin { @@ -271,7 +271,7 @@ public class BackupAdminImpl implements BackupAdmin { LOG.debug(numDeleted + " bulk loaded files out of " + map.size() + " were deleted"); } if (success) { - sysTable.deleteBulkLoadedFiles(map); + sysTable.deleteBulkLoadedRows(new ArrayList(map.keySet())); } sysTable.deleteBackupInfo(backupInfo.getBackupId()); diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java index 7ac94d8e4a..726fb51967 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupInfo; import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; +import org.apache.hadoop.hbase.backup.BackupObserver; import org.apache.hadoop.hbase.backup.BackupRestoreConstants; import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.HBackupFileSystem; @@ -43,13 +44,13 @@ import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; import org.apache.hadoop.hbase.backup.master.BackupLogCleaner; import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; import org.apache.hadoop.hbase.backup.regionserver.LogRollRegionServerProcedureManager; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.procedure.ProcedureManagerHost; -import org.apache.hadoop.hbase.util.Pair; - import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.yetus.audience.InterfaceAudience; /** * Handles backup requests, creates backup info records in backup system table to @@ -140,10 +141,14 @@ public class BackupManager implements Closeable { conf.set(ProcedureManagerHost.REGIONSERVER_PROCEDURE_CONF_KEY, classes + "," + regionProcedureClass); } + String coproc = conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY); + String regionObserverClass = BackupObserver.class.getName(); + conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, (coproc == null ? "" : coproc + ",") + + regionObserverClass); if (LOG.isDebugEnabled()) { - LOG.debug("Added region procedure manager: " + regionProcedureClass); + LOG.debug("Added region procedure manager: " + regionProcedureClass+". Added region observer: " + + regionObserverClass); } - } public static boolean isBackupEnabled(Configuration conf) { @@ -415,13 +420,8 @@ public class BackupManager implements Closeable { return systemTable.readBulkloadRows(tableList); } - public void removeBulkLoadedRows(List lst, List rows) throws IOException { - systemTable.removeBulkLoadedRows(lst, rows); - } - - public void writeBulkLoadedFiles(List sTableList, Map>[] maps) - throws IOException { - systemTable.writeBulkLoadedFiles(sTableList, maps, backupInfo.getBackupId()); + public void deleteBulkLoadedRows(List rows) throws IOException { + systemTable.deleteBulkLoadedRows(rows); } /** diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java index ebfc9f3333..c785d97efe 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java @@ -42,8 +42,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; @@ -53,6 +51,8 @@ import org.apache.hadoop.hbase.backup.BackupRestoreConstants; import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; @@ -62,6 +62,8 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.SnapshotDescription; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; @@ -119,7 +121,21 @@ public final class BackupSystemTable implements Closeable { } + /** + * Backup system table (main) name + */ private TableName tableName; + + /** + * Backup System table name for bulk loaded files. + * We keep all bulk loaded file references in a separate table + * because we have to isolate general backup operations: create, merge etc + * from activity of RegionObserver, which controls process of a bulk loading + * {@link org.apache.hadoop.hbase.backup.BackupObserver} + */ + + private TableName bulkLoadTableName; + /** * Stores backup sessions (contexts) */ @@ -171,20 +187,29 @@ public final class BackupSystemTable implements Closeable { public BackupSystemTable(Connection conn) throws IOException { this.connection = conn; - tableName = BackupSystemTable.getTableName(conn.getConfiguration()); + Configuration conf = this.connection.getConfiguration(); + tableName = BackupSystemTable.getTableName(conf); + bulkLoadTableName = BackupSystemTable.getTableNameForBulkLoadedData(conf); checkSystemTable(); } private void checkSystemTable() throws IOException { try (Admin admin = connection.getAdmin()) { verifyNamespaceExists(admin); - + Configuration conf = connection.getConfiguration(); if (!admin.tableExists(tableName)) { - HTableDescriptor backupHTD = - BackupSystemTable.getSystemTableDescriptor(connection.getConfiguration()); + TableDescriptor backupHTD = + BackupSystemTable.getSystemTableDescriptor(conf); admin.createTable(backupHTD); } - waitForSystemTable(admin); + if (!admin.tableExists(bulkLoadTableName)) { + TableDescriptor blHTD = + BackupSystemTable.getSystemTableForBulkLoadedDataDescriptor(conf); + admin.createTable(blHTD); + } + waitForSystemTable(admin, tableName); + waitForSystemTable(admin, bulkLoadTableName); + } } @@ -204,7 +229,7 @@ public final class BackupSystemTable implements Closeable { } } - private void waitForSystemTable(Admin admin) throws IOException { + private void waitForSystemTable(Admin admin, TableName tableName) throws IOException { long TIMEOUT = 60000; long startTime = EnvironmentEdgeManager.currentTime(); while (!admin.tableExists(tableName) || !admin.isTableAvailable(tableName)) { @@ -213,10 +238,11 @@ public final class BackupSystemTable implements Closeable { } catch (InterruptedException e) { } if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) { - throw new IOException("Failed to create backup system table after " + TIMEOUT + "ms"); + throw new IOException("Failed to create backup system table "+ + tableName +" after " + TIMEOUT + "ms"); } } - LOG.debug("Backup table exists and available"); + LOG.debug("Backup table "+tableName+" exists and available"); } @@ -248,7 +274,7 @@ public final class BackupSystemTable implements Closeable { */ Map readBulkLoadedFiles(String backupId) throws IOException { Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId); - try (Table table = connection.getTable(tableName); + try (Table table = connection.getTable(bulkLoadTableName); ResultScanner scanner = table.getScanner(scan)) { Result res = null; Map map = new TreeMap<>(Bytes.BYTES_COMPARATOR); @@ -276,7 +302,7 @@ public final class BackupSystemTable implements Closeable { throws IOException { Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId); Map>[] mapForSrc = new Map[sTableList == null ? 1 : sTableList.size()]; - try (Table table = connection.getTable(tableName); + try (Table table = connection.getTable(bulkLoadTableName); ResultScanner scanner = table.getScanner(scan)) { Result res = null; while ((res = scanner.next()) != null) { @@ -321,18 +347,6 @@ public final class BackupSystemTable implements Closeable { } } - /* - * @param map Map of row keys to path of bulk loaded hfile - */ - void deleteBulkLoadedFiles(Map map) throws IOException { - try (Table table = connection.getTable(tableName)) { - List dels = new ArrayList<>(); - for (byte[] row : map.keySet()) { - dels.add(new Delete(row).addFamily(BackupSystemTable.META_FAMILY)); - } - table.delete(dels); - } - } /** * Deletes backup status from backup system table table @@ -363,7 +377,7 @@ public final class BackupSystemTable implements Closeable { LOG.debug("write bulk load descriptor to backup " + tabName + " with " + finalPaths.size() + " entries"); } - try (Table table = connection.getTable(tableName)) { + try (Table table = connection.getTable(bulkLoadTableName)) { List puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region, finalPaths); table.put(puts); LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName); @@ -383,7 +397,7 @@ public final class BackupSystemTable implements Closeable { LOG.debug("write bulk load descriptor to backup " + tabName + " with " + pairs.size() + " entries"); } - try (Table table = connection.getTable(tableName)) { + try (Table table = connection.getTable(bulkLoadTableName)) { List puts = BackupSystemTable.createPutForPreparedBulkload(tabName, region, family, pairs); table.put(puts); @@ -396,8 +410,8 @@ public final class BackupSystemTable implements Closeable { * @param lst list of table names * @param rows the rows to be deleted */ - public void removeBulkLoadedRows(List lst, List rows) throws IOException { - try (Table table = connection.getTable(tableName)) { + public void deleteBulkLoadedRows(List rows) throws IOException { + try (Table table = connection.getTable(bulkLoadTableName)) { List lstDels = new ArrayList<>(); for (byte[] row : rows) { Delete del = new Delete(row); @@ -405,7 +419,7 @@ public final class BackupSystemTable implements Closeable { LOG.debug("orig deleting the row: " + Bytes.toString(row)); } table.delete(lstDels); - LOG.debug("deleted " + rows.size() + " original bulkload rows for " + lst.size() + " tables"); + LOG.debug("deleted " + rows.size() + " original bulkload rows"); } } @@ -422,7 +436,7 @@ public final class BackupSystemTable implements Closeable { for (TableName tTable : tableList) { Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(tTable); Map>>> tblMap = map.get(tTable); - try (Table table = connection.getTable(tableName); + try (Table table = connection.getTable(bulkLoadTableName); ResultScanner scanner = table.getScanner(scan)) { Result res = null; while ((res = scanner.next()) != null) { @@ -477,7 +491,7 @@ public final class BackupSystemTable implements Closeable { */ public void writeBulkLoadedFiles(List sTableList, Map>[] maps, String backupId) throws IOException { - try (Table table = connection.getTable(tableName)) { + try (Table table = connection.getTable(bulkLoadTableName)) { long ts = EnvironmentEdgeManager.currentTime(); int cnt = 0; List puts = new ArrayList<>(); @@ -1267,21 +1281,28 @@ public final class BackupSystemTable implements Closeable { * Get backup system table descriptor * @return table's descriptor */ - public static HTableDescriptor getSystemTableDescriptor(Configuration conf) { + public static TableDescriptor getSystemTableDescriptor(Configuration conf) { + + TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(getTableName(conf)); + + ColumnFamilyDescriptorBuilder colBuilder = + ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY); - HTableDescriptor tableDesc = new HTableDescriptor(getTableName(conf)); - HColumnDescriptor colSessionsDesc = new HColumnDescriptor(SESSIONS_FAMILY); - colSessionsDesc.setMaxVersions(1); - // Time to keep backup sessions (secs) + colBuilder.setMaxVersions(1); Configuration config = HBaseConfiguration.create(); int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY, BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT); - colSessionsDesc.setTimeToLive(ttl); - tableDesc.addFamily(colSessionsDesc); - HColumnDescriptor colMetaDesc = new HColumnDescriptor(META_FAMILY); - tableDesc.addFamily(colMetaDesc); - return tableDesc; + colBuilder.setTimeToLive(ttl); + + ColumnFamilyDescriptor colSessionsDesc = colBuilder.build(); + builder.addColumnFamily(colSessionsDesc); + + colBuilder = + ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY); + colBuilder.setTimeToLive(ttl); + builder.addColumnFamily(colBuilder.build()); + return builder.build(); } public static TableName getTableName(Configuration conf) { @@ -1300,6 +1321,38 @@ public final class BackupSystemTable implements Closeable { } /** + * Get backup system table descriptor + * @return table's descriptor + */ + public static TableDescriptor getSystemTableForBulkLoadedDataDescriptor(Configuration conf) { + + TableDescriptorBuilder builder = + TableDescriptorBuilder.newBuilder(getTableNameForBulkLoadedData(conf)); + + ColumnFamilyDescriptorBuilder colBuilder = + ColumnFamilyDescriptorBuilder.newBuilder(SESSIONS_FAMILY); + colBuilder.setMaxVersions(1); + Configuration config = HBaseConfiguration.create(); + int ttl = + config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY, + BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT); + colBuilder.setTimeToLive(ttl); + ColumnFamilyDescriptor colSessionsDesc = colBuilder.build(); + builder.addColumnFamily(colSessionsDesc); + colBuilder = + ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY); + colBuilder.setTimeToLive(ttl); + builder.addColumnFamily(colBuilder.build()); + return builder.build(); + } + + public static TableName getTableNameForBulkLoadedData(Configuration conf) { + String name = + conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY, + BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT) + "_bulk"; + return TableName.valueOf(name); + } + /** * Creates Put operation for a given backup info object * @param context backup info * @return put operation diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index 37c45e016b..455f6b5bc8 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -72,7 +72,6 @@ public class IncrementalTableBackupClient extends TableBackupClient { } protected List filterMissingFiles(List incrBackupFileList) throws IOException { - FileSystem fs = FileSystem.get(conf); List list = new ArrayList(); for (String file : incrBackupFileList) { Path p = new Path(file); @@ -110,6 +109,7 @@ public class IncrementalTableBackupClient extends TableBackupClient { * @param sTableList list of tables to be backed up * @return map of table to List of files */ + @SuppressWarnings("unchecked") protected Map>[] handleBulkLoad(List sTableList) throws IOException { Map>[] mapForSrc = new Map[sTableList.size()]; List activeFiles = new ArrayList(); @@ -117,7 +117,6 @@ public class IncrementalTableBackupClient extends TableBackupClient { Pair>>>>, List> pair = backupManager.readBulkloadRows(sTableList); Map>>>> map = pair.getFirst(); - FileSystem fs = FileSystem.get(conf); FileSystem tgtFs; try { tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf); @@ -126,6 +125,7 @@ public class IncrementalTableBackupClient extends TableBackupClient { } Path rootdir = FSUtils.getRootDir(conf); Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId); + for (Map.Entry>>>> tblEntry : map.entrySet()) { TableName srcTable = tblEntry.getKey(); @@ -192,26 +192,47 @@ public class IncrementalTableBackupClient extends TableBackupClient { } copyBulkLoadedFiles(activeFiles, archiveFiles); - - backupManager.writeBulkLoadedFiles(sTableList, mapForSrc); - backupManager.removeBulkLoadedRows(sTableList, pair.getSecond()); + backupManager.deleteBulkLoadedRows(pair.getSecond()); return mapForSrc; } private void copyBulkLoadedFiles(List activeFiles, List archiveFiles) - throws IOException - { + throws IOException { try { // Enable special mode of BackupDistCp conf.setInt(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 5); // Copy active files String tgtDest = backupInfo.getBackupRootDir() + Path.SEPARATOR + backupInfo.getBackupId(); - if (activeFiles.size() > 0) { + int attempt = 1; + while (activeFiles.size() > 0) { + LOG.info("Copy "+ activeFiles.size() + + " active bulk loaded files. Attempt ="+ (attempt++)); String[] toCopy = new String[activeFiles.size()]; activeFiles.toArray(toCopy); - incrementalCopyHFiles(toCopy, tgtDest); + // Active file can be archived during copy operation, + // we need to handle this properly + try { + incrementalCopyHFiles(toCopy, tgtDest); + break; + } catch (IOException e) { + // Check if some files got archived + // Update active and archived lists + // When file is being moved from active to archive + // directory, the number of active files decreases + + int numOfActive = activeFiles.size(); + updateFileLists(activeFiles, archiveFiles); + if (activeFiles.size() < numOfActive) { + continue; + } + // if not - throw exception + throw e; + } } + // If incremental copy will fail for archived files + // we will have partially loaded files in backup destination (only files from active data + // directory). It is OK, because the backup will marked as FAILED and data will be cleaned up if (archiveFiles.size() > 0) { String[] toCopy = new String[archiveFiles.size()]; archiveFiles.toArray(toCopy); @@ -224,6 +245,25 @@ public class IncrementalTableBackupClient extends TableBackupClient { } + private void updateFileLists(List activeFiles, List archiveFiles) + throws IOException { + List newlyArchived = new ArrayList(); + + for (String spath : activeFiles) { + if (!fs.exists(new Path(spath))) { + newlyArchived.add(spath); + } + } + + if (newlyArchived.size() > 0) { + activeFiles.removeAll(newlyArchived); + archiveFiles.addAll(newlyArchived); + } + + LOG.debug(newlyArchived.size() + " files have been archived."); + + } + @Override public void execute() throws IOException { @@ -322,7 +362,6 @@ public class IncrementalTableBackupClient extends TableBackupClient { protected void deleteBulkLoadDirectory() throws IOException { // delete original bulk load directory on method exit Path path = getBulkOutputDir(); - FileSystem fs = FileSystem.get(conf); boolean result = fs.delete(path, true); if (!result) { LOG.warn("Could not delete " + path); diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java index 099a70da39..6d178259e0 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java @@ -21,13 +21,10 @@ package org.apache.hadoop.hbase.backup.impl; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.TreeSet; @@ -35,19 +32,20 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.HBackupFileSystem; import org.apache.hadoop.hbase.backup.RestoreRequest; import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; -import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.backup.util.RestoreTool; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; -import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.yetus.audience.InterfaceAudience; /** * Restore table implementation @@ -171,8 +169,10 @@ public class RestoreTablesClient { for (int i = 1; i < images.length; i++) { BackupImage im = images[i]; String fileBackupDir = - HBackupFileSystem.getTableBackupDataDir(im.getRootDir(), im.getBackupId(), sTable); - dirList.add(new Path(fileBackupDir)); + HBackupFileSystem.getTableBackupDir(im.getRootDir(), im.getBackupId(), sTable); + List list = getFilesRecursively(fileBackupDir); + dirList.addAll(list); + } String dirs = StringUtils.join(dirList, ","); @@ -185,6 +185,22 @@ public class RestoreTablesClient { LOG.info(sTable + " has been successfully restored to " + tTable); } + private List getFilesRecursively(String fileBackupDir) throws IllegalArgumentException, + IOException + { + FileSystem fs = FileSystem.get((new Path(fileBackupDir)).toUri(), + new Configuration()); + List list = new ArrayList(); + RemoteIterator it = fs.listFiles(new Path(fileBackupDir), true); + while (it.hasNext()) { + Path p = it.next().getPath(); + if (HFile.isHFileFormat(fs, p)) { + list.add(p); + } + } + return list; + } + /** * Restore operation. Stage 2: resolved Backup Image dependency * @param backupManifestMap : tableName, Manifest @@ -226,27 +242,6 @@ public class RestoreTablesClient { } } } - try (BackupSystemTable table = new BackupSystemTable(conn)) { - List sTableList = Arrays.asList(sTableArray); - for (String id : backupIdSet) { - LOG.debug("restoring bulk load for " + id); - Map>[] mapForSrc = table.readBulkLoadedFiles(id, sTableList); - Map loaderResult; - conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true); - LoadIncrementalHFiles loader = BackupUtils.createLoader(conf); - for (int i = 0; i < sTableList.size(); i++) { - if (mapForSrc[i] != null && !mapForSrc[i].isEmpty()) { - loaderResult = loader.run(mapForSrc[i], tTableArray[i]); - LOG.debug("bulk loading " + sTableList.get(i) + " to " + tTableArray[i]); - if (loaderResult.isEmpty()) { - String msg = "Couldn't bulk load for " + sTableList.get(i) + " to " + tTableArray[i]; - LOG.error(msg); - throw new IOException(msg); - } - } - } - } - } LOG.debug("restoreStage finished"); } diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java index 05fcec327f..8635451ec1 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java @@ -37,13 +37,12 @@ import org.apache.hadoop.hbase.backup.BackupRestoreConstants; import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.HBackupFileSystem; import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; - -import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; +import org.apache.yetus.audience.InterfaceAudience; /** * Base class for backup operation. Concrete implementation for @@ -69,6 +68,7 @@ public abstract class TableBackupClient { protected BackupManager backupManager; protected BackupInfo backupInfo; + protected FileSystem fs; public TableBackupClient() { } @@ -90,6 +90,7 @@ public abstract class TableBackupClient { this.tableList = request.getTableList(); this.conn = conn; this.conf = conn.getConfiguration(); + this.fs = FSUtils.getCurrentFileSystem(conf); backupInfo = backupManager.createBackupInfo(backupId, request.getBackupType(), tableList, request.getTargetRootDir(), request.getTotalTasks(), request.getBandwidth()); @@ -258,22 +259,21 @@ public abstract class TableBackupClient { } } - public static void cleanupAndRestoreBackupSystem (Connection conn, BackupInfo backupInfo, - Configuration conf) throws IOException - { + public static void cleanupAndRestoreBackupSystem(Connection conn, BackupInfo backupInfo, + Configuration conf) throws IOException { BackupType type = backupInfo.getType(); - // if full backup, then delete HBase snapshots if there already are snapshots taken - // and also clean up export snapshot log files if exist - if (type == BackupType.FULL) { - deleteSnapshots(conn, backupInfo, conf); - cleanupExportSnapshotLog(conf); - } - BackupSystemTable.restoreFromSnapshot(conn); - BackupSystemTable.deleteSnapshot(conn); - // clean up the uncompleted data at target directory if the ongoing backup has already entered - // the copy phase - // For incremental backup, DistCp logs will be cleaned with the targetDir. - cleanupTargetDir(backupInfo, conf); + // if full backup, then delete HBase snapshots if there already are snapshots taken + // and also clean up export snapshot log files if exist + if (type == BackupType.FULL) { + deleteSnapshots(conn, backupInfo, conf); + cleanupExportSnapshotLog(conf); + } + BackupSystemTable.restoreFromSnapshot(conn); + BackupSystemTable.deleteSnapshot(conn); + // clean up the uncompleted data at target directory if the ongoing backup has already entered + // the copy phase + // For incremental backup, DistCp logs will be cleaned with the targetDir. + cleanupTargetDir(backupInfo, conf); } @@ -355,7 +355,6 @@ public abstract class TableBackupClient { */ protected void cleanupDistCpLog(BackupInfo backupInfo, Configuration conf) throws IOException { Path rootPath = new Path(backupInfo.getHLogTargetDir()).getParent(); - FileSystem fs = FileSystem.get(rootPath.toUri(), conf); FileStatus[] files = FSUtils.listStatus(fs, rootPath); if (files == null) { return; diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java index bed61ed321..4db828e1d5 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java @@ -31,9 +31,9 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupRestoreConstants; import org.apache.hadoop.hbase.backup.RestoreJob; import org.apache.hadoop.hbase.backup.util.BackupUtils; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; import org.apache.hadoop.util.Tool; +import org.apache.yetus.audience.InterfaceAudience; /** diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java index 8d23c69746..7e722f371c 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java @@ -58,7 +58,6 @@ import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.security.HadoopSecurityEnabledUserProviderForTesting; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.access.SecureTestUtil; @@ -296,9 +295,6 @@ public class TestBackupBase { // setup configuration SecureTestUtil.enableSecurity(TEST_UTIL.getConfiguration()); } - String coproc = conf1.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY); - conf1.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, (coproc == null ? "" : coproc + ",") + - BackupObserver.class.getName()); conf1.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true); BackupManager.decorateMasterConfiguration(conf1); BackupManager.decorateRegionServerConfiguration(conf1); diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java index f63bf298b6..4697ad5610 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java @@ -113,18 +113,33 @@ public class TestIncrementalBackupWithBulkLoad extends TestBackupBase { request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR); String backupIdIncMultiple = client.backupTables(request); assertTrue(checkSucceeded(backupIdIncMultiple)); - + // #4 bulk load again + LOG.debug("bulk loading into " + testName); + int actual1 = TestLoadIncrementalHFiles.loadHFiles(testName, table1Desc, TEST_UTIL, famName, + qualName, false, null, new byte[][][] { + new byte[][]{ Bytes.toBytes("ppp"), Bytes.toBytes("qqq") }, + new byte[][]{ Bytes.toBytes("rrr"), Bytes.toBytes("sss") }, + }, true, false, true, NB_ROWS_IN_BATCH * 2 + actual, NB_ROWS2); + + // #5 - incremental backup for table1 + tables = Lists.newArrayList(table1); + request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR); + String backupIdIncMultiple1 = client.backupTables(request); + assertTrue(checkSucceeded(backupIdIncMultiple1)); + + // Delete all data in table1 + TEST_UTIL.deleteTableData(table1); // #5.1 - check tables for full restore */ HBaseAdmin hAdmin = TEST_UTIL.getHBaseAdmin(); // #6 - restore incremental backup for table1 TableName[] tablesRestoreIncMultiple = new TableName[] { table1 }; - TableName[] tablesMapIncMultiple = new TableName[] { table1_restore }; - client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple, - false, tablesRestoreIncMultiple, tablesMapIncMultiple, true)); + //TableName[] tablesMapIncMultiple = new TableName[] { table1_restore }; + client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple1, + false, tablesRestoreIncMultiple, tablesRestoreIncMultiple, true)); - HTable hTable = (HTable) conn.getTable(table1_restore); - Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH * 2+actual); + HTable hTable = (HTable) conn.getTable(table1); + Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH * 2 + actual + actual1); request = createBackupRequest(BackupType.FULL, tables, BACKUP_ROOT_DIR); backupIdFull = client.backupTables(request); -- 2.11.0 (Apple Git-81)