From 80f49cc7ee907f779c7c8cb1cb5e68cabe19ea4f Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Thu, 2 Nov 2017 10:59:13 -0700 Subject: [PATCH] HBASE-12091 Optionally ignore edits for dropped tables for replication (Lars Hofhansl) --- .../RetriesExhaustedWithDetailsException.java | 9 ++ .../apache/hadoop/hbase/protobuf/ProtobufUtil.java | 19 +++ .../java/org/apache/hadoop/hbase/HConstants.java | 4 + .../hbase/protobuf/ReplicationProtbufUtil.java | 2 +- .../HBaseInterClusterReplicationEndpoint.java | 60 +++++++- .../replication/TestReplicationDroppedTables.java | 170 +++++++++++++++++++++ 6 files changed, 258 insertions(+), 6 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java index ff414be977..cb006756f8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client; +import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; import java.util.Collection; @@ -49,6 +50,14 @@ extends RetriesExhaustedException { List actions; List hostnameAndPort; + public RetriesExhaustedWithDetailsException(final String msg) { + super(msg); + } + + public RetriesExhaustedWithDetailsException(final String msg, final IOException e) { + super(msg, e); + } + public RetriesExhaustedWithDetailsException(List exceptions, List actions, List hostnameAndPort) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index a45629f4b3..1e76900bd3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -254,6 +254,25 @@ public final class ProtobufUtil { } /** + * Return the Exception thrown by the remote server wrapped in + * ServiceException as cause. RemoteException are left untouched. + * + * @param se ServiceException that wraps IO exception thrown by the server + * @return Exception wrapped in ServiceException. + */ + public static IOException getServiceException(Exception e) { + Throwable t = e; + if (e instanceof ServiceException || + e instanceof org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException) { + t = e.getCause(); + } + if (ExceptionUtil.isInterrupt(t)) { + return ExceptionUtil.asInterrupt(t); + } + return t instanceof IOException ? (IOException) t : new HBaseIOException(t); + } + + /** * Like {@link #getRemoteException(ServiceException)} but more generic, able to handle more than * just {@link ServiceException}. Prefer this method to * {@link #getRemoteException(ServiceException)} because trying to diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index e1368a28ae..668e3bb49c 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1237,6 +1237,10 @@ public final class HConstants { public static final String REPLICATION_SOURCE_MAXTHREADS_KEY = "hbase.replication.source.maxthreads"; + /** Maximum number of threads used by the replication source for shipping edits to the sinks */ + public static final String REPLICATION_DROP_ON_DELETED_TABLE_KEY = + "hbase.replication.drop.on.deleted.table"; + public static final int REPLICATION_SOURCE_MAXTHREADS_DEFAULT = 10; /** Configuration key for SplitLog manager timeout */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java index dbf7b5e386..af9690a659 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java @@ -69,7 +69,7 @@ public class ReplicationProtbufUtil { try { admin.replicateWALEntry(controller, p.getFirst()); } catch (org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException e) { - throw ProtobufUtil.handleRemoteException(e); + throw ProtobufUtil.getServiceException(e); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index a210aaeb87..bd3379024d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -36,6 +36,8 @@ import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; @@ -45,10 +47,11 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.TableName; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface; @@ -102,6 +105,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi private Path hfileArchiveDir; private boolean replicationBulkLoadDataEnabled; private Abortable abortable; + private boolean dropOnDeletedTables; @Override public void init(Context context) throws IOException { @@ -139,6 +143,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi // conservative for now. this.replicationRpcLimit = (int)(0.95 * (double)conf.getLong(RpcServer.MAX_REQUEST_SIZE, RpcServer.DEFAULT_MAX_REQUEST_SIZE)); + this.dropOnDeletedTables = + this.conf.getBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false); this.replicationBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, @@ -225,6 +231,37 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi return entryLists; } + private TableName parseTable(String msg) { + // ... Table '' was not found ... + Pattern p = Pattern.compile("Table \\'([\\S]*)\\' was not found"); + Matcher m = p.matcher(msg); + if (m.find()) { + String table = m.group(1); + try { + // double check that table is a valid table name + TableName.valueOf(TableName.isLegalFullyQualifiedTableName(Bytes.toBytes(table))); + return TableName.valueOf(table); + } catch (IllegalArgumentException ignore) { + } + } + return null; + } + + // Filter a set of batches by TableName + private List> filterBatches(final List> oldEntryList, TableName table) { + List> entryLists = new ArrayList<>(); + for (List entries : oldEntryList) { + ArrayList thisList = new ArrayList(entries.size()); + entryLists.add(thisList); + for (Entry e : entries) { + if (!e.getKey().getTablename().equals(table)) { + thisList.add(e); + } + } + } + return entryLists; + } + private void reconnectToPeerCluster() { ClusterConnection connection = null; try { @@ -324,10 +361,23 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi if (ioe instanceof RemoteException) { ioe = ((RemoteException) ioe).unwrapRemoteException(); LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe); - if (ioe instanceof TableNotFoundException) { - if (sleepForRetries("A table is missing in the peer cluster. " - + "Replication cannot proceed without losing data.", sleepMultiplier)) { - sleepMultiplier++; + if (ioe instanceof RetriesExhaustedWithDetailsException) { + if (dropOnDeletedTables) { + try { + // this is dangerous, but cannot change how TNFE is serialized + // at least check whether the table name is legal + TableName table = parseTable(ioe.getMessage()); + if (table != null && !conn.getAdmin().tableExists(table)) { + // Would potentially be better to retry in one of the outer loops + // and add a table filter there; but that would break the encapsulation, + // so we're doing the filtering here. + LOG.info("Missing table detected at sink, local table also does not exist, filtering edits for '"+table+"'"); + batches = filterBatches(batches, table); + continue; + } + } catch (IOException ignore) { + } + // fall through and sleep below } } else { LOG.warn("Peer encountered RemoteException, rechecking all sinks: ", ioe); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java new file mode 100644 index 0000000000..33032956d7 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java @@ -0,0 +1,170 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.fail; + +import java.util.ArrayList; +import java.util.List; + +@Category(LargeTests.class) +public class TestReplicationDroppedTables extends TestReplicationBase { + private static final Log LOG = LogFactory.getLog(TestReplicationDroppedTables.class); + + @Test(timeout = 600000) + public void testEditsStuckBehindDroppedTable() throws Exception { + // Sanity check + // Make sure by default edits for dropped tables stall the replication queue, even when the + // table(s) in question have been deleted on both ends. + testEditsBehindDroppedTable(false, "test_dropped"); + + // Make sure by default edits for dropped tables stall the replication queue, even when the + // table(s) in question have been deleted on both ends. + testEditsBehindDroppedTable(true, "test_dropped"); + + // also try with a namespace + Connection connection1 = ConnectionFactory.createConnection(conf1); + try (Admin admin1 = connection1.getAdmin()) { + admin1.createNamespace(NamespaceDescriptor.create("NS").build()); + } + Connection connection2 = ConnectionFactory.createConnection(conf2); + try (Admin admin2 = connection2.getAdmin()) { + admin2.createNamespace(NamespaceDescriptor.create("NS").build()); + } + testEditsBehindDroppedTable(true, "NS:test_dropped"); + } + + private void testEditsBehindDroppedTable(boolean allowProceeding, String tName) throws Exception { + conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, allowProceeding); + conf1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1); + + // make sure we have a single region server only, so that all + // edits for all tables go there + utility1.shutdownMiniHBaseCluster(); + utility1.startMiniHBaseCluster(1, 1); + + TableName tablename = TableName.valueOf(tName); + byte[] familyname = Bytes.toBytes("fam"); + byte[] row = Bytes.toBytes("row"); + + HTableDescriptor table = new HTableDescriptor(tablename); + HColumnDescriptor fam = new HColumnDescriptor(familyname); + fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); + table.addFamily(fam); + + Connection connection1 = ConnectionFactory.createConnection(conf1); + Connection connection2 = ConnectionFactory.createConnection(conf2); + try (Admin admin1 = connection1.getAdmin()) { + admin1.createTable(table); + } + try (Admin admin2 = connection2.getAdmin()) { + admin2.createTable(table); + } + utility1.waitUntilAllRegionsAssigned(tablename); + utility2.waitUntilAllRegionsAssigned(tablename); + + Table lHtable1 = utility1.getConnection().getTable(tablename); + + // now suspend replication + admin.disablePeer("2"); + + // put some data (lead with 0 so the edit gets sorted before the other table's edits + // in the replication batch) + // write a bunch of edits, making sure we fill a batch + List puts = new ArrayList<>(1000); + for (int i=0; i<1000; i++) { + byte[] rowkey = Bytes.toBytes(i+" put on table to be dropped"); + Put put = new Put(rowkey); + put.addColumn(familyname, row, row); + puts.add(put); + } + lHtable1.put(puts); + lHtable1.close(); + + byte[] rowkey = Bytes.toBytes("normal put"); + Put put = new Put(rowkey); + put.addColumn(famName, row, row); + htable1.put(put); + + try (Admin admin1 = connection1.getAdmin()) { + admin1.disableTable(tablename); + admin1.deleteTable(tablename); + } + try (Admin admin2 = connection2.getAdmin()) { + admin2.disableTable(tablename); + admin2.deleteTable(tablename); + } + + // wait for the sleep interval of the master cluster to become long + Thread.sleep(SLEEP_TIME * NB_RETRIES); + + admin.enablePeer("2"); + if (allowProceeding) { + // in this we'd expect the key to make it over + Get get = new Get(rowkey); + for (int i = 0; i < NB_RETRIES; i++) { + if (i==NB_RETRIES-1) { + fail("Waited too much time for put replication"); + } + Result res = htable2.get(get); + if (res.size() == 0) { + LOG.info("Row not available"); + Thread.sleep(SLEEP_TIME); + } else { + assertArrayEquals(res.getRow(), rowkey); + break; + } + } + } else { + // verify that the second key is blocked + Get get = new Get(rowkey); + for (int i = 0; i < NB_RETRIES; i++) { + Result res = htable2.get(get); + if (res.size() >= 1) { + fail("Edit should have been stuck behind dropped tables"); + } else { + LOG.info("Row not replicated, let's wait a bit more..."); + Thread.sleep(SLEEP_TIME); + } + } + } + // just to be safe + conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false); + } +} -- 2.13.4