diff --git hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/BackupProtos.java hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/BackupProtos.java
index 413661e..b96f5b3 100644
--- hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/BackupProtos.java
+++ hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/BackupProtos.java
@@ -6874,6 +6874,36 @@ public final class BackupProtos {
* optional uint32 progress = 12;
*/
int getProgress();
+
+ // optional bytes job_id = 13;
+ /**
+ * optional bytes job_id = 13;
+ */
+ boolean hasJobId();
+ /**
+ * optional bytes job_id = 13;
+ */
+ com.google.protobuf.ByteString getJobId();
+
+ // required uint32 workers_number = 14;
+ /**
+ * required uint32 workers_number = 14;
+ */
+ boolean hasWorkersNumber();
+ /**
+ * required uint32 workers_number = 14;
+ */
+ int getWorkersNumber();
+
+ // required uint32 bandwidth = 15;
+ /**
+ * required uint32 bandwidth = 15;
+ */
+ boolean hasBandwidth();
+ /**
+ * required uint32 bandwidth = 15;
+ */
+ int getBandwidth();
}
/**
* Protobuf type {@code hbase.pb.BackupContext}
@@ -7007,6 +7037,21 @@ public final class BackupProtos {
progress_ = input.readUInt32();
break;
}
+ case 106: {
+ bitField0_ |= 0x00000800;
+ jobId_ = input.readBytes();
+ break;
+ }
+ case 112: {
+ bitField0_ |= 0x00001000;
+ workersNumber_ = input.readUInt32();
+ break;
+ }
+ case 120: {
+ bitField0_ |= 0x00002000;
+ bandwidth_ = input.readUInt32();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -7597,6 +7642,54 @@ public final class BackupProtos {
return progress_;
}
+ // optional bytes job_id = 13;
+ public static final int JOB_ID_FIELD_NUMBER = 13;
+ private com.google.protobuf.ByteString jobId_;
+ /**
+ * optional bytes job_id = 13;
+ */
+ public boolean hasJobId() {
+ return ((bitField0_ & 0x00000800) == 0x00000800);
+ }
+ /**
+ * optional bytes job_id = 13;
+ */
+ public com.google.protobuf.ByteString getJobId() {
+ return jobId_;
+ }
+
+ // required uint32 workers_number = 14;
+ public static final int WORKERS_NUMBER_FIELD_NUMBER = 14;
+ private int workersNumber_;
+ /**
+ * required uint32 workers_number = 14;
+ */
+ public boolean hasWorkersNumber() {
+ return ((bitField0_ & 0x00001000) == 0x00001000);
+ }
+ /**
+ * required uint32 workers_number = 14;
+ */
+ public int getWorkersNumber() {
+ return workersNumber_;
+ }
+
+ // required uint32 bandwidth = 15;
+ public static final int BANDWIDTH_FIELD_NUMBER = 15;
+ private int bandwidth_;
+ /**
+ * required uint32 bandwidth = 15;
+ */
+ public boolean hasBandwidth() {
+ return ((bitField0_ & 0x00002000) == 0x00002000);
+ }
+ /**
+ * required uint32 bandwidth = 15;
+ */
+ public int getBandwidth() {
+ return bandwidth_;
+ }
+
private void initFields() {
backupId_ = "";
type_ = org.apache.hadoop.hbase.protobuf.generated.BackupProtos.BackupType.FULL;
@@ -7610,6 +7703,9 @@ public final class BackupProtos {
totalBytesCopied_ = 0L;
hlogTargetDir_ = "";
progress_ = 0;
+ jobId_ = com.google.protobuf.ByteString.EMPTY;
+ workersNumber_ = 0;
+ bandwidth_ = 0;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -7640,6 +7736,14 @@ public final class BackupProtos {
memoizedIsInitialized = 0;
return false;
}
+ if (!hasWorkersNumber()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (!hasBandwidth()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
for (int i = 0; i < getTableBackupStatusCount(); i++) {
if (!getTableBackupStatus(i).isInitialized()) {
memoizedIsInitialized = 0;
@@ -7689,6 +7793,15 @@ public final class BackupProtos {
if (((bitField0_ & 0x00000400) == 0x00000400)) {
output.writeUInt32(12, progress_);
}
+ if (((bitField0_ & 0x00000800) == 0x00000800)) {
+ output.writeBytes(13, jobId_);
+ }
+ if (((bitField0_ & 0x00001000) == 0x00001000)) {
+ output.writeUInt32(14, workersNumber_);
+ }
+ if (((bitField0_ & 0x00002000) == 0x00002000)) {
+ output.writeUInt32(15, bandwidth_);
+ }
getUnknownFields().writeTo(output);
}
@@ -7746,6 +7859,18 @@ public final class BackupProtos {
size += com.google.protobuf.CodedOutputStream
.computeUInt32Size(12, progress_);
}
+ if (((bitField0_ & 0x00000800) == 0x00000800)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(13, jobId_);
+ }
+ if (((bitField0_ & 0x00001000) == 0x00001000)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeUInt32Size(14, workersNumber_);
+ }
+ if (((bitField0_ & 0x00002000) == 0x00002000)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeUInt32Size(15, bandwidth_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -7826,6 +7951,21 @@ public final class BackupProtos {
result = result && (getProgress()
== other.getProgress());
}
+ result = result && (hasJobId() == other.hasJobId());
+ if (hasJobId()) {
+ result = result && getJobId()
+ .equals(other.getJobId());
+ }
+ result = result && (hasWorkersNumber() == other.hasWorkersNumber());
+ if (hasWorkersNumber()) {
+ result = result && (getWorkersNumber()
+ == other.getWorkersNumber());
+ }
+ result = result && (hasBandwidth() == other.hasBandwidth());
+ if (hasBandwidth()) {
+ result = result && (getBandwidth()
+ == other.getBandwidth());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -7887,6 +8027,18 @@ public final class BackupProtos {
hash = (37 * hash) + PROGRESS_FIELD_NUMBER;
hash = (53 * hash) + getProgress();
}
+ if (hasJobId()) {
+ hash = (37 * hash) + JOB_ID_FIELD_NUMBER;
+ hash = (53 * hash) + getJobId().hashCode();
+ }
+ if (hasWorkersNumber()) {
+ hash = (37 * hash) + WORKERS_NUMBER_FIELD_NUMBER;
+ hash = (53 * hash) + getWorkersNumber();
+ }
+ if (hasBandwidth()) {
+ hash = (37 * hash) + BANDWIDTH_FIELD_NUMBER;
+ hash = (53 * hash) + getBandwidth();
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -8025,6 +8177,12 @@ public final class BackupProtos {
bitField0_ = (bitField0_ & ~0x00000400);
progress_ = 0;
bitField0_ = (bitField0_ & ~0x00000800);
+ jobId_ = com.google.protobuf.ByteString.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00001000);
+ workersNumber_ = 0;
+ bitField0_ = (bitField0_ & ~0x00002000);
+ bandwidth_ = 0;
+ bitField0_ = (bitField0_ & ~0x00004000);
return this;
}
@@ -8106,6 +8264,18 @@ public final class BackupProtos {
to_bitField0_ |= 0x00000400;
}
result.progress_ = progress_;
+ if (((from_bitField0_ & 0x00001000) == 0x00001000)) {
+ to_bitField0_ |= 0x00000800;
+ }
+ result.jobId_ = jobId_;
+ if (((from_bitField0_ & 0x00002000) == 0x00002000)) {
+ to_bitField0_ |= 0x00001000;
+ }
+ result.workersNumber_ = workersNumber_;
+ if (((from_bitField0_ & 0x00004000) == 0x00004000)) {
+ to_bitField0_ |= 0x00002000;
+ }
+ result.bandwidth_ = bandwidth_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -8189,6 +8359,15 @@ public final class BackupProtos {
if (other.hasProgress()) {
setProgress(other.getProgress());
}
+ if (other.hasJobId()) {
+ setJobId(other.getJobId());
+ }
+ if (other.hasWorkersNumber()) {
+ setWorkersNumber(other.getWorkersNumber());
+ }
+ if (other.hasBandwidth()) {
+ setBandwidth(other.getBandwidth());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -8218,6 +8397,14 @@ public final class BackupProtos {
return false;
}
+ if (!hasWorkersNumber()) {
+
+ return false;
+ }
+ if (!hasBandwidth()) {
+
+ return false;
+ }
for (int i = 0; i < getTableBackupStatusCount(); i++) {
if (!getTableBackupStatus(i).isInitialized()) {
@@ -9022,6 +9209,108 @@ public final class BackupProtos {
return this;
}
+ // optional bytes job_id = 13;
+ private com.google.protobuf.ByteString jobId_ = com.google.protobuf.ByteString.EMPTY;
+ /**
+ * optional bytes job_id = 13;
+ */
+ public boolean hasJobId() {
+ return ((bitField0_ & 0x00001000) == 0x00001000);
+ }
+ /**
+ * optional bytes job_id = 13;
+ */
+ public com.google.protobuf.ByteString getJobId() {
+ return jobId_;
+ }
+ /**
+ * optional bytes job_id = 13;
+ */
+ public Builder setJobId(com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00001000;
+ jobId_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * optional bytes job_id = 13;
+ */
+ public Builder clearJobId() {
+ bitField0_ = (bitField0_ & ~0x00001000);
+ jobId_ = getDefaultInstance().getJobId();
+ onChanged();
+ return this;
+ }
+
+ // required uint32 workers_number = 14;
+ private int workersNumber_ ;
+ /**
+ * required uint32 workers_number = 14;
+ */
+ public boolean hasWorkersNumber() {
+ return ((bitField0_ & 0x00002000) == 0x00002000);
+ }
+ /**
+ * required uint32 workers_number = 14;
+ */
+ public int getWorkersNumber() {
+ return workersNumber_;
+ }
+ /**
+ * required uint32 workers_number = 14;
+ */
+ public Builder setWorkersNumber(int value) {
+ bitField0_ |= 0x00002000;
+ workersNumber_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * required uint32 workers_number = 14;
+ */
+ public Builder clearWorkersNumber() {
+ bitField0_ = (bitField0_ & ~0x00002000);
+ workersNumber_ = 0;
+ onChanged();
+ return this;
+ }
+
+ // required uint32 bandwidth = 15;
+ private int bandwidth_ ;
+ /**
+ * required uint32 bandwidth = 15;
+ */
+ public boolean hasBandwidth() {
+ return ((bitField0_ & 0x00004000) == 0x00004000);
+ }
+ /**
+ * required uint32 bandwidth = 15;
+ */
+ public int getBandwidth() {
+ return bandwidth_;
+ }
+ /**
+ * required uint32 bandwidth = 15;
+ */
+ public Builder setBandwidth(int value) {
+ bitField0_ |= 0x00004000;
+ bandwidth_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * required uint32 bandwidth = 15;
+ */
+ public Builder clearBandwidth() {
+ bitField0_ = (bitField0_ & ~0x00004000);
+ bandwidth_ = 0;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:hbase.pb.BackupContext)
}
@@ -9093,7 +9382,7 @@ public final class BackupProtos {
"se.pb.BackupImage\022\021\n\tcompacted\030\013 \002(\010\"]\n\021" +
"TableBackupStatus\022\"\n\005table\030\001 \002(\0132\023.hbase",
".pb.TableName\022\022\n\ntarget_dir\030\002 \002(\t\022\020\n\010sna" +
- "pshot\030\003 \001(\t\"\323\004\n\rBackupContext\022\021\n\tbackup_" +
+ "pshot\030\003 \001(\t\"\216\005\n\rBackupContext\022\021\n\tbackup_" +
"id\030\001 \002(\t\022\"\n\004type\030\002 \002(\0162\024.hbase.pb.Backup" +
"Type\022\027\n\017target_root_dir\030\003 \002(\t\0222\n\005state\030\004" +
" \001(\0162#.hbase.pb.BackupContext.BackupStat" +
@@ -9103,14 +9392,16 @@ public final class BackupProtos {
"bleBackupStatus\022\020\n\010start_ts\030\010 \002(\004\022\016\n\006end" +
"_ts\030\t \002(\004\022\032\n\022total_bytes_copied\030\n \002(\003\022\027\n",
"\017hlog_target_dir\030\013 \001(\t\022\020\n\010progress\030\014 \001(\r" +
- "\"P\n\013BackupState\022\013\n\007WAITING\020\000\022\013\n\007RUNNING\020" +
- "\001\022\014\n\010COMPLETE\020\002\022\n\n\006FAILED\020\003\022\r\n\tCANCELLED" +
- "\020\004\"}\n\013BackupPhase\022\013\n\007REQUEST\020\000\022\014\n\010SNAPSH" +
- "OT\020\001\022\027\n\023PREPARE_INCREMENTAL\020\002\022\020\n\014SNAPSHO" +
- "TCOPY\020\003\022\024\n\020INCREMENTAL_COPY\020\004\022\022\n\016STORE_M" +
- "ANIFEST\020\005*\'\n\nBackupType\022\010\n\004FULL\020\000\022\017\n\013INC" +
- "REMENTAL\020\001BB\n*org.apache.hadoop.hbase.pr" +
- "otobuf.generatedB\014BackupProtosH\001\210\001\001\240\001\001"
+ "\022\016\n\006job_id\030\r \001(\014\022\026\n\016workers_number\030\016 \002(\r" +
+ "\022\021\n\tbandwidth\030\017 \002(\r\"P\n\013BackupState\022\013\n\007WA" +
+ "ITING\020\000\022\013\n\007RUNNING\020\001\022\014\n\010COMPLETE\020\002\022\n\n\006FA" +
+ "ILED\020\003\022\r\n\tCANCELLED\020\004\"}\n\013BackupPhase\022\013\n\007" +
+ "REQUEST\020\000\022\014\n\010SNAPSHOT\020\001\022\027\n\023PREPARE_INCRE" +
+ "MENTAL\020\002\022\020\n\014SNAPSHOTCOPY\020\003\022\024\n\020INCREMENTA" +
+ "L_COPY\020\004\022\022\n\016STORE_MANIFEST\020\005*\'\n\nBackupTy" +
+ "pe\022\010\n\004FULL\020\000\022\017\n\013INCREMENTAL\020\001BB\n*org.apa" +
+ "che.hadoop.hbase.protobuf.generatedB\014Bac",
+ "kupProtosH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -9152,7 +9443,7 @@ public final class BackupProtos {
internal_static_hbase_pb_BackupContext_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_hbase_pb_BackupContext_descriptor,
- new java.lang.String[] { "BackupId", "Type", "TargetRootDir", "State", "Phase", "FailedMessage", "TableBackupStatus", "StartTs", "EndTs", "TotalBytesCopied", "HlogTargetDir", "Progress", });
+ new java.lang.String[] { "BackupId", "Type", "TargetRootDir", "State", "Phase", "FailedMessage", "TableBackupStatus", "StartTs", "EndTs", "TotalBytesCopied", "HlogTargetDir", "Progress", "JobId", "WorkersNumber", "Bandwidth", });
return null;
}
};
diff --git hbase-protocol/src/main/protobuf/Backup.proto hbase-protocol/src/main/protobuf/Backup.proto
index 6c126e4..4de55e2 100644
--- hbase-protocol/src/main/protobuf/Backup.proto
+++ hbase-protocol/src/main/protobuf/Backup.proto
@@ -85,6 +85,9 @@ message BackupContext {
required int64 total_bytes_copied = 10;
optional string hlog_target_dir = 11;
optional uint32 progress = 12;
+ optional bytes job_id = 13;
+ required uint32 workers_number = 14;
+ required uint32 bandwidth = 15;
enum BackupState {
WAITING = 0;
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupClient.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupClient.java
index 7c8ea39..ef35a02 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupClient.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupClient.java
@@ -28,7 +28,7 @@ public interface BackupClient {
public void setConf(Configuration conf);
/**
- * Send backup request to server, and monitor the progress if necessary
+ * Sends backup request to server, and monitor the progress if necessary
* @param backupType : full or incremental
* @param targetRootDir : the root path specified by user
* @param tableList : the table list specified by user
@@ -37,4 +37,90 @@ public interface BackupClient {
*/
public String create(BackupType backupType, List tableList,
String targetRootDir) throws IOException;
+
+ /**
+ * Sends backup request to server and monitor progress if necessary
+ * @param backupType : full or incremental
+ * @param tableList : table list
+ * @param targetRootDir: target root directory
+ * @param snapshot : using existing snapshot
+ * @param workers : number of workers for parallel data copy
+ * @param bandwidth : maximum bandwidth (in MB/sec) of copy job
+ * @return backup id
+ * @throws IOException
+ */
+ public String create(BackupType backupType, List tableList, String targetRootDir,
+ int workers, int bandwidth) throws IOException;
+
+ /**
+ * Describe backup image command
+ * @param backupId - backup id
+ * @throws IOException
+ */
+ public void describeBackupImage(String backupId) throws IOException;
+
+ /**
+ * Show backup progress command
+ * @param backupId - backup id
+ * @throws IOException
+ */
+ public void showProgress(String backupId) throws IOException;
+
+ /**
+ * Delete backup image command
+ * @param backupIds - backup id
+ * @throws IOException
+ */
+ public void deleteBackups(String[] backupIds) throws IOException;
+
+ /**
+ * Cancel current active backup command
+ * @param backupId - backup id
+ * @throws IOException
+ */
+ public void cancelBackup(String backupId) throws IOException;
+
+ /**
+ * Show backup history command
+ * @param n - last n backup sessions
+ * @throws IOException
+ */
+ public void showHistory(int n) throws IOException;
+
+ /**
+ * Backup set list command
+ * @throws IOException
+ */
+ public void backupSetList() throws IOException;
+
+ /**
+ * Backup set describe command
+ * @param name set name
+ * @throws IOException
+ */
+ public void backupSetDescribe(String name) throws IOException;
+
+ /**
+ * Delete backup set command
+ * @param name - backup set name
+ * @throws IOException
+ */
+ public void backupSetDelete(String name) throws IOException;
+
+ /**
+ * Add tables to backup set command
+ * @param name set name
+ * @param tables - list of tables
+ * @throws IOException
+ */
+ public void backupSetAdd(String name, String[] tablesOrNamespaces) throws IOException;
+
+ /**
+ * Remove tables from backup set
+ * @param name - backup set name
+ * @param tables - list of tables
+ * @throws IOException
+ */
+ public void backupSetRemove(String name, String[] tablesOrNamepsaces) throws IOException;
+
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java
index 015c80b..7802136 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java
@@ -41,11 +41,23 @@ public class BackupDriver extends AbstractHBaseTool {
private static final Log LOG = LogFactory.getLog(BackupDriver.class);
private Options opt;
private CommandLine cmd;
-
+ private static Configuration conf;
+
+ public static void setStaticConf(Configuration conf)
+ {
+ BackupDriver.conf = conf;
+ }
+
protected void init() throws IOException {
// define supported options
opt = new Options();
opt.addOption("debug", false, "Enable debug loggings");
+ opt.addOption("all", false, "All tables");
+ opt.addOption("t", true, "Table name");
+ opt.addOption("b", true, "Bandwidth (MB/s)");
+ opt.addOption("w", true, "Number of workers");
+ opt.addOption("n", true, "History length");
+ opt.addOption("set", true, "Backup set name");
// disable irrelevant loggers to avoid it mess up command output
LogUtils.disableUselessLoggers(LOG);
@@ -91,7 +103,11 @@ public class BackupDriver extends AbstractHBaseTool {
}
// TODO: get rid of Command altogether?
- BackupCommands.createCommand(getConf(), type, cmdline).execute();
+ BackupCommands.Command command = BackupCommands.createCommand(getConf(), type, cmdline);
+ if( type == BackupCommand.CREATE && conf != null) {
+ ((BackupCommands.CreateCommand) command).setConf(conf);
+ }
+ command.execute();
return 0;
}
@@ -111,9 +127,12 @@ public class BackupDriver extends AbstractHBaseTool {
}
public static void main(String[] args) throws Exception {
- Configuration conf = HBaseConfiguration.create();
+ Configuration conf =
+ BackupDriver.conf == null? HBaseConfiguration.create():BackupDriver.conf;
int ret = ToolRunner.run(conf, new BackupDriver(), args);
- System.exit(ret);
+ if(BackupDriver.conf == null) {
+ System.exit(ret);
+ }
}
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupClientImpl.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupClientImpl.java
index f129a7c..7da4819 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupClientImpl.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupClientImpl.java
@@ -31,8 +31,11 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.BackupClient;
+import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
import org.apache.hadoop.hbase.backup.BackupType;
import org.apache.hadoop.hbase.backup.HBackupFileSystem;
+import org.apache.hadoop.hbase.backup.impl.BackupHandler.BackupState;
+import org.apache.hadoop.hbase.backup.impl.BackupUtil.BackupCompleteData;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Connection;
@@ -40,107 +43,130 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import com.google.common.collect.Lists;
-
/**
* Backup HBase tables locally or on a remote cluster Serve as client entry point for the following
* features: - Full Backup provide local and remote back/restore for a list of tables - Incremental
* backup to build on top of full backup as daily/weekly backup - Convert incremental backup WAL
* files into hfiles - Merge several backup images into one(like merge weekly into monthly) - Add
- * and remove table to and from Backup image - Cancel a backup process - Describe information of
- * a backup image
+ * and remove table to and from Backup image - Cancel a backup process - Full backup based on
+ * existing snapshot - Describe information of a backup image
*/
+
@InterfaceAudience.Public
@InterfaceStability.Evolving
-public final class BackupClientImpl implements BackupClient {
+public final class BackupClientImpl implements BackupClient{
private static final Log LOG = LogFactory.getLog(BackupClientImpl.class);
private Configuration conf;
private BackupManager backupManager;
public BackupClientImpl() {
}
-
+
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
-
+
/**
* Prepare and submit Backup request
* @param backupId : backup_timestame (something like backup_1398729212626)
* @param backupType : full or incremental
* @param tableList : tables to be backuped
* @param targetRootDir : specified by user
+ * @param snapshot : use existing snapshot if specified by user (for future jira)
+ * @param workers - number of parallel workers in copy data job
+ * @param bandwidth - maximum bandwidth per works in MB/sec
* @throws IOException exception
*/
protected void requestBackup(String backupId, BackupType backupType, List tableList,
- String targetRootDir) throws IOException {
+ String targetRootDir, int workers, int bandwidth) throws IOException {
BackupContext backupContext = null;
-
HBaseAdmin hbadmin = null;
Connection conn = null;
try {
+
backupManager = new BackupManager(conf);
+ String tables = tableList != null? toString(tableList) : null;
if (backupType == BackupType.INCREMENTAL) {
Set incrTableSet = backupManager.getIncrementalBackupTableSet();
if (incrTableSet.isEmpty()) {
LOG.warn("Incremental backup table set contains no table.\n"
+ "Use 'backup create full' or 'backup stop' to \n "
+ "change the tables covered by incremental backup.");
- throw new DoNotRetryIOException("No table covered by incremental backup.");
+ throw new RuntimeException("No table covered by incremental backup.");
}
-
- LOG.info("Incremental backup for the following table set: " + incrTableSet);
- tableList = Lists.newArrayList(incrTableSet);
+ StringBuilder sb = new StringBuilder();
+ for (TableName tableName : incrTableSet) {
+ sb.append(tableName.getNameAsString() + " ");
+ }
+ LOG.info("Incremental backup for the following table set: " + sb.toString());
+ tables =
+ sb.toString().trim()
+ .replaceAll(" ", BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND);
}
// check whether table exists first before starting real request
- if (tableList != null) {
- ArrayList nonExistingTableList = null;
+ if (tables != null) {
+ String[] tableNames = tables.split(BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND);
+ ArrayList noneExistingTableList = null;
conn = ConnectionFactory.createConnection(conf);
hbadmin = (HBaseAdmin) conn.getAdmin();
- for (TableName tableName : tableList) {
- if (!hbadmin.tableExists(tableName)) {
- if (nonExistingTableList == null) {
- nonExistingTableList = new ArrayList<>();
+ for (String tableName : tableNames) {
+ if (!hbadmin.tableExists(TableName.valueOf(tableName))) {
+ if (noneExistingTableList == null) {
+ noneExistingTableList = new ArrayList();
}
- nonExistingTableList.add(tableName);
+ noneExistingTableList.add(tableName);
}
}
- if (nonExistingTableList != null) {
+ if (noneExistingTableList != null) {
if (backupType == BackupType.INCREMENTAL ) {
LOG.warn("Incremental backup table set contains non-exising table: "
- + nonExistingTableList);
+ + noneExistingTableList);
} else {
// Throw exception only in full mode - we try to backup non-existing table
- throw new DoNotRetryIOException("Non-existing tables found in the table list: "
- + nonExistingTableList);
+ throw new RuntimeException("Non-existing tables found in the table list: "
+ + noneExistingTableList);
}
}
}
- // if any target table backup dir already exist, then no backup action taken
- if (tableList != null) {
- for (TableName table : tableList) {
+ // if any target table backup directory already exist, then no backup action taken
+ String[] tableNames = null;
+ if (tables != null && !tables.equals("")) {
+ tableNames = tables.split(BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND);
+ }
+
+ List tList = new ArrayList();
+
+ if (tableNames != null && tableNames.length > 0) {
+ for (String table : tableNames) {
+ TableName tn = TableName.valueOf(table);
+ tList.add(tn);
String targetTableBackupDir =
- HBackupFileSystem.getTableBackupDir(targetRootDir, backupId, table);
+ HBackupFileSystem.getTableBackupDir(targetRootDir, backupId, TableName.valueOf(table));
Path targetTableBackupDirPath = new Path(targetTableBackupDir);
FileSystem outputFs = FileSystem.get(targetTableBackupDirPath.toUri(), conf);
if (outputFs.exists(targetTableBackupDirPath)) {
- throw new DoNotRetryIOException("Target backup directory " + targetTableBackupDir
+ throw new IOException("Target backup directory " + targetTableBackupDir
+ " exists already.");
}
}
}
backupContext =
- backupManager.createBackupContext(backupId, backupType, tableList, targetRootDir);
+ backupManager.createBackupContext(backupId, backupType,
+ tList, targetRootDir, workers, bandwidth);
backupManager.initialize();
backupManager.dispatchRequest(backupContext);
} catch (BackupException e) {
// suppress the backup exception wrapped within #initialize or #dispatchRequest, backup
// exception has already been handled normally
- LOG.error("Backup Exception ", e);
+ StackTraceElement[] stes = e.getStackTrace();
+ for (StackTraceElement ste : stes) {
+ LOG.info(ste);
+ }
+ LOG.error("Backup Exception " + e.getMessage());
} finally {
if (hbadmin != null) {
hbadmin.close();
@@ -151,10 +177,28 @@ public final class BackupClientImpl implements BackupClient {
}
}
+ private String toString(List tableList) {
+ StringBuffer sb = new StringBuffer();
+ for(int i=0; i < tableList.size(); i++)
+ {
+ sb.append(tableList.get(i).getNameAsString());
+ if(i < tableList.size() -1) {
+ sb.append(',');
+ }
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public String create(BackupType type, List tableList, String backupRootPath
+ ) throws IOException {
+ return create(type, tableList, backupRootPath, -1, -1);
+ }
+
@Override
- public String create(BackupType backupType, List tableList, String backupRootPath)
- throws IOException {
-
+ public String create(BackupType backupType, List tableList, String backupRootPath,
+ int workers, int bandwidth) throws IOException {
+
String backupId = BackupRestoreConstants.BACKUPID_PREFIX + EnvironmentEdgeManager.currentTime();
// check target path first, confirm it doesn't exist before backup
boolean targetExists = false;
@@ -185,14 +229,14 @@ public final class BackupClientImpl implements BackupClient {
// table list specified for backup, trigger backup on specified tables
try {
- requestBackup(backupId, backupType, tableList, backupRootPath);
+ requestBackup(backupId, backupType, tableList, backupRootPath, workers, bandwidth);
} catch (RuntimeException e) {
String errMsg = e.getMessage();
if (errMsg != null
&& (errMsg.startsWith("Non-existing tables found") || errMsg
.startsWith("Snapshot is not found"))) {
LOG.error(errMsg + ", please check your command");
- throw e;
+ throw new DoNotRetryIOException(e);
} else {
throw e;
}
@@ -204,4 +248,158 @@ public final class BackupClientImpl implements BackupClient {
return backupId;
}
+ @Override
+ public void describeBackupImage(String backupId) throws IOException {
+ BackupContext backupContext = null;
+ try (final BackupSystemTable table = new BackupSystemTable(conf)) {
+ backupContext = table.readBackupStatus(backupId);
+ if (backupContext != null) {
+ System.out.println(backupContext.getShortDescription());
+ } else {
+ System.out.println("No information found for backupID=" + backupId);
+ }
+ }
+ }
+
+ @Override
+ public void showProgress(String backupId) throws IOException {
+ BackupContext backupContext = null;
+ try (final BackupSystemTable table = new BackupSystemTable(conf)) {
+
+ if (backupId == null) {
+ ArrayList recentSessions =
+ table.getBackupContexts(BackupState.RUNNING);
+ if (recentSessions.isEmpty()) {
+ System.out.println("No ongonig sessions found.");
+ return;
+ }
+ // else show status for all ongoing sessions
+ // must be one maximum
+ for (BackupContext context : recentSessions) {
+ System.out.println(context.getStatusAndProgressAsString());
+ }
+ } else {
+
+ backupContext = table.readBackupStatus(backupId);
+ if (backupContext != null) {
+ System.out.println(backupContext.getStatusAndProgressAsString());
+ } else {
+ System.out.println("No information found for backupID=" + backupId);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void deleteBackups(String[] backupIds) throws IOException {
+ BackupContext backupContext = null;
+ String backupId = null;
+ try (final BackupSystemTable table = new BackupSystemTable(conf)) {
+ for (int i = 0; i < backupIds.length; i++) {
+ backupId = backupIds[i];
+ backupContext = table.readBackupStatus(backupId);
+ if (backupContext != null) {
+ BackupUtil.cleanupBackupData(backupContext, conf);
+ table.deleteBackupStatus(backupContext.getBackupId());
+ System.out.println("Delete backup for backupID=" + backupId + " completed.");
+ } else {
+ System.out.println("Delete backup failed: no information found for backupID=" + backupId);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void cancelBackup(String backupId) throws IOException {
+ // Kill distributed job if active
+ // Backup MUST not be in COMPLETE state
+ try (final BackupSystemTable table = new BackupSystemTable(conf)) {
+ BackupContext backupContext = table.readBackupStatus(backupId);
+ String errMessage = null;
+ if (backupContext != null && backupContext.getState() != BackupState.COMPLETE) {
+ BackupUtil.cleanupBackupData(backupContext, conf);
+ table.deleteBackupStatus(backupContext.getBackupId());
+ byte[] jobId = backupContext.getJobId();
+ if(jobId != null) {
+ BackupCopyService service = BackupRestoreFactory.getBackupCopyService(conf);
+ service.cancelCopyJob(jobId);
+ } else{
+ errMessage = "Distributed Job ID is null for backup "+backupId +
+ " in "+ backupContext.getState() + " state.";
+ }
+ } else if( backupContext == null){
+ errMessage = "No information found for backupID=" + backupId;
+ } else {
+ errMessage = "Can not cancel "+ backupId + " in " + backupContext.getState()+" state";
+ }
+
+ if( errMessage != null) {
+ throw new IOException(errMessage);
+ }
+ }
+ // then clean backup image
+ deleteBackups(new String[] { backupId });
+ }
+
+ @Override
+ public void showHistory(int n) throws IOException {
+ try (final BackupSystemTable table = new BackupSystemTable(conf)) {
+ List history = table.getBackupHistory();
+ int max = Math.min(n, history.size());
+ for (int i = 0; i < max; i++) {
+ printBackupCompleteData(history.get(i));
+ }
+ }
+ }
+
+ private void printBackupCompleteData(BackupCompleteData backupCompleteData) {
+ System.out.println(backupCompleteData.toString());
+ }
+
+ @Override
+ public void backupSetList() throws IOException{
+ try (final BackupSystemTable table = new BackupSystemTable(conf)) {
+ List list = table.backupSetList();
+ for (String s : list) {
+ System.out.println(s);
+ }
+ System.out.println("Found " + list.size() + " records");
+ }
+ }
+
+ @Override
+ public void backupSetDescribe(String name) throws IOException{
+ try (final BackupSystemTable table = new BackupSystemTable(conf)) {
+ String[] list = table.backupSetDescribe(name);
+ for (String s : list) {
+ System.out.println(s);
+ }
+ System.out.println("Found " + list.length + " records");
+ }
+ }
+
+ @Override
+ public void backupSetDelete(String name) throws IOException {
+ try (final BackupSystemTable table = new BackupSystemTable(conf)) {
+ table.backupSetDelete(name);
+ System.out.println("Deleted " + name);
+ }
+ }
+
+ @Override
+ public void backupSetAdd(String name, String[] tablesOrNamespaces) throws IOException {
+ try (final BackupSystemTable table = new BackupSystemTable(conf)) {
+ table.backupSetAdd(name, tablesOrNamespaces);
+ System.out.println("Added tables to '" + name + "'");
+ }
+ }
+
+ @Override
+ public void backupSetRemove(String name, String[] tablesOrNamepsaces) throws IOException {
+ try (final BackupSystemTable table = new BackupSystemTable(conf)) {
+ table.backupSetRemove(name, tablesOrNamepsaces);
+ System.out.println("Removed tables from '" + name + "'");
+ }
+ }
+
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
index 56e26fa..d1db757 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
@@ -19,17 +19,23 @@
package org.apache.hadoop.hbase.backup.impl;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.commons.cli.CommandLine;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.BackupClient;
import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
import org.apache.hadoop.hbase.backup.BackupType;
import org.apache.hadoop.hbase.backup.impl.BackupRestoreConstants.BackupCommand;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
-import com.google.common.collect.Lists;
/**
* General backup commands, options and usage messages
@@ -39,18 +45,56 @@ import com.google.common.collect.Lists;
public final class BackupCommands {
private static final String USAGE = "Usage: hbase backup COMMAND\n"
- + "where COMMAND is one of:\n" + " create create a new backup image\n"
+ + "where COMMAND is one of:\n"
+ + " create create a new backup image\n"
+ + " cancel cancel an ongoing backup\n"
+ + " delete delete an existing backup image\n"
+ + " describe show the detailed information of a backup image\n"
+ + " history show history of all successful backups\n"
+ + " progress show the progress of the latest backup request\n"
+ + " convert convert incremental backup WAL files into HFiles\n"
+ + " merge merge backup images\n"
+ + " set backup set management\n"
+ "Enter \'help COMMAND\' to see help message for each command\n";
private static final String CREATE_CMD_USAGE =
- "Usage: hbase backup create [tables] [-convert] "
- + "\n" + " type \"full\" to create a full backup image;\n"
+ "Usage: hbase backup create [tables] [-s name] [-convert] "
+ + "[-silent] [-w workers][-b bandwith]\n" + " type \"full\" to create a full backup image;\n"
+ " \"incremental\" to create an incremental backup image\n"
- + " backup_root_path The full root path to store the backup image,\n"
- + " the prefix can be hdfs, webhdfs, gpfs, etc\n" + " Options:\n"
- + " tables If no tables (\"\") are specified, all tables are backed up. "
+ + " backup_root_path The full root path to store the backup image,\n"
+ + " the prefix can be gpfs, hdfs or webhdfs\n" + " Options:\n"
+ + " tables If no tables (\"\") are specified, all tables are backed up. "
+ "Otherwise it is a\n" + " comma separated list of tables.\n"
- + " -convert For an incremental backup, convert WAL files to HFiles\n";
+ + " -s name Use the specified snapshot for full backup\n"
+ + " -convert For an incremental backup, convert WAL files to HFiles\n"
+ + " -w number of parallel workers.\n"
+ + " -b bandwith per one worker (in MB sec)" ;
+
+ private static final String PROGRESS_CMD_USAGE = "Usage: hbase backup progress \n"
+ + " backupId backup image id;\n";
+
+ private static final String DESCRIBE_CMD_USAGE = "Usage: hbase backup decsribe \n"
+ + " backupId backup image id\n";
+
+ private static final String HISTORY_CMD_USAGE = "Usage: hbase backup history [-n N]\n"
+ + " -n N show up to N last backup sessions, default - 10;\n";
+
+ private static final String DELETE_CMD_USAGE = "Usage: hbase backup delete \n"
+ + " backupId backup image id;\n";
+
+ private static final String CANCEL_CMD_USAGE = "Usage: hbase backup progress \n"
+ + " backupId backup image id;\n";
+
+ private static final String SET_CMD_USAGE = "Usage: hbase set COMMAND [name] [tables]\n"
+ + " name Backup set name\n"
+ + " tables If no tables (\"\") are specified, all tables will belong to the set. "
+ + "Otherwise it is a\n" + " comma separated list of tables.\n"
+ + "where COMMAND is one of:\n"
+ + " add add tables to a set, crete set if needed\n"
+ + " remove remove tables from set\n"
+ + " list list all sets\n"
+ + " describe describes set\n"
+ + " delete delete backup set\n";
public static abstract class Command extends Configured {
Command(Configuration conf) {
@@ -66,25 +110,44 @@ public final class BackupCommands {
public static Command createCommand(Configuration conf, BackupCommand type, CommandLine cmdline) {
Command cmd = null;
switch (type) {
- case CREATE:
- cmd = new CreateCommand(conf, cmdline);
- break;
- case HELP:
- default:
- cmd = new HelpCommand(conf, cmdline);
- break;
+ case CREATE:
+ cmd = new CreateCommand(conf, cmdline);
+ break;
+ case DESCRIBE:
+ cmd = new DescribeCommand(conf, cmdline);
+ break;
+ case PROGRESS:
+ cmd = new ProgressCommand(conf, cmdline);
+ break;
+ case DELETE:
+ cmd = new DeleteCommand(conf, cmdline);
+ break;
+ case CANCEL:
+ cmd = new CancelCommand(conf, cmdline);
+ break;
+ case HISTORY:
+ cmd = new HistoryCommand(conf, cmdline);
+ break;
+ case SET:
+ cmd = new BackupSetCommand(conf, cmdline);
+ break;
+ case HELP:
+ default:
+ cmd = new HelpCommand(conf, cmdline);
+ break;
}
return cmd;
}
- private static class CreateCommand extends Command {
+
+ public static class CreateCommand extends Command {
CommandLine cmdline;
CreateCommand(Configuration conf, CommandLine cmdline) {
super(conf);
this.cmdline = cmdline;
}
-
+
@Override
public void execute() throws IOException {
if (cmdline == null || cmdline.getArgs() == null) {
@@ -106,17 +169,55 @@ public final class BackupCommands {
System.exit(-1);
}
- String tables = (args.length == 3) ? args[2] : null;
+ String tables = null;
+ Configuration conf = getConf() != null? getConf(): HBaseConfiguration.create();
+ // Check backup set
+ if (cmdline.hasOption("set")) {
+ String setName = cmdline.getOptionValue("set");
+ tables = getTablesForSet(setName, conf);
+ if (tables == null) throw new IOException("Backup set '" + setName
+ + "' is either empty or does not exist");
+ } else {
+ tables = (args.length == 3) ? args[2] : null;
+ }
+ int bandwidth = cmdline.hasOption('b') ? Integer.parseInt(cmdline.getOptionValue('b')) : -1;
+ int workers = cmdline.hasOption('w') ? Integer.parseInt(cmdline.getOptionValue('w')) : -1;
+
try {
- BackupClient client = BackupRestoreFactory.getBackupClient(getConf());
- client.create(BackupType.valueOf(args[0].toUpperCase()),
- Lists.newArrayList(BackupUtil.parseTableNames(tables)), args[1]);
+ BackupClient client = BackupRestoreFactory.getBackupClient(conf);
+ client.create(BackupType.valueOf(args[0].toUpperCase()), toList(tables), args[1],
+ workers, bandwidth);
} catch (RuntimeException e) {
System.out.println("ERROR: " + e.getMessage());
System.exit(-1);
}
}
+
+ private String getTablesForSet(String name, Configuration conf)
+ throws IOException {
+ try (final BackupSystemTable table = new BackupSystemTable(conf)) {
+ String[] tables = table.backupSetDescribe(name);
+ if (tables == null) return null;
+ StringBuffer sb = new StringBuffer();
+ for (int i = 0; i < tables.length; i++) {
+ sb.append(tables[i]);
+ if (i < tables.length - 1) {
+ sb.append(BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND);
+ }
+ }
+ return sb.toString();
+ }
+ }
+
+ private List toList(String tables) {
+ String[] splits = tables.split(",");
+ List list = new ArrayList();
+ for (String s : splits) {
+ list.add(TableName.valueOf(s));
+ }
+ return list;
+ }
}
private static class HelpCommand extends Command {
@@ -150,9 +251,299 @@ public final class BackupCommands {
if (BackupCommand.CREATE.name().equalsIgnoreCase(type)) {
System.out.println(CREATE_CMD_USAGE);
- } // other commands will be supported in future jira
+ } else if (BackupCommand.DESCRIBE.name().equalsIgnoreCase(type)) {
+ System.out.println(DESCRIBE_CMD_USAGE);
+ } else if (BackupCommand.HISTORY.name().equalsIgnoreCase(type)) {
+ System.out.println(HISTORY_CMD_USAGE);
+ } else if (BackupCommand.PROGRESS.name().equalsIgnoreCase(type)) {
+ System.out.println(PROGRESS_CMD_USAGE);
+ } else if (BackupCommand.DELETE.name().equalsIgnoreCase(type)) {
+ System.out.println(DELETE_CMD_USAGE);
+ }
+ if (BackupCommand.CANCEL.name().equalsIgnoreCase(type)) {
+ System.out.println(CANCEL_CMD_USAGE);
+ }
+ if (BackupCommand.SET.name().equalsIgnoreCase(type)) {
+ System.out.println(SET_CMD_USAGE);
+ } else {
+ System.out.println("Unknown command : " + type);
+ System.out.println(USAGE);
+ }
System.exit(0);
}
}
+ private static class DescribeCommand extends Command {
+ CommandLine cmdline;
+
+ DescribeCommand(Configuration conf, CommandLine cmdline) {
+ super(conf);
+ this.cmdline = cmdline;
+ }
+
+ @Override
+ public void execute() throws IOException {
+ if (cmdline == null || cmdline.getArgs() == null) {
+ System.out.println("ERROR: missing arguments");
+ System.out.println(DESCRIBE_CMD_USAGE);
+ System.exit(-1);
+ }
+ String[] args = cmdline.getArgs();
+ if (args.length != 1) {
+ System.out.println("ERROR: wrong number of arguments");
+ System.out.println(DESCRIBE_CMD_USAGE);
+ System.exit(-1);
+ }
+
+ String backupId = args[0];
+ try {
+ Configuration conf = HBaseConfiguration.create();
+ BackupClient client = BackupRestoreFactory.getBackupClient(conf);
+ client.describeBackupImage(backupId);
+ } catch (RuntimeException e) {
+ System.out.println("ERROR: " + e.getMessage());
+ System.exit(-1);
+ }
+ }
+ }
+
+ private static class ProgressCommand extends Command {
+ CommandLine cmdline;
+
+ ProgressCommand(Configuration conf, CommandLine cmdline) {
+ super(conf);
+ this.cmdline = cmdline;
+ }
+
+ @Override
+ public void execute() throws IOException {
+ if (cmdline == null || cmdline.getArgs() == null) {
+ System.out.println("No backup id was specified, "
+ + "will retrieve the most recent (ongoing) sessions");
+ }
+ String[] args = cmdline.getArgs();
+ if (args.length > 1) {
+ System.out.println("ERROR: wrong number of arguments: " + args.length);
+ System.out.println(PROGRESS_CMD_USAGE);
+ System.exit(-1);
+ }
+
+ String backupId = args == null ? null : args[0];
+ try {
+ Configuration conf = HBaseConfiguration.create();
+ BackupClient client = BackupRestoreFactory.getBackupClient(conf);
+ client.showProgress(backupId);
+ } catch (RuntimeException e) {
+ System.out.println("ERROR: " + e.getMessage());
+ System.exit(-1);
+ }
+ }
+ }
+
+ private static class DeleteCommand extends Command {
+
+ CommandLine cmdline;
+ DeleteCommand(Configuration conf, CommandLine cmdline) {
+ super(conf);
+ this.cmdline = cmdline;
+ }
+
+ @Override
+ public void execute() throws IOException {
+ if (cmdline == null || cmdline.getArgs() == null) {
+ System.out.println("No backup id(s) was specified");
+ System.out.println(PROGRESS_CMD_USAGE);
+ System.exit(-1);
+ }
+ String[] args = cmdline.getArgs();
+
+ try {
+ Configuration conf = HBaseConfiguration.create();
+ BackupClient client = BackupRestoreFactory.getBackupClient(conf);
+ client.deleteBackups(args);
+ } catch (RuntimeException e) {
+ System.out.println("ERROR: " + e.getMessage());
+ System.exit(-1);
+ }
+ }
+ }
+
+ private static class CancelCommand extends Command {
+ CommandLine cmdline;
+
+ CancelCommand(Configuration conf, CommandLine cmdline) {
+ super(conf);
+ this.cmdline = cmdline;
+ }
+
+ @Override
+ public void execute() throws IOException {
+ if (cmdline == null || cmdline.getArgs() == null) {
+ System.out.println("No backup id(s) was specified, will use the most recent one");
+ }
+ String[] args = cmdline.getArgs();
+ String backupId = args == null || args.length == 0 ? null : args[0];
+ try {
+ Configuration conf = HBaseConfiguration.create();
+ BackupClient client = BackupRestoreFactory.getBackupClient(conf);
+ client.cancelBackup(backupId);
+ } catch (RuntimeException e) {
+ System.out.println("ERROR: " + e.getMessage());
+ System.exit(-1);
+ }
+ }
+ }
+
+ private static class HistoryCommand extends Command {
+ CommandLine cmdline;
+
+ HistoryCommand(Configuration conf, CommandLine cmdline) {
+ super(conf);
+ this.cmdline = cmdline;
+ }
+
+ @Override
+ public void execute() throws IOException {
+
+ int n =
+ cmdline == null || cmdline.getArgs() == null || cmdline.getArgs().length == 0 ? 10
+ : parseHistoryLength();
+ try {
+ Configuration conf = HBaseConfiguration.create();
+ BackupClient client = BackupRestoreFactory.getBackupClient(conf);
+ client.showHistory(n);
+ } catch (RuntimeException e) {
+ System.out.println("ERROR: " + e.getMessage());
+ System.exit(-1);
+ }
+ }
+
+ private int parseHistoryLength() {
+ String value = cmdline.getOptionValue("n");
+ if (value == null) throw new RuntimeException("command line format");
+ return Integer.parseInt(value);
+ }
+ }
+
+ private static class BackupSetCommand extends Command {
+ private final static String SET_ADD_CMD = "add";
+ private final static String SET_REMOVE_CMD = "remove";
+ private final static String SET_DELETE_CMD = "delete";
+ private final static String SET_DESCRIBE_CMD = "describe";
+ private final static String SET_LIST_CMD = "list";
+
+ CommandLine cmdline;
+
+ BackupSetCommand(Configuration conf, CommandLine cmdline) {
+ super(conf);
+ this.cmdline = cmdline;
+ }
+
+ @Override
+ public void execute() throws IOException {
+
+ // Command-line must have at least one element
+ if (cmdline == null || cmdline.getArgs() == null || cmdline.getArgs().length == 0) {
+ throw new IOException("command line format");
+ }
+ String[] args = cmdline.getArgs();
+ String cmdStr = args[0];
+ BackupCommand cmd = getCommand(cmdStr);
+
+ try {
+
+ switch (cmd) {
+ case SET_ADD:
+ processSetAdd(args);
+ break;
+ case SET_REMOVE:
+ processSetRemove(args);
+ break;
+ case SET_DELETE:
+ processSetDelete(args);
+ break;
+ case SET_DESCRIBE:
+ processSetDescribe(args);
+ break;
+ case SET_LIST:
+ processSetList(args);
+ break;
+ default:
+ break;
+
+ }
+ } catch (RuntimeException e) {
+ System.out.println("ERROR: " + e.getMessage());
+ System.exit(-1);
+ }
+ }
+
+ private void processSetList(String[] args) throws IOException {
+ // List all backup set names
+ // does not expect any args
+ Configuration conf = HBaseConfiguration.create();
+ BackupClient client = BackupRestoreFactory.getBackupClient(conf);
+ client.backupSetList();
+ }
+
+ private void processSetDescribe(String[] args) throws IOException {
+ if (args == null || args.length != 2) {
+ throw new RuntimeException("Wrong args");
+ }
+ String setName = args[1];
+ Configuration conf = HBaseConfiguration.create();
+ BackupClient client = BackupRestoreFactory.getBackupClient(conf);
+ client.backupSetDescribe(setName);
+ }
+
+ private void processSetDelete(String[] args) throws IOException {
+ if (args == null || args.length != 2) {
+ throw new RuntimeException("Wrong args");
+ }
+ String setName = args[1];
+ Configuration conf = HBaseConfiguration.create();
+ BackupClient client = BackupRestoreFactory.getBackupClient(conf);
+ client.backupSetDelete(setName);
+ }
+
+ private void processSetRemove(String[] args) throws IOException {
+ if (args == null || args.length != 3) {
+ throw new RuntimeException("Wrong args");
+ }
+ String setName = args[1];
+ String[] tables = args[2].split(",");
+ Configuration conf = HBaseConfiguration.create();
+ BackupClient client = BackupRestoreFactory.getBackupClient(conf);
+ client.backupSetRemove(setName, tables);
+ }
+
+ private void processSetAdd(String[] args) throws IOException {
+ if (args == null || args.length != 3) {
+ throw new RuntimeException("Wrong args");
+ }
+ String setName = args[1];
+ String[] tables = args[2].split(",");
+ Configuration conf = HBaseConfiguration.create();
+ BackupClient client = BackupRestoreFactory.getBackupClient(conf);
+ client.backupSetAdd(setName, tables);
+ }
+
+ private BackupCommand getCommand(String cmdStr) throws IOException {
+ if (cmdStr.equals(SET_ADD_CMD)) {
+ return BackupCommand.SET_ADD;
+ } else if (cmdStr.equals(SET_REMOVE_CMD)) {
+ return BackupCommand.SET_REMOVE;
+ } else if (cmdStr.equals(SET_DELETE_CMD)) {
+ return BackupCommand.SET_DELETE;
+ } else if (cmdStr.equals(SET_DESCRIBE_CMD)) {
+ return BackupCommand.SET_DESCRIBE;
+ } else if (cmdStr.equals(SET_LIST_CMD)) {
+ return BackupCommand.SET_LIST;
+ } else {
+ throw new IOException("Unknown command for 'set' :" + cmdStr);
+ }
+ }
+
+ }
+
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupContext.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupContext.java
index 1be0c3b..ecca4be 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupContext.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupContext.java
@@ -20,12 +20,17 @@ package org.apache.hadoop.hbase.backup.impl;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.BackupType;
import org.apache.hadoop.hbase.backup.HBackupFileSystem;
@@ -37,49 +42,15 @@ import org.apache.hadoop.hbase.protobuf.generated.BackupProtos;
import org.apache.hadoop.hbase.protobuf.generated.BackupProtos.BackupContext.Builder;
import org.apache.hadoop.hbase.protobuf.generated.BackupProtos.TableBackupStatus;
+import com.google.protobuf.ByteString;
+
/**
* An object to encapsulate the information for each backup request
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class BackupContext {
-
- public Map getBackupStatusMap() {
- return backupStatusMap;
- }
-
- public void setBackupStatusMap(Map backupStatusMap) {
- this.backupStatusMap = backupStatusMap;
- }
-
- public HashMap> getTableSetTimestampMap() {
- return tableSetTimestampMap;
- }
-
- public void setTableSetTimestampMap(
- HashMap> tableSetTimestampMap) {
- this.tableSetTimestampMap = tableSetTimestampMap;
- }
-
- public String getHlogTargetDir() {
- return hlogTargetDir;
- }
-
- public void setType(BackupType type) {
- this.type = type;
- }
-
- public void setTargetRootDir(String targetRootDir) {
- this.targetRootDir = targetRootDir;
- }
-
- public void setTotalBytesCopied(long totalBytesCopied) {
- this.totalBytesCopied = totalBytesCopied;
- }
-
- public void setCancelled(boolean cancelled) {
- this.state = BackupState.CANCELLED;;
- }
+ private static final Log LOG = LogFactory.getLog(BackupContext.class);
// backup id: a timestamp when we request the backup
private String backupId;
@@ -122,8 +93,16 @@ public class BackupContext {
transient private HashMap> tableSetTimestampMap;
// backup progress in %% (0-100)
-
private int progress;
+
+ // distributed job id
+ private byte[] jobId;
+
+ // Number of paralle workers. -1 - system defined
+ private int workers = -1;
+
+ // Bandwidth per worker in MB per sec. -1 - unlimited
+ private int bandwidth = -1;
public BackupContext() {
}
@@ -134,7 +113,7 @@ public class BackupContext {
this.backupId = backupId;
this.type = type;
this.targetRootDir = targetRootDir;
-
+ LOG.debug("CreateBackupContext: " + tables.length+" "+tables[0]);
this.addTables(tables);
if (type == BackupType.INCREMENTAL) {
@@ -145,6 +124,62 @@ public class BackupContext {
this.endTs = 0;
}
+ public byte[] getJobId() {
+ return jobId;
+ }
+
+ public void setJobId(byte[] jobId) {
+ this.jobId = jobId;
+ }
+
+ public int getWorkers() {
+ return workers;
+ }
+
+ public void setWorkers(int workers) {
+ this.workers = workers;
+ }
+
+ public int getBandwidth() {
+ return bandwidth;
+ }
+
+ public void setBandwidth(int bandwidth) {
+ this.bandwidth = bandwidth;
+ }
+
+ public void setBackupStatusMap(Map backupStatusMap) {
+ this.backupStatusMap = backupStatusMap;
+ }
+
+ public HashMap> getTableSetTimestampMap() {
+ return tableSetTimestampMap;
+ }
+
+ public void setTableSetTimestampMap(HashMap> tableSetTimestampMap) {
+ this.tableSetTimestampMap = tableSetTimestampMap;
+ }
+
+ public String getHlogTargetDir() {
+ return hlogTargetDir;
+ }
+
+ public void setType(BackupType type) {
+ this.type = type;
+ }
+
+ public void setTargetRootDir(String targetRootDir) {
+ this.targetRootDir = targetRootDir;
+ }
+
+ public void setTotalBytesCopied(long totalBytesCopied) {
+ this.totalBytesCopied = totalBytesCopied;
+ }
+
+ public void setCancelled(boolean cancelled) {
+ this.state = BackupState.CANCELLED;;
+ }
+
/**
* Set progress string
* @param msg progress message
@@ -332,6 +367,11 @@ public class BackupContext {
builder.setTargetRootDir(getTargetRootDir());
builder.setTotalBytesCopied(getTotalBytesCopied());
builder.setType(BackupProtos.BackupType.valueOf(getType().name()));
+ builder.setWorkersNumber(workers);
+ builder.setBandwidth(bandwidth);
+ if(jobId != null) {
+ builder.setJobId(ByteString.copyFrom(jobId));
+ }
byte[] data = builder.build().toByteArray();
return data;
}
@@ -368,6 +408,11 @@ public class BackupContext {
context.setTargetRootDir(proto.getTargetRootDir());
context.setTotalBytesCopied(proto.getTotalBytesCopied());
context.setType(BackupType.valueOf(proto.getType().name()));
+ context.setWorkers(proto.getWorkersNumber());
+ context.setBandwidth(proto.getBandwidth());
+ if(proto.hasJobId()){
+ context.setJobId(proto.getJobId().toByteArray());
+ }
return context;
}
@@ -379,4 +424,50 @@ public class BackupContext {
return map;
}
+ public String getShortDescription() {
+ StringBuffer sb = new StringBuffer();
+ sb.append("ID : " + backupId).append("\n");
+ sb.append("Tables : " + getTableListAsString()).append("\n");
+ sb.append("State : " + getState()).append("\n");
+ Date date = null;
+ Calendar cal = Calendar.getInstance();
+ cal.setTimeInMillis(getStartTs());
+ date = cal.getTime();
+ sb.append("Start time : " + date).append("\n");
+ if (state == BackupState.FAILED) {
+ sb.append("Failed message : " + getFailedMsg()).append("\n");
+ } else if (state == BackupState.RUNNING) {
+ sb.append("Phase : " + getPhase()).append("\n");
+ sb.append("Progress : " + getProgress()).append("\n");
+ } else if (state == BackupState.COMPLETE) {
+ cal = Calendar.getInstance();
+ cal.setTimeInMillis(getEndTs());
+ date = cal.getTime();
+ sb.append("Start time : " + date).append("\n");
+ }
+ return sb.toString();
+ }
+
+ public String getStatusAndProgressAsString() {
+ StringBuffer sb = new StringBuffer();
+ sb.append("id: ").append(getBackupId()).append(" state: ").append(getState())
+ .append(" progress: ").append(getProgress());
+ return sb.toString();
+ }
+
+ public String getTableListAsString() {
+ return concat(backupStatusMap.keySet(), ";");
+ }
+
+ private String concat(Set col, String separator) {
+ if (col.size() == 0) {
+ return "";
+ }
+ StringBuilder sb = new StringBuilder();
+ for (Object s : col) {
+ sb.append(s.toString() + separator);
+ }
+ sb.deleteCharAt(sb.lastIndexOf(";"));
+ return sb.toString();
+ }
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCopyService.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCopyService.java
index 1a0d5ed..c26f4c0 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCopyService.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCopyService.java
@@ -27,11 +27,28 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
@InterfaceAudience.Private
@InterfaceStability.Evolving
+//TODO re-factor move up
public interface BackupCopyService extends Configurable {
static enum Type {
FULL, INCREMENTAL
}
+ /**
+ * Copy backup data
+ * @param backupHandler - handler
+ * @param conf - configuration
+ * @param copyType - copy type
+ * @param options - list of options
+ * @return result (0 - success)
+ * @throws IOException
+ */
public int copy(BackupHandler backupHandler, Configuration conf, BackupCopyService.Type copyType,
String[] options) throws IOException;
+
+ /**
+ * TODO refactor
+ * @param jobHandler - copy job handler
+ * @throws IOException
+ */
+ public void cancelCopyJob(byte[] jobHandler) throws IOException;
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupHandler.java
index ea9f35e..c2b7b2c 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupHandler.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupHandler.java
@@ -233,7 +233,6 @@ public class BackupHandler implements Callable {
*/
private void snapshotForFullBackup(BackupContext backupContext) throws IOException {
LOG.info("HBase snapshot full backup for " + backupContext.getBackupId());
-
// avoid action if has been cancelled
if (backupContext.isCancelled()) {
return;
@@ -242,6 +241,7 @@ public class BackupHandler implements Callable {
try (Admin admin = conn.getAdmin()) {
// we do HBase snapshot for tables in the table list one by one currently
for (TableName table : backupContext.getTables()) {
+
// avoid action if it has been cancelled
if (backupContext.isCancelled()) {
return;
@@ -427,11 +427,26 @@ public class BackupHandler implements Callable {
// TODO this below
// backupCopier.setSubTaskPercntgInWholeTask(1f / numOfSnapshots);
int res = 0;
- String[] args = new String[4];
+ int argsLen = 4;
+ if (backupContext.getWorkers() > 0) argsLen += 2;
+ if (backupContext.getBandwidth() > 0) argsLen += 2;
+ String[] args = new String[argsLen];
args[0] = "-snapshot";
args[1] = backupContext.getSnapshotName(table);
args[2] = "-copy-to";
args[3] = backupContext.getBackupStatus(table).getTargetDir();
+ if (backupContext.getWorkers() > 0 && backupContext.getBandwidth() > 0) {
+ args[4] = "-mappers";
+ args[5] = Integer.toString(backupContext.getWorkers());
+ args[6] = "-bandwidth";
+ args[7] = Integer.toString(backupContext.getBandwidth());
+ } else if (backupContext.getWorkers() > 0) {
+ args[4] = "-mappers";
+ args[5] = Integer.toString(backupContext.getWorkers());
+ } else if (backupContext.getBandwidth() > 0) {
+ args[4] = "-bandwidth";
+ args[5] = Integer.toString(backupContext.getBandwidth());
+ }
LOG.debug("Copy snapshot " + args[1] + " to " + args[3]);
res = copyService.copy(this, conf, BackupCopyService.Type.FULL, args);
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
index a4b0a0a..d4e5a5c 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
@@ -86,9 +86,10 @@ public class BackupManager implements Closeable {
HConstants.BACKUP_ENABLE_KEY + " setting.");
}
this.conf = conf;
- this.conn = ConnectionFactory.createConnection(conf); // TODO: get Connection from elsewhere?
- this.systemTable = new BackupSystemTable(conn);
+ this.systemTable = new BackupSystemTable(this.conf);
+ this.conn = BackupSystemTable.getConnection();
Runtime.getRuntime().addShutdownHook(new ExitHandler());
+
}
/**
@@ -178,13 +179,13 @@ public class BackupManager implements Closeable {
LOG.error(e);
}
}
- if (conn != null) {
- try {
- conn.close();
- } catch (IOException e) {
- LOG.error(e);
- }
- }
+// if (conn != null) {
+// try {
+// conn.close();
+// } catch (IOException e) {
+// LOG.error(e);
+// }
+// }
}
/**
@@ -198,13 +199,14 @@ public class BackupManager implements Closeable {
* @throws BackupException exception
*/
protected BackupContext createBackupContext(String backupId, BackupType type,
- List tableList, String targetRootDir) throws BackupException {
+ List tableList, String targetRootDir, int workers, int bandwidth)
+ throws BackupException {
if (targetRootDir == null) {
throw new BackupException("Wrong backup request parameter: target backup root directory");
}
- if (type == BackupType.FULL && tableList == null) {
+ if (type == BackupType.FULL && tableList == null || tableList.size() == 0) {
// If table list is null for full backup, which means backup all tables. Then fill the table
// list with all user tables from meta. It no table available, throw the request exception.
@@ -228,8 +230,12 @@ public class BackupManager implements Closeable {
}
// there are one or more tables in the table list
- return new BackupContext(backupId, type, tableList.toArray(new TableName[tableList.size()]),
+ BackupContext context = new BackupContext(backupId, type,
+ tableList.toArray(new TableName[tableList.size()]),
targetRootDir);
+ context.setBandwidth(bandwidth);
+ context.setWorkers(workers);
+ return context;
}
/**
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupRestoreConstants.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupRestoreConstants.java
index d0ce059..7233bfa 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupRestoreConstants.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupRestoreConstants.java
@@ -37,7 +37,8 @@ public final class BackupRestoreConstants {
public static final String BACKUPID_PREFIX = "backup_";
public static enum BackupCommand {
- CREATE, CANCEL, DELETE, DESCRIBE, HISTORY, STATUS, CONVERT, MERGE, STOP, SHOW, HELP,
+ CREATE, CANCEL, DELETE, DESCRIBE, HISTORY, STATUS, CONVERT, MERGE, STOP, SHOW, HELP, PROGRESS, SET,
+ SET_ADD, SET_REMOVE, SET_DELETE, SET_DESCRIBE, SET_LIST
}
private BackupRestoreConstants() {
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
index 18a0f06..46a1e44 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
@@ -27,14 +27,21 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.NamespaceNotFoundException;
+import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.impl.BackupHandler.BackupState;
import org.apache.hadoop.hbase.backup.impl.BackupUtil.BackupCompleteData;
@@ -42,6 +49,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
@@ -67,25 +75,62 @@ public final class BackupSystemTable implements Closeable {
// Connection to HBase cluster, shared
// among all instances
- private final Connection connection;
+ private static Connection connection;
// Cluster configuration
- private final Configuration conf;
+ private static Configuration config;
- /**
- * Create a BackupSystemTable object for the given Connection. Connection is NOT owned by this
- * instance and has to be closed explicitly.
- * @param connection
- * @throws IOException
- */
- public BackupSystemTable(Connection connection) throws IOException {
- this.connection = connection;
- this.conf = connection.getConfiguration();
+ private static int refCount = 0;
+ private static WriteLock refCountLock = new ReentrantReadWriteLock().writeLock();
- createSystemTableIfNotExists();
+ public static Connection getConnection()
+ {
+ return connection;
+ }
+
+ private static void verifySystemNamespaceExists(Admin admin) throws IOException {
+ try {
+ admin.getNamespaceDescriptor(TABLE_NAMESPACE);
+ } catch (NamespaceNotFoundException e) {
+ admin.createNamespace(NamespaceDescriptor.create(TABLE_NAMESPACE).build());
+ }
+ }
+
+ public BackupSystemTable(Configuration conf) throws IOException {
+ try{
+ refCountLock.lock();
+ if (connection == null) {
+ connection = ConnectionFactory.createConnection(conf);
+ config = conf;
+ // Verify hbase:backup exists - FOR TESTS ONLY
+ // hbase:backup is created by HMaster on start up
+ createSystemTableIfNotExists();
+ }
+ refCount++;
+ } finally{
+ refCountLock.unlock();
+ }
}
- @Override
+ /**
+ * Closes HBase connection, to avoid connection leak
+ * we perform reference counting and when refNumber reaches 0
+ * we close underlying connection
+ */
public void close() {
+ try{
+ BackupSystemTable.refCountLock.lock();
+ BackupSystemTable.refCount--;
+ if(connection != null && BackupSystemTable.refCount == 0) {
+ try{
+ connection.close();
+ connection = null;
+ } catch(Exception e){
+ LOG.error(e);
+ }
+ }
+ } finally{
+ BackupSystemTable.refCountLock.unlock();
+ }
}
/**
@@ -99,15 +144,19 @@ public final class BackupSystemTable implements Closeable {
private void createSystemTableIfNotExists() throws IOException {
try(Admin admin = connection.getAdmin()) {
if (admin.tableExists(tableName) == false) {
+ verifySystemNamespaceExists(admin);
HTableDescriptor tableDesc = new HTableDescriptor(tableName);
HColumnDescriptor colDesc = new HColumnDescriptor(familyName);
colDesc.setMaxVersions(1);
int ttl =
- conf.getInt(HConstants.BACKUP_SYSTEM_TTL_KEY, HConstants.BACKUP_SYSTEM_TTL_DEFAULT);
+ config.getInt(HConstants.BACKUP_SYSTEM_TTL_KEY, HConstants.BACKUP_SYSTEM_TTL_DEFAULT);
colDesc.setTimeToLive(ttl);
tableDesc.addFamily(colDesc);
admin.createTable(tableDesc);
}
+
+ } catch (TableExistsException ee){
+ //just ignore that
} catch (IOException e) {
LOG.error(e);
throw e;
@@ -568,4 +617,203 @@ public final class BackupSystemTable implements Closeable {
return result;
}
}
+
+ /**
+ * BACKUP SETS
+ */
+
+ /**
+ * Get backup set list
+ * @return backup set list
+ * @throws IOException
+ */
+ public List backupSetList() throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(" backup set list");
+ }
+ List list = new ArrayList();
+ Table table = null;
+ ResultScanner scanner = null;
+ try {
+ table = connection.getTable(tableName);
+ Scan scan = BackupSystemTableHelper.createScanForBackupSetList();
+ scan.setMaxVersions(1);
+ scanner = table.getScanner(scan);
+ Result res = null;
+ while ((res = scanner.next()) != null) {
+ res.advance();
+ list.add(BackupSystemTableHelper.cellKeyToBackupSetName(res.current()));
+ }
+ return list;
+ } finally {
+ if (table != null) {
+ table.close();
+ }
+ }
+ }
+
+ /**
+ * Get backup set description (list of tables)
+ * @param setName set's name
+ * @return list of tables in a backup set
+ * @throws IOException
+ */
+ public String[] backupSetDescribe(String name) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(" backup set describe: "+name);
+ }
+ Table table = null;
+ try {
+ table = connection.getTable(tableName);
+ Get get = BackupSystemTableHelper.createGetForBackupSet(name);
+ Result res = table.get(get);
+ if(res.isEmpty()) return null;
+ res.advance();
+ String[] tables =
+ BackupSystemTableHelper.cellValueToBackupSet(res.current());
+ return tables;
+ } finally {
+ if (table != null) {
+ table.close();
+ }
+ }
+ }
+
+ /**
+ * Add backup set (list of tables)
+ * @param name - set name
+ * @param tables - list of tables, comma-separated
+ * @throws IOException
+ */
+ public void backupSetAdd(String name, String[] newTables) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(" backup set add: "+name);
+ }
+ Table table = null;
+ String[] union = null;
+ try {
+ table = connection.getTable(tableName);
+ Get get = BackupSystemTableHelper.createGetForBackupSet(name);
+ Result res = table.get(get);
+ if(res.isEmpty()) {
+ union = newTables;
+ } else {
+ res.advance();
+ String[] tables =
+ BackupSystemTableHelper.cellValueToBackupSet(res.current());
+ union = merge(tables, newTables);
+ }
+ Put put = BackupSystemTableHelper.createPutForBackupSet(name, union);
+ table.put(put);
+ } finally {
+ if (table != null) {
+ table.close();
+ }
+ }
+ }
+
+ private String[] merge(String[] tables, String[] newTables) {
+ List list = new ArrayList();
+ // Add all from tables
+ for(String t: tables){
+ list.add(t);
+ }
+ for(String nt: newTables){
+ if(list.contains(nt)) continue;
+ list.add(nt);
+ }
+ String[] arr = new String[list.size()];
+ list.toArray(arr);
+ return arr;
+ }
+
+ /**
+ * Remove tables from backup set (list of tables)
+ * @param name - set name
+ * @param tables - list of tables, comma-separated
+ * @throws IOException
+ */
+ public void backupSetRemove(String name, String[] toRemove) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(" backup set describe: " + name);
+ }
+ Table table = null;
+ String[] disjoint = null;
+ try {
+ table = connection.getTable(tableName);
+ Get get = BackupSystemTableHelper.createGetForBackupSet(name);
+ Result res = table.get(get);
+ if (res.isEmpty()) {
+ return;
+ } else {
+ res.advance();
+ String[] tables = BackupSystemTableHelper.cellValueToBackupSet(res.current());
+ disjoint = disjoin(tables, toRemove);
+ }
+ if (disjoint.length > 0) {
+ Put put = BackupSystemTableHelper.createPutForBackupSet(name, disjoint);
+ table.put(put);
+ } else {
+ // Delete
+ backupSetDescribe(name);
+ }
+ } finally {
+ if (table != null) {
+ table.close();
+ }
+ }
+ }
+
+ private String[] disjoin(String[] tables, String[] toRemove) {
+ List list = new ArrayList();
+ // Add all from tables
+ for (String t : tables) {
+ list.add(t);
+ }
+ for (String nt : toRemove) {
+ if (list.contains(nt)) {
+ list.remove(nt);
+ }
+ }
+ String[] arr = new String[list.size()];
+ list.toArray(arr);
+ return arr;
+ }
+
+ /**
+ * Delete backup set
+ * @param name set's name
+ * @throws IOException
+ */
+ public void backupSetDelete(String name) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(" backup set delete: " + name);
+ }
+ Table table = null;
+ try {
+ table = connection.getTable(tableName);
+ Delete del = BackupSystemTableHelper.createDeleteForBackupSet(name);
+ table.delete(del);
+ } finally {
+ if (table != null) {
+ table.close();
+ }
+ }
+ }
+
+ /**
+ * Get backup system table descriptor
+ * @return descriptor
+ */
+ public static HTableDescriptor getSystemTableDescriptor() {
+ HTableDescriptor tableDesc = new HTableDescriptor(tableName);
+ HColumnDescriptor colDesc = new HColumnDescriptor(familyName);
+ colDesc.setMaxVersions(1);
+ if(config == null) config = HBaseConfiguration.create();
+ int ttl =
+ config.getInt(HConstants.BACKUP_SYSTEM_TTL_KEY, HConstants.BACKUP_SYSTEM_TTL_DEFAULT);
+ colDesc.setTimeToLive(ttl);
+ tableDesc.addFamily(colDesc);
+ return tableDesc;
+ }
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTableHelper.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTableHelper.java
index ac096b7..52d3311 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTableHelper.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTableHelper.java
@@ -61,6 +61,7 @@ public final class BackupSystemTableHelper {
private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm.";
private final static String RS_LOG_TS_PREFIX = "rslogts.";
private final static String WALS_PREFIX = "wals.";
+ private final static String SET_KEY_PREFIX = "set.";
private final static byte[] col1 = "col1".getBytes();
private final static byte[] col2 = "col2".getBytes();
@@ -322,4 +323,93 @@ public final class BackupSystemTableHelper {
return get;
}
+
+ /**
+ * Creates Scan operation to load backup set list
+ * @return scan operation
+ */
+ static Scan createScanForBackupSetList() {
+ Scan scan = new Scan();
+ byte[] startRow = SET_KEY_PREFIX.getBytes();
+ byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
+ stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
+ scan.setStartRow(startRow);
+ scan.setStopRow(stopRow);
+ scan.addFamily(BackupSystemTable.familyName);
+ return scan;
+ }
+
+ /**
+ * Creates Get operation to load backup set content
+ * @return get operation
+ */
+ static Get createGetForBackupSet(String name) {
+ byte[] row = (SET_KEY_PREFIX + name).getBytes();
+ Get get = new Get(row);
+ get.addFamily(BackupSystemTable.familyName);
+ return get;
+ }
+
+ /**
+ * Creates Delete operation to delete backup set content
+ * @return delete operation
+ */
+ static Delete createDeleteForBackupSet(String name) {
+ byte[] row = (SET_KEY_PREFIX + name).getBytes();
+ Delete del = new Delete(row);
+ del.addFamily(BackupSystemTable.familyName);
+ return del;
+ }
+
+
+ /**
+ * Creates Put operation to update backup set content
+ * @return put operation
+ */
+ static Put createPutForBackupSet(String name, String[] tables) {
+ byte[] row = (SET_KEY_PREFIX + name).getBytes();
+ Put put = new Put(row);
+ byte[] value = convertToByteArray(tables);
+ put.addColumn(BackupSystemTable.familyName, col1, value);
+ return put;
+ }
+
+ private static byte[] convertToByteArray(String[] tables) {
+ StringBuffer sb = new StringBuffer();
+ for(int i=0; i < tables.length; i++){
+ sb.append(tables[i]);
+ if(i < tables.length -1){
+ sb.append(",");
+ }
+ }
+ return sb.toString().getBytes();
+ }
+
+
+ /**
+ * Converts cell to backup set list.
+ * @param current - cell
+ * @return backup set
+ * @throws IOException
+ */
+ static String[] cellValueToBackupSet(Cell current) throws IOException {
+ byte[] data = CellUtil.cloneValue(current);
+ if( data != null && data.length > 0){
+ return new String(data).split(",");
+ } else{
+ return new String[0];
+ }
+ }
+
+ /**
+ * Converts cell key to backup set name.
+ * @param current - cell
+ * @return backup set namd
+ * @throws IOException
+ */
+ static String cellKeyToBackupSetName(Cell current) throws IOException {
+ byte[] data = CellUtil.cloneRow(current);
+ return new String(data).substring(SET_KEY_PREFIX.length());
+ }
+
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupUtil.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupUtil.java
index 2dd0556..38eb5a3 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupUtil.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupUtil.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -45,12 +46,12 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableDescriptor;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.HBackupFileSystem;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
@@ -153,7 +154,6 @@ public final class BackupUtil {
descriptors.createTableDescriptorForTableDirectory(target, orig, false);
LOG.debug("Finished copying tableinfo.");
- HBaseAdmin hbadmin = null;
// TODO: optimize
List regions = null;
try(Connection conn = ConnectionFactory.createConnection(conf);
@@ -375,9 +375,35 @@ public final class BackupUtil {
new Long(o.getBackupToken().substring(o.getBackupToken().lastIndexOf("_") + 1));
return thisTS.compareTo(otherTS);
}
+
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ sb.append("BackupId =").append(backupToken).append("\n");
+ sb.append("Type =").append(type).append("\n");
+ sb.append("Start time =").append(new Date(Long.parseLong(startTime))).append("\n");
+ sb.append("End time =").append(new Date(Long.parseLong(endTime))).append("\n");
+ sb.append("Root path =").append(backupRootPath).append("\n");
+ sb.append("Type =").append(type).append("\n");
+ sb.append("Table list =").append(tableList).append("\n");
+ sb.append("Bytes copied =").append(type).append("\n");
+ sb.append("Ancestors =").append(BackupUtil.merge(ancestors)).append("\n\n");
+ return sb.toString();
+ }
}
+ public static String merge(List list) {
+ if (list == null || list.size() == 0) return "";
+ StringBuffer sb = new StringBuffer();
+ for (int i = 0; i < list.size(); i++) {
+ sb.append(list.get(i));
+ if (i < list.size() - 1) {
+ sb.append(",");
+ }
+ }
+ return sb.toString();
+ }
+
/**
* Sort history list by start time in descending order.
* @param historyList history list
@@ -510,4 +536,78 @@ public final class BackupUtil {
}
return ret;
}
+
+ public static void cleanupBackupData(BackupContext context, Configuration conf)
+ throws IOException
+ {
+ cleanupHLogDir(context, conf);
+ cleanupTargetDir(context, conf);
+ }
+
+ /**
+ * Clean up directories which are generated when DistCp copying hlogs.
+ * @throws IOException
+ */
+ private static void cleanupHLogDir(BackupContext backupContext, Configuration conf)
+ throws IOException {
+
+ String logDir = backupContext.getHLogTargetDir();
+ if (logDir == null) {
+ LOG.warn("No log directory specified for " + backupContext.getBackupId());
+ return;
+ }
+
+ Path rootPath = new Path(logDir).getParent();
+ FileSystem fs = FileSystem.get(rootPath.toUri(), conf);
+ FileStatus[] files = FSUtils.listStatus(fs, rootPath);
+ if (files == null) {
+ return;
+ }
+ for (FileStatus file : files) {
+ LOG.debug("Delete log files: " + file.getPath().getName());
+ FSUtils.delete(fs, file.getPath(), true);
+ }
+ }
+
+ /**
+ * Clean up the data at target directory
+ */
+ private static void cleanupTargetDir(BackupContext backupContext, Configuration conf) {
+ try {
+ // clean up the data at target directory
+ LOG.debug("Trying to cleanup up target dir : " + backupContext.getBackupId());
+ String targetDir = backupContext.getTargetRootDir();
+ if (targetDir == null) {
+ LOG.warn("No target directory specified for " + backupContext.getBackupId());
+ return;
+ }
+
+ FileSystem outputFs =
+ FileSystem.get(new Path(backupContext.getTargetRootDir()).toUri(), conf);
+
+ for (TableName table : backupContext.getTables()) {
+ Path targetDirPath =
+ new Path(HBackupFileSystem.getTableBackupDir(backupContext.getTargetRootDir(),
+ backupContext.getBackupId(), table));
+ if (outputFs.delete(targetDirPath, true)) {
+ LOG.info("Cleaning up backup data at " + targetDirPath.toString() + " done.");
+ } else {
+ LOG.info("No data has been found in " + targetDirPath.toString() + ".");
+ }
+
+ Path tableDir = targetDirPath.getParent();
+ FileStatus[] backups = FSUtils.listStatus(outputFs, tableDir);
+ if (backups == null || backups.length == 0) {
+ outputFs.delete(tableDir, true);
+ LOG.debug(tableDir.toString() + " is empty, remove it.");
+ }
+ }
+
+ } catch (IOException e1) {
+ LOG.error("Cleaning up backup data of " + backupContext.getBackupId() + " at "
+ + backupContext.getTargetRootDir() + " failed due to " + e1.getMessage() + ".");
+ }
+ }
+
+
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java
index df21612..aba0195 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java
@@ -105,12 +105,25 @@ public class IncrementalBackupManager {
newTimestamps = backupManager.readRegionServerLastLogRollResult();
logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf, savedStartCode);
- logList.addAll(getLogFilesFromBackupSystem(previousTimestampMins, newTimestamps));
+ logList = addLogFilesFromBackupSystem(logList, previousTimestampMins, newTimestamps);
backupContext.setIncrBackupFileList(logList);
return newTimestamps;
}
+ private List addLogFilesFromBackupSystem(List logList,
+ HashMap previousTimestampMins, HashMap newTimestamps)
+ throws IOException {
+ List fromBackupSystemList =
+ getLogFilesFromBackupSystem(previousTimestampMins, newTimestamps);
+ for(String fileName: fromBackupSystemList) {
+ if(!logList.contains(fileName)) {
+ logList.add(fileName);
+ }
+ }
+ return logList;
+ }
+
/**
* For each region server: get all log files newer than the last timestamps but not newer than the
* newest timestamps. FROM hbase:backup table
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreClientImpl.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreClientImpl.java
index 32d16fb..4705be8 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreClientImpl.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreClientImpl.java
@@ -20,11 +20,9 @@ package org.apache.hadoop.hbase.backup.impl;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map.Entry;
-import java.util.Set;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
@@ -51,7 +49,6 @@ public final class RestoreClientImpl implements RestoreClient {
private static final Log LOG = LogFactory.getLog(RestoreClientImpl.class);
private Configuration conf;
- private Set lastRestoreImagesSet;
public RestoreClientImpl() {
}
@@ -64,7 +61,7 @@ public final class RestoreClientImpl implements RestoreClient {
/**
* Restore operation. Stage 1: validate backupManifest, and check target tables
* @param hBackupFS to access the backup image
- * @param backupRootDir The root dir for backup image
+ * @param backupRootDir The root directory for backup image
* @param backupId The backup id for image to be restored
* @param check True if only do dependency check
* @param autoRestore True if automatically restore following the dependency
@@ -107,30 +104,18 @@ public final class RestoreClientImpl implements RestoreClient {
// check the target tables
checkTargetTables(tTableArray, isOverwrite);
+ restoreStage(hBackupFS, backupManifestMap, sTableArray, tTableArray, autoRestore);
- // start restore process
- Set restoreImageSet =
- restoreStage(hBackupFS, backupManifestMap, sTableArray, tTableArray, autoRestore);
-
- LOG.info("Restore for " + Arrays.asList(sTableArray) + " are successful!");
- lastRestoreImagesSet = restoreImageSet;
+ LOG.info("Restore for " + BackupUtil.join(sTableArray) + " is successful!");
} catch (IOException e) {
LOG.error("ERROR: restore failed with error: " + e.getMessage());
throw e;
}
-
// not only for check, return false
return false;
}
- /**
- * Get last restore image set. The value is globally set for the latest finished restore.
- * @return the last restore image set
- */
- public Set getLastRestoreImagesSet() {
- return lastRestoreImagesSet;
- }
private boolean validate(HashMap backupManifestMap)
throws IOException {
@@ -145,10 +130,6 @@ public final class RestoreClientImpl implements RestoreClient {
imageSet.addAll(depList);
}
- // todo merge
- LOG.debug("merge will be implemented in future jira");
- // BackupUtil.clearMergedImages(table, imageSet, conf);
-
LOG.info("Dependent image(s) from old to new:");
for (BackupImage image : imageSet) {
String imageDir =
@@ -187,7 +168,7 @@ public final class RestoreClientImpl implements RestoreClient {
}
} else {
LOG.info("HBase table " + tableName
- + " does not exist. It will be create during backup process");
+ + " does not exist. It will be created during restore process");
}
}
}
@@ -219,10 +200,9 @@ public final class RestoreClientImpl implements RestoreClient {
* @param sTableArray The array of tables to be restored
* @param tTableArray The array of mapping tables to restore to
* @param autoRestore : yes, restore all the backup images on the dependency list
- * @return set of BackupImages restored
* @throws IOException exception
*/
- private Set restoreStage(HBackupFileSystem hBackupFS,
+ private void restoreStage(HBackupFileSystem hBackupFS,
HashMap backupManifestMap, TableName[] sTableArray,
TableName[] tTableArray, boolean autoRestore) throws IOException {
TreeSet restoreImageSet = new TreeSet();
@@ -267,9 +247,7 @@ public final class RestoreClientImpl implements RestoreClient {
}
}
}
-
}
- return restoreImageSet;
}
/**
@@ -290,7 +268,7 @@ public final class RestoreClientImpl implements RestoreClient {
Path tableBackupPath = hFS.getTableBackupPath(sTable);
- // todo: convert feature will be provided in a future jira
+ // TODO: convert feature will be provided in a future JIRA
boolean converted = false;
if (manifest.getType() == BackupType.FULL || converted) {
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreUtil.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreUtil.java
index 2dd38c1..eb7b6b2 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreUtil.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreUtil.java
@@ -64,9 +64,9 @@ public class RestoreUtil {
}
/**
- * During incremental backup operation. Call WalPlayer to replay WAL in backup image Currently
- * tableNames and newTablesNames only contain single table, will be expanded to multiple tables in
- * the future
+ * During incremental restore operation. Call WalPlayer to replay WAL in backup image Currently
+ * tableNames and newTablesNames only contain single table, will be expanded to multiple tables
+ * in the future
* @param logDir : incremental backup folders, which contains WAL
* @param tableNames : source tableNames(table names were backuped)
* @param newTableNames : target tableNames(table names to be restored to)
@@ -76,7 +76,7 @@ public class RestoreUtil {
TableName[] tableNames, TableName[] newTableNames) throws IOException {
if (tableNames.length != newTableNames.length) {
- throw new IOException("Number of source tables adn taget Tables does not match!");
+ throw new IOException("Number of source tables and target Tables do not match!");
}
// for incremental backup image, expect the table already created either by user or previous
@@ -232,6 +232,9 @@ public class RestoreUtil {
this.conf.setInt("hbase.rpc.timeout", resultMillis);
}
+ // By default, it is 32 and loader will fail if # of files in any region exceed this
+ // limit. Bad for snapshot restore.
+ this.conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE);
LoadIncrementalHFiles loader = null;
try {
loader = new LoadIncrementalHFiles(this.conf);
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java
index 4f5adda..f2c985f 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.backup.mapreduce;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
@@ -36,7 +38,9 @@ import org.apache.hadoop.hbase.backup.impl.BackupUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
+import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.tools.DistCp;
import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.DistCpOptions;
@@ -291,4 +295,26 @@ public class MapReduceBackupCopyService implements BackupCopyService {
}
}
+ @Override
+ public void cancelCopyJob(byte[] jobId) throws IOException {
+ JobID id = new JobID();
+ id.readFields(new DataInputStream(new ByteArrayInputStream(jobId)));
+ Cluster cluster = new Cluster(getConf());
+ try {
+ Job job = cluster.getJob(id);
+ if (job == null) {
+ LOG.error("No job found for " + id);
+ // should we throw exception
+ }
+ if (job.isComplete() || job.isRetired()) {
+ return;
+ }
+
+ job.killJob();
+ LOG.debug("Killed job " + id);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java
index dae24a6..e6ede65 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java
@@ -64,8 +64,7 @@ public class BackupLogCleaner extends BaseLogCleanerDelegate {
// TODO: LogCleaners do not have a way to get the Connection from Master. We should find a
// way to pass it down here, so that this connection is not re-created every time.
// It is expensive
- try(Connection connection = ConnectionFactory.createConnection(this.getConf());
- final BackupSystemTable table = new BackupSystemTable(connection)) {
+ try (final BackupSystemTable table = new BackupSystemTable(this.getConf())) {
// If we do not have recorded backup sessions
if (!table.hasBackupSessions()) {
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java
index d524140..f08e976 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java
@@ -75,7 +75,7 @@ public class LogRollBackupSubprocedure extends Subprocedure {
LOG.info("After roll log in backup subprocedure, current log number: " + hlog.getFilenum());
Connection connection = rss.getConnection();
- try(final BackupSystemTable table = new BackupSystemTable(connection)) {
+ try(final BackupSystemTable table = new BackupSystemTable(connection.getConfiguration())) {
// sanity check, good for testing
HashMap serverTimestampMap = table.readRegionServerLastLogRollResult();
String host = rss.getServerName().getHostname();
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
index 2ceeda5..f78bc19 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
@@ -261,7 +261,7 @@ public class WALPlayer extends Configured implements Tool {
Configuration conf = getConf();
setupTime(conf, HLogInputFormat.START_TIME_KEY);
setupTime(conf, HLogInputFormat.END_TIME_KEY);
- Path inputDir = new Path(args[0]);
+ Path[] inputDirs = getInputDirs(args[0]);
String[] tables = args[1].split(",");
String[] tableMap;
if (args.length > 2) {
@@ -275,9 +275,13 @@ public class WALPlayer extends Configured implements Tool {
}
conf.setStrings(TABLES_KEY, tables);
conf.setStrings(TABLE_MAP_KEY, tableMap);
- Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + inputDir));
+ Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + System.currentTimeMillis()));
job.setJarByClass(WALPlayer.class);
- FileInputFormat.setInputPaths(job, inputDir);
+
+ for(Path p: inputDirs){
+ FileInputFormat.addInputPath(job, p);
+ }
+
job.setInputFormatClass(WALInputFormat.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
@@ -312,6 +316,20 @@ public class WALPlayer extends Configured implements Tool {
}
/**
+ * We support multiple input sources
+ * @param pathStr - list of input folders, comma-separated
+ * @return array of path objects
+ */
+ private Path[] getInputDirs(String pathStr) {
+ String[] splits = pathStr.split(",");
+ Path[] retValue = new Path[splits.length];
+ for(int i = 0; i < splits.length; i++) {
+ retValue[i] = new Path(splits[i]);
+ }
+ return retValue;
+ }
+
+ /**
* Print usage
* @param errorMsg Error message. Can be null.
*/
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
index 84b7c78..c577767 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
@@ -18,6 +18,8 @@
*/
package org.apache.hadoop.hbase.backup;
+import static org.apache.hadoop.hbase.wal.WALFactory.WAL_PROVIDER;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -31,9 +33,10 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.backup.impl.BackupHandler.BackupState;
import org.apache.hadoop.hbase.backup.impl.BackupContext;
+import org.apache.hadoop.hbase.backup.impl.BackupHandler.BackupState;
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.backup.master.BackupController;
import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
import org.apache.hadoop.hbase.backup.regionserver.LogRollRegionServerProcedureManager;
import org.apache.hadoop.hbase.client.Connection;
@@ -71,7 +74,7 @@ public class TestBackupBase {
protected static TableName table3_restore = TableName.valueOf("table3_restore");
protected static TableName table4_restore = TableName.valueOf("table4_restore");
- protected static final int NB_ROWS_IN_BATCH = 100;
+ protected static final int NB_ROWS_IN_BATCH = 999;
protected static final byte[] qualName = Bytes.toBytes("q1");
protected static final byte[] famName = Bytes.toBytes("f");
@@ -88,15 +91,20 @@ public class TestBackupBase {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL = new HBaseTestingUtility();
- TEST_UTIL.getConfiguration().set("hbase.procedure.regionserver.classes",
+ conf1 = TEST_UTIL.getConfiguration();
+ conf1.set("hbase.procedure.regionserver.classes",
LogRollRegionServerProcedureManager.class.getName());
- TEST_UTIL.getConfiguration().set("hbase.procedure.master.classes",
+ conf1.set("hbase.procedure.master.classes",
LogRollMasterProcedureManager.class.getName());
- TEST_UTIL.getConfiguration().set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
+ // Set Master Observer - Backup Controller
+ conf1.set("hbase.coprocessor.master.classes",
+ BackupController.class.getName());
+ conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
+ // Set MultiWAL (with 2 default WAL files per RS)
+ conf1.set(WAL_PROVIDER, "multiwal");
TEST_UTIL.startMiniZKCluster();
MiniZooKeeperCluster miniZK = TEST_UTIL.getZkCluster();
- conf1 = TEST_UTIL.getConfiguration();
conf2 = HBaseConfiguration.create(conf1);
conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
TEST_UTIL2 = new HBaseTestingUtility(conf2);
@@ -139,12 +147,14 @@ public class TestBackupBase {
protected static void createTables() throws Exception {
long tid = System.currentTimeMillis();
+ byte[] startKey = Bytes.toBytes("row0");
+ byte[] endKey = Bytes.toBytes("row"+ NB_ROWS_IN_BATCH);
table1 = TableName.valueOf("test-" + tid);
HBaseAdmin ha = TEST_UTIL.getHBaseAdmin();
HTableDescriptor desc = new HTableDescriptor(table1);
HColumnDescriptor fam = new HColumnDescriptor(famName);
desc.addFamily(fam);
- ha.createTable(desc);
+ ha.createTable(desc, startKey, endKey, 10);
Connection conn = ConnectionFactory.createConnection(conf1);
HTable table = (HTable) conn.getTable(table1);
loadTable(table);
@@ -152,7 +162,7 @@ public class TestBackupBase {
table2 = TableName.valueOf("test-" + tid + 1);
desc = new HTableDescriptor(table2);
desc.addFamily(fam);
- ha.createTable(desc);
+ ha.createTable(desc, startKey, endKey, 10);
table = (HTable) conn.getTable(table2);
loadTable(table);
table.close();
@@ -179,9 +189,8 @@ public class TestBackupBase {
}
private BackupContext getBackupContext(String backupId) throws IOException {
- Configuration conf = conf1;//BackupClientImpl.getConf();
- try (Connection connection = ConnectionFactory.createConnection(conf);
- BackupSystemTable table = new BackupSystemTable(connection)) {
+ Configuration conf = conf1;
+ try (BackupSystemTable table = new BackupSystemTable(conf)) {
BackupContext status = table.readBackupStatus(backupId);
return status;
}
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBoundaryTests.java hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBoundaryTests.java
index 21bf63c..0f30659 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBoundaryTests.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBoundaryTests.java
@@ -73,8 +73,7 @@ public class TestBackupBoundaryTests extends TestBackupBase {
LOG.info("test full backup fails on a single table that does not exist");
List tables = toList("tabledne");
- String backupId = getBackupClient().create(BackupType.FULL, tables, BACKUP_ROOT_DIR);
- assertTrue(checkSucceeded(backupId));
+ getBackupClient().create(BackupType.FULL, tables, BACKUP_ROOT_DIR);
}
/**
@@ -86,20 +85,17 @@ public class TestBackupBoundaryTests extends TestBackupBase {
LOG.info("test full backup fails on multiple tables that do not exist");
List tables = toList("table1dne", "table2dne");
- String backupId = getBackupClient().create(BackupType.FULL, tables, BACKUP_ROOT_DIR);
- assertTrue(checkSucceeded(backupId));
+ getBackupClient().create(BackupType.FULL, tables, BACKUP_ROOT_DIR);
}
/**
- * Verify that full backup fails on tableset containing real and fake tables.
+ * Verify that full backup fails on table set containing real and fake tables.
* @throws Exception
*/
@Test(expected = DoNotRetryIOException.class)
public void testFullBackupMixExistAndDNE() throws Exception {
LOG.info("create full backup fails on tableset containing real and fake table");
-
List tables = toList(table1.getNameAsString(), "tabledne");
- String backupId = getBackupClient().create(BackupType.FULL, tables, BACKUP_ROOT_DIR);
- //assertTrue(checkSucceeded(backupId)); // TODO
+ getBackupClient().create(BackupType.FULL, tables, BACKUP_ROOT_DIR);
}
}
\ No newline at end of file
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupLogCleaner.java hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupLogCleaner.java
index 899f53b..a4779bc 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupLogCleaner.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupLogCleaner.java
@@ -65,8 +65,7 @@ public class TestBackupLogCleaner extends TestBackupBase {
List tableSetFullList = Lists.newArrayList(table1, table2, table3, table4);
- try (Connection connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
- BackupSystemTable systemTable = new BackupSystemTable(connection)) {
+ try (BackupSystemTable systemTable = new BackupSystemTable(TEST_UTIL.getConfiguration())) {
// Verify that we have no backup sessions yet
assertFalse(systemTable.hasBackupSessions());
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupSystemTable.java hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupSystemTable.java
index 2dc31df..d062202 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupSystemTable.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupSystemTable.java
@@ -42,9 +42,9 @@ import org.apache.hadoop.hbase.backup.impl.BackupContext;
import org.apache.hadoop.hbase.backup.impl.BackupHandler.BackupState;
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
import org.apache.hadoop.hbase.backup.impl.BackupUtil.BackupCompleteData;
+import org.apache.hadoop.hbase.backup.master.BackupController;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.After;
import org.junit.AfterClass;
@@ -68,13 +68,14 @@ public class TestBackupSystemTable {
@BeforeClass
public static void setUp() throws Exception {
- cluster = UTIL.startMiniCluster();
- conn = ConnectionFactory.createConnection(UTIL.getConfiguration());
+ UTIL.getConfiguration().set("hbase.coprocessor.master.classes",
+ BackupController.class.getName());
+ cluster = UTIL.startMiniCluster();
}
@Before
public void before() throws IOException {
- table = new BackupSystemTable(conn);
+ table = new BackupSystemTable(conf);
}
@After
@@ -148,6 +149,43 @@ public class TestBackupSystemTable {
}
@Test
+ public void testBackupDelete() throws IOException {
+
+ try (BackupSystemTable table = new BackupSystemTable(conf)) {
+
+ int n = 10;
+ List list = createBackupContextList(n);
+
+ // Load data
+ for (BackupContext bc : list) {
+ // Make sure we set right status
+ bc.setState(BackupState.COMPLETE);
+ table.updateBackupStatus(bc);
+ }
+
+ // Verify exists
+ for (BackupContext bc : list) {
+ assertNotNull(table.readBackupStatus(bc.getBackupId()));
+ }
+
+ // Delete all
+ for (BackupContext bc : list) {
+ table.deleteBackupStatus(bc.getBackupId());
+ }
+
+ // Verify do not exists
+ for (BackupContext bc : list) {
+ assertNull(table.readBackupStatus(bc.getBackupId()));
+ }
+
+ cleanBackupTable();
+ }
+
+ }
+
+
+
+ @Test
public void testRegionServerLastLogRollResults() throws IOException {
String[] servers = new String[] { "server1", "server2", "server3" };
Long[] timestamps = new Long[] { 100L, 102L, 107L };
@@ -302,6 +340,145 @@ public class TestBackupSystemTable {
cleanBackupTable();
}
+
+ /**
+ * Backup set tests
+ */
+
+ @Test
+ public void testBackupSetAddNotExists() throws IOException {
+ try (BackupSystemTable table = new BackupSystemTable(conf)) {
+
+ String[] tables = new String[] { "table1", "table2", "table3" };
+ String setName = "name";
+ table.backupSetAdd(setName, tables);
+ String[] tnames = table.backupSetDescribe(setName);
+ assertTrue(tnames != null);
+ assertTrue(tnames.length == tables.length);
+ for (int i = 0; i < tnames.length; i++) {
+ assertTrue(tnames[i].equals(tables[i]));
+ }
+
+ cleanBackupTable();
+ }
+
+ }
+
+ @Test
+ public void testBackupSetAddExists() throws IOException {
+ try (BackupSystemTable table = new BackupSystemTable(conf)) {
+
+ String[] tables = new String[] { "table1", "table2", "table3" };
+ String setName = "name";
+ table.backupSetAdd(setName, tables);
+ String[] addTables = new String[] { "table4", "table5", "table6" };
+ table.backupSetAdd(setName, addTables);
+
+ String[] tnames = table.backupSetDescribe(setName);
+ assertTrue(tnames != null);
+ assertTrue(tnames.length == tables.length + addTables.length);
+ for (int i = 0; i < tnames.length; i++) {
+ assertTrue(tnames[i].equals("table" + (i + 1)));
+ }
+ cleanBackupTable();
+ }
+ }
+
+ @Test
+ public void testBackupSetAddExistsIntersects() throws IOException {
+ try (BackupSystemTable table = new BackupSystemTable(conf)) {
+
+ String[] tables = new String[] { "table1", "table2", "table3" };
+ String setName = "name";
+ table.backupSetAdd(setName, tables);
+ String[] addTables = new String[] { "table3", "table4", "table5", "table6" };
+ table.backupSetAdd(setName, addTables);
+
+ String[] tnames = table.backupSetDescribe(setName);
+ assertTrue(tnames != null);
+ assertTrue(tnames.length == tables.length + addTables.length - 1);
+ for (int i = 0; i < tnames.length; i++) {
+ assertTrue(tnames[i].equals("table" + (i + 1)));
+ }
+ cleanBackupTable();
+ }
+ }
+
+ @Test
+ public void testBackupSetRemoveSomeNotExists() throws IOException {
+ try (BackupSystemTable table = new BackupSystemTable(conf)) {
+
+ String[] tables = new String[] { "table1", "table2", "table3", "table4" };
+ String setName = "name";
+ table.backupSetAdd(setName, tables);
+ String[] removeTables = new String[] { "table4", "table5", "table6" };
+ table.backupSetRemove(setName, removeTables);
+
+ String[] tnames = table.backupSetDescribe(setName);
+ assertTrue(tnames != null);
+ assertTrue(tnames.length == tables.length - 1);
+ for (int i = 0; i < tnames.length; i++) {
+ assertTrue(tnames[i].equals("table" + (i + 1)));
+ }
+ cleanBackupTable();
+ }
+ }
+
+ @Test
+ public void testBackupSetRemove() throws IOException {
+ try (BackupSystemTable table = new BackupSystemTable(conf)) {
+
+ String[] tables = new String[] { "table1", "table2", "table3", "table4" };
+ String setName = "name";
+ table.backupSetAdd(setName, tables);
+ String[] removeTables = new String[] { "table4", "table3" };
+ table.backupSetRemove(setName, removeTables);
+
+ String[] tnames = table.backupSetDescribe(setName);
+ assertTrue(tnames != null);
+ assertTrue(tnames.length == tables.length - 2);
+ for (int i = 0; i < tnames.length; i++) {
+ assertTrue(tnames[i].equals("table" + (i + 1)));
+ }
+ cleanBackupTable();
+ }
+ }
+
+ @Test
+ public void testBackupSetDelete() throws IOException {
+ try (BackupSystemTable table = new BackupSystemTable(conf)) {
+
+ String[] tables = new String[] { "table1", "table2", "table3", "table4" };
+ String setName = "name";
+ table.backupSetAdd(setName, tables);
+ table.backupSetDelete(setName);
+
+ String[] tnames = table.backupSetDescribe(setName);
+ assertTrue(tnames == null);
+ cleanBackupTable();
+ }
+ }
+
+ @Test
+ public void testBackupSetList() throws IOException {
+ try (BackupSystemTable table = new BackupSystemTable(conf)) {
+
+ String[] tables = new String[] { "table1", "table2", "table3", "table4" };
+ String setName1 = "name1";
+ String setName2 = "name2";
+ table.backupSetAdd(setName1, tables);
+ table.backupSetAdd(setName2, tables);
+
+ List list = table.backupSetList();
+
+ assertTrue(list.size() == 2);
+ assertTrue(list.get(0).equals(setName1));
+ assertTrue(list.get(1).equals(setName2));
+
+ cleanBackupTable();
+ }
+ }
+
private boolean compare(BackupContext ctx, BackupCompleteData data) {
return ctx.getBackupId().equals(data.getBackupToken())