diff --git hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestBackupRestore.java hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestBackupRestore.java
new file mode 100644
index 0000000..5ac884b
--- /dev/null
+++ hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestBackupRestore.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.hadoop.hbase.backup.BackupRequest;
+import org.apache.hadoop.hbase.backup.BackupType;
+import org.apache.hadoop.hbase.backup.RestoreRequest;
+import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.BackupAdmin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.testclassification.IntegrationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.RegionSplitter;
+import org.apache.hadoop.hbase.util.RegionSplitter.SplitAlgorithm;
+import org.hamcrest.CoreMatchers;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Lists;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * An integration test to detect regressions in HBASE-7912. Create
+ * a table with many regions, load data, perform series backup/load operations,
+ * then restore and verify data
+ * @see HBASE-7912
+ */
+@Category(IntegrationTests.class)
+public class IntegrationTestBackupRestore {
+
+ private static final String CLASS_NAME = IntegrationTestBackupRestore.class.getSimpleName();
+
+ protected static final Log LOG = LogFactory.getLog(IntegrationTestBackupRestore.class);
+ protected static final TableName TABLE_NAME = TableName.valueOf(CLASS_NAME);
+ protected static final TableName TABLE_NAME_RESTORE = TableName.valueOf(CLASS_NAME);
+ protected static final String COLUMN_NAME = "f";
+ protected static final String REGION_COUNT_KEY = String.format("hbase.%s.regions", CLASS_NAME);
+ protected static final String REGIONSERVER_COUNT_KEY = String.format("hbase.%s.regionServers",
+ CLASS_NAME);
+ protected static final String TIMEOUT_MINUTES_KEY = String.format("hbase.%s.timeoutMinutes",
+ CLASS_NAME);
+
+ protected static final int DEFAULT_REGION_COUNT = 10;
+ protected static final int DEFAULT_REGIONSERVER_COUNT = 2;
+
+ protected static final int DEFAULT_TIMEOUT_MINUTES = 6;
+ protected static final IntegrationTestingUtility util = new IntegrationTestingUtility();
+
+ protected static final int REGION_COUNT = util.getConfiguration().getInt(REGION_COUNT_KEY,
+ DEFAULT_REGION_COUNT);
+ protected static final int REGION_SERVER_COUNT = util.getConfiguration().getInt(
+ REGIONSERVER_COUNT_KEY, DEFAULT_REGIONSERVER_COUNT);
+ protected static final int TIMEOUT_MINUTES = util.getConfiguration().getInt(TIMEOUT_MINUTES_KEY,
+ DEFAULT_TIMEOUT_MINUTES);
+
+ private static final int NB_ROWS_IN_BATCH = 10000;
+ private static String BACKUP_ROOT_DIR = "backupIT";
+ private Random random = new Random();
+
+ @Before
+ public void setUp() throws Exception {
+ LOG.info(String.format("Initializing cluster with %d region servers.", REGION_SERVER_COUNT));
+ util.initializeCluster(REGION_SERVER_COUNT);
+ LOG.info("Cluster initialized");
+
+ Admin admin = util.getConnection().getAdmin();
+ if (admin.tableExists(TABLE_NAME)) {
+ LOG.info(String.format("Deleting existing table %s.", TABLE_NAME));
+ if (admin.isTableEnabled(TABLE_NAME)) admin.disableTable(TABLE_NAME);
+ admin.deleteTable(TABLE_NAME);
+ LOG.info(String.format("Existing table %s deleted.", TABLE_NAME));
+ }
+ waitForSystemTable();
+ LOG.info("Cluster ready");
+ }
+
+ public void waitForSystemTable() throws Exception
+ {
+ waitForTable(BackupSystemTable.getTableName());
+ }
+
+ public void waitForTable(TableName table) throws Exception
+ {
+ try(Admin admin = util.getAdmin();) {
+ while (!admin.tableExists(table)
+ || !admin.isTableAvailable(table)) {
+ Thread.sleep(1000);
+ }
+ }
+ LOG.debug("Table "+table+" exists and available");
+ }
+
+
+ @After
+ public void tearDown() throws IOException {
+ LOG.info("Cleaning up after test.");
+ Admin admin = util.getConnection().getAdmin();
+ if (admin.tableExists(TABLE_NAME)) {
+ if (admin.isTableEnabled(TABLE_NAME)) admin.disableTable(TABLE_NAME);
+ admin.deleteTable(TABLE_NAME);
+ }
+ LOG.info("Restoring cluster.");
+ util.restoreCluster();
+ LOG.info("Cluster restored.");
+ }
+
+ @Test
+ public void testBackupRestore() throws Exception {
+ BACKUP_ROOT_DIR = util.getConfiguration().get("fs.defaultFS") + "/backupIT";
+
+ long startTime, endTime;
+ HTableDescriptor desc = new HTableDescriptor(TABLE_NAME);
+ desc.addFamily(new HColumnDescriptor(COLUMN_NAME));
+ SplitAlgorithm algo = new RegionSplitter.UniformSplit();
+ byte[][] splits = algo.split(REGION_COUNT);
+
+ LOG.info(String.format("Creating table %s with %d splits.", TABLE_NAME, REGION_COUNT));
+ startTime = System.currentTimeMillis();
+ try {
+ Admin admin = util.getConnection().getAdmin();
+ admin.createTable(desc, splits);
+ waitForTable(TABLE_NAME);
+ endTime = System.currentTimeMillis();
+ LOG.info(String
+ .format("Pre-split table created successfully in %dms.", (endTime - startTime)));
+ runTest();
+ } catch (Exception e) {
+ LOG.error("Failed", e);
+ }
+
+ }
+
+ private byte[] getRandomArray(int size) {
+ byte[] arr = new byte[size];
+ random.nextBytes(arr);
+ return arr;
+ }
+
+ private void runTest() throws IOException {
+ Connection conn = util.getConnection();
+ // #0- insert some data to table
+ HTable t1 = (HTable) conn.getTable(TABLE_NAME);
+ Put p1;
+ for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+ p1 = new Put(getRandomArray(100));
+ p1.addColumn("f".getBytes(), "q".getBytes(), Bytes.toBytes("val" + i));
+ t1.put(p1);
+ }
+ // #1 - create full backup for all tables
+ LOG.info("create full backup image for all tables");
+
+ List tables = Lists.newArrayList(TABLE_NAME);
+ HBaseAdmin admin = null;
+ admin = (HBaseAdmin) conn.getAdmin();
+
+ BackupRequest request = new BackupRequest();
+ request.setBackupType(BackupType.FULL).setTableList(tables).setTargetRootDir(BACKUP_ROOT_DIR);
+ String backupIdFull = admin.getBackupAdmin().backupTables(request);
+
+ assertTrue(checkSucceeded(backupIdFull));
+
+ // #2 - insert some data to table
+ t1 = (HTable) conn.getTable(TABLE_NAME);
+ for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+ p1 = new Put(getRandomArray(100));
+ p1.addColumn("f".getBytes(), "q".getBytes(), Bytes.toBytes("val" + i));
+ t1.put(p1);
+ }
+
+ Assert.assertThat(util.countRows(t1), CoreMatchers.equalTo(NB_ROWS_IN_BATCH * 2));
+ t1.close();
+
+ // #3 - incremental backup for table
+ tables = Lists.newArrayList(TABLE_NAME);
+ request = new BackupRequest();
+ request.setBackupType(BackupType.INCREMENTAL).setTableList(tables)
+ .setTargetRootDir(BACKUP_ROOT_DIR);
+ String backupIdIncMultiple = admin.getBackupAdmin().backupTables(request);
+ assertTrue(checkSucceeded(backupIdIncMultiple));
+
+ // #4 - restore full backup for all tables, without overwrite
+ TableName[] tablesRestoreFull = new TableName[] { TABLE_NAME };
+
+ TableName[] tablesMapFull = new TableName[] { TABLE_NAME_RESTORE };
+
+ BackupAdmin client = util.getAdmin().getBackupAdmin();
+ client.restore(createRestoreRequest(BACKUP_ROOT_DIR, backupIdFull, false, tablesRestoreFull,
+ tablesMapFull, false));
+
+ // #5.1 - check tables for full restore
+ Admin hAdmin = util.getConnection().getAdmin();
+ assertTrue(hAdmin.tableExists(TABLE_NAME_RESTORE));
+ hAdmin.close();
+
+ // #5.2 - checking row count of tables for full restore
+ HTable hTable = (HTable) conn.getTable(TABLE_NAME_RESTORE);
+ Assert.assertThat(util.countRows(hTable), CoreMatchers.equalTo(NB_ROWS_IN_BATCH));
+ hTable.close();
+
+ // #6 - restore incremental backup for multiple tables, with overwrite
+ TableName[] tablesRestoreIncMultiple = new TableName[] { TABLE_NAME };
+ TableName[] tablesMapIncMultiple = new TableName[] { TABLE_NAME_RESTORE };
+ client.restore(createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple, false,
+ tablesRestoreIncMultiple, tablesMapIncMultiple, true));
+
+ hTable = (HTable) conn.getTable(TABLE_NAME_RESTORE);
+ Assert.assertThat(util.countRows(hTable), CoreMatchers.equalTo(NB_ROWS_IN_BATCH * 2));
+ hTable.close();
+ admin.close();
+ conn.close();
+
+ }
+
+ protected boolean checkSucceeded(String backupId) throws IOException {
+ BackupInfo status = getBackupContext(backupId);
+ if (status == null) return false;
+ return status.getState() == BackupState.COMPLETE;
+ }
+
+ private BackupInfo getBackupContext(String backupId) throws IOException {
+ try (BackupSystemTable table = new BackupSystemTable(util.getConnection())) {
+ BackupInfo status = table.readBackupInfo(backupId);
+ return status;
+ }
+ }
+
+ /**
+ * Get restore request.
+ */
+ public RestoreRequest createRestoreRequest(String backupRootDir, String backupId, boolean check,
+ TableName[] fromTables, TableName[] toTables, boolean isOverwrite) {
+ RestoreRequest request = new RestoreRequest();
+ request.setBackupRootDir(backupRootDir).setBackupId(backupId).setCheck(check)
+ .setFromTables(fromTables).setToTables(toTables).setOverwrite(isOverwrite);
+ return request;
+ }
+
+}