diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java index 3066282..55ed562 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.backup.impl; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -54,6 +55,10 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.BackupProtos; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; /** * This class provides 'hbase:backup' table API @@ -97,6 +102,7 @@ public final class BackupSystemTable implements Closeable { final static byte[] SESSIONS_FAMILY = "session".getBytes(); // Stores other meta final static byte[] META_FAMILY = "meta".getBytes(); + final static byte[] BULK_LOAD_FAMILY = "bulk".getBytes(); // Connection to HBase cluster, shared // among all instances private final Connection connection; @@ -207,6 +213,106 @@ public final class BackupSystemTable implements Closeable { } } + /* + * Write bulk load descriptor to hbase:backup. + * @param tabName name of table bulk load is performed against + * @param ts timestamp + * @param list List of Pairs of BulkLoadDescriptor and corresponding timestamp + * @throws IOException exception + */ + public void writeBulkLoadDesc(TableName tabName, long ts, + List> list) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("write bulk load descriptor to hbase:backup " + tabName + " with " + list.size() + + " entries(" + list.iterator().next().getSecond() + ") tagged " + ts); + } + try (Table table = connection.getTable(tableName)) { + List puts = BackupSystemTableHelper.createPutForBulkloadDesc(tabName, ts, list); + table.put(puts); + } + } + + /** + * Get the BulkLoadDescriptor information between time lower and time upper from hbase:backup. + * @param tTable tablename which received the bulk load + * @param lower the earliest timestamp after which bulk load would be retrieved + * @param upper the latest timestamp before which bulk load would be retrieved + * @param parseDesc true if BulkLoadDescriptor should be parsed + * @return List of Pairs of row key String and Pairs of timestamp and the corresponding + * BulkLoadDescriptor + */ + public List>> readBulkloadDesc( + List tableList, long lower, long upper, boolean parseDesc) throws IOException { + int numTables = tableList.size(); + Collections.sort(tableList); + TableName tTable = tableList.get(0); + Scan scan = BackupSystemTableHelper.createScanForBulkload(tTable, tableList.get(numTables-1), + lower, upper); + List>> descriptors = new ArrayList<>(); + try (Table table = connection.getTable(tableName); + ResultScanner scanner = table.getScanner(scan)) { + Result res = null; + while ((res = scanner.next()) != null) { + res.advance(); + Cell cell = res.current(); + byte[] row = CellUtil.cloneRow(cell); + String rowStr = Bytes.toString(row); + TableName tbl = BackupSystemTableHelper.getTableNameFromBulkLoadRow(rowStr); + if (!tableList.contains(tbl)) continue; + int idx = rowStr.lastIndexOf(BackupSystemTableHelper.BLK_LD_DELIM); + long ts = Long.parseLong(rowStr.substring(idx+1)); + byte[] qual = CellUtil.cloneQualifier(cell); + byte[] data = CellUtil.cloneValue(cell); + ts = Bytes.toLong(qual); + // ignore bulk load which was earlier than the lower timestamp or later than upper + if (ts < lower || upper < ts) { + LOG.debug("just ignoring " + rowStr + "(" + ts + ")" + lower + "-" + "upper"); + continue; + } + descriptors.add(new Pair<>(rowStr, new Pair<>(Bytes.toLong(qual), + parseDesc ? BulkLoadDescriptor.parseFrom(data) : null))); + } + } + return descriptors; + } + + /* + * Get the row keys from hbase:backup for the tables in tableList + */ + public List readBulkloadDescRows(List tableList) throws IOException { + int numTables = tableList.size(); + Collections.sort(tableList); + TableName tTable = tableList.get(0); + Scan scan = BackupSystemTableHelper.createScanForBulkload(tTable, tableList.get(numTables-1), + 0, Long.MAX_VALUE); + List lst = new ArrayList<>(); + try (Table table = connection.getTable(tableName); + ResultScanner scanner = table.getScanner(scan)) { + Result res = null; + while ((res = scanner.next()) != null) { + res.advance(); + Cell cell = res.current(); + byte[] row = CellUtil.cloneRow(cell); + String rowStr = Bytes.toString(row); + TableName tbl = BackupSystemTableHelper.getTableNameFromBulkLoadRow(rowStr); + if (!tableList.contains(tbl)) continue; + lst.add(row); + } + } + return lst; + } + + /* + * Deletes rows for BulkLoadDescriptor's + */ + public void deleteForBulkLoad(List lst) + throws IOException { + try (Table table = connection.getTable(tableName)) { + List lstDels = BackupSystemTableHelper.createDeleteForBulkLoad(lst); + table.delete(lstDels); + } + } + /** * Get the Region Servers log information after the last log roll from hbase:backup. * @param backupRoot root directory path to backup @@ -860,6 +966,8 @@ public final class BackupSystemTable implements Closeable { HColumnDescriptor colMetaDesc = new HColumnDescriptor(META_FAMILY); // colDesc.setMaxVersions(1); tableDesc.addFamily(colMetaDesc); + HColumnDescriptor colBulkLoadDesc = new HColumnDescriptor(BULK_LOAD_FAMILY); + tableDesc.addFamily(colBulkLoadDesc); return tableDesc; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTableHelper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTableHelper.java index 37f29f8..ac9c95a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTableHelper.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTableHelper.java @@ -36,7 +36,9 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; /** @@ -63,9 +65,13 @@ public final class BackupSystemTableHelper { private final static String INCR_BACKUP_SET = "incrbackupset:"; private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:"; private final static String RS_LOG_TS_PREFIX = "rslogts:"; + private final static String BULK_LOAD_DESC_PREFIX = "bulk:"; private final static String WALS_PREFIX = "wals:"; private final static String SET_KEY_PREFIX = "backupset:"; + protected final static byte[] BLK_LD_DESC_COL = "desc".getBytes(); + protected final static String BLK_LD_DELIM = "-"; + private final static byte[] EMPTY_VALUE = new byte[] {}; // Safe delimiter in a string @@ -265,6 +271,62 @@ public final class BackupSystemTableHelper { return put; } + /* + * Creates Put's to store BulkLoadDescriptor's for bulk load + * Column name is timestamp when BulkLoadDescriptor is loaded + */ + static List createPutForBulkloadDesc(TableName table, long ts, + List> list) { + List puts = new ArrayList<>(); + for (Pair pair : list) { + Put put = new Put(rowkey(BULK_LOAD_DESC_PREFIX, table.toString(), BLK_LD_DELIM, + Long.toString(ts))); + put.addColumn(BackupSystemTable.BULK_LOAD_FAMILY, Bytes.toBytes(pair.getSecond()), + pair.getFirst().toByteArray()); + puts.add(put); + } + return puts; + } + + public static TableName getTableNameFromBulkLoadRow(String rowStr) { + int start = BULK_LOAD_DESC_PREFIX.length(); + int end = rowStr.lastIndexOf(BLK_LD_DELIM); + return TableName.valueOf(rowStr.substring(start, end)); + } + + /* + * Since there may be more than one bulk load since the previous backup, we need Scan. + * @param table for which bulk load is performed + * @param lastTable the last table for which bulk load is performed. + * @param lower the earliest timestamp after which bulk load would be retrieved + * @param upper the latest timestamp before which bulk load would be retrieved + * @return Scan object which specifies all the conditions + */ + static Scan createScanForBulkload(TableName table, TableName lastTable, long lower, long upper) + throws IOException { + Scan scan = new Scan(); + String tbl = table.toString(); + byte[] startRow = rowkey(BULK_LOAD_DESC_PREFIX, tbl); + scan.setStartRow(startRow); + byte[] stopRow = rowkey(BULK_LOAD_DESC_PREFIX, lastTable == null ? tbl : lastTable.toString()); + stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); + scan.setStopRow(stopRow); + + scan.setTimeRange(lower, Long.MAX_VALUE); + scan.addFamily(BackupSystemTable.BULK_LOAD_FAMILY); + scan.setMaxVersions(1); + return scan; + } + + public static List createDeleteForBulkLoad(List lst) { + List lstDels = new ArrayList<>(); + for (byte[] row : lst) { + Delete del = new Delete(row); + del.addFamily(BackupSystemTable.BULK_LOAD_FAMILY); + lstDels.add(del); + } + return lstDels; + } /** * Creates Scan operation to load last RS log roll results * @return scan operation diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/MasterProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/MasterProtos.java index f276b7a..494038d 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/MasterProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/MasterProtos.java @@ -103,6 +103,10 @@ public final class MasterProtos { * RESTORE_IMAGES = 2; */ RESTORE_IMAGES(1, 2), + /** + * HANDLE_BULK_LOAD = 3; + */ + HANDLE_BULK_LOAD(2, 3), ; /** @@ -113,6 +117,10 @@ public final class MasterProtos { * RESTORE_IMAGES = 2; */ public static final int RESTORE_IMAGES_VALUE = 2; + /** + * HANDLE_BULK_LOAD = 3; + */ + public static final int HANDLE_BULK_LOAD_VALUE = 3; public final int getNumber() { return value; } @@ -121,6 +129,7 @@ public final class MasterProtos { switch (value) { case 1: return VALIDATION; case 2: return RESTORE_IMAGES; + case 3: return HANDLE_BULK_LOAD; default: return null; } } @@ -61382,6 +61391,21 @@ public final class MasterProtos { * optional uint64 nonce = 8 [default = 0]; */ long getNonce(); + + // optional string full_backup_id = 9; + /** + * optional string full_backup_id = 9; + */ + boolean hasFullBackupId(); + /** + * optional string full_backup_id = 9; + */ + java.lang.String getFullBackupId(); + /** + * optional string full_backup_id = 9; + */ + com.google.protobuf.ByteString + getFullBackupIdBytes(); } /** * Protobuf type {@code hbase.pb.RestoreTablesRequest} @@ -61480,6 +61504,11 @@ public final class MasterProtos { nonce_ = input.readUInt64(); break; } + case 74: { + bitField0_ |= 0x00000040; + fullBackupId_ = input.readBytes(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -61748,6 +61777,49 @@ public final class MasterProtos { return nonce_; } + // optional string full_backup_id = 9; + public static final int FULL_BACKUP_ID_FIELD_NUMBER = 9; + private java.lang.Object fullBackupId_; + /** + * optional string full_backup_id = 9; + */ + public boolean hasFullBackupId() { + return ((bitField0_ & 0x00000040) == 0x00000040); + } + /** + * optional string full_backup_id = 9; + */ + public java.lang.String getFullBackupId() { + java.lang.Object ref = fullBackupId_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + fullBackupId_ = s; + } + return s; + } + } + /** + * optional string full_backup_id = 9; + */ + public com.google.protobuf.ByteString + getFullBackupIdBytes() { + java.lang.Object ref = fullBackupId_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + fullBackupId_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + private void initFields() { backupId_ = ""; tables_ = java.util.Collections.emptyList(); @@ -61757,6 +61829,7 @@ public final class MasterProtos { overwrite_ = false; nonceGroup_ = 0L; nonce_ = 0L; + fullBackupId_ = ""; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -61814,6 +61887,9 @@ public final class MasterProtos { if (((bitField0_ & 0x00000020) == 0x00000020)) { output.writeUInt64(8, nonce_); } + if (((bitField0_ & 0x00000040) == 0x00000040)) { + output.writeBytes(9, getFullBackupIdBytes()); + } getUnknownFields().writeTo(output); } @@ -61855,6 +61931,10 @@ public final class MasterProtos { size += com.google.protobuf.CodedOutputStream .computeUInt64Size(8, nonce_); } + if (((bitField0_ & 0x00000040) == 0x00000040)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(9, getFullBackupIdBytes()); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -61912,6 +61992,11 @@ public final class MasterProtos { result = result && (getNonce() == other.getNonce()); } + result = result && (hasFullBackupId() == other.hasFullBackupId()); + if (hasFullBackupId()) { + result = result && getFullBackupId() + .equals(other.getFullBackupId()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -61957,6 +62042,10 @@ public final class MasterProtos { hash = (37 * hash) + NONCE_FIELD_NUMBER; hash = (53 * hash) + hashLong(getNonce()); } + if (hasFullBackupId()) { + hash = (37 * hash) + FULL_BACKUP_ID_FIELD_NUMBER; + hash = (53 * hash) + getFullBackupId().hashCode(); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -62092,6 +62181,8 @@ public final class MasterProtos { bitField0_ = (bitField0_ & ~0x00000040); nonce_ = 0L; bitField0_ = (bitField0_ & ~0x00000080); + fullBackupId_ = ""; + bitField0_ = (bitField0_ & ~0x00000100); return this; } @@ -62162,6 +62253,10 @@ public final class MasterProtos { to_bitField0_ |= 0x00000020; } result.nonce_ = nonce_; + if (((from_bitField0_ & 0x00000100) == 0x00000100)) { + to_bitField0_ |= 0x00000040; + } + result.fullBackupId_ = fullBackupId_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -62252,6 +62347,11 @@ public final class MasterProtos { if (other.hasNonce()) { setNonce(other.getNonce()); } + if (other.hasFullBackupId()) { + bitField0_ |= 0x00000100; + fullBackupId_ = other.fullBackupId_; + onChanged(); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -63059,6 +63159,80 @@ public final class MasterProtos { return this; } + // optional string full_backup_id = 9; + private java.lang.Object fullBackupId_ = ""; + /** + * optional string full_backup_id = 9; + */ + public boolean hasFullBackupId() { + return ((bitField0_ & 0x00000100) == 0x00000100); + } + /** + * optional string full_backup_id = 9; + */ + public java.lang.String getFullBackupId() { + java.lang.Object ref = fullBackupId_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((com.google.protobuf.ByteString) ref) + .toStringUtf8(); + fullBackupId_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * optional string full_backup_id = 9; + */ + public com.google.protobuf.ByteString + getFullBackupIdBytes() { + java.lang.Object ref = fullBackupId_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + fullBackupId_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * optional string full_backup_id = 9; + */ + public Builder setFullBackupId( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000100; + fullBackupId_ = value; + onChanged(); + return this; + } + /** + * optional string full_backup_id = 9; + */ + public Builder clearFullBackupId() { + bitField0_ = (bitField0_ & ~0x00000100); + fullBackupId_ = getDefaultInstance().getFullBackupId(); + onChanged(); + return this; + } + /** + * optional string full_backup_id = 9; + */ + public Builder setFullBackupIdBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000100; + fullBackupId_ = value; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:hbase.pb.RestoreTablesRequest) } @@ -69156,152 +69330,153 @@ public final class MasterProtos { "ers\030\004 \001(\003\022\021\n\tbandwidth\030\005 \001(\003\022\026\n\013nonce_gr" + "oup\030\006 \001(\004:\0010\022\020\n\005nonce\030\007 \001(\004:\0010\":\n\024Backup" + "TablesResponse\022\017\n\007proc_id\030\001 \001(\004\022\021\n\tbacku" + - "p_id\030\002 \001(\t\"\357\001\n\024RestoreTablesRequest\022\021\n\tb" + + "p_id\030\002 \001(\t\"\207\002\n\024RestoreTablesRequest\022\021\n\tb" + "ackup_id\030\001 \002(\t\022#\n\006tables\030\002 \003(\0132\023.hbase.p", "b.TableName\022*\n\rtarget_tables\030\003 \003(\0132\023.hba" + "se.pb.TableName\022\027\n\017backup_root_dir\030\004 \002(\t" + "\022\035\n\025dependency_check_only\030\005 \001(\010\022\021\n\toverw" + "rite\030\006 \001(\010\022\026\n\013nonce_group\030\007 \001(\004:\0010\022\020\n\005no" + - "nce\030\010 \001(\004:\0010\"(\n\025RestoreTablesResponse\022\017\n" + - "\007proc_id\030\001 \001(\004*(\n\020MasterSwitchType\022\t\n\005SP" + - "LIT\020\000\022\t\n\005MERGE\020\001*8\n\022RestoreTablesState\022\016" + - "\n\nVALIDATION\020\001\022\022\n\016RESTORE_IMAGES\020\0022\364)\n\rM" + - "asterService\022e\n\024GetSchemaAlterStatus\022%.h" + - "base.pb.GetSchemaAlterStatusRequest\032&.hb", - "ase.pb.GetSchemaAlterStatusResponse\022b\n\023G" + - "etTableDescriptors\022$.hbase.pb.GetTableDe" + - "scriptorsRequest\032%.hbase.pb.GetTableDesc" + - "riptorsResponse\022P\n\rGetTableNames\022\036.hbase" + - ".pb.GetTableNamesRequest\032\037.hbase.pb.GetT" + - "ableNamesResponse\022Y\n\020GetClusterStatus\022!." + - "hbase.pb.GetClusterStatusRequest\032\".hbase" + - ".pb.GetClusterStatusResponse\022V\n\017IsMaster" + - "Running\022 .hbase.pb.IsMasterRunningReques" + - "t\032!.hbase.pb.IsMasterRunningResponse\022D\n\t", - "AddColumn\022\032.hbase.pb.AddColumnRequest\032\033." + - "hbase.pb.AddColumnResponse\022M\n\014DeleteColu" + - "mn\022\035.hbase.pb.DeleteColumnRequest\032\036.hbas" + - "e.pb.DeleteColumnResponse\022M\n\014ModifyColum" + - "n\022\035.hbase.pb.ModifyColumnRequest\032\036.hbase" + - ".pb.ModifyColumnResponse\022G\n\nMoveRegion\022\033" + - ".hbase.pb.MoveRegionRequest\032\034.hbase.pb.M" + - "oveRegionResponse\022k\n\026DispatchMergingRegi" + - "ons\022\'.hbase.pb.DispatchMergingRegionsReq" + - "uest\032(.hbase.pb.DispatchMergingRegionsRe", - "sponse\022M\n\014AssignRegion\022\035.hbase.pb.Assign" + - "RegionRequest\032\036.hbase.pb.AssignRegionRes" + - "ponse\022S\n\016UnassignRegion\022\037.hbase.pb.Unass" + - "ignRegionRequest\032 .hbase.pb.UnassignRegi" + - "onResponse\022P\n\rOfflineRegion\022\036.hbase.pb.O" + - "fflineRegionRequest\032\037.hbase.pb.OfflineRe" + - "gionResponse\022J\n\013DeleteTable\022\034.hbase.pb.D" + - "eleteTableRequest\032\035.hbase.pb.DeleteTable" + - "Response\022P\n\rtruncateTable\022\036.hbase.pb.Tru" + - "ncateTableRequest\032\037.hbase.pb.TruncateTab", - "leResponse\022J\n\013EnableTable\022\034.hbase.pb.Ena" + - "bleTableRequest\032\035.hbase.pb.EnableTableRe" + - "sponse\022M\n\014DisableTable\022\035.hbase.pb.Disabl" + - "eTableRequest\032\036.hbase.pb.DisableTableRes" + - "ponse\022J\n\013ModifyTable\022\034.hbase.pb.ModifyTa" + - "bleRequest\032\035.hbase.pb.ModifyTableRespons" + - "e\022J\n\013CreateTable\022\034.hbase.pb.CreateTableR" + - "equest\032\035.hbase.pb.CreateTableResponse\022A\n" + - "\010Shutdown\022\031.hbase.pb.ShutdownRequest\032\032.h" + - "base.pb.ShutdownResponse\022G\n\nStopMaster\022\033", - ".hbase.pb.StopMasterRequest\032\034.hbase.pb.S" + - "topMasterResponse\022>\n\007Balance\022\030.hbase.pb." + - "BalanceRequest\032\031.hbase.pb.BalanceRespons" + - "e\022_\n\022SetBalancerRunning\022#.hbase.pb.SetBa" + - "lancerRunningRequest\032$.hbase.pb.SetBalan" + - "cerRunningResponse\022\\\n\021IsBalancerEnabled\022" + - "\".hbase.pb.IsBalancerEnabledRequest\032#.hb" + - "ase.pb.IsBalancerEnabledResponse\022k\n\026SetS" + - "plitOrMergeEnabled\022\'.hbase.pb.SetSplitOr" + - "MergeEnabledRequest\032(.hbase.pb.SetSplitO", - "rMergeEnabledResponse\022h\n\025IsSplitOrMergeE" + - "nabled\022&.hbase.pb.IsSplitOrMergeEnabledR" + - "equest\032\'.hbase.pb.IsSplitOrMergeEnabledR" + - "esponse\022D\n\tNormalize\022\032.hbase.pb.Normaliz" + - "eRequest\032\033.hbase.pb.NormalizeResponse\022e\n" + - "\024SetNormalizerRunning\022%.hbase.pb.SetNorm" + - "alizerRunningRequest\032&.hbase.pb.SetNorma" + - "lizerRunningResponse\022b\n\023IsNormalizerEnab" + - "led\022$.hbase.pb.IsNormalizerEnabledReques" + - "t\032%.hbase.pb.IsNormalizerEnabledResponse", - "\022S\n\016RunCatalogScan\022\037.hbase.pb.RunCatalog" + - "ScanRequest\032 .hbase.pb.RunCatalogScanRes" + - "ponse\022e\n\024EnableCatalogJanitor\022%.hbase.pb" + - ".EnableCatalogJanitorRequest\032&.hbase.pb." + - "EnableCatalogJanitorResponse\022n\n\027IsCatalo" + - "gJanitorEnabled\022(.hbase.pb.IsCatalogJani" + - "torEnabledRequest\032).hbase.pb.IsCatalogJa" + - "nitorEnabledResponse\022^\n\021ExecMasterServic" + - "e\022#.hbase.pb.CoprocessorServiceRequest\032$" + - ".hbase.pb.CoprocessorServiceResponse\022A\n\010", - "Snapshot\022\031.hbase.pb.SnapshotRequest\032\032.hb" + - "ase.pb.SnapshotResponse\022h\n\025GetCompletedS" + - "napshots\022&.hbase.pb.GetCompletedSnapshot" + - "sRequest\032\'.hbase.pb.GetCompletedSnapshot" + - "sResponse\022S\n\016DeleteSnapshot\022\037.hbase.pb.D" + - "eleteSnapshotRequest\032 .hbase.pb.DeleteSn" + - "apshotResponse\022S\n\016IsSnapshotDone\022\037.hbase" + - ".pb.IsSnapshotDoneRequest\032 .hbase.pb.IsS" + - "napshotDoneResponse\022V\n\017RestoreSnapshot\022 " + - ".hbase.pb.RestoreSnapshotRequest\032!.hbase", - ".pb.RestoreSnapshotResponse\022h\n\025IsRestore" + - "SnapshotDone\022&.hbase.pb.IsRestoreSnapsho" + - "tDoneRequest\032\'.hbase.pb.IsRestoreSnapsho" + - "tDoneResponse\022P\n\rExecProcedure\022\036.hbase.p" + - "b.ExecProcedureRequest\032\037.hbase.pb.ExecPr" + - "ocedureResponse\022W\n\024ExecProcedureWithRet\022" + - "\036.hbase.pb.ExecProcedureRequest\032\037.hbase." + - "pb.ExecProcedureResponse\022V\n\017IsProcedureD" + - "one\022 .hbase.pb.IsProcedureDoneRequest\032!." + - "hbase.pb.IsProcedureDoneResponse\022V\n\017Modi", - "fyNamespace\022 .hbase.pb.ModifyNamespaceRe" + - "quest\032!.hbase.pb.ModifyNamespaceResponse" + - "\022V\n\017CreateNamespace\022 .hbase.pb.CreateNam" + - "espaceRequest\032!.hbase.pb.CreateNamespace" + - "Response\022V\n\017DeleteNamespace\022 .hbase.pb.D" + - "eleteNamespaceRequest\032!.hbase.pb.DeleteN" + - "amespaceResponse\022k\n\026GetNamespaceDescript" + - "or\022\'.hbase.pb.GetNamespaceDescriptorRequ" + - "est\032(.hbase.pb.GetNamespaceDescriptorRes" + - "ponse\022q\n\030ListNamespaceDescriptors\022).hbas", - "e.pb.ListNamespaceDescriptorsRequest\032*.h" + - "base.pb.ListNamespaceDescriptorsResponse" + - "\022\206\001\n\037ListTableDescriptorsByNamespace\0220.h" + - "base.pb.ListTableDescriptorsByNamespaceR" + - "equest\0321.hbase.pb.ListTableDescriptorsBy" + - "NamespaceResponse\022t\n\031ListTableNamesByNam" + - "espace\022*.hbase.pb.ListTableNamesByNamesp" + - "aceRequest\032+.hbase.pb.ListTableNamesByNa" + - "mespaceResponse\022P\n\rGetTableState\022\036.hbase" + - ".pb.GetTableStateRequest\032\037.hbase.pb.GetT", - "ableStateResponse\022A\n\010SetQuota\022\031.hbase.pb" + - ".SetQuotaRequest\032\032.hbase.pb.SetQuotaResp" + - "onse\022x\n\037getLastMajorCompactionTimestamp\022" + - ").hbase.pb.MajorCompactionTimestampReque" + - "st\032*.hbase.pb.MajorCompactionTimestampRe" + - "sponse\022\212\001\n(getLastMajorCompactionTimesta" + - "mpForRegion\0222.hbase.pb.MajorCompactionTi" + - "mestampForRegionRequest\032*.hbase.pb.Major" + - "CompactionTimestampResponse\022_\n\022getProced" + - "ureResult\022#.hbase.pb.GetProcedureResultR", - "equest\032$.hbase.pb.GetProcedureResultResp" + - "onse\022h\n\027getSecurityCapabilities\022%.hbase." + - "pb.SecurityCapabilitiesRequest\032&.hbase.p" + - "b.SecurityCapabilitiesResponse\022S\n\016AbortP" + - "rocedure\022\037.hbase.pb.AbortProcedureReques" + - "t\032 .hbase.pb.AbortProcedureResponse\022S\n\016L" + - "istProcedures\022\037.hbase.pb.ListProceduresR" + - "equest\032 .hbase.pb.ListProceduresResponse" + - "\022M\n\014backupTables\022\035.hbase.pb.BackupTables" + - "Request\032\036.hbase.pb.BackupTablesResponse\022", - "P\n\rrestoreTables\022\036.hbase.pb.RestoreTable" + - "sRequest\032\037.hbase.pb.RestoreTablesRespons" + - "eBB\n*org.apache.hadoop.hbase.protobuf.ge" + - "neratedB\014MasterProtosH\001\210\001\001\240\001\001" + "nce\030\010 \001(\004:\0010\022\026\n\016full_backup_id\030\t \001(\t\"(\n\025" + + "RestoreTablesResponse\022\017\n\007proc_id\030\001 \001(\004*(" + + "\n\020MasterSwitchType\022\t\n\005SPLIT\020\000\022\t\n\005MERGE\020\001" + + "*N\n\022RestoreTablesState\022\016\n\nVALIDATION\020\001\022\022" + + "\n\016RESTORE_IMAGES\020\002\022\024\n\020HANDLE_BULK_LOAD\020\003" + + "2\364)\n\rMasterService\022e\n\024GetSchemaAlterStat", + "us\022%.hbase.pb.GetSchemaAlterStatusReques" + + "t\032&.hbase.pb.GetSchemaAlterStatusRespons" + + "e\022b\n\023GetTableDescriptors\022$.hbase.pb.GetT" + + "ableDescriptorsRequest\032%.hbase.pb.GetTab" + + "leDescriptorsResponse\022P\n\rGetTableNames\022\036" + + ".hbase.pb.GetTableNamesRequest\032\037.hbase.p" + + "b.GetTableNamesResponse\022Y\n\020GetClusterSta" + + "tus\022!.hbase.pb.GetClusterStatusRequest\032\"" + + ".hbase.pb.GetClusterStatusResponse\022V\n\017Is" + + "MasterRunning\022 .hbase.pb.IsMasterRunning", + "Request\032!.hbase.pb.IsMasterRunningRespon" + + "se\022D\n\tAddColumn\022\032.hbase.pb.AddColumnRequ" + + "est\032\033.hbase.pb.AddColumnResponse\022M\n\014Dele" + + "teColumn\022\035.hbase.pb.DeleteColumnRequest\032" + + "\036.hbase.pb.DeleteColumnResponse\022M\n\014Modif" + + "yColumn\022\035.hbase.pb.ModifyColumnRequest\032\036" + + ".hbase.pb.ModifyColumnResponse\022G\n\nMoveRe" + + "gion\022\033.hbase.pb.MoveRegionRequest\032\034.hbas" + + "e.pb.MoveRegionResponse\022k\n\026DispatchMergi" + + "ngRegions\022\'.hbase.pb.DispatchMergingRegi", + "onsRequest\032(.hbase.pb.DispatchMergingReg" + + "ionsResponse\022M\n\014AssignRegion\022\035.hbase.pb." + + "AssignRegionRequest\032\036.hbase.pb.AssignReg" + + "ionResponse\022S\n\016UnassignRegion\022\037.hbase.pb" + + ".UnassignRegionRequest\032 .hbase.pb.Unassi" + + "gnRegionResponse\022P\n\rOfflineRegion\022\036.hbas" + + "e.pb.OfflineRegionRequest\032\037.hbase.pb.Off" + + "lineRegionResponse\022J\n\013DeleteTable\022\034.hbas" + + "e.pb.DeleteTableRequest\032\035.hbase.pb.Delet" + + "eTableResponse\022P\n\rtruncateTable\022\036.hbase.", + "pb.TruncateTableRequest\032\037.hbase.pb.Trunc" + + "ateTableResponse\022J\n\013EnableTable\022\034.hbase." + + "pb.EnableTableRequest\032\035.hbase.pb.EnableT" + + "ableResponse\022M\n\014DisableTable\022\035.hbase.pb." + + "DisableTableRequest\032\036.hbase.pb.DisableTa" + + "bleResponse\022J\n\013ModifyTable\022\034.hbase.pb.Mo" + + "difyTableRequest\032\035.hbase.pb.ModifyTableR" + + "esponse\022J\n\013CreateTable\022\034.hbase.pb.Create" + + "TableRequest\032\035.hbase.pb.CreateTableRespo" + + "nse\022A\n\010Shutdown\022\031.hbase.pb.ShutdownReque", + "st\032\032.hbase.pb.ShutdownResponse\022G\n\nStopMa" + + "ster\022\033.hbase.pb.StopMasterRequest\032\034.hbas" + + "e.pb.StopMasterResponse\022>\n\007Balance\022\030.hba" + + "se.pb.BalanceRequest\032\031.hbase.pb.BalanceR" + + "esponse\022_\n\022SetBalancerRunning\022#.hbase.pb" + + ".SetBalancerRunningRequest\032$.hbase.pb.Se" + + "tBalancerRunningResponse\022\\\n\021IsBalancerEn" + + "abled\022\".hbase.pb.IsBalancerEnabledReques" + + "t\032#.hbase.pb.IsBalancerEnabledResponse\022k" + + "\n\026SetSplitOrMergeEnabled\022\'.hbase.pb.SetS", + "plitOrMergeEnabledRequest\032(.hbase.pb.Set" + + "SplitOrMergeEnabledResponse\022h\n\025IsSplitOr" + + "MergeEnabled\022&.hbase.pb.IsSplitOrMergeEn" + + "abledRequest\032\'.hbase.pb.IsSplitOrMergeEn" + + "abledResponse\022D\n\tNormalize\022\032.hbase.pb.No" + + "rmalizeRequest\032\033.hbase.pb.NormalizeRespo" + + "nse\022e\n\024SetNormalizerRunning\022%.hbase.pb.S" + + "etNormalizerRunningRequest\032&.hbase.pb.Se" + + "tNormalizerRunningResponse\022b\n\023IsNormaliz" + + "erEnabled\022$.hbase.pb.IsNormalizerEnabled", + "Request\032%.hbase.pb.IsNormalizerEnabledRe" + + "sponse\022S\n\016RunCatalogScan\022\037.hbase.pb.RunC" + + "atalogScanRequest\032 .hbase.pb.RunCatalogS" + + "canResponse\022e\n\024EnableCatalogJanitor\022%.hb" + + "ase.pb.EnableCatalogJanitorRequest\032&.hba" + + "se.pb.EnableCatalogJanitorResponse\022n\n\027Is" + + "CatalogJanitorEnabled\022(.hbase.pb.IsCatal" + + "ogJanitorEnabledRequest\032).hbase.pb.IsCat" + + "alogJanitorEnabledResponse\022^\n\021ExecMaster" + + "Service\022#.hbase.pb.CoprocessorServiceReq", + "uest\032$.hbase.pb.CoprocessorServiceRespon" + + "se\022A\n\010Snapshot\022\031.hbase.pb.SnapshotReques" + + "t\032\032.hbase.pb.SnapshotResponse\022h\n\025GetComp" + + "letedSnapshots\022&.hbase.pb.GetCompletedSn" + + "apshotsRequest\032\'.hbase.pb.GetCompletedSn" + + "apshotsResponse\022S\n\016DeleteSnapshot\022\037.hbas" + + "e.pb.DeleteSnapshotRequest\032 .hbase.pb.De" + + "leteSnapshotResponse\022S\n\016IsSnapshotDone\022\037" + + ".hbase.pb.IsSnapshotDoneRequest\032 .hbase." + + "pb.IsSnapshotDoneResponse\022V\n\017RestoreSnap", + "shot\022 .hbase.pb.RestoreSnapshotRequest\032!" + + ".hbase.pb.RestoreSnapshotResponse\022h\n\025IsR" + + "estoreSnapshotDone\022&.hbase.pb.IsRestoreS" + + "napshotDoneRequest\032\'.hbase.pb.IsRestoreS" + + "napshotDoneResponse\022P\n\rExecProcedure\022\036.h" + + "base.pb.ExecProcedureRequest\032\037.hbase.pb." + + "ExecProcedureResponse\022W\n\024ExecProcedureWi" + + "thRet\022\036.hbase.pb.ExecProcedureRequest\032\037." + + "hbase.pb.ExecProcedureResponse\022V\n\017IsProc" + + "edureDone\022 .hbase.pb.IsProcedureDoneRequ", + "est\032!.hbase.pb.IsProcedureDoneResponse\022V" + + "\n\017ModifyNamespace\022 .hbase.pb.ModifyNames" + + "paceRequest\032!.hbase.pb.ModifyNamespaceRe" + + "sponse\022V\n\017CreateNamespace\022 .hbase.pb.Cre" + + "ateNamespaceRequest\032!.hbase.pb.CreateNam" + + "espaceResponse\022V\n\017DeleteNamespace\022 .hbas" + + "e.pb.DeleteNamespaceRequest\032!.hbase.pb.D" + + "eleteNamespaceResponse\022k\n\026GetNamespaceDe" + + "scriptor\022\'.hbase.pb.GetNamespaceDescript" + + "orRequest\032(.hbase.pb.GetNamespaceDescrip", + "torResponse\022q\n\030ListNamespaceDescriptors\022" + + ").hbase.pb.ListNamespaceDescriptorsReque" + + "st\032*.hbase.pb.ListNamespaceDescriptorsRe" + + "sponse\022\206\001\n\037ListTableDescriptorsByNamespa" + + "ce\0220.hbase.pb.ListTableDescriptorsByName" + + "spaceRequest\0321.hbase.pb.ListTableDescrip" + + "torsByNamespaceResponse\022t\n\031ListTableName" + + "sByNamespace\022*.hbase.pb.ListTableNamesBy" + + "NamespaceRequest\032+.hbase.pb.ListTableNam" + + "esByNamespaceResponse\022P\n\rGetTableState\022\036", + ".hbase.pb.GetTableStateRequest\032\037.hbase.p" + + "b.GetTableStateResponse\022A\n\010SetQuota\022\031.hb" + + "ase.pb.SetQuotaRequest\032\032.hbase.pb.SetQuo" + + "taResponse\022x\n\037getLastMajorCompactionTime" + + "stamp\022).hbase.pb.MajorCompactionTimestam" + + "pRequest\032*.hbase.pb.MajorCompactionTimes" + + "tampResponse\022\212\001\n(getLastMajorCompactionT" + + "imestampForRegion\0222.hbase.pb.MajorCompac" + + "tionTimestampForRegionRequest\032*.hbase.pb" + + ".MajorCompactionTimestampResponse\022_\n\022get", + "ProcedureResult\022#.hbase.pb.GetProcedureR" + + "esultRequest\032$.hbase.pb.GetProcedureResu" + + "ltResponse\022h\n\027getSecurityCapabilities\022%." + + "hbase.pb.SecurityCapabilitiesRequest\032&.h" + + "base.pb.SecurityCapabilitiesResponse\022S\n\016" + + "AbortProcedure\022\037.hbase.pb.AbortProcedure" + + "Request\032 .hbase.pb.AbortProcedureRespons" + + "e\022S\n\016ListProcedures\022\037.hbase.pb.ListProce" + + "duresRequest\032 .hbase.pb.ListProceduresRe" + + "sponse\022M\n\014backupTables\022\035.hbase.pb.Backup", + "TablesRequest\032\036.hbase.pb.BackupTablesRes" + + "ponse\022P\n\rrestoreTables\022\036.hbase.pb.Restor" + + "eTablesRequest\032\037.hbase.pb.RestoreTablesR" + + "esponseBB\n*org.apache.hadoop.hbase.proto" + + "buf.generatedB\014MasterProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -69979,7 +70154,7 @@ public final class MasterProtos { internal_static_hbase_pb_RestoreTablesRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_hbase_pb_RestoreTablesRequest_descriptor, - new java.lang.String[] { "BackupId", "Tables", "TargetTables", "BackupRootDir", "DependencyCheckOnly", "Overwrite", "NonceGroup", "Nonce", }); + new java.lang.String[] { "BackupId", "Tables", "TargetTables", "BackupRootDir", "DependencyCheckOnly", "Overwrite", "NonceGroup", "Nonce", "FullBackupId", }); internal_static_hbase_pb_RestoreTablesResponse_descriptor = getDescriptor().getMessageTypes().get(112); internal_static_hbase_pb_RestoreTablesResponse_fieldAccessorTable = new diff --git a/hbase-protocol/src/main/protobuf/Master.proto b/hbase-protocol/src/main/protobuf/Master.proto index b20d0bd..a475147 100644 --- a/hbase-protocol/src/main/protobuf/Master.proto +++ b/hbase-protocol/src/main/protobuf/Master.proto @@ -559,6 +559,7 @@ message BackupTablesResponse { enum RestoreTablesState { VALIDATION = 1; RESTORE_IMAGES = 2; + HANDLE_BULK_LOAD = 3; } message RestoreTablesRequest { @@ -570,6 +571,7 @@ message RestoreTablesRequest { optional bool overwrite = 6; optional uint64 nonce_group = 7 [default = 0]; optional uint64 nonce = 8 [default = 0]; + optional string full_backup_id = 9; } message RestoreTablesResponse { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java index 18c1f86..f66f5cd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java @@ -88,7 +88,7 @@ public class MapReduceRestoreService implements RestoreService { result = player.run(playerArgs); if (succeeded(result)) { // do bulk load - LoadIncrementalHFiles loader = createLoader(); + LoadIncrementalHFiles loader = createLoader(getConf()); if (LOG.isDebugEnabled()) { LOG.debug("Restoring HFiles from directory " + bulkOutputPath); } @@ -124,13 +124,13 @@ public class MapReduceRestoreService implements RestoreService { return result == 0; } - private LoadIncrementalHFiles createLoader() throws IOException { + public static LoadIncrementalHFiles createLoader(Configuration config) throws IOException { // set configuration for restore: // LoadIncrementalHFile needs more time // hbase.rpc.timeout 600000 // calculates Integer milliSecInHour = 3600000; - Configuration conf = new Configuration(getConf()); + Configuration conf = new Configuration(config); conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, milliSecInHour); // By default, it is 32 and loader will fail if # of files in any region exceed this diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/FullTableBackupProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/FullTableBackupProcedure.java index 94e991f..c5e74d3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/FullTableBackupProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/FullTableBackupProcedure.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.backup.impl.BackupManager; import org.apache.hadoop.hbase.backup.impl.BackupManifest; import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; import org.apache.hadoop.hbase.backup.impl.BackupRestoreConstants; +import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; import org.apache.hadoop.hbase.backup.util.BackupClientUtil; import org.apache.hadoop.hbase.backup.util.BackupServerUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -605,6 +606,15 @@ public class FullTableBackupProcedure BackupClientUtil.getMinValue(BackupServerUtil.getRSLogTimestampMins(newTableSetTimestampMap)); backupManager.writeBackupStartCode(newStartCode); + // remove bulk load desc + try (final BackupSystemTable table = new BackupSystemTable( + env.getMasterServices().getConnection())) { + List lst = table.readBulkloadDescRows(tableList); + table.deleteForBulkLoad(lst); + LOG.debug("Deleted " + lst.size() + " rows of bulk load desc"); + } catch (IOException ioe) { + LOG.error("Couldn't clean up bulk load descriptors", ioe); + } // backup complete completeBackup(env, backupContext, backupManager, BackupType.FULL, conf); return Flow.NO_MORE_STATE; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/RestoreTablesProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/RestoreTablesProcedure.java index 2678278..0bff2ed 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/RestoreTablesProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/RestoreTablesProcedure.java @@ -23,7 +23,10 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.atomic.AtomicBoolean; @@ -38,11 +41,14 @@ import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.HBackupFileSystem; import org.apache.hadoop.hbase.backup.impl.BackupManifest; import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; +import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; +import org.apache.hadoop.hbase.backup.impl.BackupSystemTableHelper; +import org.apache.hadoop.hbase.backup.mapreduce.MapReduceRestoreService; import org.apache.hadoop.hbase.backup.util.RestoreServerUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.TableState; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.TableStateManager; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; @@ -52,6 +58,11 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreTablesState; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Pair; @InterfaceAudience.Private public class RestoreTablesProcedure @@ -62,6 +73,7 @@ public class RestoreTablesProcedure private final AtomicBoolean aborted = new AtomicBoolean(false); private Configuration conf; private String backupId; + private String fullBackupId; private List sTableList; private List tTableList; private String targetRootDir; @@ -159,6 +171,7 @@ public class RestoreTablesProcedure // We need hFS only for full restore (see the code) BackupManifest manifest = HBackupFileSystem.getManifest(sTable, conf, backupRoot, backupId); if (manifest.getType() == BackupType.FULL) { + fullBackupId = manifest.getBackupImage().getBackupId(); LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from full" + " backup image " + tableBackupPath.toString()); restoreTool.fullRestoreTable(svc, tableBackupPath, sTable, tTable, truncateIfExists, @@ -237,6 +250,29 @@ public class RestoreTablesProcedure LOG.debug("restoreStage finished"); } + static long getTsFromBackupId(String backupId) { + if (backupId == null) { + return 0; + } + return Long.valueOf(backupId.substring(backupId.lastIndexOf("_")+1)); + } + + static boolean withinRange(long a, long lower, long upper) { + if (a < lower || a > upper) { + return false; + } + return true; + } + + int getIndex(TableName tbl, List sTableList) { + for (int i = 0; i < sTableList.size(); i++) { + if (tbl.equals(sTableList.get(i))) { + return i; + } + } + return -1; + } + @Override protected Flow executeFromState(final MasterProcedureEnv env, final RestoreTablesState state) throws InterruptedException { @@ -264,7 +300,72 @@ public class RestoreTablesProcedure Path rootPath = new Path(targetRootDir); HBackupFileSystem.checkImageManifestExist(backupManifestMap, sTableArray, conf, rootPath, backupId); - restore(env.getMasterServices(), backupManifestMap, sTableArray, tTableArray, isOverwrite); + restore(env.getMasterServices(), backupManifestMap, sTableArray, tTableArray,isOverwrite); + setNextState(RestoreTablesState.HANDLE_BULK_LOAD); + break; + case HANDLE_BULK_LOAD: + long fullBackupTs = getTsFromBackupId(fullBackupId), + incrBackupTs = getTsFromBackupId(backupId); + try (final BackupSystemTable table = new BackupSystemTable( + env.getMasterServices().getConnection())) { + LOG.debug("Looking for bulk load between " + fullBackupTs + " and " + incrBackupTs); + List>> list = + table.readBulkloadDesc(sTableList, fullBackupTs, incrBackupTs, true); + LoadIncrementalHFiles loader = MapReduceRestoreService.createLoader(conf); + Path rootdir = FSUtils.getRootDir(conf); + Map>[] mapForSrc = new Map[sTableList.size()]; + for (Pair> pair : list) { + TableName src = BackupSystemTableHelper.getTableNameFromBulkLoadRow(pair.getFirst()); + long ts = pair.getSecond().getFirst(); + if (!withinRange(ts, fullBackupTs, incrBackupTs)) { + LOG.debug("Dropping entry " + ts + " range: " + fullBackupTs + "->" + incrBackupTs); + continue; + } + + BulkLoadDescriptor bulkDesc = pair.getSecond().getSecond(); + TableName srcTable = ProtobufUtil.toTableName(bulkDesc.getTableName()); + assert(srcTable.equals(src)); + int srcIdx = getIndex(srcTable, sTableList); + if (srcIdx < 0) { + LOG.warn("Couldn't find " + srcTable + " in source table List"); + continue; + } + if (mapForSrc[srcIdx] == null) { + mapForSrc[srcIdx] = new TreeMap>(Bytes.BYTES_COMPARATOR); + } + Path tblDir = FSUtils.getTableDir(rootdir, srcTable); + Path regionDir = new Path(tblDir, + Bytes.toString(bulkDesc.getEncodedRegionName().toByteArray())); + // map from family to List of hfiles + + for (WALProtos.StoreDescriptor desc : bulkDesc.getStoresList()) { + int cnt = desc.getStoreFileCount(); + byte[] fam = desc.getFamilyName().toByteArray(); + Path famDir = new Path(regionDir, Bytes.toString(fam)); + for (int i = 0; i < cnt; i++) { + List files; + if (!mapForSrc[srcIdx].containsKey(fam)) { + files = new ArrayList(); + mapForSrc[srcIdx].put(fam, files); + } else { + files = mapForSrc[srcIdx].get(fam); + } + LOG.debug("found bulk loaded hfile " + desc.getStoreFile(i) + " " + famDir); + files.add(new Path(famDir, desc.getStoreFile(i))); + } + } + } + int loaderResult = 0; + for (int i = 0; i < sTableList.size(); i++) { + if (mapForSrc[i] != null && !mapForSrc[i].isEmpty()) { + loaderResult = loader.run(null, mapForSrc[i], tTableArray[i]); + LOG.debug("bulk loading " + sTableList.get(i) + " to " + tTableArray[i]); + if (loaderResult != 0) { + LOG.warn("Couldn't bulk load for " + sTableList.get(i) + " to " + tTableArray[i]); + } + } + } + } return Flow.NO_MORE_STATE; default: throw new UnsupportedOperationException("unhandled state=" + state); @@ -326,6 +427,9 @@ public class RestoreTablesProcedure MasterProtos.RestoreTablesRequest.Builder bldr = MasterProtos.RestoreTablesRequest.newBuilder(); bldr.setOverwrite(isOverwrite).setBackupId(backupId); bldr.setBackupRootDir(targetRootDir); + if (fullBackupId != null) { + bldr.setFullBackupId(fullBackupId); + } for (TableName table : sTableList) { bldr.addTables(ProtobufUtil.toProtoTableName(table)); } @@ -353,6 +457,9 @@ public class RestoreTablesProcedure targetRootDir = proto.getBackupRootDir(); isOverwrite = proto.getOverwrite(); sTableList = new ArrayList<>(proto.getTablesList().size()); + if (proto.hasFullBackupId()) { + fullBackupId = proto.getFullBackupId(); + } for (HBaseProtos.TableName table : proto.getTablesList()) { sTableList.add(ProtobufUtil.toTableName(table)); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index b1ed43c..909d8a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -138,7 +138,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { initialize(); } - private void initialize() throws Exception { + private void initialize() throws IOException { if (initalized) { return; } @@ -1098,7 +1098,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * If the table is created for the first time, then "completebulkload" reads the files twice. * More modifications necessary if we want to avoid doing it. */ - private void createTable(TableName tableName, String dirPath, Admin admin) throws Exception { + private void createTable(TableName tableName, String dirPath, Admin admin) throws IOException { final Path hfofDir = new Path(dirPath); final FileSystem fs = hfofDir.getFileSystem(getConf()); @@ -1151,7 +1151,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { LOG.info("Table "+ tableName +" is available!!"); } - public int run(String dirPath, Map> map, TableName tableName) throws Exception{ + public int run(String dirPath, Map> map, TableName tableName) throws IOException{ initialize(); try (Connection connection = ConnectionFactory.createConnection(getConf()); Admin admin = connection.getAdmin()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 36d2c53..98f507b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -78,6 +78,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.YouAreDeadException; import org.apache.hadoop.hbase.ZNodeClearer; import org.apache.hadoop.hbase.backup.impl.BackupManager; +import org.apache.hadoop.hbase.backup.impl.BulkLoadHandler; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; @@ -255,6 +256,7 @@ public class HRegionServer extends HasThread implements * Go here to get table descriptors. */ protected TableDescriptors tableDescriptors; + protected BulkLoadHandler loader; // Replication services. If no replication, this handler will be null. protected ReplicationSourceService replicationSourceHandler; @@ -1664,6 +1666,8 @@ public class HRegionServer extends HasThread implements // listeners the wal factory will add to wals it creates. final List listeners = new ArrayList(); listeners.add(new MetricsWAL()); + loader = new BulkLoadHandler(getConnection()); + listeners.add(loader); if (this.replicationSourceHandler != null && this.replicationSourceHandler.getWALActionsListener() != null) { // Replication handler is an implementation of WALActionsListener. @@ -2186,6 +2190,7 @@ public class HRegionServer extends HasThread implements * have already been called. */ protected void stopServiceThreads() { + loader.close(); // clean up the scheduled chores if (this.choreService != null) choreService.shutdown(); if (this.nonceManagerChore != null) nonceManagerChore.cancel(true); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupDeleteTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupDeleteTable.java index a7c0713..5262cc4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupDeleteTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupDeleteTable.java @@ -25,6 +25,7 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; import org.apache.hadoop.hbase.backup.util.RestoreServerUtil; import org.apache.hadoop.hbase.client.BackupAdmin; import org.apache.hadoop.hbase.client.Connection; @@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.mapreduce.TestLoadIncrementalHFiles; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; import org.hamcrest.CoreMatchers; @@ -56,6 +58,7 @@ public class TestIncrementalBackupDeleteTable extends TestBackupBase { //implement all test cases in 1 test since incremental backup/restore has dependencies @Test public void TestIncBackupDeleteTable() throws Exception { + String testName = "TestIncBackupDeleteTable"; // #1 - create full backup for all tables LOG.info("create full backup image for all tables"); @@ -85,7 +88,14 @@ public class TestIncrementalBackupDeleteTable extends TestBackupBase { // Delete table table2 admin.disableTable(table2); admin.deleteTable(table2); - + + int NB_ROWS2 = 20; + int actual = TestLoadIncrementalHFiles.loadHFiles(testName, table1Desc, TEST_UTIL, famName, + qualName, false, null, new byte[][][] { + new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, + new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, + }, true, NB_ROWS_IN_BATCH*2, NB_ROWS2); + // #3 - incremental backup for table1 tables = Lists.newArrayList(table1); request = new BackupRequest(); @@ -94,33 +104,7 @@ public class TestIncrementalBackupDeleteTable extends TestBackupBase { String backupIdIncMultiple = admin.getBackupAdmin().backupTables(request); assertTrue(checkSucceeded(backupIdIncMultiple)); - // #4 - restore full backup for all tables, without overwrite - TableName[] tablesRestoreFull = - new TableName[] { table1, table2}; - - TableName[] tablesMapFull = - new TableName[] { table1_restore, table2_restore }; - BackupAdmin client = getBackupAdmin(); - client.restore(RestoreServerUtil.createRestoreRequest(BACKUP_ROOT_DIR, backupIdFull, false, - tablesRestoreFull, - tablesMapFull, false)); - - // #5.1 - check tables for full restore - HBaseAdmin hAdmin = TEST_UTIL.getHBaseAdmin(); - assertTrue(hAdmin.tableExists(table1_restore)); - assertTrue(hAdmin.tableExists(table2_restore)); - - - // #5.2 - checking row count of tables for full restore - HTable hTable = (HTable) conn.getTable(table1_restore); - Assert.assertThat(TEST_UTIL.countRows(hTable), CoreMatchers.equalTo(NB_ROWS_IN_BATCH)); - hTable.close(); - - hTable = (HTable) conn.getTable(table2_restore); - Assert.assertThat(TEST_UTIL.countRows(hTable), CoreMatchers.equalTo(NB_ROWS_IN_BATCH)); - hTable.close(); - // #6 - restore incremental backup for table1 TableName[] tablesRestoreIncMultiple = @@ -130,8 +114,21 @@ public class TestIncrementalBackupDeleteTable extends TestBackupBase { client.restore(RestoreServerUtil.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple, false, tablesRestoreIncMultiple, tablesMapIncMultiple, true)); - hTable = (HTable) conn.getTable(table1_restore); - Assert.assertThat(TEST_UTIL.countRows(hTable), CoreMatchers.equalTo(NB_ROWS_IN_BATCH * 2)); + HTable hTable = (HTable) conn.getTable(table1_restore); + Assert.assertThat(TEST_UTIL.countRows(hTable), CoreMatchers.equalTo(NB_ROWS_IN_BATCH * 2 + + actual)); + + request.setBackupType(BackupType.FULL).setTableList(tables).setTargetRootDir(BACKUP_ROOT_DIR); + backupIdFull = admin.getBackupAdmin().backupTables(request); + try (final BackupSystemTable table = new BackupSystemTable(conn)) { + List lst = table.readBulkloadDescRows(tables); + for (byte[] row : lst) { + LOG.debug("Still have " + Bytes.toString(row)); + } + assertTrue(lst.isEmpty()); + } + assertTrue(checkSucceeded(backupIdFull)); + hTable.close(); admin.close(); conn.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java index 5678d0d..47315bc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java @@ -293,13 +293,13 @@ public class TestLoadIncrementalHFiles { runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges, useMap); } - private void runTest(String testName, HTableDescriptor htd, BloomType bloomType, - boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap) - throws Exception { + public static int loadHFiles(String testName, HTableDescriptor htd, HBaseTestingUtility util, + byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys, + byte[][][] hfileRanges, boolean useMap, int initRowCount, int factor) throws Exception { Path dir = util.getDataTestDirOnTestFS(testName); FileSystem fs = util.getTestFileSystem(); dir = dir.makeQualified(fs); - Path familyDir = new Path(dir, Bytes.toString(FAMILY)); + Path familyDir = new Path(dir, Bytes.toString(fam)); int hfileIdx = 0; Map> map = null; @@ -307,24 +307,25 @@ public class TestLoadIncrementalHFiles { if (useMap) { map = new TreeMap>(Bytes.BYTES_COMPARATOR); list = new ArrayList<>(); - map.put(FAMILY, list); + map.put(fam, list); } for (byte[][] range : hfileRanges) { byte[] from = range[0]; byte[] to = range[1]; Path path = new Path(familyDir, "hfile_" + hfileIdx++); - HFileTestUtil.createHFile(util.getConfiguration(), fs, path, FAMILY, QUALIFIER, from, to, 1000); + HFileTestUtil.createHFile(util.getConfiguration(), fs, path, fam, qual, from, to, + factor); if (useMap) { list.add(path); } } - int expectedRows = hfileIdx * 1000; + int expectedRows = hfileIdx * factor; - if (preCreateTable || map != null) { + final TableName tableName = htd.getTableName(); + if (!util.getHBaseAdmin().tableExists(tableName) && (preCreateTable || map != null)) { util.getHBaseAdmin().createTable(htd, tableSplitKeys); } - final TableName tableName = htd.getTableName(); LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()); String [] args= {dir.toString(), tableName.toString()}; if (useMap) { @@ -332,16 +333,26 @@ public class TestLoadIncrementalHFiles { } else { loader.run(args); } - Table table = util.getConnection().getTable(tableName); try { - assertEquals(expectedRows, util.countRows(table)); + assertEquals(initRowCount + expectedRows, util.countRows(table)); } finally { table.close(); } + return expectedRows; + } + + private void runTest(String testName, HTableDescriptor htd, BloomType bloomType, + boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap) + throws Exception { + loadHFiles(testName, htd, util, FAMILY, QUALIFIER, preCreateTable, tableSplitKeys, + hfileRanges, useMap, 0, 1000); + + final TableName tableName = htd.getTableName(); // verify staging folder has been cleaned up Path stagingBasePath = SecureBulkLoadUtil.getBaseStagingDir(util.getConfiguration()); + FileSystem fs = util.getTestFileSystem(); if(fs.exists(stagingBasePath)) { FileStatus[] files = fs.listStatus(stagingBasePath); for(FileStatus file : files) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BulkLoadHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BulkLoadHandler.java new file mode 100644 index 0000000..e1101fa --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BulkLoadHandler.java @@ -0,0 +1,185 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.impl; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.WALKey; + +/** + * Handles bulk load event. + */ +@InterfaceAudience.Private +public class BulkLoadHandler extends WALActionsListener.Base implements Closeable { + private static final Log LOG = LogFactory.getLog(BulkLoadHandler.class); + private BackupSystemTable sysTable; + private List> listDesc = new ArrayList<>(); + // thread which writes to hbase:backup table + private Thread thread; + private volatile boolean stopping = false; + private int sleepIntvl = 3000; + + /** + * Empty constructor + public BulkLoadHandler() { + } + */ + + public BulkLoadHandler(Connection conn) throws IOException { + sysTable = new BackupSystemTable(conn); + thread = new Thread(){ + @Override + public void run() { + handleBulkLoadDesc(); + } + }; + thread.setDaemon(true); + thread.start(); + } + + @Override + public void close() { + stopping = true; + sleepIntvl = 0; + LOG.debug("Draining " + listDesc.size() + " entries"); + long start = EnvironmentEdgeManager.currentTime(); + while (!listDesc.isEmpty()) { + try { + Thread.sleep(10); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + LOG.debug("Interrupted", ie); + break; + } + } + LOG.debug("Took " + (EnvironmentEdgeManager.currentTime()-start) + "ms to drain"); + } + + void handleBulkLoadDesc() { + List> currList = new ArrayList<>(); + TableName tabName = null; + while (true) { + currList.clear(); + Pair pair; + synchronized (listDesc) { + if (!listDesc.isEmpty()) { + pair = listDesc.remove(0); + currList.add(pair); + + BulkLoadDescriptor bld = pair.getFirst(); + tabName = ProtobufUtil.toTableName(bld.getTableName()); + // retrieve consecutive Pairs for the same table + // we can group Pairs where gap between timestamps is within certain limit + while (!listDesc.isEmpty()) { + pair = listDesc.get(0); + if (!tabName.equals(ProtobufUtil.toTableName(pair.getFirst().getTableName()))) { + break; + } + pair = listDesc.remove(0); + currList.add(pair); + } + } + } + if (!currList.isEmpty()) { + try { + sysTable.writeBulkLoadDesc(tabName, EnvironmentEdgeManager.currentTime(), currList); + } catch (IOException ioe) { + LOG.warn("Unable to write descriptor for " + tabName, ioe); + synchronized (listDesc) { + // listDesc.add(0, pair); + } + } + } else if (stopping) break; + try { + if (sleepIntvl != 0) { + /* + synchronized (listDesc) { + listDesc.wait(100); + } */ + Thread.sleep(sleepIntvl); + } + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + LOG.debug("Interrupted", ie); + break; + } + } + } + + /* + * Returns an object to listen to new wal changes + **/ + public WALActionsListener getWALActionsListener() { + return this; + } + + @Override + public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException { + } + + @Override + public void postAppend(long entryLen, long elapsedTimeMillis, final WALKey logKey, + final WALEdit edit) throws IOException { + if (stopping) return; + TableName tableName = logKey.getTablename(); + if (tableName.equals(TableName.BACKUP_TABLE_NAME)) return; + for (Cell c : edit.getCells()) { + // Only check for bulk load events + if (CellUtil.matchingQualifier(c, WALEdit.BULK_LOAD)) { + BulkLoadDescriptor bld = null; + try { + bld = WALEdit.getBulkLoadDescriptor(c); + } catch (IOException e) { + LOG.error("Failed to get bulk load event information from the wal file", e); + throw e; + } + + synchronized (listDesc) { + listDesc.add(new Pair<>(bld, EnvironmentEdgeManager.currentTime())); + // listDesc.notify(); + } + } + } + } + + @Override + public void preLogRoll(Path oldPath, Path newPath) throws IOException { + } + + @Override + public void postLogRoll(Path oldPath, Path newPath) throws IOException { + } +}