From a86d5e0556c054e9a39a981924dca4306574058d Mon Sep 17 00:00:00 2001
From: Vladimir Rodionov
Date: Tue, 20 Mar 2018 16:00:16 -0700
Subject: [PATCH] HBASE-19441: Implement retry logic around starting exclusive
backup operation
---
.../hadoop/hbase/backup/impl/BackupManager.java | 86 ++++++++----
.../hbase/backup/impl/BackupSystemTable.java | 145 ++++++++++-----------
.../backup/impl/ExclusiveOperationException.java | 33 +++++
.../hadoop/hbase/backup/TestBackupManager.java | 137 +++++++++++++++++++
4 files changed, 296 insertions(+), 105 deletions(-)
create mode 100644 hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/ExclusiveOperationException.java
create mode 100644 hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupManager.java
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
index f09d6d06ac..8bebc91b8e 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
@@ -1,4 +1,5 @@
/**
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,7 +16,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hbase.backup.impl;
import java.io.Closeable;
@@ -47,18 +47,22 @@ import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.procedure.ProcedureManagerHost;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
/**
- * Handles backup requests, creates backup info records in backup system table to
- * keep track of backup sessions, dispatches backup request.
+ * Handles backup requests, creates backup info records in backup system table to keep track of
+ * backup sessions, dispatches backup request.
*/
@InterfaceAudience.Private
public class BackupManager implements Closeable {
+ // in seconds
+ public final static String BACKUP_EXCLUSIVE_OPERATION_TIMEOUT_SECONDS_KEY =
+ "hbase.backup.exclusive.op.timeout.seconds";
+ // In seconds
+ private final static int DEFAULT_BACKUP_EXCLUSIVE_OPERATION_TIMEOUT = 3600;
private static final Logger LOG = LoggerFactory.getLogger(BackupManager.class);
protected Configuration conf = null;
@@ -112,8 +116,8 @@ public class BackupManager implements Closeable {
if (classes == null) {
conf.set(ProcedureManagerHost.MASTER_PROCEDURE_CONF_KEY, masterProcedureClass);
} else if (!classes.contains(masterProcedureClass)) {
- conf.set(ProcedureManagerHost.MASTER_PROCEDURE_CONF_KEY, classes + ","
- + masterProcedureClass);
+ conf.set(ProcedureManagerHost.MASTER_PROCEDURE_CONF_KEY,
+ classes + "," + masterProcedureClass);
}
if (LOG.isDebugEnabled()) {
@@ -138,16 +142,16 @@ public class BackupManager implements Closeable {
if (classes == null) {
conf.set(ProcedureManagerHost.REGIONSERVER_PROCEDURE_CONF_KEY, regionProcedureClass);
} else if (!classes.contains(regionProcedureClass)) {
- conf.set(ProcedureManagerHost.REGIONSERVER_PROCEDURE_CONF_KEY, classes + ","
- + regionProcedureClass);
+ conf.set(ProcedureManagerHost.REGIONSERVER_PROCEDURE_CONF_KEY,
+ classes + "," + regionProcedureClass);
}
String coproc = conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY);
String regionObserverClass = BackupObserver.class.getName();
- conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, (coproc == null ? "" : coproc + ",") +
- regionObserverClass);
+ conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+ (coproc == null ? "" : coproc + ",") + regionObserverClass);
if (LOG.isDebugEnabled()) {
- LOG.debug("Added region procedure manager: " + regionProcedureClass +
- ". Added region observer: " + regionObserverClass);
+ LOG.debug("Added region procedure manager: " + regionProcedureClass
+ + ". Added region observer: " + regionObserverClass);
}
}
@@ -223,9 +227,8 @@ public class BackupManager implements Closeable {
}
// there are one or more tables in the table list
- backupInfo =
- new BackupInfo(backupId, type, tableList.toArray(new TableName[tableList.size()]),
- targetRootDir);
+ backupInfo = new BackupInfo(backupId, type, tableList.toArray(new TableName[tableList.size()]),
+ targetRootDir);
backupInfo.setBandwidth(bandwidth);
backupInfo.setWorkers(workers);
return backupInfo;
@@ -254,7 +257,7 @@ public class BackupManager implements Closeable {
String ongoingBackupId = this.getOngoingBackupId();
if (ongoingBackupId != null) {
LOG.info("There is a ongoing backup " + ongoingBackupId
- + ". Can not launch new backup until no ongoing backup remains.");
+ + ". Can not launch new backup until no ongoing backup remains.");
throw new BackupException("There is ongoing backup.");
}
}
@@ -269,7 +272,7 @@ public class BackupManager implements Closeable {
* @return The ancestors for the current backup
* @throws IOException exception
*/
- public ArrayList getAncestors(BackupInfo backupInfo) throws IOException {
+ public ArrayList getAncestors(BackupInfo backupInfo) throws IOException {
LOG.debug("Getting the direct ancestors of the current backup " + backupInfo.getBackupId());
ArrayList ancestors = new ArrayList<>();
@@ -286,10 +289,9 @@ public class BackupManager implements Closeable {
BackupImage.Builder builder = BackupImage.newBuilder();
- BackupImage image =
- builder.withBackupId(backup.getBackupId()).withType(backup.getType())
- .withRootDir(backup.getBackupRootDir()).withTableList(backup.getTableNames())
- .withStartTime(backup.getStartTs()).withCompleteTime(backup.getCompleteTs()).build();
+ BackupImage image = builder.withBackupId(backup.getBackupId()).withType(backup.getType())
+ .withRootDir(backup.getBackupRootDir()).withTableList(backup.getTableNames())
+ .withStartTime(backup.getStartTs()).withCompleteTime(backup.getCompleteTs()).build();
// add the full backup image as an ancestor until the last incremental backup
if (backup.getType().equals(BackupType.FULL)) {
@@ -319,9 +321,9 @@ public class BackupManager implements Closeable {
BackupImage lastIncrImage = lastIncrImgManifest.getBackupImage();
ancestors.add(lastIncrImage);
- LOG.debug("Last dependent incremental backup image: " + "{BackupID="
- + lastIncrImage.getBackupId() + "," + "BackupDir=" + lastIncrImage.getRootDir()
- + "}");
+ LOG.debug(
+ "Last dependent incremental backup image: " + "{BackupID=" + lastIncrImage.getBackupId()
+ + "," + "BackupDir=" + lastIncrImage.getRootDir() + "}");
}
}
}
@@ -369,7 +371,36 @@ public class BackupManager implements Closeable {
* @throws IOException if active session already exists
*/
public void startBackupSession() throws IOException {
- systemTable.startBackupExclusiveOperation();
+ long startTime = System.currentTimeMillis();
+ long timeout = conf.getInt(BACKUP_EXCLUSIVE_OPERATION_TIMEOUT_SECONDS_KEY,
+ DEFAULT_BACKUP_EXCLUSIVE_OPERATION_TIMEOUT) * 1000L;
+ long lastWarningOutputTime = 0;
+ while (System.currentTimeMillis() - startTime < timeout) {
+ try {
+ systemTable.startBackupExclusiveOperation();
+ return;
+ } catch (IOException e) {
+ if (e instanceof ExclusiveOperationException) {
+ // sleep, then repeat
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e1) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ }
+ if (lastWarningOutputTime == 0
+ || (System.currentTimeMillis() - lastWarningOutputTime) > 60000) {
+ lastWarningOutputTime = System.currentTimeMillis();
+ LOG.warn("Waiting to acquire backup exclusive lock for "
+ + (lastWarningOutputTime - startTime) / 1000 + "s");
+ }
+ } else {
+ throw e;
+ }
+ }
+ }
+ throw new IOException(
+ "Failed to acquire backup system table exclusive lock after " + timeout / 1000 + "s");
}
/**
@@ -410,7 +441,7 @@ public class BackupManager implements Closeable {
}
public Pair
+ * 6. WALs recorded rowkey="wals:"+WAL unique file name; value = backupId and full WAL file
+ * name
+ *
+ *
*/
@InterfaceAudience.Private
public final class BackupSystemTable implements Closeable {
+
private static final Logger LOG = LoggerFactory.getLogger(BackupSystemTable.class);
static class WALItem {
@@ -128,10 +130,9 @@ public final class BackupSystemTable implements Closeable {
private TableName tableName;
/**
- * Backup System table name for bulk loaded files.
- * We keep all bulk loaded file references in a separate table
- * because we have to isolate general backup operations: create, merge etc
- * from activity of RegionObserver, which controls process of a bulk loading
+ * Backup System table name for bulk loaded files. We keep all bulk loaded file references in a
+ * separate table because we have to isolate general backup operations: create, merge etc from
+ * activity of RegionObserver, which controls process of a bulk loading
* {@link org.apache.hadoop.hbase.backup.BackupObserver}
*/
private TableName bulkLoadTableName;
@@ -198,13 +199,11 @@ public final class BackupSystemTable implements Closeable {
verifyNamespaceExists(admin);
Configuration conf = connection.getConfiguration();
if (!admin.tableExists(tableName)) {
- TableDescriptor backupHTD =
- BackupSystemTable.getSystemTableDescriptor(conf);
+ TableDescriptor backupHTD = BackupSystemTable.getSystemTableDescriptor(conf);
admin.createTable(backupHTD);
}
if (!admin.tableExists(bulkLoadTableName)) {
- TableDescriptor blHTD =
- BackupSystemTable.getSystemTableForBulkLoadedDataDescriptor(conf);
+ TableDescriptor blHTD = BackupSystemTable.getSystemTableForBulkLoadedDataDescriptor(conf);
admin.createTable(blHTD);
}
waitForSystemTable(admin, tableName);
@@ -237,11 +236,11 @@ public final class BackupSystemTable implements Closeable {
} catch (InterruptedException e) {
}
if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) {
- throw new IOException("Failed to create backup system table "+
- tableName +" after " + TIMEOUT + "ms");
+ throw new IOException(
+ "Failed to create backup system table " + tableName + " after " + TIMEOUT + "ms");
}
}
- LOG.debug("Backup table "+tableName+" exists and available");
+ LOG.debug("Backup table " + tableName + " exists and available");
}
@Override
@@ -257,7 +256,7 @@ public final class BackupSystemTable implements Closeable {
public void updateBackupInfo(BackupInfo info) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("update backup status in backup system table for: " + info.getBackupId()
- + " set status=" + info.getState());
+ + " set status=" + info.getState());
}
try (Table table = connection.getTable(tableName)) {
Put put = createPutForBackupInfo(info);
@@ -344,7 +343,6 @@ public final class BackupSystemTable implements Closeable {
}
}
-
/**
* Deletes backup status from backup system table table
* @param backupId backup id
@@ -370,7 +368,7 @@ public final class BackupSystemTable implements Closeable {
Map> finalPaths) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("write bulk load descriptor to backup " + tabName + " with " + finalPaths.size()
- + " entries");
+ + " entries");
}
try (Table table = connection.getTable(bulkLoadTableName)) {
List puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region, finalPaths);
@@ -389,8 +387,8 @@ public final class BackupSystemTable implements Closeable {
public void writeFilesForBulkLoadPreCommit(TableName tabName, byte[] region, final byte[] family,
final List> pairs) throws IOException {
if (LOG.isDebugEnabled()) {
- LOG.debug("write bulk load descriptor to backup " + tabName + " with " + pairs.size()
- + " entries");
+ LOG.debug(
+ "write bulk load descriptor to backup " + tabName + " with " + pairs.size() + " entries");
}
try (Table table = connection.getTable(bulkLoadTableName)) {
List puts =
@@ -425,7 +423,8 @@ public final class BackupSystemTable implements Closeable {
* whether the hfile was recorded by preCommitStoreFile hook (true)
*/
public Pair