From 4ddccc5fd8380f90ca8037dccefde49880e2bfc5 Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Thu, 11 Jan 2018 15:31:15 +0800 Subject: [PATCH] HBASE-19078 Add peer cluster root directory config for synchronous replication --- .../replication/ReplicationPeerConfigUtil.java | 6 ++ .../hbase/replication/ReplicationPeerConfig.java | 20 ++++++- .../replication/ReplicationPeerConfigBuilder.java | 8 +++ .../src/main/protobuf/Replication.proto | 1 + .../master/replication/ReplicationPeerManager.java | 38 ++++++++---- .../client/replication/TestReplicationAdmin.java | 69 ++++++++++++++++++++++ 6 files changed, 130 insertions(+), 12 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java index a234a9b..e6d71bc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java @@ -315,6 +315,9 @@ public final class ReplicationPeerConfigUtil { excludeNamespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet())); } + if (peer.hasClusterRootDir()) { + builder.setClusterRootDir(peer.getClusterRootDir()); + } return builder.build(); } @@ -371,6 +374,9 @@ public final class ReplicationPeerConfigUtil { } } + if (peerConfig.getClusterRootDir() != null) { + builder.setClusterRootDir(peerConfig.getClusterRootDir()); + } return builder.build(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java index bf8d030..f3eb707 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java @@ -46,6 +46,8 @@ public class ReplicationPeerConfig { private Map> excludeTableCFsMap = null; private Set excludeNamespaces = null; private long bandwidth = 0; + // Used by synchronous replication + private String clusterRootDir; private ReplicationPeerConfig(ReplicationPeerConfigBuilderImpl builder) { this.clusterKey = builder.clusterKey; @@ -64,6 +66,7 @@ public class ReplicationPeerConfig { builder.excludeNamespaces != null ? Collections.unmodifiableSet(builder.excludeNamespaces) : null; this.bandwidth = builder.bandwidth; + this.clusterRootDir = builder.clusterRootDir; } private Map> @@ -210,6 +213,10 @@ public class ReplicationPeerConfig { return this; } + public String getClusterRootDir() { + return this.clusterRootDir; + } + public static ReplicationPeerConfigBuilder newBuilder() { return new ReplicationPeerConfigBuilderImpl(); } @@ -223,7 +230,7 @@ public class ReplicationPeerConfig { .setReplicateAllUserTables(peerConfig.replicateAllUserTables()) .setExcludeTableCFsMap(peerConfig.getExcludeTableCFsMap()) .setExcludeNamespaces(peerConfig.getExcludeNamespaces()) - .setBandwidth(peerConfig.getBandwidth()); + .setBandwidth(peerConfig.getBandwidth()).setClusterRootDir(peerConfig.getClusterRootDir()); return builder; } @@ -250,6 +257,8 @@ public class ReplicationPeerConfig { private long bandwidth = 0; + private String clusterRootDir = null; + @Override public ReplicationPeerConfigBuilder setClusterKey(String clusterKey) { this.clusterKey = clusterKey; @@ -313,6 +322,12 @@ public class ReplicationPeerConfig { } @Override + public ReplicationPeerConfigBuilder setClusterRootDir(String rootDir) { + this.clusterRootDir = rootDir; + return this; + } + + @Override public ReplicationPeerConfig build() { // It would be nice to validate the configuration, but we have to work with "old" data // from ZK which makes it much more difficult. @@ -341,6 +356,9 @@ public class ReplicationPeerConfig { } } builder.append("bandwidth=").append(bandwidth); + if (this.clusterRootDir != null) { + builder.append(",clusterRootDir=").append(clusterRootDir); + } return builder.toString(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java index 0b2f2e2..dec344e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java @@ -137,6 +137,14 @@ public interface ReplicationPeerConfigBuilder { */ ReplicationPeerConfigBuilder setExcludeNamespaces(Set namespaces); + + /** + * Set the peer cluster's root directory. Used by synchronous replication. + * @param rootDir + * @return {@code this} + */ + ReplicationPeerConfigBuilder setClusterRootDir(String rootDir); + /** * Builds the configuration object from the current state of {@code this}. * @return A {@link ReplicationPeerConfig} instance. diff --git a/hbase-protocol-shaded/src/main/protobuf/Replication.proto b/hbase-protocol-shaded/src/main/protobuf/Replication.proto index 9f7b4c2..5a4c7d7 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Replication.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Replication.proto @@ -48,6 +48,7 @@ message ReplicationPeer { optional bool replicate_all = 8; repeated TableCF exclude_table_cfs = 9; repeated bytes exclude_namespaces = 10; + optional string clusterRootDir = 11; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index 696b2d7..82348b3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -132,20 +132,36 @@ public class ReplicationPeerManager { checkPeerConfig(peerConfig); ReplicationPeerDescription desc = checkPeerExists(peerId); ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig(); - if (!StringUtils.isBlank(peerConfig.getClusterKey()) && - !peerConfig.getClusterKey().equals(oldPeerConfig.getClusterKey())) { + if (!StringUtils.isBlank(peerConfig.getClusterKey()) + && !peerConfig.getClusterKey().equals(oldPeerConfig.getClusterKey())) { throw new DoNotRetryIOException( - "Changing the cluster key on an existing peer is not allowed. Existing key '" + - oldPeerConfig.getClusterKey() + "' for peer " + peerId + " does not match new key '" + - peerConfig.getClusterKey() + "'"); + "Changing the cluster key on an existing peer is not allowed. Existing key '" + + oldPeerConfig.getClusterKey() + "' for peer " + peerId + " does not match new key '" + + peerConfig.getClusterKey() + "'"); } - if (!StringUtils.isBlank(peerConfig.getReplicationEndpointImpl()) && - !peerConfig.getReplicationEndpointImpl().equals(oldPeerConfig.getReplicationEndpointImpl())) { - throw new DoNotRetryIOException("Changing the replication endpoint implementation class " + - "on an existing peer is not allowed. Existing class '" + - oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId + - " does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'"); + if (!StringUtils.isBlank(peerConfig.getReplicationEndpointImpl()) && !peerConfig + .getReplicationEndpointImpl().equals(oldPeerConfig.getReplicationEndpointImpl())) { + throw new DoNotRetryIOException("Changing the replication endpoint implementation class " + + "on an existing peer is not allowed. Existing class '" + + oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId + + " does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'"); + } + + if (!StringUtils.isBlank(peerConfig.getClusterRootDir()) + && !peerConfig.getClusterRootDir().equals(oldPeerConfig.getClusterRootDir())) { + throw new DoNotRetryIOException( + "Changing the cluster root dir on an existing peer is not allowed. Existing root dir '" + + oldPeerConfig.getClusterRootDir() + "' for peer " + peerId + + " does not match new root dir '" + peerConfig.getClusterRootDir() + "'"); + } + + if (oldPeerConfig.getClusterRootDir() != null) { + if (!ReplicationUtils.isKeyConfigEqual(oldPeerConfig, peerConfig)) { + throw new DoNotRetryIOException( + "Changing the replicated namespace/table config on an existing synchronous peer(peerId: " + + peerId + ") is not allowed."); + } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index a6091e1..4935090 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -846,4 +846,73 @@ public class TestReplicationAdmin { assertEquals(2097152, admin.getPeerConfig(ID_ONE).getBandwidth()); admin.removePeer(ID_ONE); } + + @Test + public void testPeerClusterRootDir() throws Exception { + String rootDir = "hdfs://srv1:9999/hbase"; + ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); + builder.setClusterKey(KEY_ONE); + hbaseAdmin.addReplicationPeer(ID_ONE, builder.build()); + + ReplicationPeerConfig rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); + assertNull(rpc.getClusterRootDir()); + + try { + builder.setClusterRootDir("hdfs://srv2:8888/hbase"); + hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build()); + fail("Change peer cluster root dir is not allowed"); + } catch (Exception e) { + // OK + } + + builder = ReplicationPeerConfig.newBuilder(); + builder.setClusterKey(KEY_SECOND); + builder.setClusterRootDir(rootDir); + hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); + + rpc = hbaseAdmin.getReplicationPeerConfig(ID_SECOND); + assertEquals(rootDir, rpc.getClusterRootDir()); + + try { + builder.setClusterRootDir("hdfs://srv2:8888/hbase"); + hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); + fail("Change peer cluster root dir is not allowed"); + } catch (Exception e) { + // OK + } + + try { + builder = ReplicationPeerConfig.newBuilder(rpc); + builder.setReplicateAllUserTables(false); + hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); + fail( + "Change replicated namespace/table config on an existing synchronous peer is not allowed"); + } catch (Exception e) { + // OK + } + + try { + builder = ReplicationPeerConfig.newBuilder(rpc); + Set namespaces = new HashSet<>(); + namespaces.add("ns1"); + builder.setExcludeNamespaces(namespaces); + hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); + fail( + "Change replicated namespace/table config on an existing synchronous peer is not allowed"); + } catch (Exception e) { + // OK + } + + try { + builder = ReplicationPeerConfig.newBuilder(rpc); + Map> tableCfs = new HashMap<>(); + tableCfs.put(TableName.valueOf(name.getMethodName()), new ArrayList<>()); + builder.setExcludeTableCFsMap(tableCfs); + hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); + fail( + "Change replicated namespace/table config on an existing synchronous peer is not allowed"); + } catch (Exception e) { + // OK + } + } } -- 1.9.1