diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java index c1d5258..de2e316 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java @@ -189,6 +189,32 @@ public class BackupAdminImpl implements BackupAdmin { } } LOG.debug("Delete backup info " + backupInfo.getBackupId()); + Map map = sysTable.readBulkLoadedFiles(backupId); + FileSystem fs = FileSystem.get(conn.getConfiguration()); + boolean succ = true; + int numDeleted = 0; + for (String f : map.values()) { + Path p = new Path(f); + try { + if (!fs.delete(p)) { + if (fs.exists(p)) { + LOG.warn(f + " was not deleted"); + succ = false; + } + } else { + numDeleted++; + } + } catch (IOException ioe) { + LOG.warn(f + " was not deleted", ioe); + succ = false; + } + } + if (LOG.isDebugEnabled()) { + LOG.debug(numDeleted + " bulk loaded files out of " + map.size() + " were deleted"); + } + if (succ) { + sysTable.deleteBulkLoadedFiles(map); + } sysTable.deleteBackupInfo(backupInfo.getBackupId()); LOG.info("Delete backup " + backupInfo.getBackupId() + " completed."); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java index 1d27e79..c09ce48 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.commons.logging.Log; @@ -46,6 +47,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.procedure.ProcedureManagerHost; +import org.apache.hadoop.hbase.util.Pair; import com.google.common.annotations.VisibleForTesting; @@ -393,6 +395,20 @@ public class BackupManager implements Closeable { return systemTable.readRegionServerLastLogRollResult(backupInfo.getBackupRootDir()); } + public Pair>>>>, List> + readBulkloadRows(List tableList) throws IOException { + return systemTable.readBulkloadRows(tableList); + } + + public void removeBulkLoadedRows(List lst, List rows) throws IOException { + systemTable.removeBulkLoadedRows(lst, rows); + } + + public void writeBulkLoadedFiles(List sTableList, Map>[] maps) + throws IOException { + systemTable.writeBulkLoadedFiles(sTableList, maps, backupInfo.getBackupId()); + } + /** * Get all completed backup information (in desc order by time) * @return history info of BackupCompleteData diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java index 6362f8e..27a268a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.TreeMap; import java.util.TreeSet; import org.apache.commons.lang.StringUtils; @@ -35,6 +36,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -44,6 +46,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupInfo; import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; import org.apache.hadoop.hbase.backup.BackupRestoreConstants; +import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Admin; @@ -59,6 +62,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; /** * This class provides API to access backup system table
@@ -77,6 +81,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; */ @InterfaceAudience.Private public final class BackupSystemTable implements Closeable { + private static final Log LOG = LogFactory.getLog(BackupSystemTable.class); static class WALItem { String backupId; @@ -108,8 +113,6 @@ public final class BackupSystemTable implements Closeable { } - private static final Log LOG = LogFactory.getLog(BackupSystemTable.class); - private TableName tableName; /** * Stores backup sessions (contexts) @@ -119,6 +122,7 @@ public final class BackupSystemTable implements Closeable { * Stores other meta */ final static byte[] META_FAMILY = "meta".getBytes(); + final static byte[] BULK_LOAD_FAMILY = "bulk".getBytes(); /** * Connection to HBase cluster, shared among all instances */ @@ -130,9 +134,22 @@ public final class BackupSystemTable implements Closeable { private final static String INCR_BACKUP_SET = "incrbackupset:"; private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:"; private final static String RS_LOG_TS_PREFIX = "rslogts:"; + + private final static String BULK_LOAD_PREFIX = "bulk:"; + private final static byte[] BULK_LOAD_PREFIX_BYTES = BULK_LOAD_PREFIX.getBytes(); + final static byte[] TBL_COL = Bytes.toBytes("tbl"); + final static byte[] FAM_COL = Bytes.toBytes("fam"); + final static byte[] PATH_COL = Bytes.toBytes("path"); + final static byte[] STATE_COL = Bytes.toBytes("state"); + // the two states a bulk loaded file can be + final static byte[] BL_RAW = Bytes.toBytes("R"); + final static byte[] BL_DONE = Bytes.toBytes("D"); + private final static String WALS_PREFIX = "wals:"; private final static String SET_KEY_PREFIX = "backupset:"; + // separator between BULK_LOAD_PREFIX and ordinals + protected final static String BLK_LD_DELIM = ":"; private final static byte[] EMPTY_VALUE = new byte[] {}; // Safe delimiter in a string @@ -196,6 +213,97 @@ public final class BackupSystemTable implements Closeable { } } + /* + * @param backupId the backup Id + * @return Map of rows to path of bulk loaded hfile + */ + Map readBulkLoadedFiles(String backupId) throws IOException { + Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId); + try (Table table = connection.getTable(tableName); + ResultScanner scanner = table.getScanner(scan)) { + Result res = null; + Map map = new TreeMap<>(Bytes.BYTES_COMPARATOR); + while ((res = scanner.next()) != null) { + res.advance(); + byte[] row = CellUtil.cloneRow(res.listCells().get(0)); + for (Cell cell : res.listCells()) { + if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, + BackupSystemTable.PATH_COL.length) == 0) { + map.put(row, Bytes.toString(CellUtil.cloneValue(cell))); + } + } + } + return map; + } + } + + /* + * Used during restore + * @param backupId the backup Id + * @param sTableList List of tables + * @return array of Map of family to List of Paths + */ + public Map>[] readBulkLoadedFiles(String backupId, List sTableList) + throws IOException { + Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId); + Map>[] mapForSrc = new Map[sTableList == null ? 1 : sTableList.size()]; + try (Table table = connection.getTable(tableName); + ResultScanner scanner = table.getScanner(scan)) { + Result res = null; + while ((res = scanner.next()) != null) { + res.advance(); + TableName tbl = null; + byte[] fam = null; + String path = null; + for (Cell cell : res.listCells()) { + if (CellComparator.compareQualifiers(cell, BackupSystemTable.TBL_COL, 0, + BackupSystemTable.TBL_COL.length) == 0) { + tbl = TableName.valueOf(CellUtil.cloneValue(cell)); + } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0, + BackupSystemTable.FAM_COL.length) == 0) { + fam = CellUtil.cloneValue(cell); + } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, + BackupSystemTable.PATH_COL.length) == 0) { + path = Bytes.toString(CellUtil.cloneValue(cell)); + } + } + int srcIdx = IncrementalTableBackupClient.getIndex(tbl, sTableList); + if (srcIdx == -1) { + // the table is not among the query + continue; + } + if (mapForSrc[srcIdx] == null) { + mapForSrc[srcIdx] = new TreeMap<>(Bytes.BYTES_COMPARATOR); + } + List files; + if (!mapForSrc[srcIdx].containsKey(fam)) { + files = new ArrayList(); + mapForSrc[srcIdx].put(fam, files); + } else { + files = mapForSrc[srcIdx].get(fam); + } + files.add(new Path(path)); + if (LOG.isDebugEnabled()) { + LOG.debug("found bulk loaded file : " + tbl + " " + Bytes.toString(fam) + " " + path); + } + }; + return mapForSrc; + } + } + + /* + * @param map Map of row keys to path of bulk loaded hfile + */ + void deleteBulkLoadedFiles(Map map) throws IOException { + try (Table table = connection.getTable(tableName)) { + List dels = new ArrayList<>(); + for (byte[] row : map.keySet()) { + dels.add(new Delete(row).addFamily(BackupSystemTable.META_FAMILY)); + } + table.delete(dels); + } + } + /** * Deletes backup status from backup system table table * @param backupId backup id @@ -213,6 +321,155 @@ public final class BackupSystemTable implements Closeable { } } + /* + * For postBulkLoadHFile() hook. + * @param tabName table name + * @param region the region receiving hfile + * @param finalPaths family and associated hfiles + */ + public void writePathsPostBulkLoad(TableName tabName, byte[] region, + Map> finalPaths) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("write bulk load descriptor to backup " + tabName + " with " + + finalPaths.size() + " entries"); + } + try (Table table = connection.getTable(tableName)) { + List puts = BackupSystemTable.createPutForOrigBulkload(tabName, region, + finalPaths); + table.put(puts); + LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName); + } + } + /* + * For preCommitStoreFile() hook + * @param tabName table name + * @param region the region receiving hfile + * @param family column family + * @param pairs list of paths for hfiles + */ + public void writeFilesForBulkLoadPreCommit(TableName tabName, byte[] region, + final byte[] family, final List> pairs) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("write bulk load descriptor to backup " + tabName + " with " + + pairs.size() + " entries"); + } + try (Table table = connection.getTable(tableName)) { + List puts = BackupSystemTable.createPutForOrigBulkload(tabName, region, + family, pairs); + table.put(puts); + LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName); + } + } + + /* + * Removes rows recording bulk loaded hfiles from backup table + * @param lst list of table names + * @param rows the rows to be deleted + */ + public void removeBulkLoadedRows(List lst, List rows) throws IOException { + try (Table table = connection.getTable(tableName)) { + List lstDels = new ArrayList<>(); + for (byte[] row : rows) { + Delete del = new Delete(row); + lstDels.add(del); + LOG.debug("orig deleting the row: " + Bytes.toString(row)); + } + table.delete(lstDels); + LOG.debug("deleted " + rows.size() + " original bulkload rows for " + lst.size() + " tables"); + } + } + + /* + * @param tableList list of table names + * @return The keys of the Map are table, region and column family. + * Value of the map reflects whether the hfile was recorded by preCommitStoreFile hook (true) + */ + public Pair>>>>, List> + readBulkloadRows(List tableList) throws IOException { + Map>>>> map = new HashMap<>(); + List rows = new ArrayList<>(); + for (TableName tTable : tableList) { + Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(tTable); + Map>>> tblMap = map.get(tTable); + try (Table table = connection.getTable(tableName); + ResultScanner scanner = table.getScanner(scan)) { + Result res = null; + while ((res = scanner.next()) != null) { + res.advance(); + String fam = null; + String path = null; + boolean raw = false; + byte[] row = null; + String region = null; + for (Cell cell : res.listCells()) { + row = CellUtil.cloneRow(cell); + rows.add(row); + String rowStr = Bytes.toString(row); + region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr); + if (CellComparator.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0, + BackupSystemTable.FAM_COL.length) == 0) { + fam = Bytes.toString(CellUtil.cloneValue(cell)); + } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0, + BackupSystemTable.PATH_COL.length) == 0) { + path = Bytes.toString(CellUtil.cloneValue(cell)); + } else if (CellComparator.compareQualifiers(cell, BackupSystemTable.STATE_COL, 0, + BackupSystemTable.STATE_COL.length) == 0) { + byte[] state = CellUtil.cloneValue(cell); + if (Bytes.equals(BackupSystemTable.BL_RAW, state)) { + raw = true; + } else raw = false; + } + } + if (map.get(tTable) == null) { + map.put(tTable, new HashMap>>>()); + tblMap = map.get(tTable); + } + if (tblMap.get(region) == null) { + tblMap.put(region, new HashMap>>()); + } + Map>> famMap = tblMap.get(region); + if (famMap.get(fam) == null) { + famMap.put(fam, new ArrayList>()); + } + famMap.get(fam).add(new Pair<>(path, raw)); + LOG.debug("found orig " + path + " for " + fam + " of table " + region); + } + } + } + return new Pair<>(map, rows); + } + + /* + * @param sTableList List of tables + * @param maps array of Map of family to List of Paths + * @param backupId the backup Id + */ + public void writeBulkLoadedFiles(List sTableList, Map>[] maps, + String backupId) throws IOException { + try (Table table = connection.getTable(tableName)) { + long ts = EnvironmentEdgeManager.currentTime(); + int cnt = 0; + List puts = new ArrayList<>(); + for (int idx = 0; idx < maps.length; idx++) { + Map> map = maps[idx]; + TableName tn = sTableList.get(idx); + if (map == null) continue; + for (Map.Entry> entry: map.entrySet()) { + byte[] fam = entry.getKey(); + List paths = entry.getValue(); + for (Path p : paths) { + Put put = BackupSystemTable.createPutForBulkLoadedFile(tn, fam, p.toString(), + backupId, ts, cnt++); + puts.add(put); + } + } + } + if (!puts.isEmpty()) { + table.put(puts); + } + } + } + /** * Reads backup status object (instance of backup info) from backup system table table * @param backupId backup id @@ -399,6 +656,21 @@ public final class BackupSystemTable implements Closeable { } + /* + * Retrieve TableName's for completed backup of given type + * @param type backup type + * @return List of table names + */ + public List getTablesForBackupType(BackupType type) throws IOException { + List names = new ArrayList<>(); + List infos = getBackupHistory(true); + for (BackupInfo info : infos) { + if (info.getType() != type) continue; + names.addAll(info.getTableNames()); + } + return names; + } + /** * Get history for backup destination * @param backupRoot backup destination path @@ -1233,6 +1505,119 @@ public final class BackupSystemTable implements Closeable { return s.substring(index + 1); } + /* + * Creates Put's for bulk load resulting from running LoadIncrementalHFiles + */ + static List createPutForOrigBulkload(TableName table, byte[] region, + Map> finalPaths) { + List puts = new ArrayList<>(); + for (Map.Entry> entry : finalPaths.entrySet()) { + for (Path path : entry.getValue()) { + String file = path.toString(); + int lastSlash = file.lastIndexOf("/"); + String filename = file.substring(lastSlash+1); + Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, + Bytes.toString(region), BLK_LD_DELIM, filename)); + put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName()); + put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey()); + put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, + file.getBytes()); + put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_DONE); + puts.add(put); + LOG.debug("writing done bulk path " + file + " for " + table + " " + + Bytes.toString(region)); + } + } + return puts; + } + + /* + * Creates Put's for bulk load resulting from running LoadIncrementalHFiles + */ + static List createPutForOrigBulkload(TableName table, byte[] region, + final byte[] family, final List> pairs) { + List puts = new ArrayList<>(); + for (Pair pair : pairs) { + Path path = pair.getSecond(); + String file = path.toString(); + int lastSlash = file.lastIndexOf("/"); + String filename = file.substring(lastSlash+1); + Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, + Bytes.toString(region), BLK_LD_DELIM, filename)); + put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName()); + put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, family); + put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, + file.getBytes()); + put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_RAW); + puts.add(put); + LOG.debug("writing raw bulk path " + file + " for " + table + " " + + Bytes.toString(region)); + } + return puts; + } + public static List createDeleteForOrigBulkLoad(List lst) { + List lstDels = new ArrayList<>(); + for (TableName table : lst) { + Delete del = new Delete(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM)); + del.addFamily(BackupSystemTable.META_FAMILY); + lstDels.add(del); + } + return lstDels; + } + + static Scan createScanForOrigBulkLoadedFiles(TableName table) throws IOException { + Scan scan = new Scan(); + byte[] startRow = rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM); + byte[] stopRow = Arrays.copyOf(startRow, startRow.length); + stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); + scan.withStartRow(startRow); + scan.withStopRow(stopRow); + scan.addFamily(BackupSystemTable.META_FAMILY); + scan.setMaxVersions(1); + return scan; + } + static String getTableNameFromOrigBulkLoadRow(String rowStr) { + String[] parts = rowStr.split(BLK_LD_DELIM); + return parts[1]; + } + static String getRegionNameFromOrigBulkLoadRow(String rowStr) { + // format is bulk : namespace : table : region : file + String[] parts = rowStr.split(BLK_LD_DELIM); + int idx = 3; + if (parts.length == 4) { + // the table is in default namespace + idx = 2; + } + LOG.debug("bulk row string " + rowStr + " region " + parts[idx]); + return parts[idx]; + } + /* + * Used to query bulk loaded hfiles which have been copied by incremental backup + * @param backupId the backup Id. It can be null when querying for all tables + * @return the Scan object + */ + static Scan createScanForBulkLoadedFiles(String backupId) throws IOException { + Scan scan = new Scan(); + byte[] startRow = backupId == null ? BULK_LOAD_PREFIX_BYTES : + rowkey(BULK_LOAD_PREFIX, backupId+BLK_LD_DELIM); + byte[] stopRow = Arrays.copyOf(startRow, startRow.length); + stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); + scan.setStartRow(startRow); + scan.setStopRow(stopRow); + //scan.setTimeRange(lower, Long.MAX_VALUE); + scan.addFamily(BackupSystemTable.META_FAMILY); + scan.setMaxVersions(1); + return scan; + } + + static Put createPutForBulkLoadedFile(TableName tn, byte[] fam, String p, String backupId, + long ts, int idx) { + Put put = new Put(rowkey(BULK_LOAD_PREFIX, backupId+BLK_LD_DELIM+ts+BLK_LD_DELIM+idx)); + put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, tn.getName()); + put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, fam); + put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, p.getBytes()); + return put; + } /** * Creates put list for list of WAL files * @param files list of WAL file paths @@ -1364,7 +1749,7 @@ public final class BackupSystemTable implements Closeable { return Bytes.toString(data).substring(SET_KEY_PREFIX.length()); } - private byte[] rowkey(String s, String... other) { + private static byte[] rowkey(String s, String... other) { StringBuilder sb = new StringBuilder(s); for (String ss : other) { sb.append(ss); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index 395ed6d..a696541 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -18,15 +18,21 @@ package org.apache.hadoop.hbase.backup.impl; +import java.io.FileNotFoundException; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.TreeMap; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; @@ -40,6 +46,10 @@ import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.HFileArchiveUtil; +import org.apache.hadoop.hbase.util.Pair; /** * Incremental backup implementation. @@ -154,6 +164,112 @@ public class IncrementalTableBackupClient extends TableBackupClient { return list; } + static int getIndex(TableName tbl, List sTableList) { + if (sTableList == null) return 0; + for (int i = 0; i < sTableList.size(); i++) { + if (tbl.equals(sTableList.get(i))) { + return i; + } + } + return -1; + } + + Map>[] handleBulkLoad(List sTableList) throws IOException { + Map>[] mapForSrc = new Map[sTableList.size()]; + Pair>>>>, List> pair = + backupManager.readBulkloadRows(sTableList); + Map>>>> map = pair.getFirst(); + FileSystem fs = FileSystem.get(conf); + FileSystem tgtFs; + try { + tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf); + } catch (URISyntaxException use) { + throw new IOException("Unable to get FileSystem", use); + } + Path rootdir = FSUtils.getRootDir(conf); + Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId); + for (Map.Entry>>>> tblEntry : + map.entrySet()) { + TableName srcTable = tblEntry.getKey(); + int srcIdx = getIndex(srcTable, sTableList); + if (srcIdx < 0) { + LOG.warn("Couldn't find " + srcTable + " in source table List"); + continue; + } + if (mapForSrc[srcIdx] == null) { + mapForSrc[srcIdx] = new TreeMap>(Bytes.BYTES_COMPARATOR); + } + Path tblDir = FSUtils.getTableDir(rootdir, srcTable); + Path tgtTable = new Path(new Path(tgtRoot, srcTable.getNamespaceAsString()), + srcTable.getQualifierAsString()); + for (Map.Entry>>> regionEntry : + tblEntry.getValue().entrySet()){ + String regionName = regionEntry.getKey(); + Path regionDir = new Path(tblDir, regionName); + // map from family to List of hfiles + for (Map.Entry>> famEntry : + regionEntry.getValue().entrySet()) { + String fam = famEntry.getKey(); + Path famDir = new Path(regionDir, fam); + List files; + if (!mapForSrc[srcIdx].containsKey(fam.getBytes())) { + files = new ArrayList(); + mapForSrc[srcIdx].put(fam.getBytes(), files); + } else { + files = mapForSrc[srcIdx].get(fam.getBytes()); + } + Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, regionName, fam); + String tblName = srcTable.getQualifierAsString(); + Path tgtFam = new Path(new Path(tgtTable, regionName), fam); + if (!tgtFs.mkdirs(tgtFam)) { + throw new IOException("couldn't create " + tgtFam); + } + for (Pair fileWithState : famEntry.getValue()) { + String file = fileWithState.getFirst(); + boolean raw = fileWithState.getSecond(); + int idx = file.lastIndexOf("/"); + String filename = file; + if (idx > 0) { + filename = file.substring(idx+1); + } + Path p = new Path(famDir, filename); + Path tgt = new Path(tgtFam, filename); + Path archive = new Path(archiveDir, filename); + if (fs.exists(p)) { + if (LOG.isTraceEnabled()) { + LOG.trace("found bulk hfile " + file + " in " + famDir + " for " + tblName); + } + try { + if (LOG.isTraceEnabled()) { + LOG.trace("copying " + p + " to " + tgt); + } + FileUtil.copy(fs, p, tgtFs, tgt, false,conf); + } catch (FileNotFoundException e) { + LOG.debug("copying archive " + archive + " to " + tgt); + try { + FileUtil.copy(fs, archive, tgtFs, tgt, false, conf); + } catch (FileNotFoundException fnfe) { + if (!raw) throw fnfe; + } + } + } else { + LOG.debug("copying archive " + archive + " to " + tgt); + try { + FileUtil.copy(fs, archive, tgtFs, tgt, false, conf); + } catch (FileNotFoundException fnfe) { + if (!raw) throw fnfe; + } + } + files.add(tgt); + } + } + } + } + backupManager.writeBulkLoadedFiles(sTableList, mapForSrc); + backupManager.removeBulkLoadedRows(sTableList, pair.getSecond()); + return mapForSrc; + } + @Override public void execute() throws IOException { @@ -204,6 +320,8 @@ public class IncrementalTableBackupClient extends TableBackupClient { BackupUtils.getMinValue(BackupUtils .getRSLogTimestampMins(newTableSetTimestampMap)); backupManager.writeBackupStartCode(newStartCode); + + handleBulkLoad(backupInfo.getTableNames()); // backup complete completeBackup(conn, backupInfo, backupManager, BackupType.INCREMENTAL, conf); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java index f418305..7f4643b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java @@ -19,9 +19,14 @@ package org.apache.hadoop.hbase.backup.impl; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.TreeSet; import org.apache.commons.lang.StringUtils; @@ -34,10 +39,13 @@ import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.HBackupFileSystem; import org.apache.hadoop.hbase.backup.RestoreRequest; import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; +import org.apache.hadoop.hbase.backup.mapreduce.MapReduceRestoreJob; import org.apache.hadoop.hbase.backup.util.RestoreTool; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.LoadQueueItem; /** * Restore table implementation @@ -50,6 +58,7 @@ public class RestoreTablesClient { private Configuration conf; private Connection conn; private String backupId; + private String fullBackupId; private TableName[] sTableArray; private TableName[] tTableArray; private String targetRootDir; @@ -141,6 +150,7 @@ public class RestoreTablesClient { // We need hFS only for full restore (see the code) BackupManifest manifest = HBackupFileSystem.getManifest(sTable, conf, backupRoot, backupId); if (manifest.getType() == BackupType.FULL) { + fullBackupId = manifest.getBackupImage().getBackupId(); LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from full" + " backup image " + tableBackupPath.toString()); restoreTool.fullRestoreTable(conn, tableBackupPath, sTable, tTable, truncateIfExists, @@ -170,7 +180,6 @@ public class RestoreTablesClient { restoreTool.incrementalRestoreTable(conn, tableBackupPath, paths, new TableName[] { sTable }, new TableName[] { tTable }, lastIncrBackupId); LOG.info(sTable + " has been successfully restored to " + tTable); - } /** @@ -185,39 +194,83 @@ public class RestoreTablesClient { TableName[] sTableArray, TableName[] tTableArray, boolean isOverwrite) throws IOException { TreeSet restoreImageSet = new TreeSet(); boolean truncateIfExists = isOverwrite; - try { - for (int i = 0; i < sTableArray.length; i++) { - TableName table = sTableArray[i]; - BackupManifest manifest = backupManifestMap.get(table); - // Get the image list of this backup for restore in time order from old - // to new. - List list = new ArrayList(); - list.add(manifest.getBackupImage()); - TreeSet set = new TreeSet(list); - List depList = manifest.getDependentListByTable(table); - set.addAll(depList); - BackupImage[] arr = new BackupImage[set.size()]; - set.toArray(arr); - restoreImages(arr, table, tTableArray[i], truncateIfExists); - restoreImageSet.addAll(list); - if (restoreImageSet != null && !restoreImageSet.isEmpty()) { - LOG.info("Restore includes the following image(s):"); - for (BackupImage image : restoreImageSet) { - LOG.info("Backup: " - + image.getBackupId() - + " " - + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(), + Set backupIdSet = new HashSet<>(); + for (int i = 0; i < sTableArray.length; i++) { + TableName table = sTableArray[i]; + BackupManifest manifest = backupManifestMap.get(table); + // Get the image list of this backup for restore in time order from old + // to new. + List list = new ArrayList(); + list.add(manifest.getBackupImage()); + TreeSet set = new TreeSet(list); + List depList = manifest.getDependentListByTable(table); + set.addAll(depList); + BackupImage[] arr = new BackupImage[set.size()]; + set.toArray(arr); + restoreImages(arr, table, tTableArray[i], truncateIfExists); + restoreImageSet.addAll(list); + if (restoreImageSet != null && !restoreImageSet.isEmpty()) { + LOG.info("Restore includes the following image(s):"); + for (BackupImage image : restoreImageSet) { + LOG.info("Backup: " + + image.getBackupId() + + " " + + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(), table)); + if (image.getType() == BackupType.INCREMENTAL) { + backupIdSet.add(image.getBackupId()); + LOG.debug("adding " + image.getBackupId() + " for bulk load"); + } + } + } + } + try (BackupSystemTable table = new BackupSystemTable(conn)) { + List sTableList = Arrays.asList(sTableArray); + for (String id : backupIdSet) { + LOG.debug("restoring bulk load for " + id); + Map>[] mapForSrc = table.readBulkLoadedFiles(id, sTableList); + Map loaderResult; + conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true); + LoadIncrementalHFiles loader = MapReduceRestoreJob.createLoader(conf); + for (int i = 0; i < sTableList.size(); i++) { + if (mapForSrc[i] != null && !mapForSrc[i].isEmpty()) { + loaderResult = loader.run(null, mapForSrc[i], tTableArray[i]); + LOG.debug("bulk loading " + sTableList.get(i) + " to " + tTableArray[i]); + if (loaderResult.isEmpty()) { + String msg = "Couldn't bulk load for " + sTableList.get(i) + " to " +tTableArray[i]; + LOG.error(msg); + throw new IOException(msg); + } } } } - } catch (Exception e) { - LOG.error("Failed", e); - throw new IOException(e); } LOG.debug("restoreStage finished"); } + static long getTsFromBackupId(String backupId) { + if (backupId == null) { + return 0; + } + return Long.parseLong(backupId.substring(backupId.lastIndexOf("_")+1)); + } + + static boolean withinRange(long a, long lower, long upper) { + if (a < lower || a > upper) { + return false; + } + return true; + } + + int getIndex(TableName tbl, List sTableList) { + for (int i = 0; i < sTableList.size(); i++) { + if (tbl.equals(sTableList.get(i))) { + return i; + } + } + return -1; + } + public void execute() throws IOException { // case VALIDATION: diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java index ffb61ec..9bafe12 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java @@ -98,7 +98,7 @@ public class MapReduceRestoreJob implements RestoreJob { result = player.run(playerArgs); if (succeeded(result)) { // do bulk load - LoadIncrementalHFiles loader = createLoader(); + LoadIncrementalHFiles loader = createLoader(getConf()); if (LOG.isDebugEnabled()) { LOG.debug("Restoring HFiles from directory " + bulkOutputPath); } @@ -134,13 +134,13 @@ public class MapReduceRestoreJob implements RestoreJob { return result == 0; } - private LoadIncrementalHFiles createLoader() throws IOException { + public static LoadIncrementalHFiles createLoader(Configuration config) throws IOException { // set configuration for restore: // LoadIncrementalHFile needs more time // hbase.rpc.timeout 600000 // calculates Integer milliSecInHour = 3600000; - Configuration conf = new Configuration(getConf()); + Configuration conf = new Configuration(config); conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, milliSecInHour); // By default, it is 32 and loader will fail if # of files in any region exceed this diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index f59e24c..80dfd66 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -27,6 +27,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.Deque; import java.util.HashMap; import java.util.HashSet; @@ -62,6 +63,9 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.backup.BackupType; +import org.apache.hadoop.hbase.backup.impl.BackupManager; +import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Admin; @@ -144,7 +148,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { initialize(); } - private void initialize() throws Exception { + private void initialize() throws IOException { if (initalized) { return; } @@ -282,6 +286,14 @@ public class LoadIncrementalHFiles extends Configured implements Tool { public String toString() { return "family:"+ Bytes.toString(family) + " path:" + hfilePath.toString(); } + + public byte[] getFamily() { + return family; + } + + public Path getFilePath() { + return hfilePath; + } } /* @@ -1184,7 +1196,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * If the table is created for the first time, then "completebulkload" reads the files twice. * More modifications necessary if we want to avoid doing it. */ - private void createTable(TableName tableName, String dirPath, Admin admin) throws Exception { + private void createTable(TableName tableName, String dirPath, Admin admin) throws IOException { final Path hfofDir = new Path(dirPath); final FileSystem fs = hfofDir.getFileSystem(getConf()); @@ -1238,7 +1250,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } public Map run(String dirPath, Map> map, - TableName tableName) throws Exception{ + TableName tableName) throws IOException { initialize(); try (Connection connection = ConnectionFactory.createConnection(getConf()); Admin admin = connection.getAdmin()) { @@ -1261,7 +1273,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { try (Table table = connection.getTable(tableName); RegionLocator locator = connection.getRegionLocator(tableName)) { boolean silence = "yes".equalsIgnoreCase(getConf().get(IGNORE_UNMATCHED_CF_CONF_KEY, "")); - boolean copyFiles = "yes".equalsIgnoreCase(getConf().get(ALWAYS_COPY_FILES, "")); + boolean copyFiles = getConf().getBoolean(ALWAYS_COPY_FILES, false); if (dirPath != null) { doBulkLoad(hfofDir, admin, table, locator, silence, copyFiles); } else { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java index faae4ef..8e3e105 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java @@ -167,4 +167,20 @@ public class HFileArchiveUtil { private static Path getArchivePath(final Path rootdir) { return new Path(rootdir, HConstants.HFILE_ARCHIVE_DIRECTORY); } + + /* + * @return table name given archive file path + */ + public static TableName getTableName(Path archivePath) { + Path p = archivePath; + String tbl = null; + // namespace is the 4th parent of file + for (int i = 0; i < 5; i++) { + if (p == null) return null; + if (i == 3) tbl = p.getName(); + p = p.getParent(); + } + if (p == null) return null; + return TableName.valueOf(p.getName(), tbl); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java index ec88549..e6bd73e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java @@ -49,6 +49,10 @@ import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.mapreduce.HadoopSecurityEnabledUserProviderForTesting; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.security.access.SecureTestUtil; import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WALFactory; @@ -88,6 +92,7 @@ public class TestBackupBase { protected static String BACKUP_ROOT_DIR = "/backupUT"; protected static String BACKUP_REMOTE_ROOT_DIR = "/backupUT"; protected static String provider = "defaultProvider"; + protected static boolean secure = false; /** * @throws java.lang.Exception @@ -96,6 +101,16 @@ public class TestBackupBase { public static void setUpBeforeClass() throws Exception { TEST_UTIL = new HBaseTestingUtility(); conf1 = TEST_UTIL.getConfiguration(); + if (secure) { + // set the always on security provider + UserProvider.setUserProviderForTesting(TEST_UTIL.getConfiguration(), + HadoopSecurityEnabledUserProviderForTesting.class); + // setup configuration + SecureTestUtil.enableSecurity(TEST_UTIL.getConfiguration()); + } + String coproc = conf1.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY); + conf1.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, (coproc == null ? "" : coproc + ",") + + BackupObserver.class.getName()); conf1.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true); BackupManager.decorateMasterConfiguration(conf1); BackupManager.decorateRegionServerConfiguration(conf1); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupDeleteTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupDeleteTable.java index 59d0908..12ba2da 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupDeleteTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupDeleteTable.java @@ -20,23 +20,31 @@ package org.apache.hadoop.hbase.backup; import static org.junit.Assert.assertTrue; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl; +import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.mapreduce.TestLoadIncrementalHFiles; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.junit.Assert; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import com.google.common.collect.Lists; @@ -49,12 +57,24 @@ import com.google.common.collect.Lists; * 6 Incremental backup t1 */ @Category(LargeTests.class) +@RunWith(Parameterized.class) public class TestIncrementalBackupDeleteTable extends TestBackupBase { private static final Log LOG = LogFactory.getLog(TestIncrementalBackupDeleteTable.class); + @Parameterized.Parameters + public static Collection data() { + secure = true; + List params = new ArrayList(); + params.add(new Object[] {Boolean.TRUE}); + return params; + } + + public TestIncrementalBackupDeleteTable(Boolean b) { + } // implement all test cases in 1 test since incremental backup/restore has dependencies @Test public void TestIncBackupDeleteTable() throws Exception { + String testName = "TestIncBackupDeleteTable"; // #1 - create full backup for all tables LOG.info("create full backup image for all tables"); @@ -85,33 +105,22 @@ public class TestIncrementalBackupDeleteTable extends TestBackupBase { admin.disableTable(table2); admin.deleteTable(table2); + int NB_ROWS2 = 20; + LOG.debug("bulk loading into " + testName); + int actual = TestLoadIncrementalHFiles.loadHFiles(testName, table1Desc, TEST_UTIL, famName, + qualName, false, null, new byte[][][] { + new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, + new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, + }, true, false, true, NB_ROWS_IN_BATCH*2, NB_ROWS2); + // #3 - incremental backup for table1 tables = Lists.newArrayList(table1); request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR); String backupIdIncMultiple = client.backupTables(request); assertTrue(checkSucceeded(backupIdIncMultiple)); - // #4 - restore full backup for all tables, without overwrite - TableName[] tablesRestoreFull = new TableName[] { table1, table2 }; - - TableName[] tablesMapFull = new TableName[] { table1_restore, table2_restore }; - - client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupIdFull, false, - tablesRestoreFull, tablesMapFull, false)); - - // #5.1 - check tables for full restore + // #5.1 - check tables for full restore */ HBaseAdmin hAdmin = TEST_UTIL.getHBaseAdmin(); - assertTrue(hAdmin.tableExists(table1_restore)); - assertTrue(hAdmin.tableExists(table2_restore)); - - // #5.2 - checking row count of tables for full restore - HTable hTable = (HTable) conn.getTable(table1_restore); - Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH); - hTable.close(); - - hTable = (HTable) conn.getTable(table2_restore); - Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH); - hTable.close(); // #6 - restore incremental backup for table1 TableName[] tablesRestoreIncMultiple = new TableName[] { table1 }; @@ -119,8 +128,19 @@ public class TestIncrementalBackupDeleteTable extends TestBackupBase { client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple, false, tablesRestoreIncMultiple, tablesMapIncMultiple, true)); - hTable = (HTable) conn.getTable(table1_restore); - Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH * 2); + HTable hTable = (HTable) conn.getTable(table1_restore); + Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH * 2+actual); + request = createBackupRequest(BackupType.FULL, tables, BACKUP_ROOT_DIR); + + backupIdFull = client.backupTables(request); + try (final BackupSystemTable table = new BackupSystemTable(conn)) { + Pair>>>>, List> pair + = table.readBulkloadRows(tables); + assertTrue("map still has " + pair.getSecond().size() + " entries", + pair.getSecond().isEmpty()); + } + assertTrue(checkSucceeded(backupIdFull)); + hTable.close(); admin.close(); conn.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java index a6dacf7..7ae5afc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java @@ -308,13 +308,14 @@ public class TestLoadIncrementalHFiles { runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges, useMap, false); } - private void runTest(String testName, HTableDescriptor htd, BloomType bloomType, - boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap, - boolean copyFiles) throws Exception { + public static int loadHFiles(String testName, HTableDescriptor htd, HBaseTestingUtility util, + byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys, + byte[][][] hfileRanges, boolean useMap, boolean deleteFile, + boolean copyFiles, int initRowCount, int factor) throws Exception { Path dir = util.getDataTestDirOnTestFS(testName); FileSystem fs = util.getTestFileSystem(); dir = dir.makeQualified(fs); - Path familyDir = new Path(dir, Bytes.toString(FAMILY)); + Path familyDir = new Path(dir, Bytes.toString(fam)); int hfileIdx = 0; Map> map = null; @@ -324,26 +325,26 @@ public class TestLoadIncrementalHFiles { } if (useMap) { map = new TreeMap<>(Bytes.BYTES_COMPARATOR); - map.put(FAMILY, list); + map.put(fam, list); } Path last = null; for (byte[][] range : hfileRanges) { byte[] from = range[0]; byte[] to = range[1]; Path path = new Path(familyDir, "hfile_" + hfileIdx++); - HFileTestUtil.createHFile(util.getConfiguration(), fs, path, FAMILY, QUALIFIER, from, to, 1000); + HFileTestUtil.createHFile(util.getConfiguration(), fs, path, fam, qual, from, to, factor); if (useMap) { last = path; list.add(path); } } - int expectedRows = hfileIdx * 1000; + int expectedRows = hfileIdx * factor; - if (preCreateTable || map != null) { + final TableName tableName = htd.getTableName(); + if (!util.getHBaseAdmin().tableExists(tableName) && (preCreateTable || map != null)) { util.getAdmin().createTable(htd, tableSplitKeys); } - final TableName tableName = htd.getTableName(); Configuration conf = util.getConfiguration(); if (copyFiles) { conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true); @@ -351,12 +352,14 @@ public class TestLoadIncrementalHFiles { LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); String [] args= {dir.toString(), tableName.toString()}; if (useMap) { - fs.delete(last); + if (deleteFile) fs.delete(last); Map loaded = loader.run(null, map, tableName); - expectedRows -= 1000; - for (LoadQueueItem item : loaded.keySet()) { - if (item.hfilePath.getName().equals(last.getName())) { - fail(last + " should be missing"); + if (deleteFile) { + expectedRows -= 1000; + for (LoadQueueItem item : loaded.keySet()) { + if (item.hfilePath.getName().equals(last.getName())) { + fail(last + " should be missing"); + } } } } else { @@ -365,19 +368,30 @@ public class TestLoadIncrementalHFiles { if (copyFiles) { for (Path p : list) { - assertTrue(fs.exists(p)); + assertTrue(p + " should exist", fs.exists(p)); } } Table table = util.getConnection().getTable(tableName); try { - assertEquals(expectedRows, util.countRows(table)); + assertEquals(initRowCount + expectedRows, util.countRows(table)); } finally { table.close(); } + return expectedRows; + } + + private void runTest(String testName, HTableDescriptor htd, BloomType bloomType, + boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap, + boolean copyFiles) throws Exception { + loadHFiles(testName, htd, util, FAMILY, QUALIFIER, preCreateTable, tableSplitKeys, + hfileRanges, useMap, true, copyFiles, 0, 1000); + + final TableName tableName = htd.getTableName(); // verify staging folder has been cleaned up Path stagingBasePath = new Path(FSUtils.getRootDir(util.getConfiguration()), HConstants.BULKLOAD_STAGING_DIR_NAME); + FileSystem fs = util.getTestFileSystem(); if(fs.exists(stagingBasePath)) { FileStatus[] files = fs.listStatus(stagingBasePath); for(FileStatus file : files) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupHFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupHFileCleaner.java new file mode 100644 index 0000000..2d9ccf0 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupHFileCleaner.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; + +/** + * Implementation of a file cleaner that checks if an hfile is still referenced by backup before + * deleting it from hfile archive directory. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public class BackupHFileCleaner extends BaseHFileCleanerDelegate implements Abortable { + private static final Log LOG = LogFactory.getLog(BackupHFileCleaner.class); + private boolean stopped = false; + private boolean aborted; + private Configuration conf; + private Connection connection; + private long prevReadFromBackupTbl = 0, // timestamp of most recent read from hbase:backup table + secondPrevReadFromBackupTbl = 0; // timestamp of 2nd most recent read from hbase:backup table + //used by uunit test to skip reading hbase:backup + private boolean checkForFullyBackedUpTables = true; + private List fullyBackedUpTables = null; + + private Set getFilenameFromBulkLoad(Map>[] maps) { + Set filenames = new HashSet(); + for (Map> map : maps) { + if (map == null) continue; + for (List paths : map.values()) { + for (Path p : paths) { + filenames.add(p.getName()); + } + } + } + return filenames; + } + + private Set loadHFileRefs(List tableList) throws IOException { + if (connection == null) { + connection = ConnectionFactory.createConnection(conf); + } + try (BackupSystemTable tbl = new BackupSystemTable(connection)) { + Map>[] res = + tbl.readBulkLoadedFiles(null, tableList); + secondPrevReadFromBackupTbl = prevReadFromBackupTbl; + prevReadFromBackupTbl = EnvironmentEdgeManager.currentTime(); + return getFilenameFromBulkLoad(res); + } + } + + @VisibleForTesting + void setCheckForFullyBackedUpTables(boolean b) { + checkForFullyBackedUpTables = b; + } + @Override + public Iterable getDeletableFiles(Iterable files) { + if (conf == null) { + return files; + } + // obtain the Set of TableName's which have been fully backed up + // so that we filter BulkLoad to be returned from server + if (checkForFullyBackedUpTables) { + if (connection == null) return files; + try (BackupSystemTable tbl = new BackupSystemTable(connection)) { + fullyBackedUpTables = tbl.getTablesForBackupType(BackupType.FULL); + } catch (IOException ioe) { + LOG.error("Failed to get tables which have been fully backed up, skipping checking", ioe); + return Collections.emptyList(); + } + Collections.sort(fullyBackedUpTables); + } + final Set hfileRefs; + try { + hfileRefs = loadHFileRefs(fullyBackedUpTables); + } catch (IOException ioe) { + LOG.error("Failed to read hfile references, skipping checking deletable files", ioe); + return Collections.emptyList(); + } + Iterable deletables = Iterables.filter(files, new Predicate() { + @Override + public boolean apply(FileStatus file) { + // If the file is recent, be conservative and wait for one more scan of hbase:backup table + if (file.getModificationTime() > secondPrevReadFromBackupTbl) { + return false; + } + String hfile = file.getPath().getName(); + boolean foundHFileRef = hfileRefs.contains(hfile); + return !foundHFileRef; + } + }); + return deletables; + } + + @Override + public boolean isFileDeletable(FileStatus fStat) { + // work is done in getDeletableFiles() + return true; + } + + @Override + public void setConf(Configuration config) { + this.conf = config; + this.connection = null; + try { + this.connection = ConnectionFactory.createConnection(conf); + } catch (IOException ioe) { + LOG.error("Couldn't establish connection", ioe); + } + } + + @Override + public void stop(String why) { + if (this.stopped) { + return; + } + if (this.connection != null) { + try { + this.connection.close(); + } catch (IOException ioe) { + LOG.debug("Got " + ioe + " when closing connection"); + } + } + this.stopped = true; + } + + @Override + public boolean isStopped() { + return this.stopped; + } + + @Override + public void abort(String why, Throwable e) { + LOG.warn("Aborting ReplicationHFileCleaner because " + why, e); + this.aborted = true; + stop(why); + } + + @Override + public boolean isAborted() { + return this.aborted; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupHFileCleaner.java new file mode 100644 index 0000000..dfbe106 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupHFileCleaner.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MasterTests.class, SmallTests.class }) +public class TestBackupHFileCleaner { + private static final Log LOG = LogFactory.getLog(TestBackupHFileCleaner.class); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static Configuration conf = TEST_UTIL.getConfiguration(); + private static TableName tableName = TableName.valueOf("backup.hfile.cleaner"); + private static String famName = "fam"; + static FileSystem fs = null; + Path root; + + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true); + TEST_UTIL.startMiniZKCluster(); + TEST_UTIL.startMiniCluster(1); + fs = FileSystem.get(conf); + } + + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + if (fs != null) { + fs.close(); + } + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void setup() throws IOException { + root = TEST_UTIL.getDataTestDirOnTestFS(); + } + + @After + public void cleanup() { + try { + fs.delete(root, true); + } catch (IOException e) { + LOG.warn("Failed to delete files recursively from path " + root); + } + } + + @Test + public void testGetDeletableFiles() throws IOException { + // 1. Create a file + Path file = new Path(root, "testIsFileDeletableWithNoHFileRefs"); + fs.createNewFile(file); + // 2. Assert file is successfully created + assertTrue("Test file not created!", fs.exists(file)); + BackupHFileCleaner cleaner = new BackupHFileCleaner(); + cleaner.setConf(conf); + cleaner.setCheckForFullyBackedUpTables(false); + // 3. Assert that file as is should be deletable + List stats = new ArrayList<>(); + FileStatus stat = fs.getFileStatus(file); + stats.add(stat); + Iterable deletable = cleaner.getDeletableFiles(stats); + deletable = cleaner.getDeletableFiles(stats); + boolean found = false; + for (FileStatus stat1 : deletable) { + if (stat.equals(stat1)) found = true; + } + assertTrue("Cleaner should allow to delete this file as there is no hfile reference " + + "for it.", found); + + // 4. Add the file as bulk load + List list = new ArrayList<>(1); + list.add(file); + try (Connection conn = ConnectionFactory.createConnection(conf); + BackupSystemTable sysTbl = new BackupSystemTable(conn)) { + List sTableList = new ArrayList<>(); + sTableList.add(tableName); + Map>[] maps = new Map[1]; + maps[0] = new HashMap<>(); + maps[0].put(famName.getBytes(), list); + sysTbl.writeBulkLoadedFiles(sTableList, maps, "1"); + } + + // 5. Assert file should not be deletable + deletable = cleaner.getDeletableFiles(stats); + deletable = cleaner.getDeletableFiles(stats); + found = false; + for (FileStatus stat1 : deletable) { + if (stat.equals(stat1)) found = true; + } + assertFalse("Cleaner should not allow to delete this file as there is a hfile reference " + + "for it.", found); + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java new file mode 100644 index 0000000..6c21bd1 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.backup; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.impl.BackupManager; +import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.util.Pair; + +/** + * An Observer to facilitate backup operations + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public class BackupObserver implements RegionObserver { + private static final Log LOG = LogFactory.getLog(BackupObserver.class); + @Override + public boolean postBulkLoadHFile(ObserverContext ctx, + List> stagingFamilyPaths, Map> finalPaths, + boolean hasLoaded) throws IOException { + Configuration cfg = ctx.getEnvironment().getConfiguration(); + if (!hasLoaded) { + // there is no need to record state + return hasLoaded; + } + if (finalPaths == null || !BackupManager.isBackupEnabled(cfg)) { + LOG.debug("skipping recording bulk load in postBulkLoadHFile"); + return hasLoaded; + } + try (Connection connection = ConnectionFactory.createConnection(cfg); + BackupSystemTable tbl = new BackupSystemTable(connection)) { + List fullyBackedUpTables = tbl.getTablesForBackupType(BackupType.FULL); + HRegionInfo info = ctx.getEnvironment().getRegionInfo(); + TableName tableName = info.getTable(); + if (!fullyBackedUpTables.contains(tableName)) { + if (LOG.isTraceEnabled()) { + LOG.trace(tableName + " has not gone thru full backup"); + } + return hasLoaded; + } + tbl.writePathsPostBulkLoad(tableName, info.getEncodedNameAsBytes(), finalPaths); + return hasLoaded; + } catch (IOException ioe) { + LOG.error("Failed to get tables which have been fully backed up", ioe); + return false; + } + } + @Override + public void preCommitStoreFile(final ObserverContext ctx, + final byte[] family, final List> pairs) throws IOException { + Configuration cfg = ctx.getEnvironment().getConfiguration(); + if (pairs == null || pairs.isEmpty() || !BackupManager.isBackupEnabled(cfg)) { + LOG.debug("skipping recording bulk load in preCommitStoreFile"); + return; + } + try (Connection connection = ConnectionFactory.createConnection(cfg); + BackupSystemTable tbl = new BackupSystemTable(connection)) { + List fullyBackedUpTables = tbl.getTablesForBackupType(BackupType.FULL); + HRegionInfo info = ctx.getEnvironment().getRegionInfo(); + TableName tableName = info.getTable(); + if (!fullyBackedUpTables.contains(tableName)) { + if (LOG.isTraceEnabled()) { + LOG.trace(tableName + " has not gone thru full backup"); + } + return; + } + tbl.writeFilesForBulkLoadPreCommit(tableName, info.getEncodedNameAsBytes(), family, pairs); + return; + } + } +}