From c23173809f72961f91e0ac566b494d86dda33ce9 Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Tue, 26 Dec 2017 17:38:47 +0800 Subject: [PATCH] HBASE-19630 Add peer cluster key check when add new replication peer --- .../master/replication/ReplicationPeerManager.java | 69 ++++++++++------------ .../client/replication/TestReplicationAdmin.java | 23 ++++++++ 2 files changed, 55 insertions(+), 37 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index 291980e..9dd1a4a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.master.replication; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -45,6 +46,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; @@ -151,21 +153,6 @@ public final class ReplicationPeerManager { } } - private ReplicationPeerConfig copy(ReplicationPeerConfig peerConfig) { - ReplicationPeerConfig copiedPeerConfig = new ReplicationPeerConfig(); - copiedPeerConfig.getConfiguration().putAll(peerConfig.getConfiguration()); - copiedPeerConfig.getPeerData().putAll(peerConfig.getPeerData()); - copiedPeerConfig.setTableCFsMap(peerConfig.getTableCFsMap()); - copiedPeerConfig.setNamespaces(peerConfig.getNamespaces()); - copiedPeerConfig.setExcludeTableCFsMap(peerConfig.getExcludeTableCFsMap()); - copiedPeerConfig.setExcludeNamespaces(peerConfig.getExcludeNamespaces()); - copiedPeerConfig.setBandwidth(peerConfig.getBandwidth()); - copiedPeerConfig.setReplicateAllUserTables(peerConfig.replicateAllUserTables()); - copiedPeerConfig.setClusterKey(peerConfig.getClusterKey()); - copiedPeerConfig.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl()); - return copiedPeerConfig; - } - public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) throws ReplicationException { if (peers.containsKey(peerId)) { @@ -237,36 +224,36 @@ public final class ReplicationPeerManager { return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty(); } - /** - * If replicate_all flag is true, it means all user tables will be replicated to peer cluster. - * Then allow config exclude namespaces or exclude table-cfs which can't be replicated to peer - * cluster. - *

- * If replicate_all flag is false, it means all user tables can't be replicated to peer cluster. - * Then allow to config namespaces or table-cfs which will be replicated to peer cluster. - */ - private static void checkPeerConfig(ReplicationPeerConfig peerConfig) - throws DoNotRetryIOException { + private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException { + checkClusterKey(peerConfig.getClusterKey()); + if (peerConfig.replicateAllUserTables()) { - if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) || - (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) { - throw new DoNotRetryIOException("Need clean namespaces or table-cfs config firstly " + - "when you want replicate all cluster"); + // If replicate_all flag is true, it means all user tables will be replicated to peer cluster. + // Then allow config exclude namespaces or exclude table-cfs which can't be replicated to peer + // cluster. + if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) + || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) { + throw new DoNotRetryIOException("Need clean namespaces or table-cfs config firstly " + + "when you want replicate all cluster"); } checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(), peerConfig.getExcludeTableCFsMap()); } else { - if ((peerConfig.getExcludeNamespaces() != null && - !peerConfig.getExcludeNamespaces().isEmpty()) || - (peerConfig.getExcludeTableCFsMap() != null && - !peerConfig.getExcludeTableCFsMap().isEmpty())) { + // If replicate_all flag is false, it means all user tables can't be replicated to peer + // cluster. Then allow to config namespaces or table-cfs which will be replicated to peer + // cluster. + if ((peerConfig.getExcludeNamespaces() != null + && !peerConfig.getExcludeNamespaces().isEmpty()) + || (peerConfig.getExcludeTableCFsMap() != null + && !peerConfig.getExcludeTableCFsMap().isEmpty())) { throw new DoNotRetryIOException( - "Need clean exclude-namespaces or exclude-table-cfs config firstly" + - " when replicate_all flag is false"); + "Need clean exclude-namespaces or exclude-table-cfs config firstly" + + " when replicate_all flag is false"); } checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(), peerConfig.getTableCFsMap()); } + checkConfiguredWALEntryFilters(peerConfig); } @@ -289,7 +276,7 @@ public final class ReplicationPeerManager { * exclude namespace. * */ - private static void checkNamespacesAndTableCfsConfigConflict(Set namespaces, + private void checkNamespacesAndTableCfsConfigConflict(Set namespaces, Map> tableCfs) throws DoNotRetryIOException { if (namespaces == null || namespaces.isEmpty()) { return; @@ -306,7 +293,7 @@ public final class ReplicationPeerManager { } } - private static void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig) + private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException { String filterCSV = peerConfig.getConfiguration() .get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY); @@ -323,6 +310,14 @@ public final class ReplicationPeerManager { } } + private void checkClusterKey(String clusterKey) throws DoNotRetryIOException { + try { + ZKConfig.validateClusterKey(clusterKey); + } catch (IOException e) { + throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey, e); + } + } + public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf) throws ReplicationException { ReplicationPeerStorage peerStorage = diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index 328f6d0..6fe96df 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; @@ -111,6 +112,28 @@ public class TestReplicationAdmin { } } + @Test + public void testAddInvalidPeer() { + ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); + builder.setClusterKey(KEY_ONE); + try { + String invalidPeerId = "1-2"; + hbaseAdmin.addReplicationPeer(invalidPeerId, builder.build()); + fail("Should fail as the peer id: " + invalidPeerId + " is invalid"); + } catch (Exception e) { + // OK + } + + try { + String invalidClusterKey = "2181:/hbase"; + builder.setClusterKey(invalidClusterKey); + hbaseAdmin.addReplicationPeer(ID_ONE, builder.build()); + fail("Should fail as the peer cluster key: " + invalidClusterKey + " is invalid"); + } catch (Exception e) { + // OK + } + } + /** * Simple testing of adding and removing peers, basically shows that * all interactions with ZK work -- 1.9.1