From c95f2efa3965673048cf572c620847a344a044c3 Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Fri, 12 Jan 2018 16:18:42 +0800 Subject: [PATCH] HBASE-19783 Change replication peer cluster key/endpoint from a not-null value to null is not allowed --- .../hbase/replication/ReplicationPeersZKImpl.java | 27 ++++++----- .../client/replication/TestReplicationAdmin.java | 54 ++++++++++++++++++++++ 2 files changed, 67 insertions(+), 14 deletions(-) diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index 289d2aa..970f2f1 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -28,6 +28,7 @@ import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.CompoundConfiguration; @@ -345,22 +346,20 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re throw new ReplicationException("Could not find peer Id " + id + " in connected peers"); } ReplicationPeerConfig existingConfig = peer.getPeerConfig(); - if (newConfig.getClusterKey() != null && !newConfig.getClusterKey().isEmpty() && - !newConfig.getClusterKey().equals(existingConfig.getClusterKey())){ - throw new ReplicationException("Changing the cluster key on an existing peer is not allowed." - + " Existing key '" + existingConfig.getClusterKey() + "' does not match new key '" - + newConfig.getClusterKey() + - "'"); - } - String existingEndpointImpl = existingConfig.getReplicationEndpointImpl(); - if (newConfig.getReplicationEndpointImpl() != null && - !newConfig.getReplicationEndpointImpl().isEmpty() && - !newConfig.getReplicationEndpointImpl().equals(existingEndpointImpl)){ + if (!StringUtils.equals(newConfig.getClusterKey(), existingConfig.getClusterKey())) { + throw new ReplicationException( + "Changing the cluster key on an existing peer is not allowed." + " Existing key '" + + existingConfig.getClusterKey() + "' does not match new key '" + + newConfig.getClusterKey() + "'"); + } + if (!StringUtils.equals(newConfig.getReplicationEndpointImpl(), + existingConfig.getReplicationEndpointImpl())) { throw new ReplicationException("Changing the replication endpoint implementation class " + - "on an existing peer is not allowed. Existing class '" - + existingConfig.getReplicationEndpointImpl() - + "' does not match new class '" + newConfig.getReplicationEndpointImpl() + "'"); + "on an existing peer is not allowed. Existing class '" + + existingConfig.getReplicationEndpointImpl() + "' does not match new class '" + + newConfig.getReplicationEndpointImpl() + "'"); } + // Update existingConfig's peer config and peer data with the new values, but don't touch config // or data that weren't explicitly changed ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(existingConfig); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index fb29e9e..aac57dc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -40,6 +40,8 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; +import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.InterClusterReplicationEndpointForTest; +import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.ReplicationEndpointForTest; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; @@ -764,4 +766,56 @@ public class TestReplicationAdmin { assertEquals(2097152, admin.getPeerConfig(ID_ONE).getBandwidth()); admin.removePeer(ID_ONE); } + + @Test + public void testPeerClusterKey() throws Exception { + ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); + builder.setClusterKey(KEY_ONE); + hbaseAdmin.addReplicationPeer(ID_ONE, builder.build()); + + try { + builder.setClusterKey(KEY_SECOND); + hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build()); + fail("Change cluster key on an existing peer is not allowed"); + } catch (Exception e) { + // OK + } + } + + @Test + public void testPeerReplicationEndpointImpl() throws Exception { + ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); + builder.setClusterKey(KEY_ONE); + builder.setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()); + hbaseAdmin.addReplicationPeer(ID_ONE, builder.build()); + + try { + builder.setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()); + hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build()); + fail("Change replication endpoint implementation class on an existing peer is not allowed"); + } catch (Exception e) { + // OK + } + + try { + builder = ReplicationPeerConfig.newBuilder(); + builder.setClusterKey(KEY_ONE); + hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build()); + fail("Change replication endpoint implementation class on an existing peer is not allowed"); + } catch (Exception e) { + // OK + } + + builder = ReplicationPeerConfig.newBuilder(); + builder.setClusterKey(KEY_SECOND); + hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); + + try { + builder.setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()); + hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); + fail("Change replication endpoint implementation class on an existing peer is not allowed"); + } catch (Exception e) { + // OK + } + } } -- 1.9.1