From 5885d52aff9dc4ed6a8949f61eb2d01d873cb42d Mon Sep 17 00:00:00 2001 From: zhangduo Date: Fri, 8 Feb 2019 09:56:52 +0800 Subject: [PATCH] HBASE-21857 Do not need to check clusterKey if replicationEndpoint is provided when adding a peer --- .../ReplicationPeerConfigUtil.java | 6 +- .../src/main/protobuf/Replication.proto | 2 +- .../replication/ReplicationPeerManager.java | 24 +++++- .../client/TestAsyncReplicationAdminApi.java | 75 ++++++++++++++++--- 4 files changed, 91 insertions(+), 16 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java index 25d30f4923..fefeea6e17 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java @@ -329,9 +329,9 @@ public final class ReplicationPeerConfigUtil { public static ReplicationProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) { ReplicationProtos.ReplicationPeer.Builder builder = ReplicationProtos.ReplicationPeer.newBuilder(); - if (peerConfig.getClusterKey() != null) { - builder.setClusterkey(peerConfig.getClusterKey()); - } + // we used to set cluster key as required so here we must always set it, until we can make sure + // that no one uses the old proto file. + builder.setClusterkey(peerConfig.getClusterKey() != null ? peerConfig.getClusterKey() : ""); if (peerConfig.getReplicationEndpointImpl() != null) { builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl()); } diff --git a/hbase-protocol-shaded/src/main/protobuf/Replication.proto b/hbase-protocol-shaded/src/main/protobuf/Replication.proto index 61ba131564..6619c9694a 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Replication.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Replication.proto @@ -38,7 +38,7 @@ message TableCF { message ReplicationPeer { // clusterkey is the concatenation of the slave cluster's // hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent - required string clusterkey = 1; + optional string clusterkey = 1; optional string replicationEndpointImpl = 2; repeated BytesBytesPair data = 3; repeated NameStringPair configuration = 4; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index 80338e2740..fa26f415aa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; +import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; @@ -48,6 +49,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SyncReplicationState; +import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; @@ -334,7 +336,27 @@ public class ReplicationPeerManager { } private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException { - checkClusterKey(peerConfig.getClusterKey()); + String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl(); + boolean checkClusterKey = true; + if (!StringUtils.isBlank(replicationEndpointImpl)) { + // try creating a instance + ReplicationEndpoint endpoint; + try { + endpoint = Class.forName(replicationEndpointImpl) + .asSubclass(ReplicationEndpoint.class).getDeclaredConstructor().newInstance(); + } catch (Exception e) { + throw new DoNotRetryIOException( + "Can not instantiate configured replication endpoint class=" + replicationEndpointImpl, + e); + } + // do not check cluster key if we are not HBaseInterClusterReplicationEndpoint + if (!(endpoint instanceof HBaseInterClusterReplicationEndpoint)) { + checkClusterKey = false; + } + } + if (checkClusterKey) { + checkClusterKey(peerConfig.getClusterKey()); + } if (peerConfig.replicateAllUserTables()) { // If replicate_all flag is true, it means all user tables will be replicated to peer cluster. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java index b5a50c0b4d..431141cf76 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java @@ -23,7 +23,8 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; +import static org.hamcrest.CoreMatchers.*; import java.io.IOException; import java.util.ArrayList; @@ -33,6 +34,8 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; @@ -42,8 +45,11 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.replication.VerifyWALEntriesReplicationEndpoint; +import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.FutureUtils; import org.junit.After; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -56,12 +62,12 @@ import org.junit.runners.Parameterized; * Class to test asynchronous replication admin operations. */ @RunWith(Parameterized.class) -@Category({LargeTests.class, ClientTests.class}) +@Category({ LargeTests.class, ClientTests.class }) public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestAsyncReplicationAdminApi.class); + HBaseClassTestRule.forClass(TestAsyncReplicationAdminApi.class); private final String ID_ONE = "1"; private final String KEY_ONE = "127.0.0.1:2181:/hbase"; @@ -89,7 +95,7 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { } catch (Exception e) { } ReplicationQueueStorage queueStorage = ReplicationStorageFactory - .getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration()); + .getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration()); for (ServerName serverName : queueStorage.getListOfReplicators()) { for (String queue : queueStorage.getAllQueues(serverName)) { queueStorage.removeQueue(serverName, queue); @@ -186,8 +192,8 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { // append table t1 to replication tableCFs.put(tableName1, null); admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); - Map> result = admin.getReplicationPeerConfig(ID_ONE).get() - .getTableCFsMap(); + Map> result = + admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); assertEquals(1, result.size()); assertEquals(true, result.containsKey(tableName1)); assertNull(result.get(tableName1)); @@ -301,12 +307,13 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { tableCFs.clear(); tableCFs.put(tableName3, null); admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); - fail("Test case should fail as removing table-cfs from a peer whose table-cfs didn't contain t3"); + fail("Test case should fail as removing table-cfs from a peer whose" + + " table-cfs didn't contain t3"); } catch (CompletionException e) { assertTrue(e.getCause() instanceof ReplicationException); } - Map> result = admin.getReplicationPeerConfig(ID_ONE).get() - .getTableCFsMap(); + Map> result = + admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); assertEquals(2, result.size()); assertTrue("Should contain t1", result.containsKey(tableName1)); assertTrue("Should contain t2", result.containsKey(tableName2)); @@ -414,7 +421,8 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { rpc.setTableCFsMap(tableCfs); try { admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); - fail("Test case should fail, because table " + tableName1 + " conflict with namespace " + ns1); + fail( + "Test case should fail, because table " + tableName1 + " conflict with namespace " + ns1); } catch (CompletionException e) { // OK } @@ -430,7 +438,8 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { rpc.setNamespaces(namespaces); try { admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); - fail("Test case should fail, because namespace " + ns2 + " conflict with table " + tableName2); + fail( + "Test case should fail, because namespace " + ns2 + " conflict with table " + tableName2); } catch (CompletionException e) { // OK } @@ -453,4 +462,48 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { admin.removeReplicationPeer(ID_ONE).join(); } + + @Test + public void testInvalidClusterKey() throws InterruptedException { + try { + admin.addReplicationPeer(ID_ONE, + ReplicationPeerConfig.newBuilder().setClusterKey("whatever").build()).get(); + fail(); + } catch (ExecutionException e) { + assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class)); + } + } + + @Test + public void testInvalidReplicationEndpoint() throws InterruptedException { + try { + admin.addReplicationPeer(ID_ONE, + ReplicationPeerConfig.newBuilder().setReplicationEndpointImpl("whatever").build()).get(); + fail(); + } catch (ExecutionException e) { + assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class)); + assertThat(e.getCause().getMessage(), startsWith("Can not instantiate")); + } + } + + @Test + public void testSetReplicationEndpoint() throws InterruptedException, ExecutionException { + // make sure that we do not need to set cluster key when we use customized ReplicationEndpoint + admin + .addReplicationPeer(ID_ONE, + ReplicationPeerConfig.newBuilder() + .setReplicationEndpointImpl(VerifyWALEntriesReplicationEndpoint.class.getName()).build()) + .get(); + + // but we still need to check cluster key if we specify the default ReplicationEndpoint + try { + admin + .addReplicationPeer(ID_TWO, ReplicationPeerConfig.newBuilder() + .setReplicationEndpointImpl(HBaseInterClusterReplicationEndpoint.class.getName()).build()) + .get(); + fail(); + } catch (ExecutionException e) { + assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class)); + } + } } -- 2.17.1