diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java index 21ab156..e78f810 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.util.Bytes; +import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; import java.util.Collection; @@ -51,6 +52,14 @@ extends RetriesExhaustedException { List actions; List hostnameAndPort; + public RetriesExhaustedWithDetailsException(final String msg) { + super(msg); + } + + public RetriesExhaustedWithDetailsException(final String msg, final IOException e) { + super(msg, e); + } + public RetriesExhaustedWithDetailsException(List exceptions, List actions, List hostnameAndPort) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index 5945e5e..84d4a67 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -331,6 +331,24 @@ public final class ProtobufUtil { } /** + * Return the Exception thrown by the remote server wrapped in + * ServiceException as cause. RemoteException are left untouched. + * + * @param se ServiceException that wraps IO exception thrown by the server + * @return Exception wrapped in ServiceException. + */ + public static IOException getServiceException(ServiceException e) { + Throwable t = e; + if (e instanceof ServiceException) { + t = e.getCause(); + } + if (ExceptionUtil.isInterrupt(t)) { + return ExceptionUtil.asInterrupt(t); + } + return t instanceof IOException ? (IOException) t : new HBaseIOException(t); + } + + /** * Like {@link #getRemoteException(ServiceException)} but more generic, able to handle more than * just {@link ServiceException}. Prefer this method to * {@link #getRemoteException(ServiceException)} because trying to diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 8df7bd8..9dc1acb 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1247,6 +1247,10 @@ public final class HConstants { public static final String REPLICATION_SOURCE_MAXTHREADS_KEY = "hbase.replication.source.maxthreads"; + /** Maximum number of threads used by the replication source for shipping edits to the sinks */ + public static final String REPLICATION_DROP_ON_DELETED_TABLE_KEY = + "hbase.replication.drop.on.deleted.table"; + public static final int REPLICATION_SOURCE_MAXTHREADS_DEFAULT = 10; /** Config for pluggable consensus provider */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java index d13a79c..6243511 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java @@ -71,7 +71,7 @@ public class ReplicationProtbufUtil { try { admin.replicateWALEntry(controller, p.getFirst()); } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); + throw ProtobufUtil.getServiceException(se); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index cf85ffd..da3a1e4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -43,6 +43,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.HConnection; @@ -102,6 +103,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi private Path hfileArchiveDir; private boolean replicationBulkLoadDataEnabled; private Abortable abortable; + private boolean dropOnDeletedTables; @Override public void init(Context context) throws IOException { @@ -139,6 +141,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi // conservative for now. this.replicationRpcLimit = (int)(0.95 * (double)conf.getLong(RpcServer.MAX_REQUEST_SIZE, RpcServer.DEFAULT_MAX_REQUEST_SIZE)); + this.dropOnDeletedTables = + this.conf.getBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false); this.replicationBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, @@ -225,6 +229,21 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi return entryLists; } + // Filter a set of batches by TableName + private List> filterBatches(final List> oldEntryList, TableName table) { + List> entryLists = new ArrayList<>(); + for (List entries : oldEntryList) { + ArrayList thisList = new ArrayList(entries.size()); + entryLists.add(thisList); + for (Entry e : entries) { + if (!e.getKey().getTablename().equals(table)) { + thisList.add(e); + } + } + } + return entryLists; + } + private void reconnectToPeerCluster() { HConnection connection = null; try { @@ -325,7 +344,31 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi ioe = ((RemoteException) ioe).unwrapRemoteException(); LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe); if (ioe instanceof TableNotFoundException) { - if (sleepForRetries("A table is missing in the peer cluster. " + if (dropOnDeletedTables) { + try { + // this is dangerous, but cannot change how TNFE is serialized + // at least check whether the table name is legal + TableName table = TableName.valueOf( + TableName.isLegalFullyQualifiedTableName(Bytes.toBytes(ioe.getMessage()))); + if (!conn.getAdmin().tableExists(table)) { + // Would potentially be better to retry in one of the outer loops + // and add a table filter there; but that would break the encapsulation, + // so we're doing the filtering here. + LOG.info("Missing table detectd at sink, local table also does not exist, filtering edits for "+table); + filterBatches(batches, table); + } + } catch (IOException ignore) { + if (sleepForRetries("Could not check missing table. " + + "Replication cannot proceed without losing data.", sleepMultiplier)) { + sleepMultiplier++; + } + } catch (IllegalArgumentException ignore) { + if (sleepForRetries("Invalid table name. " + + "Replication cannot proceed without losing data.", sleepMultiplier)) { + sleepMultiplier++; + } + } + } else if (sleepForRetries("A table is missing in the peer cluster. " + "Replication cannot proceed without losing data.", sleepMultiplier)) { sleepMultiplier++; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java index e69de29..a65cc44 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java @@ -0,0 +1,157 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.fail; + +import java.util.ArrayList; +import java.util.List; + +@Category(LargeTests.class) +public class TestReplicationDroppedTables extends TestReplicationBase { + private static final Log LOG = LogFactory.getLog(TestReplicationDroppedTables.class); + + @Test(timeout = 600000) + public void testEditsStuckBehindDroppedTable() throws Exception { + // Make sure by default edits for dropped tables stall the replication queue, even when the + // table(s) in question have been deleted on both ends. + testEditsBehindDroppedTable(true); + + // Make sure by default edits for dropped tables stall the replication queue, even when the + // table(s) in question have been deleted on both ends. + testEditsBehindDroppedTable(false); + } + + private void testEditsBehindDroppedTable(boolean allowProceeding) throws Exception { + conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, allowProceeding); + conf1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1); + + // make sure we have a single region server only, so that all + // edits for all tables go there + utility1.shutdownMiniHBaseCluster(); + utility1.startMiniHBaseCluster(1, 1); + + TableName tablename = TableName.valueOf("test_dropped"); + byte[] familyname = Bytes.toBytes("fam"); + byte[] row = Bytes.toBytes("row"); + + HTableDescriptor table = new HTableDescriptor(tablename); + HColumnDescriptor fam = new HColumnDescriptor(familyname); + fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); + table.addFamily(fam); + + Connection connection1 = ConnectionFactory.createConnection(conf1); + Connection connection2 = ConnectionFactory.createConnection(conf2); + try (Admin admin1 = connection1.getAdmin()) { + admin1.createTable(table); + } + try (Admin admin2 = connection2.getAdmin()) { + admin2.createTable(table); + } + utility1.waitUntilAllRegionsAssigned(tablename); + utility2.waitUntilAllRegionsAssigned(tablename); + + Table lHtable1 = utility1.getConnection().getTable(tablename); + + // now suspend replication + admin.disablePeer(PEER_ID); + + // put some data (lead with 0 so the edit gets sorted before the other table's edits + // in the replication batch) + // write a bunch of edits, making sure we fill a batch + List puts = new ArrayList<>(1000); + for (int i=0; i<1000; i++) { + byte[] rowkey = Bytes.toBytes(i+" put on table to be dropped"); + Put put = new Put(rowkey); + put.addColumn(familyname, row, row); + puts.add(put); + } + lHtable1.put(puts); + lHtable1.close(); + + byte[] rowkey = Bytes.toBytes("normal put"); + Put put = new Put(rowkey); + put.addColumn(famName, row, row); + htable1.put(put); + + try (Admin admin1 = connection1.getAdmin()) { + admin1.disableTable(tablename); + admin1.deleteTable(tablename); + } + try (Admin admin2 = connection2.getAdmin()) { + admin2.disableTable(tablename); + admin2.deleteTable(tablename); + } + + // wait for the sleep interval of the master cluster to become long + Thread.sleep(SLEEP_TIME * NB_RETRIES); + + admin.enablePeer(PEER_ID); + if (allowProceeding) { + // in this we'd expect the key to make it over + Get get = new Get(rowkey); + for (int i = 0; i < NB_RETRIES; i++) { + if (i==NB_RETRIES-1) { + fail("Waited too much time for put replication"); + } + Result res = htable2.get(get); + if (res.size() == 0) { + LOG.info("Row not available"); + Thread.sleep(SLEEP_TIME); + } else { + assertArrayEquals(res.value(), rowkey); + break; + } + } + } else { + // verify that the second key is blocked + Get get = new Get(rowkey); + for (int i = 0; i < NB_RETRIES; i++) { + Result res = htable2.get(get); + if (res.size() >= 1) { + fail("Edit should have been stuck behind dropped tables"); + } else { + LOG.info("Row not replicated, let's wait a bit more..."); + Thread.sleep(SLEEP_TIME); + } + } + } + // just to be safe + conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false); + } +}