diff --git hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/BackupProtos.java hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/BackupProtos.java index 413661e..b96f5b3 100644 --- hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/BackupProtos.java +++ hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/BackupProtos.java @@ -6874,6 +6874,36 @@ public final class BackupProtos { * optional uint32 progress = 12; */ int getProgress(); + + // optional bytes job_id = 13; + /** + * optional bytes job_id = 13; + */ + boolean hasJobId(); + /** + * optional bytes job_id = 13; + */ + com.google.protobuf.ByteString getJobId(); + + // required uint32 workers_number = 14; + /** + * required uint32 workers_number = 14; + */ + boolean hasWorkersNumber(); + /** + * required uint32 workers_number = 14; + */ + int getWorkersNumber(); + + // required uint32 bandwidth = 15; + /** + * required uint32 bandwidth = 15; + */ + boolean hasBandwidth(); + /** + * required uint32 bandwidth = 15; + */ + int getBandwidth(); } /** * Protobuf type {@code hbase.pb.BackupContext} @@ -7007,6 +7037,21 @@ public final class BackupProtos { progress_ = input.readUInt32(); break; } + case 106: { + bitField0_ |= 0x00000800; + jobId_ = input.readBytes(); + break; + } + case 112: { + bitField0_ |= 0x00001000; + workersNumber_ = input.readUInt32(); + break; + } + case 120: { + bitField0_ |= 0x00002000; + bandwidth_ = input.readUInt32(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -7597,6 +7642,54 @@ public final class BackupProtos { return progress_; } + // optional bytes job_id = 13; + public static final int JOB_ID_FIELD_NUMBER = 13; + private com.google.protobuf.ByteString jobId_; + /** + * optional bytes job_id = 13; + */ + public boolean hasJobId() { + return ((bitField0_ & 0x00000800) == 0x00000800); + } + /** + * optional bytes job_id = 13; + */ + public com.google.protobuf.ByteString getJobId() { + return jobId_; + } + + // required uint32 workers_number = 14; + public static final int WORKERS_NUMBER_FIELD_NUMBER = 14; + private int workersNumber_; + /** + * required uint32 workers_number = 14; + */ + public boolean hasWorkersNumber() { + return ((bitField0_ & 0x00001000) == 0x00001000); + } + /** + * required uint32 workers_number = 14; + */ + public int getWorkersNumber() { + return workersNumber_; + } + + // required uint32 bandwidth = 15; + public static final int BANDWIDTH_FIELD_NUMBER = 15; + private int bandwidth_; + /** + * required uint32 bandwidth = 15; + */ + public boolean hasBandwidth() { + return ((bitField0_ & 0x00002000) == 0x00002000); + } + /** + * required uint32 bandwidth = 15; + */ + public int getBandwidth() { + return bandwidth_; + } + private void initFields() { backupId_ = ""; type_ = org.apache.hadoop.hbase.protobuf.generated.BackupProtos.BackupType.FULL; @@ -7610,6 +7703,9 @@ public final class BackupProtos { totalBytesCopied_ = 0L; hlogTargetDir_ = ""; progress_ = 0; + jobId_ = com.google.protobuf.ByteString.EMPTY; + workersNumber_ = 0; + bandwidth_ = 0; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -7640,6 +7736,14 @@ public final class BackupProtos { memoizedIsInitialized = 0; return false; } + if (!hasWorkersNumber()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasBandwidth()) { + memoizedIsInitialized = 0; + return false; + } for (int i = 0; i < getTableBackupStatusCount(); i++) { if (!getTableBackupStatus(i).isInitialized()) { memoizedIsInitialized = 0; @@ -7689,6 +7793,15 @@ public final class BackupProtos { if (((bitField0_ & 0x00000400) == 0x00000400)) { output.writeUInt32(12, progress_); } + if (((bitField0_ & 0x00000800) == 0x00000800)) { + output.writeBytes(13, jobId_); + } + if (((bitField0_ & 0x00001000) == 0x00001000)) { + output.writeUInt32(14, workersNumber_); + } + if (((bitField0_ & 0x00002000) == 0x00002000)) { + output.writeUInt32(15, bandwidth_); + } getUnknownFields().writeTo(output); } @@ -7746,6 +7859,18 @@ public final class BackupProtos { size += com.google.protobuf.CodedOutputStream .computeUInt32Size(12, progress_); } + if (((bitField0_ & 0x00000800) == 0x00000800)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(13, jobId_); + } + if (((bitField0_ & 0x00001000) == 0x00001000)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt32Size(14, workersNumber_); + } + if (((bitField0_ & 0x00002000) == 0x00002000)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt32Size(15, bandwidth_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -7826,6 +7951,21 @@ public final class BackupProtos { result = result && (getProgress() == other.getProgress()); } + result = result && (hasJobId() == other.hasJobId()); + if (hasJobId()) { + result = result && getJobId() + .equals(other.getJobId()); + } + result = result && (hasWorkersNumber() == other.hasWorkersNumber()); + if (hasWorkersNumber()) { + result = result && (getWorkersNumber() + == other.getWorkersNumber()); + } + result = result && (hasBandwidth() == other.hasBandwidth()); + if (hasBandwidth()) { + result = result && (getBandwidth() + == other.getBandwidth()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -7887,6 +8027,18 @@ public final class BackupProtos { hash = (37 * hash) + PROGRESS_FIELD_NUMBER; hash = (53 * hash) + getProgress(); } + if (hasJobId()) { + hash = (37 * hash) + JOB_ID_FIELD_NUMBER; + hash = (53 * hash) + getJobId().hashCode(); + } + if (hasWorkersNumber()) { + hash = (37 * hash) + WORKERS_NUMBER_FIELD_NUMBER; + hash = (53 * hash) + getWorkersNumber(); + } + if (hasBandwidth()) { + hash = (37 * hash) + BANDWIDTH_FIELD_NUMBER; + hash = (53 * hash) + getBandwidth(); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -8025,6 +8177,12 @@ public final class BackupProtos { bitField0_ = (bitField0_ & ~0x00000400); progress_ = 0; bitField0_ = (bitField0_ & ~0x00000800); + jobId_ = com.google.protobuf.ByteString.EMPTY; + bitField0_ = (bitField0_ & ~0x00001000); + workersNumber_ = 0; + bitField0_ = (bitField0_ & ~0x00002000); + bandwidth_ = 0; + bitField0_ = (bitField0_ & ~0x00004000); return this; } @@ -8106,6 +8264,18 @@ public final class BackupProtos { to_bitField0_ |= 0x00000400; } result.progress_ = progress_; + if (((from_bitField0_ & 0x00001000) == 0x00001000)) { + to_bitField0_ |= 0x00000800; + } + result.jobId_ = jobId_; + if (((from_bitField0_ & 0x00002000) == 0x00002000)) { + to_bitField0_ |= 0x00001000; + } + result.workersNumber_ = workersNumber_; + if (((from_bitField0_ & 0x00004000) == 0x00004000)) { + to_bitField0_ |= 0x00002000; + } + result.bandwidth_ = bandwidth_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -8189,6 +8359,15 @@ public final class BackupProtos { if (other.hasProgress()) { setProgress(other.getProgress()); } + if (other.hasJobId()) { + setJobId(other.getJobId()); + } + if (other.hasWorkersNumber()) { + setWorkersNumber(other.getWorkersNumber()); + } + if (other.hasBandwidth()) { + setBandwidth(other.getBandwidth()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -8218,6 +8397,14 @@ public final class BackupProtos { return false; } + if (!hasWorkersNumber()) { + + return false; + } + if (!hasBandwidth()) { + + return false; + } for (int i = 0; i < getTableBackupStatusCount(); i++) { if (!getTableBackupStatus(i).isInitialized()) { @@ -9022,6 +9209,108 @@ public final class BackupProtos { return this; } + // optional bytes job_id = 13; + private com.google.protobuf.ByteString jobId_ = com.google.protobuf.ByteString.EMPTY; + /** + * optional bytes job_id = 13; + */ + public boolean hasJobId() { + return ((bitField0_ & 0x00001000) == 0x00001000); + } + /** + * optional bytes job_id = 13; + */ + public com.google.protobuf.ByteString getJobId() { + return jobId_; + } + /** + * optional bytes job_id = 13; + */ + public Builder setJobId(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00001000; + jobId_ = value; + onChanged(); + return this; + } + /** + * optional bytes job_id = 13; + */ + public Builder clearJobId() { + bitField0_ = (bitField0_ & ~0x00001000); + jobId_ = getDefaultInstance().getJobId(); + onChanged(); + return this; + } + + // required uint32 workers_number = 14; + private int workersNumber_ ; + /** + * required uint32 workers_number = 14; + */ + public boolean hasWorkersNumber() { + return ((bitField0_ & 0x00002000) == 0x00002000); + } + /** + * required uint32 workers_number = 14; + */ + public int getWorkersNumber() { + return workersNumber_; + } + /** + * required uint32 workers_number = 14; + */ + public Builder setWorkersNumber(int value) { + bitField0_ |= 0x00002000; + workersNumber_ = value; + onChanged(); + return this; + } + /** + * required uint32 workers_number = 14; + */ + public Builder clearWorkersNumber() { + bitField0_ = (bitField0_ & ~0x00002000); + workersNumber_ = 0; + onChanged(); + return this; + } + + // required uint32 bandwidth = 15; + private int bandwidth_ ; + /** + * required uint32 bandwidth = 15; + */ + public boolean hasBandwidth() { + return ((bitField0_ & 0x00004000) == 0x00004000); + } + /** + * required uint32 bandwidth = 15; + */ + public int getBandwidth() { + return bandwidth_; + } + /** + * required uint32 bandwidth = 15; + */ + public Builder setBandwidth(int value) { + bitField0_ |= 0x00004000; + bandwidth_ = value; + onChanged(); + return this; + } + /** + * required uint32 bandwidth = 15; + */ + public Builder clearBandwidth() { + bitField0_ = (bitField0_ & ~0x00004000); + bandwidth_ = 0; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:hbase.pb.BackupContext) } @@ -9093,7 +9382,7 @@ public final class BackupProtos { "se.pb.BackupImage\022\021\n\tcompacted\030\013 \002(\010\"]\n\021" + "TableBackupStatus\022\"\n\005table\030\001 \002(\0132\023.hbase", ".pb.TableName\022\022\n\ntarget_dir\030\002 \002(\t\022\020\n\010sna" + - "pshot\030\003 \001(\t\"\323\004\n\rBackupContext\022\021\n\tbackup_" + + "pshot\030\003 \001(\t\"\216\005\n\rBackupContext\022\021\n\tbackup_" + "id\030\001 \002(\t\022\"\n\004type\030\002 \002(\0162\024.hbase.pb.Backup" + "Type\022\027\n\017target_root_dir\030\003 \002(\t\0222\n\005state\030\004" + " \001(\0162#.hbase.pb.BackupContext.BackupStat" + @@ -9103,14 +9392,16 @@ public final class BackupProtos { "bleBackupStatus\022\020\n\010start_ts\030\010 \002(\004\022\016\n\006end" + "_ts\030\t \002(\004\022\032\n\022total_bytes_copied\030\n \002(\003\022\027\n", "\017hlog_target_dir\030\013 \001(\t\022\020\n\010progress\030\014 \001(\r" + - "\"P\n\013BackupState\022\013\n\007WAITING\020\000\022\013\n\007RUNNING\020" + - "\001\022\014\n\010COMPLETE\020\002\022\n\n\006FAILED\020\003\022\r\n\tCANCELLED" + - "\020\004\"}\n\013BackupPhase\022\013\n\007REQUEST\020\000\022\014\n\010SNAPSH" + - "OT\020\001\022\027\n\023PREPARE_INCREMENTAL\020\002\022\020\n\014SNAPSHO" + - "TCOPY\020\003\022\024\n\020INCREMENTAL_COPY\020\004\022\022\n\016STORE_M" + - "ANIFEST\020\005*\'\n\nBackupType\022\010\n\004FULL\020\000\022\017\n\013INC" + - "REMENTAL\020\001BB\n*org.apache.hadoop.hbase.pr" + - "otobuf.generatedB\014BackupProtosH\001\210\001\001\240\001\001" + "\022\016\n\006job_id\030\r \001(\014\022\026\n\016workers_number\030\016 \002(\r" + + "\022\021\n\tbandwidth\030\017 \002(\r\"P\n\013BackupState\022\013\n\007WA" + + "ITING\020\000\022\013\n\007RUNNING\020\001\022\014\n\010COMPLETE\020\002\022\n\n\006FA" + + "ILED\020\003\022\r\n\tCANCELLED\020\004\"}\n\013BackupPhase\022\013\n\007" + + "REQUEST\020\000\022\014\n\010SNAPSHOT\020\001\022\027\n\023PREPARE_INCRE" + + "MENTAL\020\002\022\020\n\014SNAPSHOTCOPY\020\003\022\024\n\020INCREMENTA" + + "L_COPY\020\004\022\022\n\016STORE_MANIFEST\020\005*\'\n\nBackupTy" + + "pe\022\010\n\004FULL\020\000\022\017\n\013INCREMENTAL\020\001BB\n*org.apa" + + "che.hadoop.hbase.protobuf.generatedB\014Bac", + "kupProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -9152,7 +9443,7 @@ public final class BackupProtos { internal_static_hbase_pb_BackupContext_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_hbase_pb_BackupContext_descriptor, - new java.lang.String[] { "BackupId", "Type", "TargetRootDir", "State", "Phase", "FailedMessage", "TableBackupStatus", "StartTs", "EndTs", "TotalBytesCopied", "HlogTargetDir", "Progress", }); + new java.lang.String[] { "BackupId", "Type", "TargetRootDir", "State", "Phase", "FailedMessage", "TableBackupStatus", "StartTs", "EndTs", "TotalBytesCopied", "HlogTargetDir", "Progress", "JobId", "WorkersNumber", "Bandwidth", }); return null; } }; diff --git hbase-protocol/src/main/protobuf/Backup.proto hbase-protocol/src/main/protobuf/Backup.proto index 6c126e4..4de55e2 100644 --- hbase-protocol/src/main/protobuf/Backup.proto +++ hbase-protocol/src/main/protobuf/Backup.proto @@ -85,6 +85,9 @@ message BackupContext { required int64 total_bytes_copied = 10; optional string hlog_target_dir = 11; optional uint32 progress = 12; + optional bytes job_id = 13; + required uint32 workers_number = 14; + required uint32 bandwidth = 15; enum BackupState { WAITING = 0; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupClient.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupClient.java index 7c8ea39..ef35a02 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupClient.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupClient.java @@ -28,7 +28,7 @@ public interface BackupClient { public void setConf(Configuration conf); /** - * Send backup request to server, and monitor the progress if necessary + * Sends backup request to server, and monitor the progress if necessary * @param backupType : full or incremental * @param targetRootDir : the root path specified by user * @param tableList : the table list specified by user @@ -37,4 +37,90 @@ public interface BackupClient { */ public String create(BackupType backupType, List tableList, String targetRootDir) throws IOException; + + /** + * Sends backup request to server and monitor progress if necessary + * @param backupType : full or incremental + * @param tableList : table list + * @param targetRootDir: target root directory + * @param snapshot : using existing snapshot + * @param workers : number of workers for parallel data copy + * @param bandwidth : maximum bandwidth (in MB/sec) of copy job + * @return backup id + * @throws IOException + */ + public String create(BackupType backupType, List tableList, String targetRootDir, + int workers, int bandwidth) throws IOException; + + /** + * Describe backup image command + * @param backupId - backup id + * @throws IOException + */ + public void describeBackupImage(String backupId) throws IOException; + + /** + * Show backup progress command + * @param backupId - backup id + * @throws IOException + */ + public void showProgress(String backupId) throws IOException; + + /** + * Delete backup image command + * @param backupIds - backup id + * @throws IOException + */ + public void deleteBackups(String[] backupIds) throws IOException; + + /** + * Cancel current active backup command + * @param backupId - backup id + * @throws IOException + */ + public void cancelBackup(String backupId) throws IOException; + + /** + * Show backup history command + * @param n - last n backup sessions + * @throws IOException + */ + public void showHistory(int n) throws IOException; + + /** + * Backup set list command + * @throws IOException + */ + public void backupSetList() throws IOException; + + /** + * Backup set describe command + * @param name set name + * @throws IOException + */ + public void backupSetDescribe(String name) throws IOException; + + /** + * Delete backup set command + * @param name - backup set name + * @throws IOException + */ + public void backupSetDelete(String name) throws IOException; + + /** + * Add tables to backup set command + * @param name set name + * @param tables - list of tables + * @throws IOException + */ + public void backupSetAdd(String name, String[] tablesOrNamespaces) throws IOException; + + /** + * Remove tables from backup set + * @param name - backup set name + * @param tables - list of tables + * @throws IOException + */ + public void backupSetRemove(String name, String[] tablesOrNamepsaces) throws IOException; + } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java index 015c80b..7802136 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java @@ -41,11 +41,23 @@ public class BackupDriver extends AbstractHBaseTool { private static final Log LOG = LogFactory.getLog(BackupDriver.class); private Options opt; private CommandLine cmd; - + private static Configuration conf; + + public static void setStaticConf(Configuration conf) + { + BackupDriver.conf = conf; + } + protected void init() throws IOException { // define supported options opt = new Options(); opt.addOption("debug", false, "Enable debug loggings"); + opt.addOption("all", false, "All tables"); + opt.addOption("t", true, "Table name"); + opt.addOption("b", true, "Bandwidth (MB/s)"); + opt.addOption("w", true, "Number of workers"); + opt.addOption("n", true, "History length"); + opt.addOption("set", true, "Backup set name"); // disable irrelevant loggers to avoid it mess up command output LogUtils.disableUselessLoggers(LOG); @@ -91,7 +103,11 @@ public class BackupDriver extends AbstractHBaseTool { } // TODO: get rid of Command altogether? - BackupCommands.createCommand(getConf(), type, cmdline).execute(); + BackupCommands.Command command = BackupCommands.createCommand(getConf(), type, cmdline); + if( type == BackupCommand.CREATE && conf != null) { + ((BackupCommands.CreateCommand) command).setConf(conf); + } + command.execute(); return 0; } @@ -111,9 +127,12 @@ public class BackupDriver extends AbstractHBaseTool { } public static void main(String[] args) throws Exception { - Configuration conf = HBaseConfiguration.create(); + Configuration conf = + BackupDriver.conf == null? HBaseConfiguration.create():BackupDriver.conf; int ret = ToolRunner.run(conf, new BackupDriver(), args); - System.exit(ret); + if(BackupDriver.conf == null) { + System.exit(ret); + } } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupClientImpl.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupClientImpl.java index f129a7c..7da4819 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupClientImpl.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupClientImpl.java @@ -31,8 +31,11 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupClient; +import org.apache.hadoop.hbase.backup.BackupRestoreFactory; import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.HBackupFileSystem; +import org.apache.hadoop.hbase.backup.impl.BackupHandler.BackupState; +import org.apache.hadoop.hbase.backup.impl.BackupUtil.BackupCompleteData; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Connection; @@ -40,107 +43,130 @@ import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import com.google.common.collect.Lists; - /** * Backup HBase tables locally or on a remote cluster Serve as client entry point for the following * features: - Full Backup provide local and remote back/restore for a list of tables - Incremental * backup to build on top of full backup as daily/weekly backup - Convert incremental backup WAL * files into hfiles - Merge several backup images into one(like merge weekly into monthly) - Add - * and remove table to and from Backup image - Cancel a backup process - Describe information of - * a backup image + * and remove table to and from Backup image - Cancel a backup process - Full backup based on + * existing snapshot - Describe information of a backup image */ + @InterfaceAudience.Public @InterfaceStability.Evolving -public final class BackupClientImpl implements BackupClient { +public final class BackupClientImpl implements BackupClient{ private static final Log LOG = LogFactory.getLog(BackupClientImpl.class); private Configuration conf; private BackupManager backupManager; public BackupClientImpl() { } - + @Override public void setConf(Configuration conf) { this.conf = conf; } - + /** * Prepare and submit Backup request * @param backupId : backup_timestame (something like backup_1398729212626) * @param backupType : full or incremental * @param tableList : tables to be backuped * @param targetRootDir : specified by user + * @param snapshot : use existing snapshot if specified by user (for future jira) + * @param workers - number of parallel workers in copy data job + * @param bandwidth - maximum bandwidth per works in MB/sec * @throws IOException exception */ protected void requestBackup(String backupId, BackupType backupType, List tableList, - String targetRootDir) throws IOException { + String targetRootDir, int workers, int bandwidth) throws IOException { BackupContext backupContext = null; - HBaseAdmin hbadmin = null; Connection conn = null; try { + backupManager = new BackupManager(conf); + String tables = tableList != null? toString(tableList) : null; if (backupType == BackupType.INCREMENTAL) { Set incrTableSet = backupManager.getIncrementalBackupTableSet(); if (incrTableSet.isEmpty()) { LOG.warn("Incremental backup table set contains no table.\n" + "Use 'backup create full' or 'backup stop' to \n " + "change the tables covered by incremental backup."); - throw new DoNotRetryIOException("No table covered by incremental backup."); + throw new RuntimeException("No table covered by incremental backup."); } - - LOG.info("Incremental backup for the following table set: " + incrTableSet); - tableList = Lists.newArrayList(incrTableSet); + StringBuilder sb = new StringBuilder(); + for (TableName tableName : incrTableSet) { + sb.append(tableName.getNameAsString() + " "); + } + LOG.info("Incremental backup for the following table set: " + sb.toString()); + tables = + sb.toString().trim() + .replaceAll(" ", BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND); } // check whether table exists first before starting real request - if (tableList != null) { - ArrayList nonExistingTableList = null; + if (tables != null) { + String[] tableNames = tables.split(BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND); + ArrayList noneExistingTableList = null; conn = ConnectionFactory.createConnection(conf); hbadmin = (HBaseAdmin) conn.getAdmin(); - for (TableName tableName : tableList) { - if (!hbadmin.tableExists(tableName)) { - if (nonExistingTableList == null) { - nonExistingTableList = new ArrayList<>(); + for (String tableName : tableNames) { + if (!hbadmin.tableExists(TableName.valueOf(tableName))) { + if (noneExistingTableList == null) { + noneExistingTableList = new ArrayList(); } - nonExistingTableList.add(tableName); + noneExistingTableList.add(tableName); } } - if (nonExistingTableList != null) { + if (noneExistingTableList != null) { if (backupType == BackupType.INCREMENTAL ) { LOG.warn("Incremental backup table set contains non-exising table: " - + nonExistingTableList); + + noneExistingTableList); } else { // Throw exception only in full mode - we try to backup non-existing table - throw new DoNotRetryIOException("Non-existing tables found in the table list: " - + nonExistingTableList); + throw new RuntimeException("Non-existing tables found in the table list: " + + noneExistingTableList); } } } - // if any target table backup dir already exist, then no backup action taken - if (tableList != null) { - for (TableName table : tableList) { + // if any target table backup directory already exist, then no backup action taken + String[] tableNames = null; + if (tables != null && !tables.equals("")) { + tableNames = tables.split(BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND); + } + + List tList = new ArrayList(); + + if (tableNames != null && tableNames.length > 0) { + for (String table : tableNames) { + TableName tn = TableName.valueOf(table); + tList.add(tn); String targetTableBackupDir = - HBackupFileSystem.getTableBackupDir(targetRootDir, backupId, table); + HBackupFileSystem.getTableBackupDir(targetRootDir, backupId, TableName.valueOf(table)); Path targetTableBackupDirPath = new Path(targetTableBackupDir); FileSystem outputFs = FileSystem.get(targetTableBackupDirPath.toUri(), conf); if (outputFs.exists(targetTableBackupDirPath)) { - throw new DoNotRetryIOException("Target backup directory " + targetTableBackupDir + throw new IOException("Target backup directory " + targetTableBackupDir + " exists already."); } } } backupContext = - backupManager.createBackupContext(backupId, backupType, tableList, targetRootDir); + backupManager.createBackupContext(backupId, backupType, + tList, targetRootDir, workers, bandwidth); backupManager.initialize(); backupManager.dispatchRequest(backupContext); } catch (BackupException e) { // suppress the backup exception wrapped within #initialize or #dispatchRequest, backup // exception has already been handled normally - LOG.error("Backup Exception ", e); + StackTraceElement[] stes = e.getStackTrace(); + for (StackTraceElement ste : stes) { + LOG.info(ste); + } + LOG.error("Backup Exception " + e.getMessage()); } finally { if (hbadmin != null) { hbadmin.close(); @@ -151,10 +177,28 @@ public final class BackupClientImpl implements BackupClient { } } + private String toString(List tableList) { + StringBuffer sb = new StringBuffer(); + for(int i=0; i < tableList.size(); i++) + { + sb.append(tableList.get(i).getNameAsString()); + if(i < tableList.size() -1) { + sb.append(','); + } + } + return sb.toString(); + } + + @Override + public String create(BackupType type, List tableList, String backupRootPath + ) throws IOException { + return create(type, tableList, backupRootPath, -1, -1); + } + @Override - public String create(BackupType backupType, List tableList, String backupRootPath) - throws IOException { - + public String create(BackupType backupType, List tableList, String backupRootPath, + int workers, int bandwidth) throws IOException { + String backupId = BackupRestoreConstants.BACKUPID_PREFIX + EnvironmentEdgeManager.currentTime(); // check target path first, confirm it doesn't exist before backup boolean targetExists = false; @@ -185,14 +229,14 @@ public final class BackupClientImpl implements BackupClient { // table list specified for backup, trigger backup on specified tables try { - requestBackup(backupId, backupType, tableList, backupRootPath); + requestBackup(backupId, backupType, tableList, backupRootPath, workers, bandwidth); } catch (RuntimeException e) { String errMsg = e.getMessage(); if (errMsg != null && (errMsg.startsWith("Non-existing tables found") || errMsg .startsWith("Snapshot is not found"))) { LOG.error(errMsg + ", please check your command"); - throw e; + throw new DoNotRetryIOException(e); } else { throw e; } @@ -204,4 +248,158 @@ public final class BackupClientImpl implements BackupClient { return backupId; } + @Override + public void describeBackupImage(String backupId) throws IOException { + BackupContext backupContext = null; + try (final BackupSystemTable table = new BackupSystemTable(conf)) { + backupContext = table.readBackupStatus(backupId); + if (backupContext != null) { + System.out.println(backupContext.getShortDescription()); + } else { + System.out.println("No information found for backupID=" + backupId); + } + } + } + + @Override + public void showProgress(String backupId) throws IOException { + BackupContext backupContext = null; + try (final BackupSystemTable table = new BackupSystemTable(conf)) { + + if (backupId == null) { + ArrayList recentSessions = + table.getBackupContexts(BackupState.RUNNING); + if (recentSessions.isEmpty()) { + System.out.println("No ongonig sessions found."); + return; + } + // else show status for all ongoing sessions + // must be one maximum + for (BackupContext context : recentSessions) { + System.out.println(context.getStatusAndProgressAsString()); + } + } else { + + backupContext = table.readBackupStatus(backupId); + if (backupContext != null) { + System.out.println(backupContext.getStatusAndProgressAsString()); + } else { + System.out.println("No information found for backupID=" + backupId); + } + } + } + } + + @Override + public void deleteBackups(String[] backupIds) throws IOException { + BackupContext backupContext = null; + String backupId = null; + try (final BackupSystemTable table = new BackupSystemTable(conf)) { + for (int i = 0; i < backupIds.length; i++) { + backupId = backupIds[i]; + backupContext = table.readBackupStatus(backupId); + if (backupContext != null) { + BackupUtil.cleanupBackupData(backupContext, conf); + table.deleteBackupStatus(backupContext.getBackupId()); + System.out.println("Delete backup for backupID=" + backupId + " completed."); + } else { + System.out.println("Delete backup failed: no information found for backupID=" + backupId); + } + } + } + } + + @Override + public void cancelBackup(String backupId) throws IOException { + // Kill distributed job if active + // Backup MUST not be in COMPLETE state + try (final BackupSystemTable table = new BackupSystemTable(conf)) { + BackupContext backupContext = table.readBackupStatus(backupId); + String errMessage = null; + if (backupContext != null && backupContext.getState() != BackupState.COMPLETE) { + BackupUtil.cleanupBackupData(backupContext, conf); + table.deleteBackupStatus(backupContext.getBackupId()); + byte[] jobId = backupContext.getJobId(); + if(jobId != null) { + BackupCopyService service = BackupRestoreFactory.getBackupCopyService(conf); + service.cancelCopyJob(jobId); + } else{ + errMessage = "Distributed Job ID is null for backup "+backupId + + " in "+ backupContext.getState() + " state."; + } + } else if( backupContext == null){ + errMessage = "No information found for backupID=" + backupId; + } else { + errMessage = "Can not cancel "+ backupId + " in " + backupContext.getState()+" state"; + } + + if( errMessage != null) { + throw new IOException(errMessage); + } + } + // then clean backup image + deleteBackups(new String[] { backupId }); + } + + @Override + public void showHistory(int n) throws IOException { + try (final BackupSystemTable table = new BackupSystemTable(conf)) { + List history = table.getBackupHistory(); + int max = Math.min(n, history.size()); + for (int i = 0; i < max; i++) { + printBackupCompleteData(history.get(i)); + } + } + } + + private void printBackupCompleteData(BackupCompleteData backupCompleteData) { + System.out.println(backupCompleteData.toString()); + } + + @Override + public void backupSetList() throws IOException{ + try (final BackupSystemTable table = new BackupSystemTable(conf)) { + List list = table.backupSetList(); + for (String s : list) { + System.out.println(s); + } + System.out.println("Found " + list.size() + " records"); + } + } + + @Override + public void backupSetDescribe(String name) throws IOException{ + try (final BackupSystemTable table = new BackupSystemTable(conf)) { + String[] list = table.backupSetDescribe(name); + for (String s : list) { + System.out.println(s); + } + System.out.println("Found " + list.length + " records"); + } + } + + @Override + public void backupSetDelete(String name) throws IOException { + try (final BackupSystemTable table = new BackupSystemTable(conf)) { + table.backupSetDelete(name); + System.out.println("Deleted " + name); + } + } + + @Override + public void backupSetAdd(String name, String[] tablesOrNamespaces) throws IOException { + try (final BackupSystemTable table = new BackupSystemTable(conf)) { + table.backupSetAdd(name, tablesOrNamespaces); + System.out.println("Added tables to '" + name + "'"); + } + } + + @Override + public void backupSetRemove(String name, String[] tablesOrNamepsaces) throws IOException { + try (final BackupSystemTable table = new BackupSystemTable(conf)) { + table.backupSetRemove(name, tablesOrNamepsaces); + System.out.println("Removed tables from '" + name + "'"); + } + } + } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java index 56e26fa..d1db757 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java @@ -19,17 +19,23 @@ package org.apache.hadoop.hbase.backup.impl; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + import org.apache.commons.cli.CommandLine; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupClient; import org.apache.hadoop.hbase.backup.BackupRestoreFactory; import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.impl.BackupRestoreConstants.BackupCommand; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; -import com.google.common.collect.Lists; /** * General backup commands, options and usage messages @@ -39,18 +45,56 @@ import com.google.common.collect.Lists; public final class BackupCommands { private static final String USAGE = "Usage: hbase backup COMMAND\n" - + "where COMMAND is one of:\n" + " create create a new backup image\n" + + "where COMMAND is one of:\n" + + " create create a new backup image\n" + + " cancel cancel an ongoing backup\n" + + " delete delete an existing backup image\n" + + " describe show the detailed information of a backup image\n" + + " history show history of all successful backups\n" + + " progress show the progress of the latest backup request\n" + + " convert convert incremental backup WAL files into HFiles\n" + + " merge merge backup images\n" + + " set backup set management\n" + "Enter \'help COMMAND\' to see help message for each command\n"; private static final String CREATE_CMD_USAGE = - "Usage: hbase backup create [tables] [-convert] " - + "\n" + " type \"full\" to create a full backup image;\n" + "Usage: hbase backup create [tables] [-s name] [-convert] " + + "[-silent] [-w workers][-b bandwith]\n" + " type \"full\" to create a full backup image;\n" + " \"incremental\" to create an incremental backup image\n" - + " backup_root_path The full root path to store the backup image,\n" - + " the prefix can be hdfs, webhdfs, gpfs, etc\n" + " Options:\n" - + " tables If no tables (\"\") are specified, all tables are backed up. " + + " backup_root_path The full root path to store the backup image,\n" + + " the prefix can be gpfs, hdfs or webhdfs\n" + " Options:\n" + + " tables If no tables (\"\") are specified, all tables are backed up. " + "Otherwise it is a\n" + " comma separated list of tables.\n" - + " -convert For an incremental backup, convert WAL files to HFiles\n"; + + " -s name Use the specified snapshot for full backup\n" + + " -convert For an incremental backup, convert WAL files to HFiles\n" + + " -w number of parallel workers.\n" + + " -b bandwith per one worker (in MB sec)" ; + + private static final String PROGRESS_CMD_USAGE = "Usage: hbase backup progress \n" + + " backupId backup image id;\n"; + + private static final String DESCRIBE_CMD_USAGE = "Usage: hbase backup decsribe \n" + + " backupId backup image id\n"; + + private static final String HISTORY_CMD_USAGE = "Usage: hbase backup history [-n N]\n" + + " -n N show up to N last backup sessions, default - 10;\n"; + + private static final String DELETE_CMD_USAGE = "Usage: hbase backup delete \n" + + " backupId backup image id;\n"; + + private static final String CANCEL_CMD_USAGE = "Usage: hbase backup progress \n" + + " backupId backup image id;\n"; + + private static final String SET_CMD_USAGE = "Usage: hbase set COMMAND [name] [tables]\n" + + " name Backup set name\n" + + " tables If no tables (\"\") are specified, all tables will belong to the set. " + + "Otherwise it is a\n" + " comma separated list of tables.\n" + + "where COMMAND is one of:\n" + + " add add tables to a set, crete set if needed\n" + + " remove remove tables from set\n" + + " list list all sets\n" + + " describe describes set\n" + + " delete delete backup set\n"; public static abstract class Command extends Configured { Command(Configuration conf) { @@ -66,25 +110,44 @@ public final class BackupCommands { public static Command createCommand(Configuration conf, BackupCommand type, CommandLine cmdline) { Command cmd = null; switch (type) { - case CREATE: - cmd = new CreateCommand(conf, cmdline); - break; - case HELP: - default: - cmd = new HelpCommand(conf, cmdline); - break; + case CREATE: + cmd = new CreateCommand(conf, cmdline); + break; + case DESCRIBE: + cmd = new DescribeCommand(conf, cmdline); + break; + case PROGRESS: + cmd = new ProgressCommand(conf, cmdline); + break; + case DELETE: + cmd = new DeleteCommand(conf, cmdline); + break; + case CANCEL: + cmd = new CancelCommand(conf, cmdline); + break; + case HISTORY: + cmd = new HistoryCommand(conf, cmdline); + break; + case SET: + cmd = new BackupSetCommand(conf, cmdline); + break; + case HELP: + default: + cmd = new HelpCommand(conf, cmdline); + break; } return cmd; } - private static class CreateCommand extends Command { + + public static class CreateCommand extends Command { CommandLine cmdline; CreateCommand(Configuration conf, CommandLine cmdline) { super(conf); this.cmdline = cmdline; } - + @Override public void execute() throws IOException { if (cmdline == null || cmdline.getArgs() == null) { @@ -106,17 +169,55 @@ public final class BackupCommands { System.exit(-1); } - String tables = (args.length == 3) ? args[2] : null; + String tables = null; + Configuration conf = getConf() != null? getConf(): HBaseConfiguration.create(); + // Check backup set + if (cmdline.hasOption("set")) { + String setName = cmdline.getOptionValue("set"); + tables = getTablesForSet(setName, conf); + if (tables == null) throw new IOException("Backup set '" + setName + + "' is either empty or does not exist"); + } else { + tables = (args.length == 3) ? args[2] : null; + } + int bandwidth = cmdline.hasOption('b') ? Integer.parseInt(cmdline.getOptionValue('b')) : -1; + int workers = cmdline.hasOption('w') ? Integer.parseInt(cmdline.getOptionValue('w')) : -1; + try { - BackupClient client = BackupRestoreFactory.getBackupClient(getConf()); - client.create(BackupType.valueOf(args[0].toUpperCase()), - Lists.newArrayList(BackupUtil.parseTableNames(tables)), args[1]); + BackupClient client = BackupRestoreFactory.getBackupClient(conf); + client.create(BackupType.valueOf(args[0].toUpperCase()), toList(tables), args[1], + workers, bandwidth); } catch (RuntimeException e) { System.out.println("ERROR: " + e.getMessage()); System.exit(-1); } } + + private String getTablesForSet(String name, Configuration conf) + throws IOException { + try (final BackupSystemTable table = new BackupSystemTable(conf)) { + String[] tables = table.backupSetDescribe(name); + if (tables == null) return null; + StringBuffer sb = new StringBuffer(); + for (int i = 0; i < tables.length; i++) { + sb.append(tables[i]); + if (i < tables.length - 1) { + sb.append(BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND); + } + } + return sb.toString(); + } + } + + private List toList(String tables) { + String[] splits = tables.split(","); + List list = new ArrayList(); + for (String s : splits) { + list.add(TableName.valueOf(s)); + } + return list; + } } private static class HelpCommand extends Command { @@ -150,9 +251,299 @@ public final class BackupCommands { if (BackupCommand.CREATE.name().equalsIgnoreCase(type)) { System.out.println(CREATE_CMD_USAGE); - } // other commands will be supported in future jira + } else if (BackupCommand.DESCRIBE.name().equalsIgnoreCase(type)) { + System.out.println(DESCRIBE_CMD_USAGE); + } else if (BackupCommand.HISTORY.name().equalsIgnoreCase(type)) { + System.out.println(HISTORY_CMD_USAGE); + } else if (BackupCommand.PROGRESS.name().equalsIgnoreCase(type)) { + System.out.println(PROGRESS_CMD_USAGE); + } else if (BackupCommand.DELETE.name().equalsIgnoreCase(type)) { + System.out.println(DELETE_CMD_USAGE); + } + if (BackupCommand.CANCEL.name().equalsIgnoreCase(type)) { + System.out.println(CANCEL_CMD_USAGE); + } + if (BackupCommand.SET.name().equalsIgnoreCase(type)) { + System.out.println(SET_CMD_USAGE); + } else { + System.out.println("Unknown command : " + type); + System.out.println(USAGE); + } System.exit(0); } } + private static class DescribeCommand extends Command { + CommandLine cmdline; + + DescribeCommand(Configuration conf, CommandLine cmdline) { + super(conf); + this.cmdline = cmdline; + } + + @Override + public void execute() throws IOException { + if (cmdline == null || cmdline.getArgs() == null) { + System.out.println("ERROR: missing arguments"); + System.out.println(DESCRIBE_CMD_USAGE); + System.exit(-1); + } + String[] args = cmdline.getArgs(); + if (args.length != 1) { + System.out.println("ERROR: wrong number of arguments"); + System.out.println(DESCRIBE_CMD_USAGE); + System.exit(-1); + } + + String backupId = args[0]; + try { + Configuration conf = HBaseConfiguration.create(); + BackupClient client = BackupRestoreFactory.getBackupClient(conf); + client.describeBackupImage(backupId); + } catch (RuntimeException e) { + System.out.println("ERROR: " + e.getMessage()); + System.exit(-1); + } + } + } + + private static class ProgressCommand extends Command { + CommandLine cmdline; + + ProgressCommand(Configuration conf, CommandLine cmdline) { + super(conf); + this.cmdline = cmdline; + } + + @Override + public void execute() throws IOException { + if (cmdline == null || cmdline.getArgs() == null) { + System.out.println("No backup id was specified, " + + "will retrieve the most recent (ongoing) sessions"); + } + String[] args = cmdline.getArgs(); + if (args.length > 1) { + System.out.println("ERROR: wrong number of arguments: " + args.length); + System.out.println(PROGRESS_CMD_USAGE); + System.exit(-1); + } + + String backupId = args == null ? null : args[0]; + try { + Configuration conf = HBaseConfiguration.create(); + BackupClient client = BackupRestoreFactory.getBackupClient(conf); + client.showProgress(backupId); + } catch (RuntimeException e) { + System.out.println("ERROR: " + e.getMessage()); + System.exit(-1); + } + } + } + + private static class DeleteCommand extends Command { + + CommandLine cmdline; + DeleteCommand(Configuration conf, CommandLine cmdline) { + super(conf); + this.cmdline = cmdline; + } + + @Override + public void execute() throws IOException { + if (cmdline == null || cmdline.getArgs() == null) { + System.out.println("No backup id(s) was specified"); + System.out.println(PROGRESS_CMD_USAGE); + System.exit(-1); + } + String[] args = cmdline.getArgs(); + + try { + Configuration conf = HBaseConfiguration.create(); + BackupClient client = BackupRestoreFactory.getBackupClient(conf); + client.deleteBackups(args); + } catch (RuntimeException e) { + System.out.println("ERROR: " + e.getMessage()); + System.exit(-1); + } + } + } + + private static class CancelCommand extends Command { + CommandLine cmdline; + + CancelCommand(Configuration conf, CommandLine cmdline) { + super(conf); + this.cmdline = cmdline; + } + + @Override + public void execute() throws IOException { + if (cmdline == null || cmdline.getArgs() == null) { + System.out.println("No backup id(s) was specified, will use the most recent one"); + } + String[] args = cmdline.getArgs(); + String backupId = args == null || args.length == 0 ? null : args[0]; + try { + Configuration conf = HBaseConfiguration.create(); + BackupClient client = BackupRestoreFactory.getBackupClient(conf); + client.cancelBackup(backupId); + } catch (RuntimeException e) { + System.out.println("ERROR: " + e.getMessage()); + System.exit(-1); + } + } + } + + private static class HistoryCommand extends Command { + CommandLine cmdline; + + HistoryCommand(Configuration conf, CommandLine cmdline) { + super(conf); + this.cmdline = cmdline; + } + + @Override + public void execute() throws IOException { + + int n = + cmdline == null || cmdline.getArgs() == null || cmdline.getArgs().length == 0 ? 10 + : parseHistoryLength(); + try { + Configuration conf = HBaseConfiguration.create(); + BackupClient client = BackupRestoreFactory.getBackupClient(conf); + client.showHistory(n); + } catch (RuntimeException e) { + System.out.println("ERROR: " + e.getMessage()); + System.exit(-1); + } + } + + private int parseHistoryLength() { + String value = cmdline.getOptionValue("n"); + if (value == null) throw new RuntimeException("command line format"); + return Integer.parseInt(value); + } + } + + private static class BackupSetCommand extends Command { + private final static String SET_ADD_CMD = "add"; + private final static String SET_REMOVE_CMD = "remove"; + private final static String SET_DELETE_CMD = "delete"; + private final static String SET_DESCRIBE_CMD = "describe"; + private final static String SET_LIST_CMD = "list"; + + CommandLine cmdline; + + BackupSetCommand(Configuration conf, CommandLine cmdline) { + super(conf); + this.cmdline = cmdline; + } + + @Override + public void execute() throws IOException { + + // Command-line must have at least one element + if (cmdline == null || cmdline.getArgs() == null || cmdline.getArgs().length == 0) { + throw new IOException("command line format"); + } + String[] args = cmdline.getArgs(); + String cmdStr = args[0]; + BackupCommand cmd = getCommand(cmdStr); + + try { + + switch (cmd) { + case SET_ADD: + processSetAdd(args); + break; + case SET_REMOVE: + processSetRemove(args); + break; + case SET_DELETE: + processSetDelete(args); + break; + case SET_DESCRIBE: + processSetDescribe(args); + break; + case SET_LIST: + processSetList(args); + break; + default: + break; + + } + } catch (RuntimeException e) { + System.out.println("ERROR: " + e.getMessage()); + System.exit(-1); + } + } + + private void processSetList(String[] args) throws IOException { + // List all backup set names + // does not expect any args + Configuration conf = HBaseConfiguration.create(); + BackupClient client = BackupRestoreFactory.getBackupClient(conf); + client.backupSetList(); + } + + private void processSetDescribe(String[] args) throws IOException { + if (args == null || args.length != 2) { + throw new RuntimeException("Wrong args"); + } + String setName = args[1]; + Configuration conf = HBaseConfiguration.create(); + BackupClient client = BackupRestoreFactory.getBackupClient(conf); + client.backupSetDescribe(setName); + } + + private void processSetDelete(String[] args) throws IOException { + if (args == null || args.length != 2) { + throw new RuntimeException("Wrong args"); + } + String setName = args[1]; + Configuration conf = HBaseConfiguration.create(); + BackupClient client = BackupRestoreFactory.getBackupClient(conf); + client.backupSetDelete(setName); + } + + private void processSetRemove(String[] args) throws IOException { + if (args == null || args.length != 3) { + throw new RuntimeException("Wrong args"); + } + String setName = args[1]; + String[] tables = args[2].split(","); + Configuration conf = HBaseConfiguration.create(); + BackupClient client = BackupRestoreFactory.getBackupClient(conf); + client.backupSetRemove(setName, tables); + } + + private void processSetAdd(String[] args) throws IOException { + if (args == null || args.length != 3) { + throw new RuntimeException("Wrong args"); + } + String setName = args[1]; + String[] tables = args[2].split(","); + Configuration conf = HBaseConfiguration.create(); + BackupClient client = BackupRestoreFactory.getBackupClient(conf); + client.backupSetAdd(setName, tables); + } + + private BackupCommand getCommand(String cmdStr) throws IOException { + if (cmdStr.equals(SET_ADD_CMD)) { + return BackupCommand.SET_ADD; + } else if (cmdStr.equals(SET_REMOVE_CMD)) { + return BackupCommand.SET_REMOVE; + } else if (cmdStr.equals(SET_DELETE_CMD)) { + return BackupCommand.SET_DELETE; + } else if (cmdStr.equals(SET_DESCRIBE_CMD)) { + return BackupCommand.SET_DESCRIBE; + } else if (cmdStr.equals(SET_LIST_CMD)) { + return BackupCommand.SET_LIST; + } else { + throw new IOException("Unknown command for 'set' :" + cmdStr); + } + } + + } + } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupContext.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupContext.java index 1be0c3b..ecca4be 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupContext.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupContext.java @@ -20,12 +20,17 @@ package org.apache.hadoop.hbase.backup.impl; import java.io.IOException; import java.util.ArrayList; +import java.util.Calendar; +import java.util.Collection; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.HBackupFileSystem; @@ -37,49 +42,15 @@ import org.apache.hadoop.hbase.protobuf.generated.BackupProtos; import org.apache.hadoop.hbase.protobuf.generated.BackupProtos.BackupContext.Builder; import org.apache.hadoop.hbase.protobuf.generated.BackupProtos.TableBackupStatus; +import com.google.protobuf.ByteString; + /** * An object to encapsulate the information for each backup request */ @InterfaceAudience.Private @InterfaceStability.Evolving public class BackupContext { - - public Map getBackupStatusMap() { - return backupStatusMap; - } - - public void setBackupStatusMap(Map backupStatusMap) { - this.backupStatusMap = backupStatusMap; - } - - public HashMap> getTableSetTimestampMap() { - return tableSetTimestampMap; - } - - public void setTableSetTimestampMap( - HashMap> tableSetTimestampMap) { - this.tableSetTimestampMap = tableSetTimestampMap; - } - - public String getHlogTargetDir() { - return hlogTargetDir; - } - - public void setType(BackupType type) { - this.type = type; - } - - public void setTargetRootDir(String targetRootDir) { - this.targetRootDir = targetRootDir; - } - - public void setTotalBytesCopied(long totalBytesCopied) { - this.totalBytesCopied = totalBytesCopied; - } - - public void setCancelled(boolean cancelled) { - this.state = BackupState.CANCELLED;; - } + private static final Log LOG = LogFactory.getLog(BackupContext.class); // backup id: a timestamp when we request the backup private String backupId; @@ -122,8 +93,16 @@ public class BackupContext { transient private HashMap> tableSetTimestampMap; // backup progress in %% (0-100) - private int progress; + + // distributed job id + private byte[] jobId; + + // Number of paralle workers. -1 - system defined + private int workers = -1; + + // Bandwidth per worker in MB per sec. -1 - unlimited + private int bandwidth = -1; public BackupContext() { } @@ -134,7 +113,7 @@ public class BackupContext { this.backupId = backupId; this.type = type; this.targetRootDir = targetRootDir; - + LOG.debug("CreateBackupContext: " + tables.length+" "+tables[0]); this.addTables(tables); if (type == BackupType.INCREMENTAL) { @@ -145,6 +124,62 @@ public class BackupContext { this.endTs = 0; } + public byte[] getJobId() { + return jobId; + } + + public void setJobId(byte[] jobId) { + this.jobId = jobId; + } + + public int getWorkers() { + return workers; + } + + public void setWorkers(int workers) { + this.workers = workers; + } + + public int getBandwidth() { + return bandwidth; + } + + public void setBandwidth(int bandwidth) { + this.bandwidth = bandwidth; + } + + public void setBackupStatusMap(Map backupStatusMap) { + this.backupStatusMap = backupStatusMap; + } + + public HashMap> getTableSetTimestampMap() { + return tableSetTimestampMap; + } + + public void setTableSetTimestampMap(HashMap> tableSetTimestampMap) { + this.tableSetTimestampMap = tableSetTimestampMap; + } + + public String getHlogTargetDir() { + return hlogTargetDir; + } + + public void setType(BackupType type) { + this.type = type; + } + + public void setTargetRootDir(String targetRootDir) { + this.targetRootDir = targetRootDir; + } + + public void setTotalBytesCopied(long totalBytesCopied) { + this.totalBytesCopied = totalBytesCopied; + } + + public void setCancelled(boolean cancelled) { + this.state = BackupState.CANCELLED;; + } + /** * Set progress string * @param msg progress message @@ -332,6 +367,11 @@ public class BackupContext { builder.setTargetRootDir(getTargetRootDir()); builder.setTotalBytesCopied(getTotalBytesCopied()); builder.setType(BackupProtos.BackupType.valueOf(getType().name())); + builder.setWorkersNumber(workers); + builder.setBandwidth(bandwidth); + if(jobId != null) { + builder.setJobId(ByteString.copyFrom(jobId)); + } byte[] data = builder.build().toByteArray(); return data; } @@ -368,6 +408,11 @@ public class BackupContext { context.setTargetRootDir(proto.getTargetRootDir()); context.setTotalBytesCopied(proto.getTotalBytesCopied()); context.setType(BackupType.valueOf(proto.getType().name())); + context.setWorkers(proto.getWorkersNumber()); + context.setBandwidth(proto.getBandwidth()); + if(proto.hasJobId()){ + context.setJobId(proto.getJobId().toByteArray()); + } return context; } @@ -379,4 +424,50 @@ public class BackupContext { return map; } + public String getShortDescription() { + StringBuffer sb = new StringBuffer(); + sb.append("ID : " + backupId).append("\n"); + sb.append("Tables : " + getTableListAsString()).append("\n"); + sb.append("State : " + getState()).append("\n"); + Date date = null; + Calendar cal = Calendar.getInstance(); + cal.setTimeInMillis(getStartTs()); + date = cal.getTime(); + sb.append("Start time : " + date).append("\n"); + if (state == BackupState.FAILED) { + sb.append("Failed message : " + getFailedMsg()).append("\n"); + } else if (state == BackupState.RUNNING) { + sb.append("Phase : " + getPhase()).append("\n"); + sb.append("Progress : " + getProgress()).append("\n"); + } else if (state == BackupState.COMPLETE) { + cal = Calendar.getInstance(); + cal.setTimeInMillis(getEndTs()); + date = cal.getTime(); + sb.append("Start time : " + date).append("\n"); + } + return sb.toString(); + } + + public String getStatusAndProgressAsString() { + StringBuffer sb = new StringBuffer(); + sb.append("id: ").append(getBackupId()).append(" state: ").append(getState()) + .append(" progress: ").append(getProgress()); + return sb.toString(); + } + + public String getTableListAsString() { + return concat(backupStatusMap.keySet(), ";"); + } + + private String concat(Set col, String separator) { + if (col.size() == 0) { + return ""; + } + StringBuilder sb = new StringBuilder(); + for (Object s : col) { + sb.append(s.toString() + separator); + } + sb.deleteCharAt(sb.lastIndexOf(";")); + return sb.toString(); + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCopyService.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCopyService.java index 1a0d5ed..c26f4c0 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCopyService.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCopyService.java @@ -27,11 +27,28 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; @InterfaceAudience.Private @InterfaceStability.Evolving +//TODO re-factor move up public interface BackupCopyService extends Configurable { static enum Type { FULL, INCREMENTAL } + /** + * Copy backup data + * @param backupHandler - handler + * @param conf - configuration + * @param copyType - copy type + * @param options - list of options + * @return result (0 - success) + * @throws IOException + */ public int copy(BackupHandler backupHandler, Configuration conf, BackupCopyService.Type copyType, String[] options) throws IOException; + + /** + * TODO refactor + * @param jobHandler - copy job handler + * @throws IOException + */ + public void cancelCopyJob(byte[] jobHandler) throws IOException; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupHandler.java index ea9f35e..c2b7b2c 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupHandler.java @@ -233,7 +233,6 @@ public class BackupHandler implements Callable { */ private void snapshotForFullBackup(BackupContext backupContext) throws IOException { LOG.info("HBase snapshot full backup for " + backupContext.getBackupId()); - // avoid action if has been cancelled if (backupContext.isCancelled()) { return; @@ -242,6 +241,7 @@ public class BackupHandler implements Callable { try (Admin admin = conn.getAdmin()) { // we do HBase snapshot for tables in the table list one by one currently for (TableName table : backupContext.getTables()) { + // avoid action if it has been cancelled if (backupContext.isCancelled()) { return; @@ -427,11 +427,26 @@ public class BackupHandler implements Callable { // TODO this below // backupCopier.setSubTaskPercntgInWholeTask(1f / numOfSnapshots); int res = 0; - String[] args = new String[4]; + int argsLen = 4; + if (backupContext.getWorkers() > 0) argsLen += 2; + if (backupContext.getBandwidth() > 0) argsLen += 2; + String[] args = new String[argsLen]; args[0] = "-snapshot"; args[1] = backupContext.getSnapshotName(table); args[2] = "-copy-to"; args[3] = backupContext.getBackupStatus(table).getTargetDir(); + if (backupContext.getWorkers() > 0 && backupContext.getBandwidth() > 0) { + args[4] = "-mappers"; + args[5] = Integer.toString(backupContext.getWorkers()); + args[6] = "-bandwidth"; + args[7] = Integer.toString(backupContext.getBandwidth()); + } else if (backupContext.getWorkers() > 0) { + args[4] = "-mappers"; + args[5] = Integer.toString(backupContext.getWorkers()); + } else if (backupContext.getBandwidth() > 0) { + args[4] = "-bandwidth"; + args[5] = Integer.toString(backupContext.getBandwidth()); + } LOG.debug("Copy snapshot " + args[1] + " to " + args[3]); res = copyService.copy(this, conf, BackupCopyService.Type.FULL, args); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java index a4b0a0a..d4e5a5c 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java @@ -86,9 +86,10 @@ public class BackupManager implements Closeable { HConstants.BACKUP_ENABLE_KEY + " setting."); } this.conf = conf; - this.conn = ConnectionFactory.createConnection(conf); // TODO: get Connection from elsewhere? - this.systemTable = new BackupSystemTable(conn); + this.systemTable = new BackupSystemTable(this.conf); + this.conn = BackupSystemTable.getConnection(); Runtime.getRuntime().addShutdownHook(new ExitHandler()); + } /** @@ -178,13 +179,13 @@ public class BackupManager implements Closeable { LOG.error(e); } } - if (conn != null) { - try { - conn.close(); - } catch (IOException e) { - LOG.error(e); - } - } +// if (conn != null) { +// try { +// conn.close(); +// } catch (IOException e) { +// LOG.error(e); +// } +// } } /** @@ -198,13 +199,14 @@ public class BackupManager implements Closeable { * @throws BackupException exception */ protected BackupContext createBackupContext(String backupId, BackupType type, - List tableList, String targetRootDir) throws BackupException { + List tableList, String targetRootDir, int workers, int bandwidth) + throws BackupException { if (targetRootDir == null) { throw new BackupException("Wrong backup request parameter: target backup root directory"); } - if (type == BackupType.FULL && tableList == null) { + if (type == BackupType.FULL && tableList == null || tableList.size() == 0) { // If table list is null for full backup, which means backup all tables. Then fill the table // list with all user tables from meta. It no table available, throw the request exception. @@ -228,8 +230,12 @@ public class BackupManager implements Closeable { } // there are one or more tables in the table list - return new BackupContext(backupId, type, tableList.toArray(new TableName[tableList.size()]), + BackupContext context = new BackupContext(backupId, type, + tableList.toArray(new TableName[tableList.size()]), targetRootDir); + context.setBandwidth(bandwidth); + context.setWorkers(workers); + return context; } /** diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupRestoreConstants.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupRestoreConstants.java index d0ce059..7233bfa 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupRestoreConstants.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupRestoreConstants.java @@ -37,7 +37,8 @@ public final class BackupRestoreConstants { public static final String BACKUPID_PREFIX = "backup_"; public static enum BackupCommand { - CREATE, CANCEL, DELETE, DESCRIBE, HISTORY, STATUS, CONVERT, MERGE, STOP, SHOW, HELP, + CREATE, CANCEL, DELETE, DESCRIBE, HISTORY, STATUS, CONVERT, MERGE, STOP, SHOW, HELP, PROGRESS, SET, + SET_ADD, SET_REMOVE, SET_DELETE, SET_DESCRIBE, SET_LIST } private BackupRestoreConstants() { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java index 18a0f06..46a1e44 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java @@ -27,14 +27,21 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.NamespaceNotFoundException; +import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.impl.BackupHandler.BackupState; import org.apache.hadoop.hbase.backup.impl.BackupUtil.BackupCompleteData; @@ -42,6 +49,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; @@ -67,25 +75,62 @@ public final class BackupSystemTable implements Closeable { // Connection to HBase cluster, shared // among all instances - private final Connection connection; + private static Connection connection; // Cluster configuration - private final Configuration conf; + private static Configuration config; - /** - * Create a BackupSystemTable object for the given Connection. Connection is NOT owned by this - * instance and has to be closed explicitly. - * @param connection - * @throws IOException - */ - public BackupSystemTable(Connection connection) throws IOException { - this.connection = connection; - this.conf = connection.getConfiguration(); + private static int refCount = 0; + private static WriteLock refCountLock = new ReentrantReadWriteLock().writeLock(); - createSystemTableIfNotExists(); + public static Connection getConnection() + { + return connection; + } + + private static void verifySystemNamespaceExists(Admin admin) throws IOException { + try { + admin.getNamespaceDescriptor(TABLE_NAMESPACE); + } catch (NamespaceNotFoundException e) { + admin.createNamespace(NamespaceDescriptor.create(TABLE_NAMESPACE).build()); + } + } + + public BackupSystemTable(Configuration conf) throws IOException { + try{ + refCountLock.lock(); + if (connection == null) { + connection = ConnectionFactory.createConnection(conf); + config = conf; + // Verify hbase:backup exists - FOR TESTS ONLY + // hbase:backup is created by HMaster on start up + createSystemTableIfNotExists(); + } + refCount++; + } finally{ + refCountLock.unlock(); + } } - @Override + /** + * Closes HBase connection, to avoid connection leak + * we perform reference counting and when refNumber reaches 0 + * we close underlying connection + */ public void close() { + try{ + BackupSystemTable.refCountLock.lock(); + BackupSystemTable.refCount--; + if(connection != null && BackupSystemTable.refCount == 0) { + try{ + connection.close(); + connection = null; + } catch(Exception e){ + LOG.error(e); + } + } + } finally{ + BackupSystemTable.refCountLock.unlock(); + } } /** @@ -99,15 +144,19 @@ public final class BackupSystemTable implements Closeable { private void createSystemTableIfNotExists() throws IOException { try(Admin admin = connection.getAdmin()) { if (admin.tableExists(tableName) == false) { + verifySystemNamespaceExists(admin); HTableDescriptor tableDesc = new HTableDescriptor(tableName); HColumnDescriptor colDesc = new HColumnDescriptor(familyName); colDesc.setMaxVersions(1); int ttl = - conf.getInt(HConstants.BACKUP_SYSTEM_TTL_KEY, HConstants.BACKUP_SYSTEM_TTL_DEFAULT); + config.getInt(HConstants.BACKUP_SYSTEM_TTL_KEY, HConstants.BACKUP_SYSTEM_TTL_DEFAULT); colDesc.setTimeToLive(ttl); tableDesc.addFamily(colDesc); admin.createTable(tableDesc); } + + } catch (TableExistsException ee){ + //just ignore that } catch (IOException e) { LOG.error(e); throw e; @@ -568,4 +617,203 @@ public final class BackupSystemTable implements Closeable { return result; } } + + /** + * BACKUP SETS + */ + + /** + * Get backup set list + * @return backup set list + * @throws IOException + */ + public List backupSetList() throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug(" backup set list"); + } + List list = new ArrayList(); + Table table = null; + ResultScanner scanner = null; + try { + table = connection.getTable(tableName); + Scan scan = BackupSystemTableHelper.createScanForBackupSetList(); + scan.setMaxVersions(1); + scanner = table.getScanner(scan); + Result res = null; + while ((res = scanner.next()) != null) { + res.advance(); + list.add(BackupSystemTableHelper.cellKeyToBackupSetName(res.current())); + } + return list; + } finally { + if (table != null) { + table.close(); + } + } + } + + /** + * Get backup set description (list of tables) + * @param setName set's name + * @return list of tables in a backup set + * @throws IOException + */ + public String[] backupSetDescribe(String name) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug(" backup set describe: "+name); + } + Table table = null; + try { + table = connection.getTable(tableName); + Get get = BackupSystemTableHelper.createGetForBackupSet(name); + Result res = table.get(get); + if(res.isEmpty()) return null; + res.advance(); + String[] tables = + BackupSystemTableHelper.cellValueToBackupSet(res.current()); + return tables; + } finally { + if (table != null) { + table.close(); + } + } + } + + /** + * Add backup set (list of tables) + * @param name - set name + * @param tables - list of tables, comma-separated + * @throws IOException + */ + public void backupSetAdd(String name, String[] newTables) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug(" backup set add: "+name); + } + Table table = null; + String[] union = null; + try { + table = connection.getTable(tableName); + Get get = BackupSystemTableHelper.createGetForBackupSet(name); + Result res = table.get(get); + if(res.isEmpty()) { + union = newTables; + } else { + res.advance(); + String[] tables = + BackupSystemTableHelper.cellValueToBackupSet(res.current()); + union = merge(tables, newTables); + } + Put put = BackupSystemTableHelper.createPutForBackupSet(name, union); + table.put(put); + } finally { + if (table != null) { + table.close(); + } + } + } + + private String[] merge(String[] tables, String[] newTables) { + List list = new ArrayList(); + // Add all from tables + for(String t: tables){ + list.add(t); + } + for(String nt: newTables){ + if(list.contains(nt)) continue; + list.add(nt); + } + String[] arr = new String[list.size()]; + list.toArray(arr); + return arr; + } + + /** + * Remove tables from backup set (list of tables) + * @param name - set name + * @param tables - list of tables, comma-separated + * @throws IOException + */ + public void backupSetRemove(String name, String[] toRemove) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug(" backup set describe: " + name); + } + Table table = null; + String[] disjoint = null; + try { + table = connection.getTable(tableName); + Get get = BackupSystemTableHelper.createGetForBackupSet(name); + Result res = table.get(get); + if (res.isEmpty()) { + return; + } else { + res.advance(); + String[] tables = BackupSystemTableHelper.cellValueToBackupSet(res.current()); + disjoint = disjoin(tables, toRemove); + } + if (disjoint.length > 0) { + Put put = BackupSystemTableHelper.createPutForBackupSet(name, disjoint); + table.put(put); + } else { + // Delete + backupSetDescribe(name); + } + } finally { + if (table != null) { + table.close(); + } + } + } + + private String[] disjoin(String[] tables, String[] toRemove) { + List list = new ArrayList(); + // Add all from tables + for (String t : tables) { + list.add(t); + } + for (String nt : toRemove) { + if (list.contains(nt)) { + list.remove(nt); + } + } + String[] arr = new String[list.size()]; + list.toArray(arr); + return arr; + } + + /** + * Delete backup set + * @param name set's name + * @throws IOException + */ + public void backupSetDelete(String name) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug(" backup set delete: " + name); + } + Table table = null; + try { + table = connection.getTable(tableName); + Delete del = BackupSystemTableHelper.createDeleteForBackupSet(name); + table.delete(del); + } finally { + if (table != null) { + table.close(); + } + } + } + + /** + * Get backup system table descriptor + * @return descriptor + */ + public static HTableDescriptor getSystemTableDescriptor() { + HTableDescriptor tableDesc = new HTableDescriptor(tableName); + HColumnDescriptor colDesc = new HColumnDescriptor(familyName); + colDesc.setMaxVersions(1); + if(config == null) config = HBaseConfiguration.create(); + int ttl = + config.getInt(HConstants.BACKUP_SYSTEM_TTL_KEY, HConstants.BACKUP_SYSTEM_TTL_DEFAULT); + colDesc.setTimeToLive(ttl); + tableDesc.addFamily(colDesc); + return tableDesc; + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTableHelper.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTableHelper.java index ac096b7..52d3311 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTableHelper.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTableHelper.java @@ -61,6 +61,7 @@ public final class BackupSystemTableHelper { private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm."; private final static String RS_LOG_TS_PREFIX = "rslogts."; private final static String WALS_PREFIX = "wals."; + private final static String SET_KEY_PREFIX = "set."; private final static byte[] col1 = "col1".getBytes(); private final static byte[] col2 = "col2".getBytes(); @@ -322,4 +323,93 @@ public final class BackupSystemTableHelper { return get; } + + /** + * Creates Scan operation to load backup set list + * @return scan operation + */ + static Scan createScanForBackupSetList() { + Scan scan = new Scan(); + byte[] startRow = SET_KEY_PREFIX.getBytes(); + byte[] stopRow = Arrays.copyOf(startRow, startRow.length); + stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); + scan.setStartRow(startRow); + scan.setStopRow(stopRow); + scan.addFamily(BackupSystemTable.familyName); + return scan; + } + + /** + * Creates Get operation to load backup set content + * @return get operation + */ + static Get createGetForBackupSet(String name) { + byte[] row = (SET_KEY_PREFIX + name).getBytes(); + Get get = new Get(row); + get.addFamily(BackupSystemTable.familyName); + return get; + } + + /** + * Creates Delete operation to delete backup set content + * @return delete operation + */ + static Delete createDeleteForBackupSet(String name) { + byte[] row = (SET_KEY_PREFIX + name).getBytes(); + Delete del = new Delete(row); + del.addFamily(BackupSystemTable.familyName); + return del; + } + + + /** + * Creates Put operation to update backup set content + * @return put operation + */ + static Put createPutForBackupSet(String name, String[] tables) { + byte[] row = (SET_KEY_PREFIX + name).getBytes(); + Put put = new Put(row); + byte[] value = convertToByteArray(tables); + put.addColumn(BackupSystemTable.familyName, col1, value); + return put; + } + + private static byte[] convertToByteArray(String[] tables) { + StringBuffer sb = new StringBuffer(); + for(int i=0; i < tables.length; i++){ + sb.append(tables[i]); + if(i < tables.length -1){ + sb.append(","); + } + } + return sb.toString().getBytes(); + } + + + /** + * Converts cell to backup set list. + * @param current - cell + * @return backup set + * @throws IOException + */ + static String[] cellValueToBackupSet(Cell current) throws IOException { + byte[] data = CellUtil.cloneValue(current); + if( data != null && data.length > 0){ + return new String(data).split(","); + } else{ + return new String[0]; + } + } + + /** + * Converts cell key to backup set name. + * @param current - cell + * @return backup set namd + * @throws IOException + */ + static String cellKeyToBackupSetName(Cell current) throws IOException { + byte[] data = CellUtil.cloneRow(current); + return new String(data).substring(SET_KEY_PREFIX.length()); + } + } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupUtil.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupUtil.java index 2dd0556..38eb5a3 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupUtil.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupUtil.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.net.URLDecoder; import java.util.ArrayList; import java.util.Collections; +import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -45,12 +46,12 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableDescriptor; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.HBackupFileSystem; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSUtils; @@ -153,7 +154,6 @@ public final class BackupUtil { descriptors.createTableDescriptorForTableDirectory(target, orig, false); LOG.debug("Finished copying tableinfo."); - HBaseAdmin hbadmin = null; // TODO: optimize List regions = null; try(Connection conn = ConnectionFactory.createConnection(conf); @@ -375,9 +375,35 @@ public final class BackupUtil { new Long(o.getBackupToken().substring(o.getBackupToken().lastIndexOf("_") + 1)); return thisTS.compareTo(otherTS); } + + public String toString() { + StringBuffer sb = new StringBuffer(); + sb.append("BackupId =").append(backupToken).append("\n"); + sb.append("Type =").append(type).append("\n"); + sb.append("Start time =").append(new Date(Long.parseLong(startTime))).append("\n"); + sb.append("End time =").append(new Date(Long.parseLong(endTime))).append("\n"); + sb.append("Root path =").append(backupRootPath).append("\n"); + sb.append("Type =").append(type).append("\n"); + sb.append("Table list =").append(tableList).append("\n"); + sb.append("Bytes copied =").append(type).append("\n"); + sb.append("Ancestors =").append(BackupUtil.merge(ancestors)).append("\n\n"); + return sb.toString(); + } } + public static String merge(List list) { + if (list == null || list.size() == 0) return ""; + StringBuffer sb = new StringBuffer(); + for (int i = 0; i < list.size(); i++) { + sb.append(list.get(i)); + if (i < list.size() - 1) { + sb.append(","); + } + } + return sb.toString(); + } + /** * Sort history list by start time in descending order. * @param historyList history list @@ -510,4 +536,78 @@ public final class BackupUtil { } return ret; } + + public static void cleanupBackupData(BackupContext context, Configuration conf) + throws IOException + { + cleanupHLogDir(context, conf); + cleanupTargetDir(context, conf); + } + + /** + * Clean up directories which are generated when DistCp copying hlogs. + * @throws IOException + */ + private static void cleanupHLogDir(BackupContext backupContext, Configuration conf) + throws IOException { + + String logDir = backupContext.getHLogTargetDir(); + if (logDir == null) { + LOG.warn("No log directory specified for " + backupContext.getBackupId()); + return; + } + + Path rootPath = new Path(logDir).getParent(); + FileSystem fs = FileSystem.get(rootPath.toUri(), conf); + FileStatus[] files = FSUtils.listStatus(fs, rootPath); + if (files == null) { + return; + } + for (FileStatus file : files) { + LOG.debug("Delete log files: " + file.getPath().getName()); + FSUtils.delete(fs, file.getPath(), true); + } + } + + /** + * Clean up the data at target directory + */ + private static void cleanupTargetDir(BackupContext backupContext, Configuration conf) { + try { + // clean up the data at target directory + LOG.debug("Trying to cleanup up target dir : " + backupContext.getBackupId()); + String targetDir = backupContext.getTargetRootDir(); + if (targetDir == null) { + LOG.warn("No target directory specified for " + backupContext.getBackupId()); + return; + } + + FileSystem outputFs = + FileSystem.get(new Path(backupContext.getTargetRootDir()).toUri(), conf); + + for (TableName table : backupContext.getTables()) { + Path targetDirPath = + new Path(HBackupFileSystem.getTableBackupDir(backupContext.getTargetRootDir(), + backupContext.getBackupId(), table)); + if (outputFs.delete(targetDirPath, true)) { + LOG.info("Cleaning up backup data at " + targetDirPath.toString() + " done."); + } else { + LOG.info("No data has been found in " + targetDirPath.toString() + "."); + } + + Path tableDir = targetDirPath.getParent(); + FileStatus[] backups = FSUtils.listStatus(outputFs, tableDir); + if (backups == null || backups.length == 0) { + outputFs.delete(tableDir, true); + LOG.debug(tableDir.toString() + " is empty, remove it."); + } + } + + } catch (IOException e1) { + LOG.error("Cleaning up backup data of " + backupContext.getBackupId() + " at " + + backupContext.getTargetRootDir() + " failed due to " + e1.getMessage() + "."); + } + } + + } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java index df21612..aba0195 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java @@ -105,12 +105,25 @@ public class IncrementalBackupManager { newTimestamps = backupManager.readRegionServerLastLogRollResult(); logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf, savedStartCode); - logList.addAll(getLogFilesFromBackupSystem(previousTimestampMins, newTimestamps)); + logList = addLogFilesFromBackupSystem(logList, previousTimestampMins, newTimestamps); backupContext.setIncrBackupFileList(logList); return newTimestamps; } + private List addLogFilesFromBackupSystem(List logList, + HashMap previousTimestampMins, HashMap newTimestamps) + throws IOException { + List fromBackupSystemList = + getLogFilesFromBackupSystem(previousTimestampMins, newTimestamps); + for(String fileName: fromBackupSystemList) { + if(!logList.contains(fileName)) { + logList.add(fileName); + } + } + return logList; + } + /** * For each region server: get all log files newer than the last timestamps but not newer than the * newest timestamps. FROM hbase:backup table diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreClientImpl.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreClientImpl.java index 32d16fb..4705be8 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreClientImpl.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreClientImpl.java @@ -20,11 +20,9 @@ package org.apache.hadoop.hbase.backup.impl; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map.Entry; -import java.util.Set; import java.util.TreeSet; import org.apache.commons.logging.Log; @@ -51,7 +49,6 @@ public final class RestoreClientImpl implements RestoreClient { private static final Log LOG = LogFactory.getLog(RestoreClientImpl.class); private Configuration conf; - private Set lastRestoreImagesSet; public RestoreClientImpl() { } @@ -64,7 +61,7 @@ public final class RestoreClientImpl implements RestoreClient { /** * Restore operation. Stage 1: validate backupManifest, and check target tables * @param hBackupFS to access the backup image - * @param backupRootDir The root dir for backup image + * @param backupRootDir The root directory for backup image * @param backupId The backup id for image to be restored * @param check True if only do dependency check * @param autoRestore True if automatically restore following the dependency @@ -107,30 +104,18 @@ public final class RestoreClientImpl implements RestoreClient { // check the target tables checkTargetTables(tTableArray, isOverwrite); + restoreStage(hBackupFS, backupManifestMap, sTableArray, tTableArray, autoRestore); - // start restore process - Set restoreImageSet = - restoreStage(hBackupFS, backupManifestMap, sTableArray, tTableArray, autoRestore); - - LOG.info("Restore for " + Arrays.asList(sTableArray) + " are successful!"); - lastRestoreImagesSet = restoreImageSet; + LOG.info("Restore for " + BackupUtil.join(sTableArray) + " is successful!"); } catch (IOException e) { LOG.error("ERROR: restore failed with error: " + e.getMessage()); throw e; } - // not only for check, return false return false; } - /** - * Get last restore image set. The value is globally set for the latest finished restore. - * @return the last restore image set - */ - public Set getLastRestoreImagesSet() { - return lastRestoreImagesSet; - } private boolean validate(HashMap backupManifestMap) throws IOException { @@ -145,10 +130,6 @@ public final class RestoreClientImpl implements RestoreClient { imageSet.addAll(depList); } - // todo merge - LOG.debug("merge will be implemented in future jira"); - // BackupUtil.clearMergedImages(table, imageSet, conf); - LOG.info("Dependent image(s) from old to new:"); for (BackupImage image : imageSet) { String imageDir = @@ -187,7 +168,7 @@ public final class RestoreClientImpl implements RestoreClient { } } else { LOG.info("HBase table " + tableName - + " does not exist. It will be create during backup process"); + + " does not exist. It will be created during restore process"); } } } @@ -219,10 +200,9 @@ public final class RestoreClientImpl implements RestoreClient { * @param sTableArray The array of tables to be restored * @param tTableArray The array of mapping tables to restore to * @param autoRestore : yes, restore all the backup images on the dependency list - * @return set of BackupImages restored * @throws IOException exception */ - private Set restoreStage(HBackupFileSystem hBackupFS, + private void restoreStage(HBackupFileSystem hBackupFS, HashMap backupManifestMap, TableName[] sTableArray, TableName[] tTableArray, boolean autoRestore) throws IOException { TreeSet restoreImageSet = new TreeSet(); @@ -267,9 +247,7 @@ public final class RestoreClientImpl implements RestoreClient { } } } - } - return restoreImageSet; } /** @@ -290,7 +268,7 @@ public final class RestoreClientImpl implements RestoreClient { Path tableBackupPath = hFS.getTableBackupPath(sTable); - // todo: convert feature will be provided in a future jira + // TODO: convert feature will be provided in a future JIRA boolean converted = false; if (manifest.getType() == BackupType.FULL || converted) { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreUtil.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreUtil.java index 2dd38c1..eb7b6b2 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreUtil.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreUtil.java @@ -64,9 +64,9 @@ public class RestoreUtil { } /** - * During incremental backup operation. Call WalPlayer to replay WAL in backup image Currently - * tableNames and newTablesNames only contain single table, will be expanded to multiple tables in - * the future + * During incremental restore operation. Call WalPlayer to replay WAL in backup image Currently + * tableNames and newTablesNames only contain single table, will be expanded to multiple tables + * in the future * @param logDir : incremental backup folders, which contains WAL * @param tableNames : source tableNames(table names were backuped) * @param newTableNames : target tableNames(table names to be restored to) @@ -76,7 +76,7 @@ public class RestoreUtil { TableName[] tableNames, TableName[] newTableNames) throws IOException { if (tableNames.length != newTableNames.length) { - throw new IOException("Number of source tables adn taget Tables does not match!"); + throw new IOException("Number of source tables and target Tables do not match!"); } // for incremental backup image, expect the table already created either by user or previous @@ -232,6 +232,9 @@ public class RestoreUtil { this.conf.setInt("hbase.rpc.timeout", resultMillis); } + // By default, it is 32 and loader will fail if # of files in any region exceed this + // limit. Bad for snapshot restore. + this.conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE); LoadIncrementalHFiles loader = null; try { loader = new LoadIncrementalHFiles(this.conf); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java index 4f5adda..f2c985f 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.backup.mapreduce; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.Method; @@ -36,7 +38,9 @@ import org.apache.hadoop.hbase.backup.impl.BackupUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.snapshot.ExportSnapshot; +import org.apache.hadoop.mapreduce.Cluster; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.tools.DistCp; import org.apache.hadoop.tools.DistCpConstants; import org.apache.hadoop.tools.DistCpOptions; @@ -291,4 +295,26 @@ public class MapReduceBackupCopyService implements BackupCopyService { } } + @Override + public void cancelCopyJob(byte[] jobId) throws IOException { + JobID id = new JobID(); + id.readFields(new DataInputStream(new ByteArrayInputStream(jobId))); + Cluster cluster = new Cluster(getConf()); + try { + Job job = cluster.getJob(id); + if (job == null) { + LOG.error("No job found for " + id); + // should we throw exception + } + if (job.isComplete() || job.isRetired()) { + return; + } + + job.killJob(); + LOG.debug("Killed job " + id); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java index dae24a6..e6ede65 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java @@ -64,8 +64,7 @@ public class BackupLogCleaner extends BaseLogCleanerDelegate { // TODO: LogCleaners do not have a way to get the Connection from Master. We should find a // way to pass it down here, so that this connection is not re-created every time. // It is expensive - try(Connection connection = ConnectionFactory.createConnection(this.getConf()); - final BackupSystemTable table = new BackupSystemTable(connection)) { + try (final BackupSystemTable table = new BackupSystemTable(this.getConf())) { // If we do not have recorded backup sessions if (!table.hasBackupSessions()) { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java index d524140..f08e976 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java @@ -75,7 +75,7 @@ public class LogRollBackupSubprocedure extends Subprocedure { LOG.info("After roll log in backup subprocedure, current log number: " + hlog.getFilenum()); Connection connection = rss.getConnection(); - try(final BackupSystemTable table = new BackupSystemTable(connection)) { + try(final BackupSystemTable table = new BackupSystemTable(connection.getConfiguration())) { // sanity check, good for testing HashMap serverTimestampMap = table.readRegionServerLastLogRollResult(); String host = rss.getServerName().getHostname(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java index 2ceeda5..f78bc19 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -261,7 +261,7 @@ public class WALPlayer extends Configured implements Tool { Configuration conf = getConf(); setupTime(conf, HLogInputFormat.START_TIME_KEY); setupTime(conf, HLogInputFormat.END_TIME_KEY); - Path inputDir = new Path(args[0]); + Path[] inputDirs = getInputDirs(args[0]); String[] tables = args[1].split(","); String[] tableMap; if (args.length > 2) { @@ -275,9 +275,13 @@ public class WALPlayer extends Configured implements Tool { } conf.setStrings(TABLES_KEY, tables); conf.setStrings(TABLE_MAP_KEY, tableMap); - Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + inputDir)); + Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + System.currentTimeMillis())); job.setJarByClass(WALPlayer.class); - FileInputFormat.setInputPaths(job, inputDir); + + for(Path p: inputDirs){ + FileInputFormat.addInputPath(job, p); + } + job.setInputFormatClass(WALInputFormat.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); @@ -312,6 +316,20 @@ public class WALPlayer extends Configured implements Tool { } /** + * We support multiple input sources + * @param pathStr - list of input folders, comma-separated + * @return array of path objects + */ + private Path[] getInputDirs(String pathStr) { + String[] splits = pathStr.split(","); + Path[] retValue = new Path[splits.length]; + for(int i = 0; i < splits.length; i++) { + retValue[i] = new Path(splits[i]); + } + return retValue; + } + + /** * Print usage * @param errorMsg Error message. Can be null. */ diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java index 84b7c78..c577767 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java @@ -18,6 +18,8 @@ */ package org.apache.hadoop.hbase.backup; +import static org.apache.hadoop.hbase.wal.WALFactory.WAL_PROVIDER; + import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -31,9 +33,10 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.backup.impl.BackupHandler.BackupState; import org.apache.hadoop.hbase.backup.impl.BackupContext; +import org.apache.hadoop.hbase.backup.impl.BackupHandler.BackupState; import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; +import org.apache.hadoop.hbase.backup.master.BackupController; import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; import org.apache.hadoop.hbase.backup.regionserver.LogRollRegionServerProcedureManager; import org.apache.hadoop.hbase.client.Connection; @@ -71,7 +74,7 @@ public class TestBackupBase { protected static TableName table3_restore = TableName.valueOf("table3_restore"); protected static TableName table4_restore = TableName.valueOf("table4_restore"); - protected static final int NB_ROWS_IN_BATCH = 100; + protected static final int NB_ROWS_IN_BATCH = 999; protected static final byte[] qualName = Bytes.toBytes("q1"); protected static final byte[] famName = Bytes.toBytes("f"); @@ -88,15 +91,20 @@ public class TestBackupBase { @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL = new HBaseTestingUtility(); - TEST_UTIL.getConfiguration().set("hbase.procedure.regionserver.classes", + conf1 = TEST_UTIL.getConfiguration(); + conf1.set("hbase.procedure.regionserver.classes", LogRollRegionServerProcedureManager.class.getName()); - TEST_UTIL.getConfiguration().set("hbase.procedure.master.classes", + conf1.set("hbase.procedure.master.classes", LogRollMasterProcedureManager.class.getName()); - TEST_UTIL.getConfiguration().set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); + // Set Master Observer - Backup Controller + conf1.set("hbase.coprocessor.master.classes", + BackupController.class.getName()); + conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); + // Set MultiWAL (with 2 default WAL files per RS) + conf1.set(WAL_PROVIDER, "multiwal"); TEST_UTIL.startMiniZKCluster(); MiniZooKeeperCluster miniZK = TEST_UTIL.getZkCluster(); - conf1 = TEST_UTIL.getConfiguration(); conf2 = HBaseConfiguration.create(conf1); conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); TEST_UTIL2 = new HBaseTestingUtility(conf2); @@ -139,12 +147,14 @@ public class TestBackupBase { protected static void createTables() throws Exception { long tid = System.currentTimeMillis(); + byte[] startKey = Bytes.toBytes("row0"); + byte[] endKey = Bytes.toBytes("row"+ NB_ROWS_IN_BATCH); table1 = TableName.valueOf("test-" + tid); HBaseAdmin ha = TEST_UTIL.getHBaseAdmin(); HTableDescriptor desc = new HTableDescriptor(table1); HColumnDescriptor fam = new HColumnDescriptor(famName); desc.addFamily(fam); - ha.createTable(desc); + ha.createTable(desc, startKey, endKey, 10); Connection conn = ConnectionFactory.createConnection(conf1); HTable table = (HTable) conn.getTable(table1); loadTable(table); @@ -152,7 +162,7 @@ public class TestBackupBase { table2 = TableName.valueOf("test-" + tid + 1); desc = new HTableDescriptor(table2); desc.addFamily(fam); - ha.createTable(desc); + ha.createTable(desc, startKey, endKey, 10); table = (HTable) conn.getTable(table2); loadTable(table); table.close(); @@ -179,9 +189,8 @@ public class TestBackupBase { } private BackupContext getBackupContext(String backupId) throws IOException { - Configuration conf = conf1;//BackupClientImpl.getConf(); - try (Connection connection = ConnectionFactory.createConnection(conf); - BackupSystemTable table = new BackupSystemTable(connection)) { + Configuration conf = conf1; + try (BackupSystemTable table = new BackupSystemTable(conf)) { BackupContext status = table.readBackupStatus(backupId); return status; } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBoundaryTests.java hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBoundaryTests.java index 21bf63c..0f30659 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBoundaryTests.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBoundaryTests.java @@ -73,8 +73,7 @@ public class TestBackupBoundaryTests extends TestBackupBase { LOG.info("test full backup fails on a single table that does not exist"); List tables = toList("tabledne"); - String backupId = getBackupClient().create(BackupType.FULL, tables, BACKUP_ROOT_DIR); - assertTrue(checkSucceeded(backupId)); + getBackupClient().create(BackupType.FULL, tables, BACKUP_ROOT_DIR); } /** @@ -86,20 +85,17 @@ public class TestBackupBoundaryTests extends TestBackupBase { LOG.info("test full backup fails on multiple tables that do not exist"); List tables = toList("table1dne", "table2dne"); - String backupId = getBackupClient().create(BackupType.FULL, tables, BACKUP_ROOT_DIR); - assertTrue(checkSucceeded(backupId)); + getBackupClient().create(BackupType.FULL, tables, BACKUP_ROOT_DIR); } /** - * Verify that full backup fails on tableset containing real and fake tables. + * Verify that full backup fails on table set containing real and fake tables. * @throws Exception */ @Test(expected = DoNotRetryIOException.class) public void testFullBackupMixExistAndDNE() throws Exception { LOG.info("create full backup fails on tableset containing real and fake table"); - List tables = toList(table1.getNameAsString(), "tabledne"); - String backupId = getBackupClient().create(BackupType.FULL, tables, BACKUP_ROOT_DIR); - //assertTrue(checkSucceeded(backupId)); // TODO + getBackupClient().create(BackupType.FULL, tables, BACKUP_ROOT_DIR); } } \ No newline at end of file diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupLogCleaner.java hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupLogCleaner.java index 899f53b..a4779bc 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupLogCleaner.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupLogCleaner.java @@ -65,8 +65,7 @@ public class TestBackupLogCleaner extends TestBackupBase { List tableSetFullList = Lists.newArrayList(table1, table2, table3, table4); - try (Connection connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); - BackupSystemTable systemTable = new BackupSystemTable(connection)) { + try (BackupSystemTable systemTable = new BackupSystemTable(TEST_UTIL.getConfiguration())) { // Verify that we have no backup sessions yet assertFalse(systemTable.hasBackupSessions()); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupSystemTable.java hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupSystemTable.java index 2dc31df..d062202 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupSystemTable.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupSystemTable.java @@ -42,9 +42,9 @@ import org.apache.hadoop.hbase.backup.impl.BackupContext; import org.apache.hadoop.hbase.backup.impl.BackupHandler.BackupState; import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; import org.apache.hadoop.hbase.backup.impl.BackupUtil.BackupCompleteData; +import org.apache.hadoop.hbase.backup.master.BackupController; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.junit.After; import org.junit.AfterClass; @@ -68,13 +68,14 @@ public class TestBackupSystemTable { @BeforeClass public static void setUp() throws Exception { - cluster = UTIL.startMiniCluster(); - conn = ConnectionFactory.createConnection(UTIL.getConfiguration()); + UTIL.getConfiguration().set("hbase.coprocessor.master.classes", + BackupController.class.getName()); + cluster = UTIL.startMiniCluster(); } @Before public void before() throws IOException { - table = new BackupSystemTable(conn); + table = new BackupSystemTable(conf); } @After @@ -148,6 +149,43 @@ public class TestBackupSystemTable { } @Test + public void testBackupDelete() throws IOException { + + try (BackupSystemTable table = new BackupSystemTable(conf)) { + + int n = 10; + List list = createBackupContextList(n); + + // Load data + for (BackupContext bc : list) { + // Make sure we set right status + bc.setState(BackupState.COMPLETE); + table.updateBackupStatus(bc); + } + + // Verify exists + for (BackupContext bc : list) { + assertNotNull(table.readBackupStatus(bc.getBackupId())); + } + + // Delete all + for (BackupContext bc : list) { + table.deleteBackupStatus(bc.getBackupId()); + } + + // Verify do not exists + for (BackupContext bc : list) { + assertNull(table.readBackupStatus(bc.getBackupId())); + } + + cleanBackupTable(); + } + + } + + + + @Test public void testRegionServerLastLogRollResults() throws IOException { String[] servers = new String[] { "server1", "server2", "server3" }; Long[] timestamps = new Long[] { 100L, 102L, 107L }; @@ -302,6 +340,145 @@ public class TestBackupSystemTable { cleanBackupTable(); } + + /** + * Backup set tests + */ + + @Test + public void testBackupSetAddNotExists() throws IOException { + try (BackupSystemTable table = new BackupSystemTable(conf)) { + + String[] tables = new String[] { "table1", "table2", "table3" }; + String setName = "name"; + table.backupSetAdd(setName, tables); + String[] tnames = table.backupSetDescribe(setName); + assertTrue(tnames != null); + assertTrue(tnames.length == tables.length); + for (int i = 0; i < tnames.length; i++) { + assertTrue(tnames[i].equals(tables[i])); + } + + cleanBackupTable(); + } + + } + + @Test + public void testBackupSetAddExists() throws IOException { + try (BackupSystemTable table = new BackupSystemTable(conf)) { + + String[] tables = new String[] { "table1", "table2", "table3" }; + String setName = "name"; + table.backupSetAdd(setName, tables); + String[] addTables = new String[] { "table4", "table5", "table6" }; + table.backupSetAdd(setName, addTables); + + String[] tnames = table.backupSetDescribe(setName); + assertTrue(tnames != null); + assertTrue(tnames.length == tables.length + addTables.length); + for (int i = 0; i < tnames.length; i++) { + assertTrue(tnames[i].equals("table" + (i + 1))); + } + cleanBackupTable(); + } + } + + @Test + public void testBackupSetAddExistsIntersects() throws IOException { + try (BackupSystemTable table = new BackupSystemTable(conf)) { + + String[] tables = new String[] { "table1", "table2", "table3" }; + String setName = "name"; + table.backupSetAdd(setName, tables); + String[] addTables = new String[] { "table3", "table4", "table5", "table6" }; + table.backupSetAdd(setName, addTables); + + String[] tnames = table.backupSetDescribe(setName); + assertTrue(tnames != null); + assertTrue(tnames.length == tables.length + addTables.length - 1); + for (int i = 0; i < tnames.length; i++) { + assertTrue(tnames[i].equals("table" + (i + 1))); + } + cleanBackupTable(); + } + } + + @Test + public void testBackupSetRemoveSomeNotExists() throws IOException { + try (BackupSystemTable table = new BackupSystemTable(conf)) { + + String[] tables = new String[] { "table1", "table2", "table3", "table4" }; + String setName = "name"; + table.backupSetAdd(setName, tables); + String[] removeTables = new String[] { "table4", "table5", "table6" }; + table.backupSetRemove(setName, removeTables); + + String[] tnames = table.backupSetDescribe(setName); + assertTrue(tnames != null); + assertTrue(tnames.length == tables.length - 1); + for (int i = 0; i < tnames.length; i++) { + assertTrue(tnames[i].equals("table" + (i + 1))); + } + cleanBackupTable(); + } + } + + @Test + public void testBackupSetRemove() throws IOException { + try (BackupSystemTable table = new BackupSystemTable(conf)) { + + String[] tables = new String[] { "table1", "table2", "table3", "table4" }; + String setName = "name"; + table.backupSetAdd(setName, tables); + String[] removeTables = new String[] { "table4", "table3" }; + table.backupSetRemove(setName, removeTables); + + String[] tnames = table.backupSetDescribe(setName); + assertTrue(tnames != null); + assertTrue(tnames.length == tables.length - 2); + for (int i = 0; i < tnames.length; i++) { + assertTrue(tnames[i].equals("table" + (i + 1))); + } + cleanBackupTable(); + } + } + + @Test + public void testBackupSetDelete() throws IOException { + try (BackupSystemTable table = new BackupSystemTable(conf)) { + + String[] tables = new String[] { "table1", "table2", "table3", "table4" }; + String setName = "name"; + table.backupSetAdd(setName, tables); + table.backupSetDelete(setName); + + String[] tnames = table.backupSetDescribe(setName); + assertTrue(tnames == null); + cleanBackupTable(); + } + } + + @Test + public void testBackupSetList() throws IOException { + try (BackupSystemTable table = new BackupSystemTable(conf)) { + + String[] tables = new String[] { "table1", "table2", "table3", "table4" }; + String setName1 = "name1"; + String setName2 = "name2"; + table.backupSetAdd(setName1, tables); + table.backupSetAdd(setName2, tables); + + List list = table.backupSetList(); + + assertTrue(list.size() == 2); + assertTrue(list.get(0).equals(setName1)); + assertTrue(list.get(1).equals(setName2)); + + cleanBackupTable(); + } + } + private boolean compare(BackupContext ctx, BackupCompleteData data) { return ctx.getBackupId().equals(data.getBackupToken())