diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java index ff414be977..cb006756f8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client; +import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; import java.util.Collection; @@ -49,6 +50,14 @@ extends RetriesExhaustedException { List actions; List hostnameAndPort; + public RetriesExhaustedWithDetailsException(final String msg) { + super(msg); + } + + public RetriesExhaustedWithDetailsException(final String msg, final IOException e) { + super(msg, e); + } + public RetriesExhaustedWithDetailsException(List exceptions, List actions, List hostnameAndPort) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index 8950311ebe..3aa4e1c819 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -253,6 +253,21 @@ public final class ProtobufUtil { } /** + * Return the Exception thrown by the remote server wrapped in + * ServiceException as cause. RemoteException are left untouched. + * + * @param se ServiceException that wraps IO exception thrown by the server + * @return Exception wrapped in ServiceException. + */ + public static IOException getServiceException(org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException e) { + Throwable t = e.getCause(); + if (ExceptionUtil.isInterrupt(t)) { + return ExceptionUtil.asInterrupt(t); + } + return t instanceof IOException ? (IOException) t : new HBaseIOException(t); + } + + /** * Like {@link #getRemoteException(ServiceException)} but more generic, able to handle more than * just {@link ServiceException}. Prefer this method to * {@link #getRemoteException(ServiceException)} because trying to diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index cc9fc57c05..6d3074ee43 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1200,6 +1200,10 @@ public final class HConstants { public static final String REPLICATION_SOURCE_MAXTHREADS_KEY = "hbase.replication.source.maxthreads"; + /** Maximum number of threads used by the replication source for shipping edits to the sinks */ + public static final String REPLICATION_DROP_ON_DELETED_TABLE_KEY = + "hbase.replication.drop.on.deleted.table"; + public static final int REPLICATION_SOURCE_MAXTHREADS_DEFAULT = 10; /** Configuration key for SplitLog manager timeout */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java index dbf7b5e386..af9690a659 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java @@ -69,7 +69,7 @@ public class ReplicationProtbufUtil { try { admin.replicateWALEntry(controller, p.getFirst()); } catch (org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException e) { - throw ProtobufUtil.handleRemoteException(e); + throw ProtobufUtil.getServiceException(e); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java index ffdba34713..401da4c2cf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java @@ -53,6 +53,7 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener { @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) class Context { + private final Configuration localConf; private final Configuration conf; private final FileSystem fs; private final TableDescriptors tableDescriptors; @@ -64,6 +65,7 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener { @InterfaceAudience.Private public Context( + final Configuration localConf, final Configuration conf, final FileSystem fs, final String peerId, @@ -72,6 +74,7 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener { final MetricsSource metrics, final TableDescriptors tableDescriptors, final Abortable abortable) { + this.localConf = localConf; this.conf = conf; this.fs = fs; this.clusterId = clusterId; @@ -84,6 +87,9 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener { public Configuration getConfiguration() { return conf; } + public Configuration getLocalConfiguration() { + return localConf; + } public FileSystem getFilesystem() { return fs; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index a210aaeb87..c1ed64413d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -36,6 +36,8 @@ import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; @@ -45,9 +47,11 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; @@ -79,6 +83,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2; private ClusterConnection conn; + private Configuration localConf; private Configuration conf; // How long should we sleep for each retry private long sleepForRetries; @@ -102,11 +107,13 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi private Path hfileArchiveDir; private boolean replicationBulkLoadDataEnabled; private Abortable abortable; + private boolean dropOnDeletedTables; @Override public void init(Context context) throws IOException { super.init(context); this.conf = HBaseConfiguration.create(ctx.getConfiguration()); + this.localConf = HBaseConfiguration.create(ctx.getLocalConfiguration()); decorateConf(); this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier", @@ -139,6 +146,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi // conservative for now. this.replicationRpcLimit = (int)(0.95 * (double)conf.getLong(RpcServer.MAX_REQUEST_SIZE, RpcServer.DEFAULT_MAX_REQUEST_SIZE)); + this.dropOnDeletedTables = + this.conf.getBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false); this.replicationBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, @@ -225,6 +234,37 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi return entryLists; } + private TableName parseTable(String msg) { + // ... TableNotFoundException: ''/n... + Pattern p = Pattern.compile("TableNotFoundException: \\'([\\S]*)\\'"); + Matcher m = p.matcher(msg); + if (m.find()) { + String table = m.group(1); + try { + // double check that table is a valid table name + TableName.valueOf(TableName.isLegalFullyQualifiedTableName(Bytes.toBytes(table))); + return TableName.valueOf(table); + } catch (IllegalArgumentException ignore) { + } + } + return null; + } + + // Filter a set of batches by TableName + private List> filterBatches(final List> oldEntryList, TableName table) { + List> entryLists = new ArrayList<>(); + for (List entries : oldEntryList) { + ArrayList thisList = new ArrayList(entries.size()); + entryLists.add(thisList); + for (Entry e : entries) { + if (!e.getKey().getTablename().equals(table)) { + thisList.add(e); + } + } + } + return entryLists; + } + private void reconnectToPeerCluster() { ClusterConnection connection = null; try { @@ -325,10 +365,27 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi ioe = ((RemoteException) ioe).unwrapRemoteException(); LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe); if (ioe instanceof TableNotFoundException) { - if (sleepForRetries("A table is missing in the peer cluster. " - + "Replication cannot proceed without losing data.", sleepMultiplier)) { - sleepMultiplier++; + if (dropOnDeletedTables) { + // this is a bit fragile, but cannot change how TNFE is serialized + // at least check whether the table name is legal + TableName table = parseTable(ioe.getMessage()); + if (table != null) { + try (Connection localConn = + ConnectionFactory.createConnection(ctx.getLocalConfiguration())) { + if (!localConn.getAdmin().tableExists(table)) { + // Would potentially be better to retry in one of the outer loops + // and add a table filter there; but that would break the encapsulation, + // so we're doing the filtering here. + LOG.info("Missing table detected at sink, local table also does not exist, filtering edits for '"+table+"'"); + batches = filterBatches(batches, table); + continue; + } + } catch (IOException iox) { + LOG.warn("Exception checking for local table: ", iox); + } + } } + // fall through and sleep below } else { LOG.warn("Peer encountered RemoteException, rechecking all sinks: ", ioe); replicationSinkMgr.chooseSinks(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 05b1f218b1..2f9f9c5c9e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -42,12 +42,14 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; @@ -372,6 +374,12 @@ public class ReplicationSink { for (List rows : allRows) { table.batch(rows, null); } + } catch (RetriesExhaustedWithDetailsException rewde) { + for (Throwable ex : rewde.getCauses()) { + if (ex instanceof TableNotFoundException) { + throw new TableNotFoundException("'"+tableName+"'"); + } + } } catch (InterruptedException ix) { throw (InterruptedIOException) new InterruptedIOException().initCause(ix); } finally { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 609274f326..45d7d94c23 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -507,7 +507,7 @@ public class ReplicationSourceManager implements ReplicationListener { replicationEndpoint, walFileLengthProvider, metrics); // init replication endpoint - replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(), + replicationEndpoint.init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors, server)); return src;