diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java index 5c869f6..38acd66 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java @@ -75,6 +75,16 @@ public interface BackupRestoreConstants { public static final String BACKUPID_PREFIX = "backup_"; + public static final String ZOOKEEPER_ZNODE_BACKUP_KEY = "zookeeper.znode.backup"; + public static final String ZOOKEEPER_ZNODE_BACKUP_DEFAULT = "backup"; + public static final String ZOOKEEPER_ZNODE_BACKUP_HFILE_REFS_KEY = + "zookeeper.znode.backup.hfile.refs"; + public static final String ZOOKEEPER_ZNODE_BACKUP_HFILE_REFS_DEFAULT = "hfile-refs"; + // znode for full backup + public static final String ZOOKEEPER_ZNODE_BACKUP_FULL_DEFAULT = "full"; + public static final String ZOOKEEPER_ZNODE_BACKUP_RESTORE_DEFAULT = "restore"; + public static final String ZOOKEEPER_ZNODE_BACKUP_RESTORE_SEP = ";"; + /** * Backup/Restore constants */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java index 0aad830..1b8a69e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; @@ -418,6 +419,11 @@ public class BackupManager implements Closeable { systemTable.writeBackupStartCode(startCode, backupContext.getTargetRootDir()); } + public void writeBulkLoadedFiles(List sTableList, Map>[] maps) + throws IOException { + systemTable.writeBulkLoadedFiles(sTableList, maps, backupContext.getBackupId()); + } + /** * Get the RS log information after the last log roll from hbase:backup. * @return RS log info diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java index e32d7d4..035ce14 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java @@ -21,18 +21,22 @@ import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.TreeMap; import java.util.TreeSet; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -42,6 +46,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupInfo; import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; import org.apache.hadoop.hbase.backup.BackupRestoreConstants; +import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.util.BackupClientUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; @@ -57,6 +62,7 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.BackupProtos; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** * This class provides 'hbase:backup' table API @@ -195,6 +201,49 @@ public final class BackupSystemTable implements Closeable { return new String(val); } } + public Map>[] readBulkLoadedFiles(String backupId, List sTableList) + throws IOException{ + Scan scan = BackupSystemTableHelper.createScanForBulkLoadedFiles(backupId); + Map>[] mapForSrc = new Map[sTableList.size()]; + try (Table table = connection.getTable(tableName); + ResultScanner scanner = table.getScanner(scan)) { + Result res = null; + while ((res = scanner.next()) != null) { + res.advance(); + TableName tbl = null; + byte[] fam = null; + String path = null; + byte [] qualifier = null; + for (Cell cell : res.listCells()) { + qualifier = CellUtil.cloneQualifier(cell); + if (CellComparator.compareQualifiers(cell, BackupSystemTableHelper.TBL_COL, 0, + BackupSystemTableHelper.TBL_COL.length) == 0) { + tbl = TableName.valueOf(CellUtil.cloneValue(cell)); + } else if (CellComparator.compareQualifiers(cell, BackupSystemTableHelper.FAM_COL, 0, + BackupSystemTableHelper.FAM_COL.length) == 0) { + fam = CellUtil.cloneValue(cell); + } else if (CellComparator.compareQualifiers(cell, BackupSystemTableHelper.PATH_COL, 0, + BackupSystemTableHelper.PATH_COL.length) == 0) { + path = Bytes.toString(CellUtil.cloneValue(cell)); + } + } + int srcIdx = IncrementalTableBackupClient.getIndex(tbl, sTableList); + if (mapForSrc[srcIdx] == null) { + mapForSrc[srcIdx] = new TreeMap<>(Bytes.BYTES_COMPARATOR); + } + List files; + if (!mapForSrc[srcIdx].containsKey(fam)) { + files = new ArrayList(); + mapForSrc[srcIdx].put(fam, files); + } else { + files = mapForSrc[srcIdx].get(fam); + } + files.add(new Path(path)); + LOG.debug("found bulk loaded file : " + tbl + " " + Bytes.toString(fam) + " " + path); + }; + return mapForSrc; + } + } /** * Write the start code (timestamp) to hbase:backup. If passed in null, then write 0 byte. @@ -212,6 +261,31 @@ public final class BackupSystemTable implements Closeable { } } + public void writeBulkLoadedFiles(List sTableList, Map>[] maps, + String backupId) throws IOException { + try (Table table = connection.getTable(tableName)) { + long ts = EnvironmentEdgeManager.currentTime(); + int cnt = 0; + List puts = new ArrayList<>(); + for (int idx = 0; idx < maps.length; idx++) { + Map> map = maps[idx]; + TableName tn = sTableList.get(idx); + if (map == null) continue; + for (Map.Entry> entry: map.entrySet()) { + byte[] fam = entry.getKey(); + List paths = entry.getValue(); + for (Path p : paths) { + Put put = BackupSystemTableHelper.createPutForBulkLoadedFile(tn, fam, p.toString(), + backupId, ts, cnt++); + puts.add(put); + } + } + } + if (!puts.isEmpty()) { + table.put(puts); + } + } + } /** * Get the Region Servers log information after the last log roll from hbase:backup. * @param backupRoot root directory path to backup @@ -335,6 +409,19 @@ public final class BackupSystemTable implements Closeable { } + /* + * Retrieve TableName's for completed backup of given type + */ + public Set getTablesForBackupType(BackupType type) throws IOException { + Set names = new HashSet<>(); + List infos = getBackupHistory(true); + for (BackupInfo info : infos) { + if (info.getType() != type) continue; + names.addAll(info.getTableNames()); + } + return names; + } + /** * Get history for backup destination * @param backupRoot - backup destination diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTableHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTableHelper.java index f5911b4..fb4f911 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTableHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTableHelper.java @@ -63,6 +63,13 @@ public final class BackupSystemTableHelper { private final static String INCR_BACKUP_SET = "incrbackupset:"; private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:"; private final static String RS_LOG_TS_PREFIX = "rslogts:"; + private final static String BULK_LOAD_PREFIX = "bulk:"; + // separator between BULK_LOAD_PREFIX and ordinals + private final static String BULK_LOAD_SEP = "-"; + final static byte[] TBL_COL = "tbl".getBytes(); + final static byte[] FAM_COL = "fam".getBytes(); + final static byte[] PATH_COL = "path".getBytes(); + private final static String WALS_PREFIX = "wals:"; private final static String SET_KEY_PREFIX = "backupset:"; @@ -149,6 +156,27 @@ public final class BackupSystemTableHelper { return put; } + static Scan createScanForBulkLoadedFiles(String backupId) throws IOException { + Scan scan = new Scan(); + byte[] startRow = rowkey(BULK_LOAD_PREFIX, backupId+BULK_LOAD_SEP); + byte[] stopRow = Arrays.copyOf(startRow, startRow.length); + stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); + scan.setStartRow(startRow); + scan.setStopRow(stopRow); + scan.addFamily(BackupSystemTable.META_FAMILY); + scan.setMaxVersions(1); + return scan; + } + + static Put createPutForBulkLoadedFile(TableName tn, byte[] fam, String p, String backupId, + long ts, int idx) { + Put put = new Put(rowkey(BULK_LOAD_PREFIX, backupId+BULK_LOAD_SEP+ts+BULK_LOAD_SEP+idx)); + put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, tn.getName()); + put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, fam); + put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, p.getBytes()); + return put; + } + /** * Creates Get to retrieve incremental backup table set from hbase:backup * @return get operation diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java index f1f09cc..ad3477b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java @@ -18,11 +18,15 @@ package org.apache.hadoop.hbase.backup.impl; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT; +import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT; + import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,6 +35,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.backup.BackupCopyTask; import org.apache.hadoop.hbase.backup.BackupInfo; import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase; @@ -40,8 +45,6 @@ import org.apache.hadoop.hbase.backup.BackupRestoreConstants; import org.apache.hadoop.hbase.backup.BackupRestoreServerFactory; import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.HBackupFileSystem; -import org.apache.hadoop.hbase.backup.impl.BackupException; -import org.apache.hadoop.hbase.backup.impl.BackupManifest; import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; import org.apache.hadoop.hbase.backup.util.BackupClientUtil; @@ -51,8 +54,13 @@ import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; @InterfaceAudience.Private public class FullTableBackupClient { @@ -450,6 +458,43 @@ public class FullTableBackupClient { return backupSnapshot; } + void recordInZk(Set tTableArray) throws IOException { + ZooKeeperWatcher zkw; + try { + zkw = new ZooKeeperWatcher(conf, "backup-tables", null, false); + } catch (ZooKeeperConnectionException zkce) { + throw new IOException("could not create watcher", zkce); + } catch (IOException ioe) { + throw new IOException("could not create watcher", ioe); + } + String backupZNodeName = conf.get(BackupRestoreConstants.ZOOKEEPER_ZNODE_BACKUP_KEY, + BackupRestoreConstants.ZOOKEEPER_ZNODE_BACKUP_DEFAULT); + String backupZNode = ZKUtil.joinZNode( + conf.get(ZOOKEEPER_ZNODE_PARENT, DEFAULT_ZOOKEEPER_ZNODE_PARENT), backupZNodeName); + String fullZNode = ZKUtil.joinZNode(backupZNode, + BackupRestoreConstants.ZOOKEEPER_ZNODE_BACKUP_FULL_DEFAULT); + StringBuilder tables = new StringBuilder(); + boolean isFirst = true; + for (TableName tn : tTableArray) { + if (!isFirst) { + tables.append(BackupRestoreConstants.ZOOKEEPER_ZNODE_BACKUP_RESTORE_SEP); + } + tables.append(tn.toString()); + isFirst = false; + } + try { + String s = tables.toString(); + LOG.debug("writing " + s + " to full " + fullZNode); + ZKUtil.createEphemeralNodeAndWatch(zkw, fullZNode, s.getBytes()); + Stat stat = new Stat(); + byte[] data = ZKUtil.getDataNoWatch(zkw, fullZNode, stat); + LOG.debug("written " + Bytes.toString(data) + " to full " + fullZNode + " by " + zkw); + } catch (KeeperException ke) { + throw new IOException(ke); + } finally { + zkw.close(); + } + } /** * Backup request execution * @throws IOException @@ -469,6 +514,7 @@ public class FullTableBackupClient { if (firstBackup) { // This is our first backup. Let's put some marker on ZK so that we can hold the logs // while we do the backup. + recordInZk(backupContext.getTables()); backupManager.writeBackupStartCode(0L); } // We roll log here before we do the snapshot. It is possible there is duplicate data diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index 8acace0..fc94257 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -18,16 +18,22 @@ package org.apache.hadoop.hbase.backup.impl; +import java.io.FileNotFoundException; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.TreeMap; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; @@ -42,6 +48,9 @@ import org.apache.hadoop.hbase.backup.util.BackupClientUtil; import org.apache.hadoop.hbase.backup.util.BackupServerUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.HFileArchiveUtil; @InterfaceAudience.Private public class IncrementalTableBackupClient { @@ -172,6 +181,87 @@ public class IncrementalTableBackupClient { return list; } + static int getIndex(TableName tbl, List sTableList) { + for (int i = 0; i < sTableList.size(); i++) { + if (tbl.equals(sTableList.get(i))) { + return i; + } + } + return -1; + } + + Map>[] handleBulkLoad(List sTableList) throws IOException { + Map>[] mapForSrc = new Map[sTableList.size()]; + Map>>> map = + BulkLoadHandler.getHFileRefs(null, sTableList, conf); + FileSystem fs = FileSystem.get(conf); + FileSystem tgtFs; + try { + tgtFs = FileSystem.get(new URI(backupContext.getTargetRootDir()), conf); + } catch (URISyntaxException use) { + throw new IOException("Unable to get FileSystem", use); + } + Path rootdir = FSUtils.getRootDir(conf); + Path tgtRoot = new Path(new Path(backupContext.getTargetRootDir()), backupId); + for (Map.Entry>>> tblEntry : map.entrySet()) { + TableName srcTable = tblEntry.getKey(); + int srcIdx = getIndex(srcTable, sTableList); + if (srcIdx < 0) { + LOG.warn("Couldn't find " + srcTable + " in source table List"); + continue; + } + if (mapForSrc[srcIdx] == null) { + mapForSrc[srcIdx] = new TreeMap>(Bytes.BYTES_COMPARATOR); + } + Path tblDir = FSUtils.getTableDir(rootdir, srcTable); + Path tgtTable = new Path(new Path(tgtRoot, srcTable.getNamespaceAsString()), + srcTable.getQualifierAsString()); + for (Map.Entry>> regionEntry : tblEntry.getValue().entrySet()){ + String regionName = regionEntry.getKey(); + Path regionDir = new Path(tblDir, regionName); + // map from family to List of hfiles + for (Map.Entry> famEntry : regionEntry.getValue().entrySet()) { + String fam = famEntry.getKey(); + Path famDir = new Path(regionDir, fam); + List files; + if (!mapForSrc[srcIdx].containsKey(fam)) { + files = new ArrayList(); + mapForSrc[srcIdx].put(fam.getBytes(), files); + } else { + files = mapForSrc[srcIdx].get(fam); + } + Path archive = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, regionName, fam); + String tblName = srcTable.getQualifierAsString(); + Path tgtFam = new Path(new Path(tgtTable, regionName), fam); + if (!tgtFs.mkdirs(tgtFam)) { + throw new IOException("couldn't create " + tgtFam); + } + for (String file : famEntry.getValue()) { + Path p = new Path(famDir, file); + Path tgt = new Path(tgtFam, file); + if (fs.exists(p)) { + LOG.debug("found bulk hfile " + file + " in " + famDir + " for " + tblName); + try { + LOG.debug("copying " + p + " to " + tgt); + FileUtil.copy(fs, p, tgtFs, tgt, false,conf); + } catch (FileNotFoundException e) { + LOG.debug("copying archive " + archive + " to " + tgt); + FileUtil.copy(fs, archive, tgtFs, tgt, false, conf); + } + } else { + LOG.debug("copying archive " + archive + " to " + tgt); + FileUtil.copy(fs, archive, tgtFs, tgt, false, conf); + } + files.add(tgt); + } + } + } + } + BulkLoadHandler.removeHFileRefs(null, map, conf); + backupManager.writeBulkLoadedFiles(sTableList, mapForSrc); + return mapForSrc; + } + public void execute() throws IOException { // case PREPARE_INCREMENTAL: @@ -222,6 +312,8 @@ public class IncrementalTableBackupClient { BackupClientUtil.getMinValue(BackupServerUtil .getRSLogTimestampMins(newTableSetTimestampMap)); backupManager.writeBackupStartCode(newStartCode); + + handleBulkLoad(backupManager.getBackupContext().getTableNames()); // backup complete FullTableBackupClient.completeBackup(conn, backupContext, backupManager, BackupType.INCREMENTAL, conf); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java index 91f2d68..9f9490e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java @@ -18,10 +18,15 @@ package org.apache.hadoop.hbase.backup.impl; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT; +import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT; + import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.TreeSet; import org.apache.commons.lang.StringUtils; @@ -30,15 +35,23 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.backup.BackupRestoreConstants; import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.HBackupFileSystem; import org.apache.hadoop.hbase.backup.RestoreRequest; -import org.apache.hadoop.hbase.backup.impl.BackupManifest; import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; +import org.apache.hadoop.hbase.backup.mapreduce.MapReduceRestoreTask; import org.apache.hadoop.hbase.backup.util.RestoreServerUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; @InterfaceAudience.Private public class RestoreTablesClient { @@ -47,6 +60,7 @@ public class RestoreTablesClient { private Configuration conf; private Connection conn; private String backupId; + private String fullBackupId; private TableName[] sTableArray; private TableName[] tTableArray; private String targetRootDir; @@ -142,6 +156,7 @@ public class RestoreTablesClient { // We need hFS only for full restore (see the code) BackupManifest manifest = HBackupFileSystem.getManifest(sTable, conf, backupRoot, backupId); if (manifest.getType() == BackupType.FULL) { + fullBackupId = manifest.getBackupImage().getBackupId(); LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from full" + " backup image " + tableBackupPath.toString()); restoreTool.fullRestoreTable(conn, tableBackupPath, sTable, tTable, truncateIfExists, @@ -171,7 +186,6 @@ public class RestoreTablesClient { restoreTool.incrementalRestoreTable(conn, tableBackupPath, paths, new TableName[] { sTable }, new TableName[] { tTable }, lastIncrBackupId); LOG.info(sTable + " has been successfully restored to " + tTable); - } /** @@ -212,15 +226,86 @@ public class RestoreTablesClient { } } } + try (BackupSystemTable table = new BackupSystemTable(conn)) { + List sTableList = Arrays.asList(sTableArray); + Map>[] mapForSrc = table.readBulkLoadedFiles(backupId, sTableList); + int loaderResult = 0; + LoadIncrementalHFiles loader = MapReduceRestoreTask.createLoader(conf); + for (int i = 0; i < sTableList.size(); i++) { + if (mapForSrc[i] != null && !mapForSrc[i].isEmpty()) { + loaderResult = loader.run(null, mapForSrc[i], tTableArray[i]); + LOG.debug("bulk loading " + sTableList.get(i) + " to " + tTableArray[i]); + if (loaderResult != 0) { + String msg = "Couldn't bulk load for " + sTableList.get(i) + " to " + tTableArray[i]; + LOG.error(msg); + throw new IOException(msg); + } + } + } + } } catch (Exception e) { LOG.error("Failed", e); + if (e instanceof IOException) { + throw e; + } throw new IOException(e); } LOG.debug("restoreStage finished"); } - public void execute() throws IOException { + static long getTsFromBackupId(String backupId) { + if (backupId == null) { + return 0; + } + return Long.valueOf(backupId.substring(backupId.lastIndexOf("_")+1)); + } + + static boolean withinRange(long a, long lower, long upper) { + if (a < lower || a > upper) { + return false; + } + return true; + } + void recordInZk(TableName[] tTableArray) throws IOException { + ZooKeeperWatcher zkw; + try { + zkw = new ZooKeeperWatcher(conf, "restore-tables", null, false); + } catch (ZooKeeperConnectionException zkce) { + throw new IOException("could not create watcher", zkce); + } catch (IOException ioe) { + throw new IOException("could not create watcher", ioe); + } + String backupZNodeName = conf.get(BackupRestoreConstants.ZOOKEEPER_ZNODE_BACKUP_KEY, + BackupRestoreConstants.ZOOKEEPER_ZNODE_BACKUP_DEFAULT); + String backupZNode = ZKUtil.joinZNode( + conf.get(ZOOKEEPER_ZNODE_PARENT, DEFAULT_ZOOKEEPER_ZNODE_PARENT), backupZNodeName); + String restoreZNode = ZKUtil.joinZNode(backupZNode, + BackupRestoreConstants.ZOOKEEPER_ZNODE_BACKUP_RESTORE_DEFAULT); + StringBuilder tables = new StringBuilder(); + boolean isFirst = true; + for (TableName tn : tTableArray) { + if (!isFirst) { + tables.append(BackupRestoreConstants.ZOOKEEPER_ZNODE_BACKUP_RESTORE_SEP); + } + tables.append(tn.toString()); + isFirst = false; + } + try { + String s = tables.toString(); + LOG.debug("writing " + s + " to " + restoreZNode); + ZKUtil.createEphemeralNodeAndWatch(zkw, restoreZNode, s.getBytes()); + Stat stat = new Stat(); + byte[] data = ZKUtil.getDataNoWatch(zkw, restoreZNode, stat); + LOG.debug("written " + Bytes.toString(data) + " to " + restoreZNode + " by " + zkw); + } catch (KeeperException ke) { + throw new IOException(ke); + } finally { + zkw.close(); + } + } + + public void execute() throws IOException { // case VALIDATION: // check the target tables checkTargetTables(tTableArray, isOverwrite); @@ -230,7 +315,7 @@ public class RestoreTablesClient { Path rootPath = new Path(targetRootDir); HBackupFileSystem.checkImageManifestExist(backupManifestMap, sTableArray, conf, rootPath, backupId); + recordInZk(tTableArray); restore(backupManifestMap, sTableArray, tTableArray, isOverwrite); } - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreTask.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreTask.java index 8d9f5b4..317822f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreTask.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreTask.java @@ -88,7 +88,7 @@ public class MapReduceRestoreTask implements RestoreTask { result = player.run(playerArgs); if (succeeded(result)) { // do bulk load - LoadIncrementalHFiles loader = createLoader(); + LoadIncrementalHFiles loader = createLoader(getConf()); if (LOG.isDebugEnabled()) { LOG.debug("Restoring HFiles from directory " + bulkOutputPath); } @@ -124,13 +124,13 @@ public class MapReduceRestoreTask implements RestoreTask { return result == 0; } - private LoadIncrementalHFiles createLoader() throws IOException { + public static LoadIncrementalHFiles createLoader(Configuration config) throws IOException { // set configuration for restore: // LoadIncrementalHFile needs more time // hbase.rpc.timeout 600000 // calculates Integer milliSecInHour = 3600000; - Configuration conf = new Configuration(getConf()); + Configuration conf = new Configuration(config); conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, milliSecInHour); // By default, it is 32 and loader will fail if # of files in any region exceed this diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index b1ed43c..909d8a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -138,7 +138,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { initialize(); } - private void initialize() throws Exception { + private void initialize() throws IOException { if (initalized) { return; } @@ -1098,7 +1098,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * If the table is created for the first time, then "completebulkload" reads the files twice. * More modifications necessary if we want to avoid doing it. */ - private void createTable(TableName tableName, String dirPath, Admin admin) throws Exception { + private void createTable(TableName tableName, String dirPath, Admin admin) throws IOException { final Path hfofDir = new Path(dirPath); final FileSystem fs = hfofDir.getFileSystem(getConf()); @@ -1151,7 +1151,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { LOG.info("Table "+ tableName +" is available!!"); } - public int run(String dirPath, Map> map, TableName tableName) throws Exception{ + public int run(String dirPath, Map> map, TableName tableName) throws IOException{ initialize(); try (Connection connection = ConnectionFactory.createConnection(getConf()); Admin admin = connection.getAdmin()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index eb0c9c0..3453aa5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -78,6 +78,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.YouAreDeadException; import org.apache.hadoop.hbase.ZNodeClearer; import org.apache.hadoop.hbase.backup.impl.BackupManager; +import org.apache.hadoop.hbase.backup.impl.BulkLoadHandler; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; @@ -255,6 +256,7 @@ public class HRegionServer extends HasThread implements * Go here to get table descriptors. */ protected TableDescriptors tableDescriptors; + protected BulkLoadHandler loader; // Replication services. If no replication, this handler will be null. protected ReplicationSourceService replicationSourceHandler; @@ -958,6 +960,7 @@ public class HRegionServer extends HasThread implements // We registered with the Master. Go into run mode. long lastMsg = System.currentTimeMillis(); long oldRequestCount = -1; + loader.init(); // The main run loop. while (!isStopped() && isHealthy()) { if (!isClusterUp()) { @@ -1664,6 +1667,8 @@ public class HRegionServer extends HasThread implements // listeners the wal factory will add to wals it creates. final List listeners = new ArrayList(); listeners.add(new MetricsWAL()); + loader = new BulkLoadHandler(zooKeeper, conf, getConnection()); + listeners.add(loader); if (this.replicationSourceHandler != null && this.replicationSourceHandler.getWALActionsListener() != null) { // Replication handler is an implementation of WALActionsListener. @@ -2186,6 +2191,7 @@ public class HRegionServer extends HasThread implements * have already been called. */ protected void stopServiceThreads() { + loader.close(); // clean up the scheduled chores if (this.choreService != null) choreService.shutdown(); if (this.nonceManagerChore != null) nonceManagerChore.cancel(true); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java index a235696..4a09fe9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java @@ -153,4 +153,20 @@ public class HFileArchiveUtil { private static Path getArchivePath(final Path rootdir) { return new Path(rootdir, HConstants.HFILE_ARCHIVE_DIRECTORY); } + + /* + * @return table name given archive file path + */ + public static TableName getTableName(Path archivePath) { + Path p = archivePath; + String tbl = null; + // namespace is the 4th parent of file + for (int i = 0; i < 5; i++) { + if (p == null) return null; + if (i == 3) tbl = p.getName(); + p = p.getParent(); + } + if (p == null) return null; + return TableName.valueOf(p.getName(), tbl); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java index 2da7871..7fd6eb7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java @@ -49,6 +49,9 @@ import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.mapreduce.HadoopSecurityEnabledUserProviderForTesting; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.security.access.SecureTestUtil; import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WALFactory; @@ -89,6 +92,7 @@ public class TestBackupBase { protected static String BACKUP_ROOT_DIR = "/backupUT"; protected static String BACKUP_REMOTE_ROOT_DIR = "/backupUT"; protected static String provider = "defaultProvider"; + protected static boolean secure = false; /** * @throws java.lang.Exception @@ -97,6 +101,13 @@ public class TestBackupBase { public static void setUpBeforeClass() throws Exception { TEST_UTIL = new HBaseTestingUtility(); conf1 = TEST_UTIL.getConfiguration(); + if (secure) { + // set the always on security provider + UserProvider.setUserProviderForTesting(TEST_UTIL.getConfiguration(), + HadoopSecurityEnabledUserProviderForTesting.class); + // setup configuration + SecureTestUtil.enableSecurity(TEST_UTIL.getConfiguration()); + } conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); // Set MultiWAL (with 2 default WAL files per RS) conf1.set(WALFactory.WAL_PROVIDER, provider); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestFullRestore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestFullRestore.java index bd2bfff..f6e01a8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestFullRestore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestFullRestore.java @@ -20,6 +20,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.impl.BulkLoadHandler; import org.apache.hadoop.hbase.backup.util.RestoreServerUtil; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.testclassification.LargeTests; @@ -33,6 +34,9 @@ import com.google.common.collect.Lists; public class TestFullRestore extends TestBackupBase { private static final Log LOG = LogFactory.getLog(TestFullRestore.class); + static { + //BulkLoadHandler.disable(); + } /** * Verify that a single table is restored to a new table diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupDeleteTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupDeleteTable.java index 0a73888..7b26b04 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupDeleteTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupDeleteTable.java @@ -20,11 +20,14 @@ package org.apache.hadoop.hbase.backup; import static org.junit.Assert.assertTrue; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; import org.apache.hadoop.hbase.backup.impl.HBaseBackupAdmin; import org.apache.hadoop.hbase.backup.util.RestoreServerUtil; import org.apache.hadoop.hbase.client.Connection; @@ -32,12 +35,15 @@ import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.mapreduce.TestLoadIncrementalHFiles; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; import org.hamcrest.CoreMatchers; import org.junit.Assert; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import com.google.common.collect.Lists; @@ -51,11 +57,25 @@ import com.google.common.collect.Lists; * 6 Incremental backup t1 */ @Category(LargeTests.class) +@RunWith(Parameterized.class) public class TestIncrementalBackupDeleteTable extends TestBackupBase { private static final Log LOG = LogFactory.getLog(TestIncrementalBackupDeleteTable.class); + + @Parameterized.Parameters + public static Collection data() { + secure = true; + List params = new ArrayList(); + params.add(new Object[] {Boolean.TRUE}); + return params; + } + + public TestIncrementalBackupDeleteTable(Boolean b) { + } + //implement all test cases in 1 test since incremental backup/restore has dependencies @Test public void TestIncBackupDeleteTable() throws Exception { + String testName = "TestIncBackupDeleteTable"; // #1 - create full backup for all tables LOG.info("create full backup image for all tables"); @@ -86,7 +106,14 @@ public class TestIncrementalBackupDeleteTable extends TestBackupBase { // Delete table table2 admin.disableTable(table2); admin.deleteTable(table2); - + + int NB_ROWS2 = 20; + int actual = TestLoadIncrementalHFiles.loadHFiles(testName, table1Desc, TEST_UTIL, famName, + qualName, false, null, new byte[][][] { + new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, + new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, + }, true, NB_ROWS_IN_BATCH*2, NB_ROWS2); + // #3 - incremental backup for table1 tables = Lists.newArrayList(table1); request = new BackupRequest(); @@ -95,32 +122,7 @@ public class TestIncrementalBackupDeleteTable extends TestBackupBase { String backupIdIncMultiple = client.backupTables(request); assertTrue(checkSucceeded(backupIdIncMultiple)); - // #4 - restore full backup for all tables, without overwrite - TableName[] tablesRestoreFull = - new TableName[] { table1, table2}; - - TableName[] tablesMapFull = - new TableName[] { table1_restore, table2_restore }; - - client.restore(RestoreServerUtil.createRestoreRequest(BACKUP_ROOT_DIR, backupIdFull, false, - tablesRestoreFull, - tablesMapFull, false)); - - // #5.1 - check tables for full restore HBaseAdmin hAdmin = TEST_UTIL.getHBaseAdmin(); - assertTrue(hAdmin.tableExists(table1_restore)); - assertTrue(hAdmin.tableExists(table2_restore)); - - - // #5.2 - checking row count of tables for full restore - HTable hTable = (HTable) conn.getTable(table1_restore); - Assert.assertThat(TEST_UTIL.countRows(hTable), CoreMatchers.equalTo(NB_ROWS_IN_BATCH)); - hTable.close(); - - hTable = (HTable) conn.getTable(table2_restore); - Assert.assertThat(TEST_UTIL.countRows(hTable), CoreMatchers.equalTo(NB_ROWS_IN_BATCH)); - hTable.close(); - // #6 - restore incremental backup for table1 TableName[] tablesRestoreIncMultiple = @@ -130,8 +132,10 @@ public class TestIncrementalBackupDeleteTable extends TestBackupBase { client.restore(RestoreServerUtil.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple, false, tablesRestoreIncMultiple, tablesMapIncMultiple, true)); - hTable = (HTable) conn.getTable(table1_restore); - Assert.assertThat(TEST_UTIL.countRows(hTable), CoreMatchers.equalTo(NB_ROWS_IN_BATCH * 2)); + HTable hTable = (HTable) conn.getTable(table1_restore); + Assert.assertThat(TEST_UTIL.countRows(hTable), CoreMatchers.equalTo(NB_ROWS_IN_BATCH * 2 + + actual)); + hTable.close(); admin.close(); conn.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java index 5678d0d..47315bc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java @@ -293,13 +293,13 @@ public class TestLoadIncrementalHFiles { runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges, useMap); } - private void runTest(String testName, HTableDescriptor htd, BloomType bloomType, - boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap) - throws Exception { + public static int loadHFiles(String testName, HTableDescriptor htd, HBaseTestingUtility util, + byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys, + byte[][][] hfileRanges, boolean useMap, int initRowCount, int factor) throws Exception { Path dir = util.getDataTestDirOnTestFS(testName); FileSystem fs = util.getTestFileSystem(); dir = dir.makeQualified(fs); - Path familyDir = new Path(dir, Bytes.toString(FAMILY)); + Path familyDir = new Path(dir, Bytes.toString(fam)); int hfileIdx = 0; Map> map = null; @@ -307,24 +307,25 @@ public class TestLoadIncrementalHFiles { if (useMap) { map = new TreeMap>(Bytes.BYTES_COMPARATOR); list = new ArrayList<>(); - map.put(FAMILY, list); + map.put(fam, list); } for (byte[][] range : hfileRanges) { byte[] from = range[0]; byte[] to = range[1]; Path path = new Path(familyDir, "hfile_" + hfileIdx++); - HFileTestUtil.createHFile(util.getConfiguration(), fs, path, FAMILY, QUALIFIER, from, to, 1000); + HFileTestUtil.createHFile(util.getConfiguration(), fs, path, fam, qual, from, to, + factor); if (useMap) { list.add(path); } } - int expectedRows = hfileIdx * 1000; + int expectedRows = hfileIdx * factor; - if (preCreateTable || map != null) { + final TableName tableName = htd.getTableName(); + if (!util.getHBaseAdmin().tableExists(tableName) && (preCreateTable || map != null)) { util.getHBaseAdmin().createTable(htd, tableSplitKeys); } - final TableName tableName = htd.getTableName(); LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()); String [] args= {dir.toString(), tableName.toString()}; if (useMap) { @@ -332,16 +333,26 @@ public class TestLoadIncrementalHFiles { } else { loader.run(args); } - Table table = util.getConnection().getTable(tableName); try { - assertEquals(expectedRows, util.countRows(table)); + assertEquals(initRowCount + expectedRows, util.countRows(table)); } finally { table.close(); } + return expectedRows; + } + + private void runTest(String testName, HTableDescriptor htd, BloomType bloomType, + boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap) + throws Exception { + loadHFiles(testName, htd, util, FAMILY, QUALIFIER, preCreateTable, tableSplitKeys, + hfileRanges, useMap, 0, 1000); + + final TableName tableName = htd.getTableName(); // verify staging folder has been cleaned up Path stagingBasePath = SecureBulkLoadUtil.getBaseStagingDir(util.getConfiguration()); + FileSystem fs = util.getTestFileSystem(); if(fs.exists(stagingBasePath)) { FileStatus[] files = fs.listStatus(stagingBasePath); for(FileStatus file : files) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupHFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupHFileCleaner.java new file mode 100644 index 0000000..531eab4 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupHFileCleaner.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.impl; + +import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT; +import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.backup.BackupRestoreConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate; +import org.apache.hadoop.hbase.util.HFileArchiveUtil; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; + +/** + * Implementation of a file cleaner that checks if an hfile is still referenced by backup before + * deleting it from hfile archive directory. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public class BackupHFileCleaner extends BaseHFileCleanerDelegate implements Abortable { + private static final Log LOG = LogFactory.getLog(BackupHFileCleaner.class); + private boolean stopped = false; + private boolean aborted; + private ZooKeeperWatcher zkw; + private static final String ZK_NAME = "hfile-ref-cleaner"; + + public BackupHFileCleaner(Configuration config) { + setConf(config); + try { + zkw = new ZooKeeperWatcher(getConf(), "hfile-ref-cleaner", null, false); + } catch (ZooKeeperConnectionException zkce) { + LOG.debug("could not create watcher", zkce); + } catch (IOException ioe) { + LOG.debug("could not create watcher", ioe); + } + } + + Set getHFileRefs(List sTableList) throws IOException { + if (zkw == null) { + try { + zkw = new ZooKeeperWatcher(getConf(), ZK_NAME, null, false); + } catch (ZooKeeperConnectionException zkce) { + LOG.debug("could not create watcher", zkce); + throw new IOException(zkce); + } catch (IOException ioe) { + LOG.debug("could not create watcher", ioe); + throw ioe; + } + } + String backupZNodeName = getConf().get(BackupRestoreConstants.ZOOKEEPER_ZNODE_BACKUP_KEY, + BackupRestoreConstants.ZOOKEEPER_ZNODE_BACKUP_DEFAULT); + String hfileRefsZNodeName = getConf().get( + BackupRestoreConstants.ZOOKEEPER_ZNODE_BACKUP_HFILE_REFS_KEY, + BackupRestoreConstants.ZOOKEEPER_ZNODE_BACKUP_HFILE_REFS_DEFAULT); + String backupZNode = ZKUtil.joinZNode( + getConf().get(ZOOKEEPER_ZNODE_PARENT, DEFAULT_ZOOKEEPER_ZNODE_PARENT), backupZNodeName); + String hfileRefsZNode = ZKUtil.joinZNode(backupZNode, hfileRefsZNodeName); + + Set filenames = new HashSet(); + try { + if (sTableList == null || sTableList.isEmpty()) { + List tables = ZKUtil.listChildrenNoWatch(zkw, hfileRefsZNode); + if (tables == null) return filenames; + sTableList = new ArrayList<>(); + for (String tbl : tables) { + sTableList.add(TableName.valueOf(tbl)); + } + } + + for (TableName tableName : sTableList) { + String tableZnode = ZKUtil.joinZNode(hfileRefsZNode, tableName.getNameAsString()); + List regionZnodes = ZKUtil.listChildrenNoWatch(zkw, tableZnode); + for (String region : regionZnodes) { + String regionZnode = ZKUtil.joinZNode(tableZnode, region); + List famZnodes = ZKUtil.listChildrenNoWatch(zkw, regionZnode); + for (String family : famZnodes) { + String famZnode = ZKUtil.joinZNode(regionZnode, family); + List hFileRefsZnodes = ZKUtil.listChildrenNoWatch(zkw, famZnode); + filenames.addAll(hFileRefsZnodes); + } + } + } + } catch (KeeperException ke) { + LOG.debug("encountered ", ke); + return filenames; + } + return filenames; + } + + @Override + public Iterable getDeletableFiles(Iterable files) { + if (this.getConf() == null) { + return files; + } + // obtain the Set of TableName's for the files + // so that we filter BulkLoadDescriptor's to be returned from server + Set tbls = new HashSet<>(); + for (FileStatus file : files) { + Path p = file.getPath(); + try { + TableName tbl = HFileArchiveUtil.getTableName(p); + if (tbl != null) tbls.add(tbl); + } catch (IllegalArgumentException iae) { + } + } + List list = new ArrayList<>(tbls); + Collections.sort(list); + final Set hfileRefs; + try { + hfileRefs = getHFileRefs(list); + } catch (IOException ioe) { + LOG.warn("Failed to read hfile references, skipping checking deletable files"); + return Collections.emptyList(); + } + return Iterables.filter(files, new Predicate() { + @Override + public boolean apply(FileStatus file) { + String hfile = file.getPath().getName(); + boolean foundHFileRef = hfileRefs.contains(hfile); + return !foundHFileRef; + } + }); + } + + @Override + public void setConf(Configuration config) { + super.setConf(config); + } + + @Override + public void stop(String why) { + if (this.stopped) { + return; + } + this.stopped = true; + } + + @Override + public boolean isStopped() { + return this.stopped; + } + + @Override + public void abort(String why, Throwable e) { + LOG.warn("Aborting ReplicationHFileCleaner because " + why, e); + this.aborted = true; + stop(why); + } + + @Override + public boolean isAborted() { + return this.aborted; + } + + @Override + public boolean isFileDeletable(FileStatus fStat) { + // all members of this class are null if replication is disabled, + // so do not stop from deleting the file + if (getConf() == null) { + return true; + } + + final Set hfileRefs; + try { + hfileRefs = getHFileRefs(null); + } catch (Exception e) { + LOG.warn("Failed to read hfile references, skipping checking deletable " + + "file for " + fStat.getPath()); + return false; + } + return !hfileRefs.contains(fStat.getPath().getName()); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BulkLoadHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BulkLoadHandler.java new file mode 100644 index 0000000..5482542 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BulkLoadHandler.java @@ -0,0 +1,486 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.impl; + +import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT; +import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.backup.BackupRestoreConstants; +import org.apache.hadoop.hbase.backup.BackupType; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +/** + * Handles bulk load event. + */ +@InterfaceAudience.Private +public class BulkLoadHandler extends WALActionsListener.Base implements Closeable, Abortable { + private static final Log LOG = LogFactory.getLog(BulkLoadHandler.class); + private Configuration conf; + protected final ZooKeeperWatcher zookeeper; + private BackupSystemTable sysTable; + private Thread thread; + private volatile boolean stopping = false; + // whether we have successfully read previous full backups + private static volatile boolean retrievedFullBackups = false; + private FullBackupTracker fullBackupTracker; + private RestoreTracker restoreTracker; + // tables which are fully backed up + private Set fullBackupTables = new HashSet(); + // tables which are the restore destinations + private Set restoreDest = new HashSet(); + // all the tables receiving bulk load - this is only used when the handler hasn't retrieved Set of + // tables which have gone through full backup + private static Set allTables = new HashSet(); + protected final String backupZNode; + /** The name of the znode that contains hfile references */ + protected final String hfileRefsZNode; + + private static final String ZK_NAME = "backup-bulkload-handler"; + private static final int SLEEP_INTERVAL = 5000; + + /* + * Tracks the full backup znode under backupZNode + */ + public class FullBackupTracker extends ZooKeeperNodeTracker { + + public FullBackupTracker(String fullZNode, ZooKeeperWatcher watcher, + Abortable abortable) { + super(watcher, fullZNode, abortable); + } + + @Override + public synchronized void nodeCreated(String path) { + if (path.startsWith(node)) { + super.nodeCreated(path); + try { + byte[] data = ZKUtil.getData(watcher, node); + String s = Bytes.toString(data); + LOG.debug("node created: " + path + " with " + s + " from " + watcher); + if (s == null) return; + String[] tbls = s.split(BackupRestoreConstants.ZOOKEEPER_ZNODE_BACKUP_RESTORE_SEP); + for (String tbl : tbls) { + fullBackupTables.add(TableName.valueOf(tbl)); + } + } catch (KeeperException ke) { + LOG.error("unable to obtain data from " + node, ke); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + } + } + + /* + * Tracks the restore znode under backupZNode + */ + public class RestoreTracker extends ZooKeeperNodeTracker { + + public RestoreTracker(String restoreZNode, ZooKeeperWatcher watcher, + Abortable abortable) { + super(watcher, restoreZNode, abortable); + } + + @Override + public synchronized void nodeCreated(String path) { + if (path.startsWith(node)) { + super.nodeCreated(path); + try { + byte[] data = ZKUtil.getData(watcher, node); + String s = Bytes.toString(data); + LOG.debug("node created: " + path + " with " + s + " from " + watcher); + if (s == null) return; + String[] tbls = s.split(BackupRestoreConstants.ZOOKEEPER_ZNODE_BACKUP_RESTORE_SEP); + restoreDest.clear(); + for (String tbl : tbls) { + restoreDest.add(TableName.valueOf(tbl)); + } + } catch (KeeperException ke) { + LOG.error("unable to obtain data from " + node, ke); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + } + @Override + public synchronized void nodeDeleted(String path) { + if (path.equals(node)) { + super.nodeDeleted(path); + restoreDest.clear(); + } + } + } + + public BulkLoadHandler(ZooKeeperWatcher zookeeper, Configuration conf, Connection conn) + throws IOException { + this.zookeeper = zookeeper; + this.conf = zookeeper.getConfiguration(); + sysTable = new BackupSystemTable(conn); + String backupZNodeName = conf.get(BackupRestoreConstants.ZOOKEEPER_ZNODE_BACKUP_KEY, + BackupRestoreConstants.ZOOKEEPER_ZNODE_BACKUP_DEFAULT); + String hfileRefsZNodeName=conf.get(BackupRestoreConstants.ZOOKEEPER_ZNODE_BACKUP_HFILE_REFS_KEY, + BackupRestoreConstants.ZOOKEEPER_ZNODE_BACKUP_HFILE_REFS_DEFAULT); + this.backupZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, backupZNodeName); + this.hfileRefsZNode = ZKUtil.joinZNode(backupZNode, hfileRefsZNodeName); + try { + ZKUtil.createWithParents(this.zookeeper, this.hfileRefsZNode); + } catch (KeeperException e) { + throw new IOException("Could not initialize hfile references znode.", e); + } + String restoreZNode = ZKUtil.joinZNode(backupZNode, + BackupRestoreConstants.ZOOKEEPER_ZNODE_BACKUP_RESTORE_DEFAULT); + this.restoreTracker = new RestoreTracker(restoreZNode, this.zookeeper, this); + this.restoreTracker.start(); + String fullBackupZNode = ZKUtil.joinZNode(backupZNode, + BackupRestoreConstants.ZOOKEEPER_ZNODE_BACKUP_FULL_DEFAULT); + this.fullBackupTracker = new FullBackupTracker(fullBackupZNode, this.zookeeper, this); + this.fullBackupTracker.start(); + thread = new Thread(){ + @Override + public void run() { + while (!stopping) { + handleUnneededTables(); + try { + Thread.sleep(SLEEP_INTERVAL); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } + } + } + }; + thread.setDaemon(true); + thread.start(); + } + + public void abort(String why, Throwable e) { + LOG.warn(why, e); + } + public boolean isAborted() { + return false; + } + + public void init() throws IOException { + } + + // Removes znodes whose corresponding table is not involved with backup + void handleUnneededTables() { + if (retrievedFullBackups) return; + // tables which have gone through full backup + Set trackedTables = new HashSet(); + try { + trackedTables = sysTable.getTablesForBackupType(BackupType.FULL); + LOG.debug("Retrieved " + trackedTables.size() + " tables"); + } catch (IOException ioe) { + LOG.debug("Got " + ioe.getMessage()); + return; + } + Set clone = null; + synchronized (allTables) { + boolean modified = allTables.removeAll(trackedTables); + if (modified && !allTables.isEmpty()) { + clone = new HashSet(allTables); + allTables.clear(); + } + } + if (clone != null) { + removeHFileRefs(clone); + } + retrievedFullBackups = true; + } + + public void removeHFileRefs(Collection tables) { + int cnt = 0; + for (TableName tableName : tables) { + String tableZnode = ZKUtil.joinZNode(hfileRefsZNode, tableName.getNameAsString()); + try { + org.apache.zookeeper.ZKUtil.deleteRecursive( + zookeeper.getRecoverableZooKeeper().getZooKeeper(), tableZnode); + cnt++; + } catch (KeeperException e) { + LOG.warn("Failed to remove files from hfile reference znode=" + e.getPath(), e); + } catch (InterruptedException ie) { + LOG.warn("Interrupted", ie); + Thread.currentThread().interrupt(); + } + } + LOG.debug("The number of subtrees to be removed for hfile references in zk is " + cnt); + } + + @Override + public void close() { + stopping = true; + } + + /* + * Returns an object to listen to new wal changes + **/ + public WALActionsListener getWALActionsListener() { + return this; + } + + @Override + public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException { + } + + static String gethfileRefsZNode(Configuration conf) { + String backupZNodeName = conf.get(BackupRestoreConstants.ZOOKEEPER_ZNODE_BACKUP_KEY, + BackupRestoreConstants.ZOOKEEPER_ZNODE_BACKUP_DEFAULT); + String hfileRefsZNodeName=conf.get(BackupRestoreConstants.ZOOKEEPER_ZNODE_BACKUP_HFILE_REFS_KEY, + BackupRestoreConstants.ZOOKEEPER_ZNODE_BACKUP_HFILE_REFS_DEFAULT); + String backupZNode = ZKUtil.joinZNode( + conf.get(ZOOKEEPER_ZNODE_PARENT, DEFAULT_ZOOKEEPER_ZNODE_PARENT), backupZNodeName); + return ZKUtil.joinZNode(backupZNode, hfileRefsZNodeName); + } + + /* + * @param zkw the watcher + * @param sTableList List of table names for which hfile references are to be retrieved + * @param conf the Configuration + * @return map of hfile references per table + */ + static Map>>> getHFileRefs(ZooKeeperWatcher zkw, + List sTableList, Configuration conf) { + boolean closeZk = false; + if (zkw == null) { + try { + zkw = new ZooKeeperWatcher(conf, ZK_NAME, null, false); + closeZk = true; + } catch (ZooKeeperConnectionException zkce) { + LOG.debug("could not create watcher", zkce); + return null; + } catch (IOException ioe) { + LOG.debug("could not create watcher", ioe); + return null; + } + } + String hfileRefsZNode = gethfileRefsZNode(conf); + + Map>>> map = new HashMap<>(); + try { + if (sTableList == null) { + List tables = ZKUtil.listChildrenNoWatch(zkw, hfileRefsZNode); + sTableList = new ArrayList<>(); + for (String tbl : tables) { + sTableList.add(TableName.valueOf(tbl)); + } + } + + for (TableName tableName : sTableList) { + if (map.get(tableName) == null) { + map.put(tableName, new HashMap>>()); + } + Map>> tblMap = map.get(tableName); + String tableZnode = ZKUtil.joinZNode(hfileRefsZNode, tableName.getNameAsString()); + List regionZnodes = ZKUtil.listChildrenNoWatch(zkw, tableZnode); + if (regionZnodes == null) return map; + for (String region : regionZnodes) { + if (tblMap.get(region) == null) { + tblMap.put(region, new HashMap>()); + } + Map> regionMap = tblMap.get(region); + String regionZnode = ZKUtil.joinZNode(tableZnode, region); + List famZnodes = ZKUtil.listChildrenNoWatch(zkw, regionZnode); + for (String family : famZnodes) { + String famZnode = ZKUtil.joinZNode(regionZnode, family); + List hFileRefsZnodes = ZKUtil.listChildrenNoWatch(zkw, famZnode); + regionMap.put(family, hFileRefsZnodes); + } + } + } + } catch (KeeperException ke) { + LOG.warn(ke); + return map; + } finally { + if (closeZk) zkw.close(); + } + return map; + } + + /* + * @param zkw the watcher + * @param map hfile references per table to be removed + * @param conf the Configuration + */ + static void removeHFileRefs(ZooKeeperWatcher zkw, + Map>>> map, Configuration conf) + throws IOException { + boolean closeZk = false; + if (zkw == null) { + try { + zkw = new ZooKeeperWatcher(conf, ZK_NAME, null, false); + closeZk = true; + } catch (ZooKeeperConnectionException zkce) { + throw new IOException("could not create watcher", zkce); + } catch (IOException ioe) { + throw new IOException("could not create watcher", ioe); + } + } + String hfileRefsZNode = gethfileRefsZNode(conf); + + boolean debugEnabled = LOG.isDebugEnabled(); + List listOfOps = new ArrayList(); + for (Map.Entry>>> tblEntry : map.entrySet()) { + TableName tableName = tblEntry.getKey(); + String tableZnode = ZKUtil.joinZNode(hfileRefsZNode, tableName.getNameAsString()); + if (debugEnabled) { + LOG.debug("Removing hfile references " + tblEntry.getValue() + " from " + tableZnode); + } + for (Map.Entry>> regionEntry : tblEntry.getValue().entrySet()){ + String regionName = regionEntry.getKey(); + String regionZnode = ZKUtil.joinZNode(tableZnode, regionName); + for (Map.Entry> famEntry : regionEntry.getValue().entrySet()) { + String fam = famEntry.getKey(); + String famZnode = ZKUtil.joinZNode(regionZnode, fam); + + List files = famEntry.getValue(); + for (String file : files) { + listOfOps.add(ZKUtilOp.deleteNodeFailSilent(ZKUtil.joinZNode(famZnode, file))); + } + } + } + } + if (debugEnabled) { + LOG.debug("The multi list size for removing hfile references in zk is " + listOfOps.size()); + } + try { + ZKUtil.multiOrSequential(zkw, listOfOps, true); + } catch (KeeperException e) { + throw new IOException("Failed to remove files from hfile reference znode=" + e.getPath(), e); + } finally { + if (closeZk) { + zkw.close(); + } + } + } + + /* + * @param zkw the watcher + * @param tableName table name + * @param regionName region name + * @param famName family name + * @param files List of files for the (table, region, family) + * @param conf the Configuration + */ + public static void addHFileRefs(ZooKeeperWatcher zkw, TableName tableName, String regionName, + String famName, List files, Configuration conf) throws IOException { + String hfileRefsZNode = gethfileRefsZNode(conf); + + String tableZnode = ZKUtil.joinZNode(hfileRefsZNode, tableName.getNameAsString()); + String regionZnode = ZKUtil.joinZNode(tableZnode, regionName); + String famZnode = ZKUtil.joinZNode(regionZnode, famName); + try { + ZKUtil.createWithParents(zkw, famZnode); + } catch (KeeperException ke) { + LOG.debug("cannot create family znode", ke); + } + boolean debugEnabled = LOG.isDebugEnabled(); + if (debugEnabled) { + LOG.debug("Adding hfile references " + files + " to " + famZnode); + } + List listOfOps = new ArrayList(); + int size = files.size(); + for (int i = 0; i < size; i++) { + listOfOps.add(ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(famZnode, files.get(i)), + HConstants.EMPTY_BYTE_ARRAY)); + } + if (debugEnabled) { + LOG.debug(" The multi list size for adding hfile references in zk for node " + famZnode + + " is " + listOfOps.size()); + } + try { + ZKUtil.multiOrSequential(zkw, listOfOps, true); + } catch (KeeperException e) { + throw new IOException("Failed to create hfile reference znode=" + e.getPath(), e); + } + if (!retrievedFullBackups) { + synchronized (allTables) { + allTables.add(tableName); + } + } + } + + @Override + public void postAppend(long entryLen, long elapsedTimeMillis, final WALKey logKey, + final WALEdit edit) throws IOException { + if (stopping) return; + TableName tableName = logKey.getTablename(); + if (tableName.isSystemTable()) return; + if (restoreDest.contains(tableName)) return; + // we only care about tables which have gone through full backups + if (retrievedFullBackups && !fullBackupTables.contains(tableName)) return; + for (Cell c : edit.getCells()) { + // Only check for bulk load events + if (CellUtil.matchingQualifier(c, WALEdit.BULK_LOAD)) { + BulkLoadDescriptor bld = null; + try { + bld = WALEdit.getBulkLoadDescriptor(c); + } catch (IOException e) { + LOG.error("Failed to get bulk load event information from the wal file", e); + throw e; + } + + String regionName = Bytes.toString(bld.getEncodedRegionName().toByteArray()); + for (StoreDescriptor s : bld.getStoresList()) { + byte[] fam = s.getFamilyName().toByteArray(); + String famName = Bytes.toString(fam); + addHFileRefs(this.zookeeper, tableName, regionName, famName, s.getStoreFileList(), conf); + } + } + } + } + + @Override + public void preLogRoll(Path oldPath, Path newPath) throws IOException { + } + + @Override + public void postLogRoll(Path oldPath, Path newPath) throws IOException { + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupHFileCleaner.java new file mode 100644 index 0000000..4aae41c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupHFileCleaner.java @@ -0,0 +1,233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ChoreService; +import org.apache.hadoop.hbase.CoordinatedStateManager; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.impl.BackupHFileCleaner; +import org.apache.hadoop.hbase.backup.impl.BulkLoadHandler; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MasterTests.class, SmallTests.class }) +public class TestBackupHFileCleaner { + private static final Log LOG = LogFactory.getLog(TestBackupHFileCleaner.class); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static Server server; + private static Configuration conf = TEST_UTIL.getConfiguration(); + private static TableName tableName = TableName.valueOf("backup.hfile.cleaner"); + private static String regionName = "region"; + private static String famName = "fam"; + static FileSystem fs = null; + Path root; + + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniZKCluster(); + server = new DummyServer(); + try { + fs = FileSystem.get(conf); + } finally { + if (fs != null) { + fs.close(); + } + } + } + + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniZKCluster(); + } + + @Before + public void setup() throws IOException { + root = TEST_UTIL.getDataTestDirOnTestFS(); + } + + @After + public void cleanup() { + try { + fs.delete(root, true); + } catch (IOException e) { + LOG.warn("Failed to delete files recursively from path " + root); + } + } + + @Test + public void testIsFileDeletable() throws IOException { + // 1. Create a file + Path file = new Path(root, "testIsFileDeletableWithNoHFileRefs"); + fs.createNewFile(file); + // 2. Assert file is successfully created + assertTrue("Test file not created!", fs.exists(file)); + BackupHFileCleaner cleaner = new BackupHFileCleaner(conf); + cleaner.setConf(conf); + // 3. Assert that file as is should be deletable + assertTrue("Cleaner should allow to delete this file as there is no hfile reference node " + + "for it.", cleaner.isFileDeletable(fs.getFileStatus(file))); + + List files = new ArrayList(1); + files.add(file.getName()); + // 4. Add the file to hfile-refs + BulkLoadHandler.addHFileRefs(server.getZooKeeper(), tableName, regionName, famName, files,conf); + // 5. Assert file should not be deletable + assertFalse("Cleaner should not allow to delete this file as there is a hfile reference node " + + "for it.", cleaner.isFileDeletable(fs.getFileStatus(file))); + } + + @Test + public void testGetDeletableFiles() throws Exception { + // 1. Create two files and assert that they do not exist + Path dir = new Path(new Path(new Path(new Path(root, "default"), tableName.getNameAsString()), + regionName), famName); + Path notDeletablefile = new Path(dir, "testGetDeletableFiles_1"); + fs.createNewFile(notDeletablefile); + assertTrue("Test file not created!", fs.exists(notDeletablefile)); + Path deletablefile = new Path(dir, "testGetDeletableFiles_2"); + fs.createNewFile(deletablefile); + assertTrue("Test file not created!", fs.exists(deletablefile)); + + List files = new ArrayList(2); + FileStatus f = new FileStatus(); + f.setPath(deletablefile); + files.add(f); + f = new FileStatus(); + f.setPath(notDeletablefile); + files.add(f); + + List hfiles = new ArrayList<>(1); + hfiles.add(notDeletablefile.getName()); + // 2. Add one file to hfile-refs + BulkLoadHandler.addHFileRefs(server.getZooKeeper(), tableName, regionName, famName, hfiles, + conf); + + BackupHFileCleaner cleaner = new BackupHFileCleaner(conf); + Iterator deletableFilesIterator = cleaner.getDeletableFiles(files).iterator(); + int i = 0; + while (deletableFilesIterator.hasNext() && i < 2) { + i++; + } + // 5. Assert one file should not be deletable and it is present in the list returned + if (i > 2) { + fail("File " + notDeletablefile + + " should not be deletable as its hfile reference node is not added."); + } + assertTrue(deletableFilesIterator.next().getPath().equals(deletablefile)); + } + + static class DummyServer implements Server { + + @Override + public Configuration getConfiguration() { + return TEST_UTIL.getConfiguration(); + } + + @Override + public ZooKeeperWatcher getZooKeeper() { + try { + return new ZooKeeperWatcher(getConfiguration(), "dummy server", this); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + @Override + public CoordinatedStateManager getCoordinatedStateManager() { + return null; + } + + @Override + public ClusterConnection getConnection() { + return null; + } + + @Override + public MetaTableLocator getMetaTableLocator() { + return null; + } + + @Override + public ServerName getServerName() { + return ServerName.valueOf("regionserver,60020,000000"); + } + + @Override + public void abort(String why, Throwable e) { + } + + @Override + public boolean isAborted() { + return false; + } + + @Override + public void stop(String why) { + } + + @Override + public boolean isStopped() { + return false; + } + + @Override + public ChoreService getChoreService() { + return null; + } + + @Override + public ClusterConnection getClusterConnection() { + // TODO Auto-generated method stub + return null; + } + } +} \ No newline at end of file