diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java index fedcac0..a2cb7c5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java @@ -20,7 +20,9 @@ package org.apache.hadoop.hbase.backup.impl; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -42,6 +44,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupInfo; import org.apache.hadoop.hbase.backup.BackupRestoreConstants; import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; +import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.util.BackupClientUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; @@ -55,6 +58,10 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.BackupProtos; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; /** * This class provides 'hbase:backup' table API @@ -98,6 +105,7 @@ public final class BackupSystemTable implements Closeable { final static byte[] SESSIONS_FAMILY = "session".getBytes(); // Stores other meta final static byte[] META_FAMILY = "meta".getBytes(); + final static byte[] BULK_LOAD_FAMILY = "bulk".getBytes(); // Connection to HBase cluster, shared // among all instances private final Connection connection; @@ -208,6 +216,110 @@ public final class BackupSystemTable implements Closeable { } } + /* + * Write bulk load descriptor to hbase:backup. + * @param tabName name of table bulk load is performed against + * @param ts timestamp + * @param list List of Pairs of BulkLoadDescriptor and corresponding timestamp + * @throws IOException exception + */ + public void writeBulkLoadDesc(TableName tabName, long ts, + List> list) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("write bulk load descriptor to hbase:backup " + tabName + " with " + list.size() + + " entries(" + list.iterator().next().getSecond() + ") tagged " + ts); + } + try (Table table = connection.getTable(tableName)) { + List puts = BackupSystemTableHelper.createPutForBulkloadDesc(tabName, ts, list); + table.put(puts); + } + } + + /** + * Get the BulkLoadDescriptor information between time lower and time upper from hbase:backup. + * @param tTable tablename which received the bulk load + * @param lower the earliest timestamp after which bulk load would be retrieved + * @param upper the latest timestamp before which bulk load would be retrieved + * @param parseDesc true if BulkLoadDescriptor should be parsed + * @return List of Pairs of row key String and Pairs of timestamp and the corresponding + * BulkLoadDescriptor + */ + public List>> readBulkloadDesc( + List tableList, long lower, long upper, boolean parseDesc) throws IOException { + int numTables = 0; + TableName tTable = null; + if (tableList != null) { + numTables = tableList.size(); + Collections.sort(tableList); + tTable = tableList.get(0); + } + Scan scan = BackupSystemTableHelper.createScanForBulkload(tTable, tableList.get(numTables-1), + lower, upper); + List>> descriptors = new ArrayList<>(); + try (Table table = connection.getTable(tableName); + ResultScanner scanner = table.getScanner(scan)) { + Result res = null; + while ((res = scanner.next()) != null) { + res.advance(); + Cell cell = res.current(); + byte[] row = CellUtil.cloneRow(cell); + String rowStr = Bytes.toString(row); + TableName tbl = BackupSystemTableHelper.getTableNameFromBulkLoadRow(rowStr); + if (tableList != null && !tableList.contains(tbl)) continue; + int idx = rowStr.lastIndexOf(BackupSystemTableHelper.BLK_LD_DELIM); + long ts = Long.parseLong(rowStr.substring(idx+1)); + byte[] qual = CellUtil.cloneQualifier(cell); + byte[] data = CellUtil.cloneValue(cell); + ts = Bytes.toLong(qual); + // ignore bulk load which was earlier than the lower timestamp or later than upper + if (ts < lower || upper < ts) { + LOG.debug("just ignoring " + rowStr + "(" + ts + ")" + lower + "-" + "upper"); + continue; + } + descriptors.add(new Pair<>(rowStr, new Pair<>(Bytes.toLong(qual), + parseDesc ? BulkLoadDescriptor.parseFrom(data) : null))); + } + } + return descriptors; + } + + /* + * Get the row keys from hbase:backup for the tables in tableList + */ + public List readBulkloadDescRows(List tableList) throws IOException { + int numTables = tableList.size(); + Collections.sort(tableList); + TableName tTable = tableList.get(0); + Scan scan = BackupSystemTableHelper.createScanForBulkload(tTable, tableList.get(numTables-1), + 0, Long.MAX_VALUE); + List lst = new ArrayList<>(); + try (Table table = connection.getTable(tableName); + ResultScanner scanner = table.getScanner(scan)) { + Result res = null; + while ((res = scanner.next()) != null) { + res.advance(); + Cell cell = res.current(); + byte[] row = CellUtil.cloneRow(cell); + String rowStr = Bytes.toString(row); + TableName tbl = BackupSystemTableHelper.getTableNameFromBulkLoadRow(rowStr); + if (!tableList.contains(tbl)) continue; + lst.add(row); + } + } + return lst; + } + + /* + * Deletes rows for BulkLoadDescriptor's + */ + public void deleteForBulkLoad(List lst) + throws IOException { + try (Table table = connection.getTable(tableName)) { + List lstDels = BackupSystemTableHelper.createDeleteForBulkLoad(lst); + table.delete(lstDels); + } + } + /** * Get the Region Servers log information after the last log roll from hbase:backup. * @param backupRoot root directory path to backup @@ -331,6 +443,19 @@ public final class BackupSystemTable implements Closeable { } + /* + * Retrieve TableName's for completed backup of given type + */ + public Set getTablesForBackupType(BackupType type) throws IOException { + Set names = new HashSet<>(); + List infos = getBackupHistory(true); + for (BackupInfo info : infos) { + if (info.getType() != type) continue; + names.addAll(info.getTableNames()); + } + return names; + } + /** * Get history for backup destination * @param backupRoot - backup destination @@ -915,6 +1040,8 @@ public final class BackupSystemTable implements Closeable { HColumnDescriptor colMetaDesc = new HColumnDescriptor(META_FAMILY); // colDesc.setMaxVersions(1); tableDesc.addFamily(colMetaDesc); + HColumnDescriptor colBulkLoadDesc = new HColumnDescriptor(BULK_LOAD_FAMILY); + tableDesc.addFamily(colBulkLoadDesc); return tableDesc; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTableHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTableHelper.java index 37f29f8..fe6abf2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTableHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTableHelper.java @@ -36,7 +36,9 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; /** @@ -63,9 +65,13 @@ public final class BackupSystemTableHelper { private final static String INCR_BACKUP_SET = "incrbackupset:"; private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:"; private final static String RS_LOG_TS_PREFIX = "rslogts:"; + private final static String BULK_LOAD_DESC_PREFIX = "bulk:"; private final static String WALS_PREFIX = "wals:"; private final static String SET_KEY_PREFIX = "backupset:"; + protected final static byte[] BLK_LD_DESC_COL = "desc".getBytes(); + protected final static String BLK_LD_DELIM = "-"; + private final static byte[] EMPTY_VALUE = new byte[] {}; // Safe delimiter in a string @@ -265,6 +271,67 @@ public final class BackupSystemTableHelper { return put; } + /* + * Creates Put's to store BulkLoadDescriptor's for bulk load + * Column name is timestamp when BulkLoadDescriptor is loaded + */ + static List createPutForBulkloadDesc(TableName table, long ts, + List> list) { + List puts = new ArrayList<>(); + for (Pair pair : list) { + Put put = new Put(rowkey(BULK_LOAD_DESC_PREFIX, table.toString(), BLK_LD_DELIM, + Long.toString(ts))); + put.addColumn(BackupSystemTable.BULK_LOAD_FAMILY, Bytes.toBytes(pair.getSecond()), + pair.getFirst().toByteArray()); + puts.add(put); + } + return puts; + } + + public static TableName getTableNameFromBulkLoadRow(String rowStr) { + int start = BULK_LOAD_DESC_PREFIX.length(); + int end = rowStr.lastIndexOf(BLK_LD_DELIM); + return TableName.valueOf(rowStr.substring(start, end)); + } + + /* + * Since there may be more than one bulk load since the previous backup, we need Scan. + * @param table for which bulk load is performed + * @param lastTable the last table for which bulk load is performed. + * @param lower the earliest timestamp after which bulk load would be retrieved + * @param upper the latest timestamp before which bulk load would be retrieved + * @return Scan object which specifies all the conditions + */ + static Scan createScanForBulkload(TableName table, TableName lastTable, long lower, long upper) + throws IOException { + Scan scan = new Scan(); + String tbl = null; + if (table != null) { + tbl = table.toString(); + byte[] startRow = rowkey(BULK_LOAD_DESC_PREFIX, tbl); + scan.setStartRow(startRow); + } + if (lastTable != null) { + byte[] stopRow = rowkey(BULK_LOAD_DESC_PREFIX, lastTable.toString()); + stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); + scan.setStopRow(stopRow); + } + + scan.setTimeRange(lower, Long.MAX_VALUE); + scan.addFamily(BackupSystemTable.BULK_LOAD_FAMILY); + scan.setMaxVersions(1); + return scan; + } + + public static List createDeleteForBulkLoad(List lst) { + List lstDels = new ArrayList<>(); + for (byte[] row : lst) { + Delete del = new Delete(row); + del.addFamily(BackupSystemTable.BULK_LOAD_FAMILY); + lstDels.add(del); + } + return lstDels; + } /** * Creates Scan operation to load last RS log roll results * @return scan operation diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java index f1f09cc..e047422 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java @@ -527,6 +527,15 @@ public class FullTableBackupClient { .getRSLogTimestampMins(newTableSetTimestampMap)); backupManager.writeBackupStartCode(newStartCode); + // remove bulk load desc + try (final BackupSystemTable table = new BackupSystemTable(conn)) { + List lst = table.readBulkloadDescRows(tableList); + table.deleteForBulkLoad(lst); + LOG.debug("Deleted " + lst.size() + " rows of bulk load desc"); + } catch (IOException ioe) { + LOG.error("Couldn't clean up bulk load descriptors", ioe); + } + // backup complete completeBackup(conn, backupContext, backupManager, BackupType.FULL, conf); } catch (Exception e) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index 8acace0..e3902b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -172,6 +172,9 @@ public class IncrementalTableBackupClient { return list; } + void handleBulkLoad(List sTableList) throws IOException { + } + public void execute() throws IOException { // case PREPARE_INCREMENTAL: @@ -222,6 +225,8 @@ public class IncrementalTableBackupClient { BackupClientUtil.getMinValue(BackupServerUtil .getRSLogTimestampMins(newTableSetTimestampMap)); backupManager.writeBackupStartCode(newStartCode); + + handleBulkLoad(backupManager.getBackupContext().getTableNames()); // backup complete FullTableBackupClient.completeBackup(conn, backupContext, backupManager, BackupType.INCREMENTAL, conf); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java index 91f2d68..42ff29f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java @@ -20,25 +20,37 @@ package org.apache.hadoop.hbase.backup.impl; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.TreeMap; import java.util.TreeSet; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.HBackupFileSystem; import org.apache.hadoop.hbase.backup.RestoreRequest; -import org.apache.hadoop.hbase.backup.impl.BackupManifest; import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; +import org.apache.hadoop.hbase.backup.mapreduce.MapReduceRestoreTask; import org.apache.hadoop.hbase.backup.util.RestoreServerUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.HFileArchiveUtil; +import org.apache.hadoop.hbase.util.Pair; @InterfaceAudience.Private public class RestoreTablesClient { @@ -47,6 +59,7 @@ public class RestoreTablesClient { private Configuration conf; private Connection conn; private String backupId; + private String fullBackupId; private TableName[] sTableArray; private TableName[] tTableArray; private String targetRootDir; @@ -142,6 +155,7 @@ public class RestoreTablesClient { // We need hFS only for full restore (see the code) BackupManifest manifest = HBackupFileSystem.getManifest(sTable, conf, backupRoot, backupId); if (manifest.getType() == BackupType.FULL) { + fullBackupId = manifest.getBackupImage().getBackupId(); LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from full" + " backup image " + tableBackupPath.toString()); restoreTool.fullRestoreTable(conn, tableBackupPath, sTable, tTable, truncateIfExists, @@ -171,7 +185,78 @@ public class RestoreTablesClient { restoreTool.incrementalRestoreTable(conn, tableBackupPath, paths, new TableName[] { sTable }, new TableName[] { tTable }, lastIncrBackupId); LOG.info(sTable + " has been successfully restored to " + tTable); + } + + Map>[] handleBulkLoad(List sTableList) throws IOException { + long fullBackupTs = getTsFromBackupId(fullBackupId), + incrBackupTs = getTsFromBackupId(backupId); + Map>[] mapForSrc = new Map[sTableList.size()]; + try (final BackupSystemTable table = new BackupSystemTable(conn)) { + LOG.debug("Looking for bulk load between " + fullBackupTs + " and " + incrBackupTs); + List>> list = + table.readBulkloadDesc(sTableList, fullBackupTs, incrBackupTs, true); + LoadIncrementalHFiles loader = MapReduceRestoreTask.createLoader(conf); + Path rootdir = FSUtils.getRootDir(conf); + for (Pair> pair : list) { + TableName src = BackupSystemTableHelper.getTableNameFromBulkLoadRow(pair.getFirst()); + long ts = pair.getSecond().getFirst(); + if (!withinRange(ts, fullBackupTs, incrBackupTs)) { + LOG.debug("Dropping entry " + ts + " range: " + fullBackupTs + "->" + incrBackupTs); + continue; + } + + BulkLoadDescriptor bulkDesc = pair.getSecond().getSecond(); + TableName srcTable = ProtobufUtil.toTableName(bulkDesc.getTableName()); + assert(srcTable.equals(src)); + int srcIdx = getIndex(srcTable, sTableList); + if (srcIdx < 0) { + LOG.warn("Couldn't find " + srcTable + " in source table List"); + continue; + } + if (mapForSrc[srcIdx] == null) { + mapForSrc[srcIdx] = new TreeMap>(Bytes.BYTES_COMPARATOR); + } + Path tblDir = FSUtils.getTableDir(rootdir, srcTable); + String regionName = Bytes.toString(bulkDesc.getEncodedRegionName().toByteArray()); + Path regionDir = new Path(tblDir, regionName); + // map from family to List of hfiles + FileSystem fs = FileSystem.get(conf); + for (WALProtos.StoreDescriptor desc : bulkDesc.getStoresList()) { + int cnt = desc.getStoreFileCount(); + byte[] fam = desc.getFamilyName().toByteArray(); + String famName = Bytes.toString(fam); + Path famDir = new Path(regionDir, famName); + for (int i = 0; i < cnt; i++) { + List files; + if (!mapForSrc[srcIdx].containsKey(fam)) { + files = new ArrayList(); + mapForSrc[srcIdx].put(fam, files); + } else { + files = mapForSrc[srcIdx].get(fam); + } + Path p = new Path(famDir, desc.getStoreFile(i)); + if (fs.exists(p)) { + LOG.debug("found bulk loaded hfile " + desc.getStoreFile(i) + " " + famDir); + files.add(p); + } else { + files.add(HFileArchiveUtil.getStoreArchivePath(conf, srcTable, regionName, famName)); + } + } + } + } + int loaderResult = 0; + for (int i = 0; i < sTableList.size(); i++) { + if (mapForSrc[i] != null && !mapForSrc[i].isEmpty()) { + loaderResult = loader.run(null, mapForSrc[i], tTableArray[i]); + LOG.debug("bulk loading " + sTableList.get(i) + " to " + tTableArray[i]); + if (loaderResult != 0) { + LOG.warn("Couldn't bulk load for " + sTableList.get(i) + " to " + tTableArray[i]); + } + } + } + } + return mapForSrc; } /** @@ -212,6 +297,7 @@ public class RestoreTablesClient { } } } + handleBulkLoad(Arrays.asList(sTableArray)); } catch (Exception e) { LOG.error("Failed", e); throw new IOException(e); @@ -219,6 +305,29 @@ public class RestoreTablesClient { LOG.debug("restoreStage finished"); } + static long getTsFromBackupId(String backupId) { + if (backupId == null) { + return 0; + } + return Long.valueOf(backupId.substring(backupId.lastIndexOf("_")+1)); + } + + static boolean withinRange(long a, long lower, long upper) { + if (a < lower || a > upper) { + return false; + } + return true; + } + + int getIndex(TableName tbl, List sTableList) { + for (int i = 0; i < sTableList.size(); i++) { + if (tbl.equals(sTableList.get(i))) { + return i; + } + } + return -1; + } + public void execute() throws IOException { // case VALIDATION: diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreTask.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreTask.java index 8d9f5b4..317822f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreTask.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreTask.java @@ -88,7 +88,7 @@ public class MapReduceRestoreTask implements RestoreTask { result = player.run(playerArgs); if (succeeded(result)) { // do bulk load - LoadIncrementalHFiles loader = createLoader(); + LoadIncrementalHFiles loader = createLoader(getConf()); if (LOG.isDebugEnabled()) { LOG.debug("Restoring HFiles from directory " + bulkOutputPath); } @@ -124,13 +124,13 @@ public class MapReduceRestoreTask implements RestoreTask { return result == 0; } - private LoadIncrementalHFiles createLoader() throws IOException { + public static LoadIncrementalHFiles createLoader(Configuration config) throws IOException { // set configuration for restore: // LoadIncrementalHFile needs more time // hbase.rpc.timeout 600000 // calculates Integer milliSecInHour = 3600000; - Configuration conf = new Configuration(getConf()); + Configuration conf = new Configuration(config); conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, milliSecInHour); // By default, it is 32 and loader will fail if # of files in any region exceed this diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index b1ed43c..909d8a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -138,7 +138,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { initialize(); } - private void initialize() throws Exception { + private void initialize() throws IOException { if (initalized) { return; } @@ -1098,7 +1098,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * If the table is created for the first time, then "completebulkload" reads the files twice. * More modifications necessary if we want to avoid doing it. */ - private void createTable(TableName tableName, String dirPath, Admin admin) throws Exception { + private void createTable(TableName tableName, String dirPath, Admin admin) throws IOException { final Path hfofDir = new Path(dirPath); final FileSystem fs = hfofDir.getFileSystem(getConf()); @@ -1151,7 +1151,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { LOG.info("Table "+ tableName +" is available!!"); } - public int run(String dirPath, Map> map, TableName tableName) throws Exception{ + public int run(String dirPath, Map> map, TableName tableName) throws IOException{ initialize(); try (Connection connection = ConnectionFactory.createConnection(getConf()); Admin admin = connection.getAdmin()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 36d2c53..d2f8b29 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -78,6 +78,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.YouAreDeadException; import org.apache.hadoop.hbase.ZNodeClearer; import org.apache.hadoop.hbase.backup.impl.BackupManager; +import org.apache.hadoop.hbase.backup.impl.BulkLoadHandler; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; @@ -255,6 +256,7 @@ public class HRegionServer extends HasThread implements * Go here to get table descriptors. */ protected TableDescriptors tableDescriptors; + protected BulkLoadHandler loader; // Replication services. If no replication, this handler will be null. protected ReplicationSourceService replicationSourceHandler; @@ -958,6 +960,7 @@ public class HRegionServer extends HasThread implements // We registered with the Master. Go into run mode. long lastMsg = System.currentTimeMillis(); long oldRequestCount = -1; + loader.init(); // The main run loop. while (!isStopped() && isHealthy()) { if (!isClusterUp()) { @@ -1664,6 +1667,8 @@ public class HRegionServer extends HasThread implements // listeners the wal factory will add to wals it creates. final List listeners = new ArrayList(); listeners.add(new MetricsWAL()); + loader = new BulkLoadHandler(getConnection()); + listeners.add(loader); if (this.replicationSourceHandler != null && this.replicationSourceHandler.getWALActionsListener() != null) { // Replication handler is an implementation of WALActionsListener. @@ -2186,6 +2191,7 @@ public class HRegionServer extends HasThread implements * have already been called. */ protected void stopServiceThreads() { + loader.close(); // clean up the scheduled chores if (this.choreService != null) choreService.shutdown(); if (this.nonceManagerChore != null) nonceManagerChore.cancel(true); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java index a235696..4a09fe9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java @@ -153,4 +153,20 @@ public class HFileArchiveUtil { private static Path getArchivePath(final Path rootdir) { return new Path(rootdir, HConstants.HFILE_ARCHIVE_DIRECTORY); } + + /* + * @return table name given archive file path + */ + public static TableName getTableName(Path archivePath) { + Path p = archivePath; + String tbl = null; + // namespace is the 4th parent of file + for (int i = 0; i < 5; i++) { + if (p == null) return null; + if (i == 3) tbl = p.getName(); + p = p.getParent(); + } + if (p == null) return null; + return TableName.valueOf(p.getName(), tbl); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java index 2da7871..7fd6eb7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java @@ -49,6 +49,9 @@ import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.mapreduce.HadoopSecurityEnabledUserProviderForTesting; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.security.access.SecureTestUtil; import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WALFactory; @@ -89,6 +92,7 @@ public class TestBackupBase { protected static String BACKUP_ROOT_DIR = "/backupUT"; protected static String BACKUP_REMOTE_ROOT_DIR = "/backupUT"; protected static String provider = "defaultProvider"; + protected static boolean secure = false; /** * @throws java.lang.Exception @@ -97,6 +101,13 @@ public class TestBackupBase { public static void setUpBeforeClass() throws Exception { TEST_UTIL = new HBaseTestingUtility(); conf1 = TEST_UTIL.getConfiguration(); + if (secure) { + // set the always on security provider + UserProvider.setUserProviderForTesting(TEST_UTIL.getConfiguration(), + HadoopSecurityEnabledUserProviderForTesting.class); + // setup configuration + SecureTestUtil.enableSecurity(TEST_UTIL.getConfiguration()); + } conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); // Set MultiWAL (with 2 default WAL files per RS) conf1.set(WALFactory.WAL_PROVIDER, provider); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupDeleteTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupDeleteTable.java index 0a73888..06624a7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupDeleteTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupDeleteTable.java @@ -20,11 +20,14 @@ package org.apache.hadoop.hbase.backup; import static org.junit.Assert.assertTrue; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; import org.apache.hadoop.hbase.backup.impl.HBaseBackupAdmin; import org.apache.hadoop.hbase.backup.util.RestoreServerUtil; import org.apache.hadoop.hbase.client.Connection; @@ -32,12 +35,15 @@ import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.mapreduce.TestLoadIncrementalHFiles; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; import org.hamcrest.CoreMatchers; import org.junit.Assert; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import com.google.common.collect.Lists; @@ -51,11 +57,25 @@ import com.google.common.collect.Lists; * 6 Incremental backup t1 */ @Category(LargeTests.class) +@RunWith(Parameterized.class) public class TestIncrementalBackupDeleteTable extends TestBackupBase { private static final Log LOG = LogFactory.getLog(TestIncrementalBackupDeleteTable.class); + + @Parameterized.Parameters + public static Collection data() { + secure = true; + List params = new ArrayList(); + params.add(new Object[] {Boolean.TRUE}); + return params; + } + + public TestIncrementalBackupDeleteTable(Boolean b) { + } + //implement all test cases in 1 test since incremental backup/restore has dependencies @Test public void TestIncBackupDeleteTable() throws Exception { + String testName = "TestIncBackupDeleteTable"; // #1 - create full backup for all tables LOG.info("create full backup image for all tables"); @@ -86,7 +106,14 @@ public class TestIncrementalBackupDeleteTable extends TestBackupBase { // Delete table table2 admin.disableTable(table2); admin.deleteTable(table2); - + + int NB_ROWS2 = 20; + int actual = TestLoadIncrementalHFiles.loadHFiles(testName, table1Desc, TEST_UTIL, famName, + qualName, false, null, new byte[][][] { + new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, + new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, + }, true, NB_ROWS_IN_BATCH*2, NB_ROWS2); + // #3 - incremental backup for table1 tables = Lists.newArrayList(table1); request = new BackupRequest(); @@ -95,32 +122,7 @@ public class TestIncrementalBackupDeleteTable extends TestBackupBase { String backupIdIncMultiple = client.backupTables(request); assertTrue(checkSucceeded(backupIdIncMultiple)); - // #4 - restore full backup for all tables, without overwrite - TableName[] tablesRestoreFull = - new TableName[] { table1, table2}; - - TableName[] tablesMapFull = - new TableName[] { table1_restore, table2_restore }; - - client.restore(RestoreServerUtil.createRestoreRequest(BACKUP_ROOT_DIR, backupIdFull, false, - tablesRestoreFull, - tablesMapFull, false)); - - // #5.1 - check tables for full restore HBaseAdmin hAdmin = TEST_UTIL.getHBaseAdmin(); - assertTrue(hAdmin.tableExists(table1_restore)); - assertTrue(hAdmin.tableExists(table2_restore)); - - - // #5.2 - checking row count of tables for full restore - HTable hTable = (HTable) conn.getTable(table1_restore); - Assert.assertThat(TEST_UTIL.countRows(hTable), CoreMatchers.equalTo(NB_ROWS_IN_BATCH)); - hTable.close(); - - hTable = (HTable) conn.getTable(table2_restore); - Assert.assertThat(TEST_UTIL.countRows(hTable), CoreMatchers.equalTo(NB_ROWS_IN_BATCH)); - hTable.close(); - // #6 - restore incremental backup for table1 TableName[] tablesRestoreIncMultiple = @@ -130,8 +132,22 @@ public class TestIncrementalBackupDeleteTable extends TestBackupBase { client.restore(RestoreServerUtil.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple, false, tablesRestoreIncMultiple, tablesMapIncMultiple, true)); - hTable = (HTable) conn.getTable(table1_restore); - Assert.assertThat(TEST_UTIL.countRows(hTable), CoreMatchers.equalTo(NB_ROWS_IN_BATCH * 2)); + HTable hTable = (HTable) conn.getTable(table1_restore); + Assert.assertThat(TEST_UTIL.countRows(hTable), CoreMatchers.equalTo(NB_ROWS_IN_BATCH * 2 + + actual)); + + /* + request.setBackupType(BackupType.FULL).setTableList(tables).setTargetRootDir(BACKUP_ROOT_DIR); + backupIdFull = client.backupTables(request); + try (final BackupSystemTable table = new BackupSystemTable(conn)) { + List lst = table.readBulkloadDescRows(tables); + for (byte[] row : lst) { + LOG.debug("Still have " + Bytes.toString(row)); + } + assertTrue(lst.isEmpty()); + } + assertTrue(checkSucceeded(backupIdFull)); */ + hTable.close(); admin.close(); conn.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java index 5678d0d..47315bc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java @@ -293,13 +293,13 @@ public class TestLoadIncrementalHFiles { runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges, useMap); } - private void runTest(String testName, HTableDescriptor htd, BloomType bloomType, - boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap) - throws Exception { + public static int loadHFiles(String testName, HTableDescriptor htd, HBaseTestingUtility util, + byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys, + byte[][][] hfileRanges, boolean useMap, int initRowCount, int factor) throws Exception { Path dir = util.getDataTestDirOnTestFS(testName); FileSystem fs = util.getTestFileSystem(); dir = dir.makeQualified(fs); - Path familyDir = new Path(dir, Bytes.toString(FAMILY)); + Path familyDir = new Path(dir, Bytes.toString(fam)); int hfileIdx = 0; Map> map = null; @@ -307,24 +307,25 @@ public class TestLoadIncrementalHFiles { if (useMap) { map = new TreeMap>(Bytes.BYTES_COMPARATOR); list = new ArrayList<>(); - map.put(FAMILY, list); + map.put(fam, list); } for (byte[][] range : hfileRanges) { byte[] from = range[0]; byte[] to = range[1]; Path path = new Path(familyDir, "hfile_" + hfileIdx++); - HFileTestUtil.createHFile(util.getConfiguration(), fs, path, FAMILY, QUALIFIER, from, to, 1000); + HFileTestUtil.createHFile(util.getConfiguration(), fs, path, fam, qual, from, to, + factor); if (useMap) { list.add(path); } } - int expectedRows = hfileIdx * 1000; + int expectedRows = hfileIdx * factor; - if (preCreateTable || map != null) { + final TableName tableName = htd.getTableName(); + if (!util.getHBaseAdmin().tableExists(tableName) && (preCreateTable || map != null)) { util.getHBaseAdmin().createTable(htd, tableSplitKeys); } - final TableName tableName = htd.getTableName(); LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()); String [] args= {dir.toString(), tableName.toString()}; if (useMap) { @@ -332,16 +333,26 @@ public class TestLoadIncrementalHFiles { } else { loader.run(args); } - Table table = util.getConnection().getTable(tableName); try { - assertEquals(expectedRows, util.countRows(table)); + assertEquals(initRowCount + expectedRows, util.countRows(table)); } finally { table.close(); } + return expectedRows; + } + + private void runTest(String testName, HTableDescriptor htd, BloomType bloomType, + boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap) + throws Exception { + loadHFiles(testName, htd, util, FAMILY, QUALIFIER, preCreateTable, tableSplitKeys, + hfileRanges, useMap, 0, 1000); + + final TableName tableName = htd.getTableName(); // verify staging folder has been cleaned up Path stagingBasePath = SecureBulkLoadUtil.getBaseStagingDir(util.getConfiguration()); + FileSystem fs = util.getTestFileSystem(); if(fs.exists(stagingBasePath)) { FileStatus[] files = fs.listStatus(stagingBasePath); for(FileStatus file : files) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupHFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupHFileCleaner.java new file mode 100644 index 0000000..673add5 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupHFileCleaner.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.impl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; +import org.apache.hadoop.hbase.util.HFileArchiveUtil; +import org.apache.hadoop.hbase.util.Pair; + +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; + +/** + * Implementation of a file cleaner that checks if an hfile is still referenced by backup before + * deleting it from hfile archive directory. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public class BackupHFileCleaner extends BaseHFileCleanerDelegate implements Abortable { + private static final Log LOG = LogFactory.getLog(BackupHFileCleaner.class); + private boolean stopped = false; + private boolean aborted; + private Configuration conf; + private Connection connection; + + Set getFilenameFromBulkLoadDescriptor( + List>> list) { + Set filenames = new HashSet(); + for (Pair> pair : list) { + TableName src = BackupSystemTableHelper.getTableNameFromBulkLoadRow(pair.getFirst()); + BulkLoadDescriptor bulkDesc = pair.getSecond().getSecond(); + for (WALProtos.StoreDescriptor desc : bulkDesc.getStoresList()) { + int cnt = desc.getStoreFileCount(); + for (int i = 0; i < cnt; i++) { + filenames.add(desc.getStoreFile(i)); + } + } + } + return filenames; + } + + private Set loadHFileRefs(List tableList) throws IOException { + final Set hfileRefs; + if (connection == null) { + connection = ConnectionFactory.createConnection(conf); + } + try (BackupSystemTable tbl = new BackupSystemTable(connection)) { + List>> res = + tbl.readBulkloadDesc(tableList, 0, Long.MAX_VALUE, true); + hfileRefs = getFilenameFromBulkLoadDescriptor(res); + } + return hfileRefs; + } + + @Override + public Iterable getDeletableFiles(Iterable files) { + if (this.getConf() == null) { + return files; + } + // obtain the Set of TableName's for the files + // so that we filter BulkLoadDescriptor's to be returned from server + Set tbls = new HashSet<>(); + for (FileStatus file : files) { + Path p = file.getPath(); + TableName tbl = HFileArchiveUtil.getTableName(p); + if (tbl != null) tbls.add(tbl); + } + List list = new ArrayList<>(tbls); + Collections.sort(list); + final Set hfileRefs; + try { + hfileRefs = loadHFileRefs(list); + } catch (IOException ioe) { + LOG.warn("Failed to read hfile references, skipping checking deletable files"); + return Collections.emptyList(); + } + return Iterables.filter(files, new Predicate() { + @Override + public boolean apply(FileStatus file) { + String hfile = file.getPath().getName(); + boolean foundHFileRef = hfileRefs.contains(hfile); + return !foundHFileRef; + } + }); + } + + @Override + public void setConf(Configuration config) { + this.conf = config; + this.connection = null; + try { + this.connection = ConnectionFactory.createConnection(conf); + } catch (IOException ioe) { + LOG.error("Couldn't establish connection", ioe); + } + } + + @Override + public void stop(String why) { + if (this.stopped) { + return; + } + this.stopped = true; + } + + @Override + public boolean isStopped() { + return this.stopped; + } + + @Override + public void abort(String why, Throwable e) { + LOG.warn("Aborting ReplicationHFileCleaner because " + why, e); + this.aborted = true; + stop(why); + } + + @Override + public boolean isAborted() { + return this.aborted; + } + + @Override + public boolean isFileDeletable(FileStatus fStat) { + // all members of this class are null if replication is disabled, + // so do not stop from deleting the file + if (getConf() == null) { + return true; + } + + final Set hfileRefs; + try { + hfileRefs = loadHFileRefs(null); + } catch (Exception e) { + LOG.warn("Failed to read hfile references, skipping checking deletable " + + "file for " + fStat.getPath()); + return false; + } + return !hfileRefs.contains(fStat.getPath().getName()); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BulkLoadHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BulkLoadHandler.java new file mode 100644 index 0000000..6de430f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BulkLoadHandler.java @@ -0,0 +1,200 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.impl; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupType; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.WALKey; + +/** + * Handles bulk load event. + */ +@InterfaceAudience.Private +public class BulkLoadHandler extends WALActionsListener.Base implements Closeable { + private static final Log LOG = LogFactory.getLog(BulkLoadHandler.class); + private BackupSystemTable sysTable; + private List> listDesc = new ArrayList<>(); + // thread which writes to hbase:backup table + private Thread thread; + private volatile boolean stopping = false; + private int sleepIntvl = 3000; + // tables which have gone through full backup + private Set trackedTables = null; + + /** + * Empty constructor + public BulkLoadHandler() { + } + */ + + public BulkLoadHandler(Connection conn) throws IOException { + sysTable = new BackupSystemTable(conn); + thread = new Thread(){ + @Override + public void run() { + handleBulkLoadDesc(); + } + }; + thread.setDaemon(true); + thread.start(); + } + + public void init() throws IOException { + try { + trackedTables = sysTable.getTablesForBackupType(BackupType.FULL); + LOG.debug("Retrieved " + trackedTables.size() + " tables"); + } catch (IOException ioe) { + LOG.debug("Got " + ioe.getMessage()); + } + } + + @Override + public void close() { + stopping = true; + sleepIntvl = 0; + LOG.debug("Draining " + listDesc.size() + " entries"); + long start = EnvironmentEdgeManager.currentTime(); + while (!listDesc.isEmpty()) { + try { + Thread.sleep(10); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + LOG.debug("Interrupted", ie); + break; + } + } + LOG.debug("Took " + (EnvironmentEdgeManager.currentTime()-start) + "ms to drain"); + } + + void handleBulkLoadDesc() { + List> currList = new ArrayList<>(); + TableName tabName = null; + while (true) { + currList.clear(); + Pair pair; + synchronized (listDesc) { + if (!listDesc.isEmpty()) { + pair = listDesc.remove(0); + currList.add(pair); + + BulkLoadDescriptor bld = pair.getFirst(); + tabName = ProtobufUtil.toTableName(bld.getTableName()); + // if (!trackedTables.contains(tabName)) continue; + + // retrieve consecutive Pairs for the same table + // we can group Pairs where gap between timestamps is within certain limit + while (!listDesc.isEmpty()) { + pair = listDesc.get(0); + if (!tabName.equals(ProtobufUtil.toTableName(pair.getFirst().getTableName()))) { + break; + } + pair = listDesc.remove(0); + currList.add(pair); + } + } + } + if (!currList.isEmpty()) { + try { + sysTable.writeBulkLoadDesc(tabName, EnvironmentEdgeManager.currentTime(), currList); + } catch (IOException ioe) { + LOG.warn("Unable to write descriptor for " + tabName, ioe); + synchronized (listDesc) { + // listDesc.add(0, pair); + } + } + } else if (stopping) break; + try { + if (sleepIntvl != 0) { + /* + synchronized (listDesc) { + listDesc.wait(100); + } */ + Thread.sleep(sleepIntvl); + } + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + LOG.debug("Interrupted", ie); + break; + } + } + } + + /* + * Returns an object to listen to new wal changes + **/ + public WALActionsListener getWALActionsListener() { + return this; + } + + @Override + public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException { + } + + @Override + public void postAppend(long entryLen, long elapsedTimeMillis, final WALKey logKey, + final WALEdit edit) throws IOException { + if (stopping) return; + TableName tableName = logKey.getTablename(); + if (tableName.equals(TableName.BACKUP_TABLE_NAME)) return; + for (Cell c : edit.getCells()) { + // Only check for bulk load events + if (CellUtil.matchingQualifier(c, WALEdit.BULK_LOAD)) { + BulkLoadDescriptor bld = null; + try { + bld = WALEdit.getBulkLoadDescriptor(c); + } catch (IOException e) { + LOG.error("Failed to get bulk load event information from the wal file", e); + throw e; + } + + synchronized (listDesc) { + listDesc.add(new Pair<>(bld, EnvironmentEdgeManager.currentTime())); + // listDesc.notify(); + } + } + } + } + + @Override + public void preLogRoll(Path oldPath, Path newPath) throws IOException { + } + + @Override + public void postLogRoll(Path oldPath, Path newPath) throws IOException { + } +}