From 2e47717e610e53a33a05fbe5648629ed6009981c Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Thu, 11 Jan 2018 15:31:15 +0800 Subject: [PATCH] HBASE-19078 Add peer cluster root directory config for synchronous replication --- .../replication/ReplicationPeerConfigUtil.java | 6 ++ .../hbase/replication/ReplicationPeerConfig.java | 20 ++++++- .../replication/ReplicationPeerConfigBuilder.java | 7 +++ .../src/main/protobuf/Replication.proto | 1 + .../master/replication/ReplicationPeerManager.java | 39 ++++++++---- .../client/replication/TestReplicationAdmin.java | 69 ++++++++++++++++++++++ .../src/main/ruby/hbase/replication_admin.rb | 5 ++ hbase-shell/src/main/ruby/hbase_constants.rb | 1 + .../src/main/ruby/shell/commands/add_peer.rb | 21 ++++++- .../src/test/ruby/hbase/replication_admin_test.rb | 16 +++++ 10 files changed, 172 insertions(+), 13 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java index a234a9b..22f9051 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java @@ -315,6 +315,9 @@ public final class ReplicationPeerConfigUtil { excludeNamespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet())); } + if (peer.hasRemoteClusterRootDir()) { + builder.setRemoteClusterRootDir(peer.getRemoteClusterRootDir()); + } return builder.build(); } @@ -371,6 +374,9 @@ public final class ReplicationPeerConfigUtil { } } + if (peerConfig.getRemoteClusterRootDir() != null) { + builder.setRemoteClusterRootDir(peerConfig.getRemoteClusterRootDir()); + } return builder.build(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java index bf8d030..9652e9e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java @@ -46,6 +46,8 @@ public class ReplicationPeerConfig { private Map> excludeTableCFsMap = null; private Set excludeNamespaces = null; private long bandwidth = 0; + // Used by synchronous replication + private String remoteClusterRootDir; private ReplicationPeerConfig(ReplicationPeerConfigBuilderImpl builder) { this.clusterKey = builder.clusterKey; @@ -64,6 +66,7 @@ public class ReplicationPeerConfig { builder.excludeNamespaces != null ? Collections.unmodifiableSet(builder.excludeNamespaces) : null; this.bandwidth = builder.bandwidth; + this.remoteClusterRootDir = builder.remoteClusterRootDir; } private Map> @@ -210,6 +213,10 @@ public class ReplicationPeerConfig { return this; } + public String getRemoteClusterRootDir() { + return this.remoteClusterRootDir; + } + public static ReplicationPeerConfigBuilder newBuilder() { return new ReplicationPeerConfigBuilderImpl(); } @@ -223,7 +230,7 @@ public class ReplicationPeerConfig { .setReplicateAllUserTables(peerConfig.replicateAllUserTables()) .setExcludeTableCFsMap(peerConfig.getExcludeTableCFsMap()) .setExcludeNamespaces(peerConfig.getExcludeNamespaces()) - .setBandwidth(peerConfig.getBandwidth()); + .setBandwidth(peerConfig.getBandwidth()).setRemoteClusterRootDir(peerConfig.getRemoteClusterRootDir()); return builder; } @@ -250,6 +257,8 @@ public class ReplicationPeerConfig { private long bandwidth = 0; + private String remoteClusterRootDir = null; + @Override public ReplicationPeerConfigBuilder setClusterKey(String clusterKey) { this.clusterKey = clusterKey; @@ -313,6 +322,12 @@ public class ReplicationPeerConfig { } @Override + public ReplicationPeerConfigBuilder setRemoteClusterRootDir(String rootDir) { + this.remoteClusterRootDir = rootDir; + return this; + } + + @Override public ReplicationPeerConfig build() { // It would be nice to validate the configuration, but we have to work with "old" data // from ZK which makes it much more difficult. @@ -341,6 +356,9 @@ public class ReplicationPeerConfig { } } builder.append("bandwidth=").append(bandwidth); + if (this.remoteClusterRootDir != null) { + builder.append(",clusterRootDir=").append(remoteClusterRootDir); + } return builder.toString(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java index 0b2f2e2..97ae131 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java @@ -138,6 +138,13 @@ public interface ReplicationPeerConfigBuilder { ReplicationPeerConfigBuilder setExcludeNamespaces(Set namespaces); /** + * Set the remote peer cluster's root directory. Used by synchronous replication. + * @param rootDir the remote peer cluster's root directory + * @return {@code this} + */ + ReplicationPeerConfigBuilder setRemoteClusterRootDir(String rootDir); + + /** * Builds the configuration object from the current state of {@code this}. * @return A {@link ReplicationPeerConfig} instance. */ diff --git a/hbase-protocol-shaded/src/main/protobuf/Replication.proto b/hbase-protocol-shaded/src/main/protobuf/Replication.proto index 9f7b4c2..ae8dc01 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Replication.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Replication.proto @@ -48,6 +48,7 @@ message ReplicationPeer { optional bool replicate_all = 8; repeated TableCF exclude_table_cfs = 9; repeated bytes exclude_namespaces = 10; + optional string remoteClusterRootDir = 11; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index 696b2d7..2896ca2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -132,20 +132,37 @@ public class ReplicationPeerManager { checkPeerConfig(peerConfig); ReplicationPeerDescription desc = checkPeerExists(peerId); ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig(); - if (!StringUtils.isBlank(peerConfig.getClusterKey()) && - !peerConfig.getClusterKey().equals(oldPeerConfig.getClusterKey())) { + if (!StringUtils.isBlank(peerConfig.getClusterKey()) + && !peerConfig.getClusterKey().equals(oldPeerConfig.getClusterKey())) { throw new DoNotRetryIOException( - "Changing the cluster key on an existing peer is not allowed. Existing key '" + - oldPeerConfig.getClusterKey() + "' for peer " + peerId + " does not match new key '" + - peerConfig.getClusterKey() + "'"); + "Changing the cluster key on an existing peer is not allowed. Existing key '" + + oldPeerConfig.getClusterKey() + "' for peer " + peerId + " does not match new key '" + + peerConfig.getClusterKey() + "'"); } - if (!StringUtils.isBlank(peerConfig.getReplicationEndpointImpl()) && - !peerConfig.getReplicationEndpointImpl().equals(oldPeerConfig.getReplicationEndpointImpl())) { - throw new DoNotRetryIOException("Changing the replication endpoint implementation class " + - "on an existing peer is not allowed. Existing class '" + - oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId + - " does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'"); + if (!StringUtils.isBlank(peerConfig.getReplicationEndpointImpl()) && !peerConfig + .getReplicationEndpointImpl().equals(oldPeerConfig.getReplicationEndpointImpl())) { + throw new DoNotRetryIOException("Changing the replication endpoint implementation class " + + "on an existing peer is not allowed. Existing class '" + + oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId + + " does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'"); + } + + if (!StringUtils.isBlank(peerConfig.getRemoteClusterRootDir()) + && !peerConfig.getRemoteClusterRootDir().equals(oldPeerConfig.getRemoteClusterRootDir())) { + throw new DoNotRetryIOException( + "Changing the remote cluster root dir on an existing peer is not allowed. " + + "Existing root dir '" + oldPeerConfig.getRemoteClusterRootDir() + "' for peer " + + peerId + " does not match new root dir '" + peerConfig.getRemoteClusterRootDir() + + "'"); + } + + if (oldPeerConfig.getRemoteClusterRootDir() != null) { + if (!ReplicationUtils.isKeyConfigEqual(oldPeerConfig, peerConfig)) { + throw new DoNotRetryIOException( + "Changing the replicated namespace/table config on a synchronous replication " + + "peer(peerId: " + peerId + ") is not allowed."); + } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index a6091e1..0ec79b0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -846,4 +846,73 @@ public class TestReplicationAdmin { assertEquals(2097152, admin.getPeerConfig(ID_ONE).getBandwidth()); admin.removePeer(ID_ONE); } + + @Test + public void testPeerClusterRootDir() throws Exception { + String rootDir = "hdfs://srv1:9999/hbase"; + ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); + builder.setClusterKey(KEY_ONE); + hbaseAdmin.addReplicationPeer(ID_ONE, builder.build()); + + ReplicationPeerConfig rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); + assertNull(rpc.getRemoteClusterRootDir()); + + try { + builder.setRemoteClusterRootDir("hdfs://srv2:8888/hbase"); + hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build()); + fail("Change remote peer cluster root dir is not allowed"); + } catch (Exception e) { + // OK + } + + builder = ReplicationPeerConfig.newBuilder(); + builder.setClusterKey(KEY_SECOND); + builder.setRemoteClusterRootDir(rootDir); + hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); + + rpc = hbaseAdmin.getReplicationPeerConfig(ID_SECOND); + assertEquals(rootDir, rpc.getRemoteClusterRootDir()); + + try { + builder.setRemoteClusterRootDir("hdfs://srv2:8888/hbase"); + hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); + fail("Change remote peer cluster root dir is not allowed"); + } catch (Exception e) { + // OK + } + + try { + builder = ReplicationPeerConfig.newBuilder(rpc); + builder.setReplicateAllUserTables(false); + hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); + fail( + "Change replicated namespace/table config on an existing synchronous peer is not allowed"); + } catch (Exception e) { + // OK + } + + try { + builder = ReplicationPeerConfig.newBuilder(rpc); + Set namespaces = new HashSet<>(); + namespaces.add("ns1"); + builder.setExcludeNamespaces(namespaces); + hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); + fail( + "Change replicated namespace/table config on an existing synchronous peer is not allowed"); + } catch (Exception e) { + // OK + } + + try { + builder = ReplicationPeerConfig.newBuilder(rpc); + Map> tableCfs = new HashMap<>(); + tableCfs.put(TableName.valueOf(name.getMethodName()), new ArrayList<>()); + builder.setExcludeTableCFsMap(tableCfs); + hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); + fail( + "Change replicated namespace/table config on an existing synchronous peer is not allowed"); + } catch (Exception e) { + // OK + } + } } diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb index b9d4a0c..8d6eab7 100644 --- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb +++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb @@ -64,6 +64,7 @@ module Hbase table_cfs = args.fetch(TABLE_CFS, nil) namespaces = args.fetch(NAMESPACES, nil) peer_state = args.fetch(STATE, nil) + remote_root_dir = args.fetch(REMOTE_ROOT_DIR, nil) # Create and populate a ReplicationPeerConfig builder = org.apache.hadoop.hbase.replication.ReplicationPeerConfig @@ -74,6 +75,10 @@ module Hbase builder.set_replication_endpoint_impl(endpoint_classname) end + unless remote_root_dir.nil? + builder.setRemoteClusterRootDir(remote_root_dir) + end + unless config.nil? builder.putAllConfiguration(config) end diff --git a/hbase-shell/src/main/ruby/hbase_constants.rb b/hbase-shell/src/main/ruby/hbase_constants.rb index 28484cb..7e9e40a 100644 --- a/hbase-shell/src/main/ruby/hbase_constants.rb +++ b/hbase-shell/src/main/ruby/hbase_constants.rb @@ -77,6 +77,7 @@ module HBaseConstants VALUE = 'VALUE'.freeze ENDPOINT_CLASSNAME = 'ENDPOINT_CLASSNAME'.freeze CLUSTER_KEY = 'CLUSTER_KEY'.freeze + REMOTE_ROOT_DIR = 'REMOTE_ROOT_DIR'.freeze TABLE_CFS = 'TABLE_CFS'.freeze NAMESPACES = 'NAMESPACES'.freeze STATE = 'STATE'.freeze diff --git a/hbase-shell/src/main/ruby/shell/commands/add_peer.rb b/hbase-shell/src/main/ruby/shell/commands/add_peer.rb index eb2da83..9d15dc6 100644 --- a/hbase-shell/src/main/ruby/shell/commands/add_peer.rb +++ b/hbase-shell/src/main/ruby/shell/commands/add_peer.rb @@ -35,7 +35,7 @@ to the peer cluster. An optional parameter for table column families identifies which tables and/or column families will be replicated to the peer cluster. -Notice: Set a namespace in the peer config means that all tables in this namespace +Note: Set a namespace in the peer config means that all tables in this namespace will be replicated to the peer cluster. So if you already have set a namespace in peer config, then you can't set this namespace's tables in the peer config again. @@ -74,6 +74,25 @@ the key TABLE_CFS. Note: Either CLUSTER_KEY or ENDPOINT_CLASSNAME must be specified. If ENDPOINT_CLASSNAME is specified, CLUSTER_KEY is optional and should only be specified if a particular custom endpoint requires it. +The default replication peer is asynchronous. You can also add a synchronous replication peer +with REMOTE_ROOT_DIR parameter. Meanwhile, synchronous replication peer also support the optional +config for asynchronous replication peer. + +Examples: + + hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase", + REMOTE_ROOT_DIR => "hdfs://srv1:9999/hbase" + hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase", + STATE => "ENABLED", REMOTE_ROOT_DIR => "hdfs://srv1:9999/hbase" + hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase", + STATE => "DISABLED", REMOTE_ROOT_DIR => "hdfs://srv1:9999/hbase" + hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase", + REMOTE_ROOT_DIR => "hdfs://srv1:9999/hbase", NAMESPACES => ["ns1", "ns2"] + hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase", + REMOTE_ROOT_DIR => "hdfs://srv1:9999/hbase", TABLE_CFS => { "table1" => [] } + +Note: The REMOTE_ROOT_DIR is not allowed to change. + EOF end diff --git a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb index 0f84396..e7eb95c 100644 --- a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb +++ b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb @@ -97,6 +97,22 @@ module Hbase command(:remove_peer, @peer_id) end + define_test "add_peer: remote cluster root dir" do + cluster_key = "server1.cie.com:2181:/hbase" + remote_root_dir = "hdfs://srv1:9999/hbase" + args = { CLUSTER_KEY => cluster_key, REMOTE_ROOT_DIR => remote_root_dir } + command(:add_peer, @peer_id, args) + + assert_equal(1, command(:list_peers).length) + peer = command(:list_peers).get(0) + assert_equal(@peer_id, peer.getPeerId) + assert_equal(cluster_key, peer.getPeerConfig.getClusterKey) + assert_equal(remote_root_dir, peer.getPeerConfig.getRemoteClusterRootDir) + + # cleanup for future tests + command(:remove_peer, @peer_id) + end + define_test "add_peer: single zk cluster key with enabled/disabled state" do cluster_key = "server1.cie.com:2181:/hbase" -- 1.9.1