diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 8df7bd89a..1a91134a0 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1247,6 +1247,10 @@ public final class HConstants { public static final String REPLICATION_SOURCE_MAXTHREADS_KEY = "hbase.replication.source.maxthreads"; + /** Maximum number of threads used by the replication source for shipping edits to the sinks */ + public static final String REPLICATION_CONTINUE_ON_DELETED_TABLE_KEY = + "hbase.replication.continue.on.deleted.tables"; + public static final int REPLICATION_SOURCE_MAXTHREADS_DEFAULT = 10; /** Config for pluggable consensus provider */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index cf85ffd7b..d7757be34 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -43,6 +43,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.HConnection; @@ -102,6 +103,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi private Path hfileArchiveDir; private boolean replicationBulkLoadDataEnabled; private Abortable abortable; + private boolean continueOnDeletedTables; @Override public void init(Context context) throws IOException { @@ -139,6 +141,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi // conservative for now. this.replicationRpcLimit = (int)(0.95 * (double)conf.getLong(RpcServer.MAX_REQUEST_SIZE, RpcServer.DEFAULT_MAX_REQUEST_SIZE)); + this.continueOnDeletedTables = + this.conf.getBoolean(HConstants.REPLICATION_CONTINUE_ON_DELETED_TABLE_KEY, false); this.replicationBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, @@ -225,6 +229,21 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi return entryLists; } + // Filter a set of batches by TableName + private List> filterBatches(final List> oldEntryList, TableName table) { + List> entryLists = new ArrayList<>(); + for (List entries : oldEntryList) { + ArrayList thisList = new ArrayList(entries.size()); + entryLists.add(thisList); + for (Entry e : entries) { + if (!e.getKey().getTablename().equals(table)) { + thisList.add(e); + } + } + } + return entryLists; + } + private void reconnectToPeerCluster() { HConnection connection = null; try { @@ -325,7 +344,30 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi ioe = ((RemoteException) ioe).unwrapRemoteException(); LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe); if (ioe instanceof TableNotFoundException) { - if (sleepForRetries("A table is missing in the peer cluster. " + if (continueOnDeletedTables) { + try { + // this is dangerous, but cannot change how TNFE is serialized + // at least check whether the table name is legal + TableName table = TableName.valueOf( + TableName.isLegalFullyQualifiedTableName(Bytes.toBytes(ioe.getMessage()))); + if (!conn.getAdmin().tableExists(table)) { + // Would potentially be better to retry in one of the outer loops + // and add a table filter there; but that would break the encapsulation, + // so we're doing the filtering here. + filterBatches(batches, table); + } + } catch (IOException ignore) { + if (sleepForRetries("Could not check missing table. " + + "Replication cannot proceed without losing data.", sleepMultiplier)) { + sleepMultiplier++; + } + } catch (IllegalArgumentException ignore) { + if (sleepForRetries("Invalid table name. " + + "Replication cannot proceed without losing data.", sleepMultiplier)) { + sleepMultiplier++; + } + } + } else if (sleepForRetries("A table is missing in the peer cluster. " + "Replication cannot proceed without losing data.", sleepMultiplier)) { sleepMultiplier++; }