From f085f1714f93831aaa51339f87d93af60f0decc3 Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Thu, 11 Jan 2018 15:31:15 +0800 Subject: [PATCH] HBASE-19078 Add peer cluster root directory config for synchronous replication --- .../replication/ReplicationPeerConfigUtil.java | 6 ++ .../hbase/replication/ReplicationPeerConfig.java | 21 +++++- .../replication/ReplicationPeerConfigBuilder.java | 7 ++ .../src/main/protobuf/Replication.proto | 1 + .../master/replication/ReplicationPeerManager.java | 35 +++++++--- .../client/replication/TestReplicationAdmin.java | 77 ++++++++++++++++++++++ .../src/main/ruby/hbase/replication_admin.rb | 17 ++--- hbase-shell/src/main/ruby/hbase_constants.rb | 1 + .../src/main/ruby/shell/commands/add_peer.rb | 21 +++++- .../src/main/ruby/shell/commands/list_peers.rb | 19 +++++- .../src/test/ruby/hbase/replication_admin_test.rb | 16 +++++ 11 files changed, 199 insertions(+), 22 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java index a234a9b..22f9051 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java @@ -315,6 +315,9 @@ public final class ReplicationPeerConfigUtil { excludeNamespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet())); } + if (peer.hasRemoteClusterRootDir()) { + builder.setRemoteClusterRootDir(peer.getRemoteClusterRootDir()); + } return builder.build(); } @@ -371,6 +374,9 @@ public final class ReplicationPeerConfigUtil { } } + if (peerConfig.getRemoteClusterRootDir() != null) { + builder.setRemoteClusterRootDir(peerConfig.getRemoteClusterRootDir()); + } return builder.build(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java index bf8d030..f6025e3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java @@ -46,6 +46,8 @@ public class ReplicationPeerConfig { private Map> excludeTableCFsMap = null; private Set excludeNamespaces = null; private long bandwidth = 0; + // Used by synchronous replication + private String remoteClusterRootDir; private ReplicationPeerConfig(ReplicationPeerConfigBuilderImpl builder) { this.clusterKey = builder.clusterKey; @@ -64,6 +66,7 @@ public class ReplicationPeerConfig { builder.excludeNamespaces != null ? Collections.unmodifiableSet(builder.excludeNamespaces) : null; this.bandwidth = builder.bandwidth; + this.remoteClusterRootDir = builder.remoteClusterRootDir; } private Map> @@ -210,6 +213,10 @@ public class ReplicationPeerConfig { return this; } + public String getRemoteClusterRootDir() { + return this.remoteClusterRootDir; + } + public static ReplicationPeerConfigBuilder newBuilder() { return new ReplicationPeerConfigBuilderImpl(); } @@ -223,7 +230,8 @@ public class ReplicationPeerConfig { .setReplicateAllUserTables(peerConfig.replicateAllUserTables()) .setExcludeTableCFsMap(peerConfig.getExcludeTableCFsMap()) .setExcludeNamespaces(peerConfig.getExcludeNamespaces()) - .setBandwidth(peerConfig.getBandwidth()); + .setBandwidth(peerConfig.getBandwidth()) + .setRemoteClusterRootDir(peerConfig.getRemoteClusterRootDir()); return builder; } @@ -250,6 +258,8 @@ public class ReplicationPeerConfig { private long bandwidth = 0; + private String remoteClusterRootDir = null; + @Override public ReplicationPeerConfigBuilder setClusterKey(String clusterKey) { this.clusterKey = clusterKey; @@ -313,6 +323,12 @@ public class ReplicationPeerConfig { } @Override + public ReplicationPeerConfigBuilder setRemoteClusterRootDir(String rootDir) { + this.remoteClusterRootDir = rootDir; + return this; + } + + @Override public ReplicationPeerConfig build() { // It would be nice to validate the configuration, but we have to work with "old" data // from ZK which makes it much more difficult. @@ -341,6 +357,9 @@ public class ReplicationPeerConfig { } } builder.append("bandwidth=").append(bandwidth); + if (this.remoteClusterRootDir != null) { + builder.append(",clusterRootDir=").append(remoteClusterRootDir); + } return builder.toString(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java index 0b2f2e2..97ae131 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java @@ -138,6 +138,13 @@ public interface ReplicationPeerConfigBuilder { ReplicationPeerConfigBuilder setExcludeNamespaces(Set namespaces); /** + * Set the remote peer cluster's root directory. Used by synchronous replication. + * @param rootDir the remote peer cluster's root directory + * @return {@code this} + */ + ReplicationPeerConfigBuilder setRemoteClusterRootDir(String rootDir); + + /** * Builds the configuration object from the current state of {@code this}. * @return A {@link ReplicationPeerConfig} instance. */ diff --git a/hbase-protocol-shaded/src/main/protobuf/Replication.proto b/hbase-protocol-shaded/src/main/protobuf/Replication.proto index 9f7b4c2..ae8dc01 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Replication.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Replication.proto @@ -48,6 +48,7 @@ message ReplicationPeer { optional bool replicate_all = 8; repeated TableCF exclude_table_cfs = 9; repeated bytes exclude_namespaces = 10; + optional string remoteClusterRootDir = 11; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index 696b2d7..1c9e869 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -133,19 +133,36 @@ public class ReplicationPeerManager { ReplicationPeerDescription desc = checkPeerExists(peerId); ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig(); if (!StringUtils.isBlank(peerConfig.getClusterKey()) && - !peerConfig.getClusterKey().equals(oldPeerConfig.getClusterKey())) { + !peerConfig.getClusterKey().equals(oldPeerConfig.getClusterKey())) { throw new DoNotRetryIOException( "Changing the cluster key on an existing peer is not allowed. Existing key '" + - oldPeerConfig.getClusterKey() + "' for peer " + peerId + " does not match new key '" + - peerConfig.getClusterKey() + "'"); + oldPeerConfig.getClusterKey() + "' for peer " + peerId + " does not match new key '" + + peerConfig.getClusterKey() + "'"); } - if (!StringUtils.isBlank(peerConfig.getReplicationEndpointImpl()) && - !peerConfig.getReplicationEndpointImpl().equals(oldPeerConfig.getReplicationEndpointImpl())) { - throw new DoNotRetryIOException("Changing the replication endpoint implementation class " + - "on an existing peer is not allowed. Existing class '" + - oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId + - " does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'"); + if (!StringUtils.isBlank(peerConfig.getReplicationEndpointImpl()) && !peerConfig + .getReplicationEndpointImpl().equals(oldPeerConfig.getReplicationEndpointImpl())) { + throw new DoNotRetryIOException("Changing the replication endpoint implementation class " + + "on an existing peer is not allowed. Existing class '" + + oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId + + " does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'"); + } + + if (!StringUtils.equals(peerConfig.getRemoteClusterRootDir(), + oldPeerConfig.getRemoteClusterRootDir())) { + throw new DoNotRetryIOException( + "Changing the remote cluster root dir on an existing peer is not allowed. " + + "Existing root dir '" + oldPeerConfig.getRemoteClusterRootDir() + "' for peer " + + peerId + " does not match new root dir '" + peerConfig.getRemoteClusterRootDir() + + "'"); + } + + if (oldPeerConfig.getRemoteClusterRootDir() != null) { + if (!ReplicationUtils.isKeyConfigEqual(oldPeerConfig, peerConfig)) { + throw new DoNotRetryIOException( + "Changing the replicated namespace/table config on a synchronous replication " + + "peer(peerId: " + peerId + ") is not allowed."); + } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index a6091e1..2db3cbb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -846,4 +846,81 @@ public class TestReplicationAdmin { assertEquals(2097152, admin.getPeerConfig(ID_ONE).getBandwidth()); admin.removePeer(ID_ONE); } + + @Test + public void testPeerClusterRootDir() throws Exception { + String rootDir = "hdfs://srv1:9999/hbase"; + ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); + builder.setClusterKey(KEY_ONE); + hbaseAdmin.addReplicationPeer(ID_ONE, builder.build()); + + ReplicationPeerConfig rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); + assertNull(rpc.getRemoteClusterRootDir()); + + try { + builder.setRemoteClusterRootDir("hdfs://srv2:8888/hbase"); + hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build()); + fail("Change remote peer cluster root dir is not allowed"); + } catch (Exception e) { + // OK + } + + builder = ReplicationPeerConfig.newBuilder(); + builder.setClusterKey(KEY_SECOND); + builder.setRemoteClusterRootDir(rootDir); + hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); + + rpc = hbaseAdmin.getReplicationPeerConfig(ID_SECOND); + assertEquals(rootDir, rpc.getRemoteClusterRootDir()); + + try { + builder.setRemoteClusterRootDir("hdfs://srv2:8888/hbase"); + hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); + fail("Change remote peer cluster root dir is not allowed"); + } catch (Exception e) { + // OK + } + + try { + builder.setRemoteClusterRootDir(null); + hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); + fail("Change remote peer cluster root dir is not allowed"); + } catch (Exception e) { + // OK + } + + try { + builder = ReplicationPeerConfig.newBuilder(rpc); + builder.setReplicateAllUserTables(false); + hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); + fail( + "Change replicated namespace/table config on an existing synchronous peer is not allowed"); + } catch (Exception e) { + // OK + } + + try { + builder = ReplicationPeerConfig.newBuilder(rpc); + Set namespaces = new HashSet<>(); + namespaces.add("ns1"); + builder.setExcludeNamespaces(namespaces); + hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); + fail( + "Change replicated namespace/table config on an existing synchronous peer is not allowed"); + } catch (Exception e) { + // OK + } + + try { + builder = ReplicationPeerConfig.newBuilder(rpc); + Map> tableCfs = new HashMap<>(); + tableCfs.put(TableName.valueOf(name.getMethodName()), new ArrayList<>()); + builder.setExcludeTableCFsMap(tableCfs); + hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); + fail( + "Change replicated namespace/table config on an existing synchronous peer is not allowed"); + } catch (Exception e) { + // OK + } + } } diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb index b9d4a0c..0729000 100644 --- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb +++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb @@ -64,16 +64,20 @@ module Hbase table_cfs = args.fetch(TABLE_CFS, nil) namespaces = args.fetch(NAMESPACES, nil) peer_state = args.fetch(STATE, nil) + remote_root_dir = args.fetch(REMOTE_ROOT_DIR, nil) # Create and populate a ReplicationPeerConfig - builder = org.apache.hadoop.hbase.replication.ReplicationPeerConfig - .newBuilder() + builder = ReplicationPeerConfig.newBuilder() builder.set_cluster_key(cluster_key) unless endpoint_classname.nil? builder.set_replication_endpoint_impl(endpoint_classname) end + unless remote_root_dir.nil? + builder.setRemoteClusterRootDir(remote_root_dir) + end + unless config.nil? builder.putAllConfiguration(config) end @@ -228,8 +232,7 @@ module Hbase namespaces.each do |n| ns_set.add(n) end - builder = org.apache.hadoop.hbase.replication.ReplicationPeerConfig - .newBuilder(rpc) + builder = ReplicationPeerConfig.newBuilder(rpc) builder.setNamespaces(ns_set) @admin.updateReplicationPeerConfig(id, builder.build) end @@ -248,8 +251,7 @@ module Hbase ns_set.remove(n) end end - builder = org.apache.hadoop.hbase.replication.ReplicationPeerConfig - .newBuilder(rpc) + builder = ReplicationPeerConfig.newBuilder(rpc) builder.setNamespaces(ns_set) @admin.updateReplicationPeerConfig(id, builder.build) end @@ -361,8 +363,7 @@ module Hbase # Create and populate a ReplicationPeerConfig replication_peer_config = get_peer_config(id) - builder = org.apache.hadoop.hbase.replication.ReplicationPeerConfig - .newBuilder(replication_peer_config) + builder = ReplicationPeerConfig.newBuilder(replication_peer_config) unless config.nil? builder.putAllConfiguration(config) end diff --git a/hbase-shell/src/main/ruby/hbase_constants.rb b/hbase-shell/src/main/ruby/hbase_constants.rb index 28484cb..7e9e40a 100644 --- a/hbase-shell/src/main/ruby/hbase_constants.rb +++ b/hbase-shell/src/main/ruby/hbase_constants.rb @@ -77,6 +77,7 @@ module HBaseConstants VALUE = 'VALUE'.freeze ENDPOINT_CLASSNAME = 'ENDPOINT_CLASSNAME'.freeze CLUSTER_KEY = 'CLUSTER_KEY'.freeze + REMOTE_ROOT_DIR = 'REMOTE_ROOT_DIR'.freeze TABLE_CFS = 'TABLE_CFS'.freeze NAMESPACES = 'NAMESPACES'.freeze STATE = 'STATE'.freeze diff --git a/hbase-shell/src/main/ruby/shell/commands/add_peer.rb b/hbase-shell/src/main/ruby/shell/commands/add_peer.rb index eb2da83..9d15dc6 100644 --- a/hbase-shell/src/main/ruby/shell/commands/add_peer.rb +++ b/hbase-shell/src/main/ruby/shell/commands/add_peer.rb @@ -35,7 +35,7 @@ to the peer cluster. An optional parameter for table column families identifies which tables and/or column families will be replicated to the peer cluster. -Notice: Set a namespace in the peer config means that all tables in this namespace +Note: Set a namespace in the peer config means that all tables in this namespace will be replicated to the peer cluster. So if you already have set a namespace in peer config, then you can't set this namespace's tables in the peer config again. @@ -74,6 +74,25 @@ the key TABLE_CFS. Note: Either CLUSTER_KEY or ENDPOINT_CLASSNAME must be specified. If ENDPOINT_CLASSNAME is specified, CLUSTER_KEY is optional and should only be specified if a particular custom endpoint requires it. +The default replication peer is asynchronous. You can also add a synchronous replication peer +with REMOTE_ROOT_DIR parameter. Meanwhile, synchronous replication peer also support the optional +config for asynchronous replication peer. + +Examples: + + hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase", + REMOTE_ROOT_DIR => "hdfs://srv1:9999/hbase" + hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase", + STATE => "ENABLED", REMOTE_ROOT_DIR => "hdfs://srv1:9999/hbase" + hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase", + STATE => "DISABLED", REMOTE_ROOT_DIR => "hdfs://srv1:9999/hbase" + hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase", + REMOTE_ROOT_DIR => "hdfs://srv1:9999/hbase", NAMESPACES => ["ns1", "ns2"] + hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase", + REMOTE_ROOT_DIR => "hdfs://srv1:9999/hbase", TABLE_CFS => { "table1" => [] } + +Note: The REMOTE_ROOT_DIR is not allowed to change. + EOF end diff --git a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb index 522d23d..1acfc68 100644 --- a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb +++ b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb @@ -39,7 +39,8 @@ EOF peers = replication_admin.list_peers formatter.header(%w[PEER_ID CLUSTER_KEY ENDPOINT_CLASSNAME - STATE REPLICATE_ALL NAMESPACES TABLE_CFS BANDWIDTH]) + REMOTE_ROOT_DIR STATE REPLICATE_ALL + NAMESPACES TABLE_CFS BANDWIDTH]) peers.each do |peer| id = peer.getPeerId @@ -52,8 +53,20 @@ EOF namespaces = replication_admin.show_peer_namespaces(config) tableCFs = replication_admin.show_peer_tableCFs_by_config(config) end - formatter.row([id, config.getClusterKey, - config.getReplicationEndpointImpl, state, + cluster_key = 'nil' + unless config.getClusterKey.nil? + cluster_key = config.getClusterKey + end + endpoint_classname = 'nil' + unless config.getReplicationEndpointImpl.nil? + endpoint_classname = config.getReplicationEndpointImpl + end + remote_root_dir = 'nil' + unless config.getRemoteClusterRootDir.nil? + remote_root_dir = config.getRemoteClusterRootDir + end + formatter.row([id, cluster_key, endpoint_classname, + remote_root_dir, state, config.replicateAllUserTables, namespaces, tableCFs, config.getBandwidth]) end diff --git a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb index 0f84396..e7eb95c 100644 --- a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb +++ b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb @@ -97,6 +97,22 @@ module Hbase command(:remove_peer, @peer_id) end + define_test "add_peer: remote cluster root dir" do + cluster_key = "server1.cie.com:2181:/hbase" + remote_root_dir = "hdfs://srv1:9999/hbase" + args = { CLUSTER_KEY => cluster_key, REMOTE_ROOT_DIR => remote_root_dir } + command(:add_peer, @peer_id, args) + + assert_equal(1, command(:list_peers).length) + peer = command(:list_peers).get(0) + assert_equal(@peer_id, peer.getPeerId) + assert_equal(cluster_key, peer.getPeerConfig.getClusterKey) + assert_equal(remote_root_dir, peer.getPeerConfig.getRemoteClusterRootDir) + + # cleanup for future tests + command(:remove_peer, @peer_id) + end + define_test "add_peer: single zk cluster key with enabled/disabled state" do cluster_key = "server1.cie.com:2181:/hbase" -- 1.9.1