From 0808f1dc580885da4475d3eef524984bccc5e535 Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Fri, 17 Nov 2017 18:27:36 +0800 Subject: [PATCH] HBASE-19293 Support add a disabled state replication peer directly --- .../java/org/apache/hadoop/hbase/client/Admin.java | 11 +++++++++++ .../org/apache/hadoop/hbase/client/AsyncAdmin.java | 13 +++++++++++- .../hadoop/hbase/client/AsyncHBaseAdmin.java | 6 +++--- .../org/apache/hadoop/hbase/client/HBaseAdmin.java | 5 ++--- .../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 6 +++--- .../hbase/shaded/protobuf/RequestConverter.java | 7 ++++++- .../src/main/protobuf/Replication.proto | 1 + .../hadoop/hbase/replication/ReplicationPeers.java | 13 +++++++++++- .../hbase/replication/ReplicationPeersZKImpl.java | 19 +++++++++--------- .../org/apache/hadoop/hbase/master/HMaster.java | 6 +++--- .../hadoop/hbase/master/MasterRpcServices.java | 5 +++-- .../apache/hadoop/hbase/master/MasterServices.java | 3 ++- .../master/replication/ReplicationManager.java | 8 +++----- .../client/replication/TestReplicationAdmin.java | 18 +++++++++++++++-- .../hbase/master/MockNoopMasterServices.java | 4 ++-- .../src/main/ruby/hbase/replication_admin.rb | 10 +++++++++- hbase-shell/src/main/ruby/hbase_constants.rb | 1 + .../src/main/ruby/shell/commands/add_peer.rb | 2 ++ .../src/test/ruby/hbase/replication_admin_test.rb | 23 +++++++++++++++++++--- 19 files changed, 120 insertions(+), 41 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java index fd02a48..138ab33 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java @@ -2470,9 +2470,20 @@ public interface Admin extends Abortable, Closeable { */ default void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig) throws IOException { + addReplicationPeer(peerId, peerConfig, true); } /** + * Add a new replication peer for replicating data to slave cluster. + * @param peerId a short name that identifies the peer + * @param peerConfig configuration for the replication slave cluster + * @param enabled peer state, true if ENABLED and false if DISABLED + * @throws IOException + */ + void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) + throws IOException; + + /** * Remove a peer and stop the replication. * @param peerId a short name that identifies the peer * @throws IOException diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index 0e0a673..abd8e5a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -499,8 +499,19 @@ public interface AsyncAdmin { * @param peerId a short name that identifies the peer * @param peerConfig configuration for the replication slave cluster */ + default CompletableFuture addReplicationPeer(String peerId, + ReplicationPeerConfig peerConfig) { + return addReplicationPeer(peerId, peerConfig, true); + } + + /** + * Add a new replication peer for replicating data to slave cluster + * @param peerId a short name that identifies the peer + * @param peerConfig configuration for the replication slave cluster + * @param enabled peer state, true if ENABLED and false if DISABLED + */ CompletableFuture addReplicationPeer(String peerId, - ReplicationPeerConfig peerConfig); + ReplicationPeerConfig peerConfig, boolean enabled); /** * Remove a peer and stop the replication diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index 3fe7951..fb16fce 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -375,9 +375,9 @@ class AsyncHBaseAdmin implements AsyncAdmin { } @Override - public CompletableFuture - addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig) { - return wrap(rawAdmin.addReplicationPeer(peerId, peerConfig)); + public CompletableFuture addReplicationPeer(String peerId, + ReplicationPeerConfig peerConfig, boolean enabled) { + return wrap(rawAdmin.addReplicationPeer(peerId, peerConfig, enabled)); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 7669eb2..289f7d9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -103,7 +103,6 @@ import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.util.StringUtils; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; - import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; @@ -3850,13 +3849,13 @@ public class HBaseAdmin implements Admin { } @Override - public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig) + public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) throws IOException { executeCallable(new MasterCallable(getConnection(), getRpcControllerFactory()) { @Override protected Void rpcCall() throws Exception { master.addReplicationPeer(getRpcController(), - RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig)); + RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled)); return null; } }); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index fe1d685..f56e7ca 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -1501,14 +1501,14 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture addReplicationPeer(String peerId, - ReplicationPeerConfig peerConfig) { + ReplicationPeerConfig peerConfig, boolean enabled) { return this . newMasterCaller() .action( (controller, stub) -> this . call(controller, stub, - RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig), (s, c, req, - done) -> s.addReplicationPeer(c, req, done), (resp) -> null)).call(); + RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled), (s, + c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> null)).call(); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index 9eff114..4fdc87d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -1618,10 +1618,15 @@ public final class RequestConverter { } public static ReplicationProtos.AddReplicationPeerRequest buildAddReplicationPeerRequest( - String peerId, ReplicationPeerConfig peerConfig) { + String peerId, ReplicationPeerConfig peerConfig, boolean enabled) { AddReplicationPeerRequest.Builder builder = AddReplicationPeerRequest.newBuilder(); builder.setPeerId(peerId); builder.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig)); + ReplicationProtos.ReplicationState.Builder stateBuilder = + ReplicationProtos.ReplicationState.newBuilder(); + stateBuilder.setState(enabled ? ReplicationProtos.ReplicationState.State.ENABLED + : ReplicationProtos.ReplicationState.State.DISABLED); + builder.setPeerState(stateBuilder.build()); return builder.build(); } diff --git a/hbase-protocol-shaded/src/main/protobuf/Replication.proto b/hbase-protocol-shaded/src/main/protobuf/Replication.proto index 7e78144..88efa00 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Replication.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Replication.proto @@ -77,6 +77,7 @@ message ReplicationHLogPosition { message AddReplicationPeerRequest { required string peer_id = 1; required ReplicationPeer peer_config = 2; + required ReplicationState peer_state = 3; } message AddReplicationPeerResponse { diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java index 3e2e1ad..10936bf 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java @@ -51,7 +51,18 @@ public interface ReplicationPeers { * @param peerId a short that identifies the cluster * @param peerConfig configuration for the replication slave cluster */ - void registerPeer(String peerId, ReplicationPeerConfig peerConfig) + default void registerPeer(String peerId, ReplicationPeerConfig peerConfig) + throws ReplicationException { + registerPeer(peerId, peerConfig, true); + } + + /** + * Add a new remote slave cluster for replication. + * @param peerId a short that identifies the cluster + * @param peerConfig configuration for the replication slave cluster + * @param enabled peer state, true if ENABLED and false if DISABLED + */ + void registerPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) throws ReplicationException; /** diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index cc84c1d..e5925ce 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -105,7 +105,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re } @Override - public void registerPeer(String id, ReplicationPeerConfig peerConfig) + public void registerPeer(String id, ReplicationPeerConfig peerConfig, boolean enabled) throws ReplicationException { try { if (peerExists(id)) { @@ -130,19 +130,18 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re ZKUtil.createWithParents(this.zookeeper, this.peersZNode); List listOfOps = new ArrayList<>(2); - ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(getPeerNode(id), - ReplicationPeerConfigUtil.toByteArray(peerConfig)); - // b/w PeerWatcher and ReplicationZookeeper#add method to create the - // peer-state znode. This happens while adding a peer - // The peer state data is set as "ENABLED" by default. - ZKUtilOp op2 = ZKUtilOp.createAndFailSilent(getPeerStateNode(id), ENABLED_ZNODE_BYTES); + ZKUtilOp op1 = + ZKUtilOp.createAndFailSilent(getPeerNode(id), + ReplicationPeerConfigUtil.toByteArray(peerConfig)); + ZKUtilOp op2 = + ZKUtilOp.createAndFailSilent(getPeerStateNode(id), enabled ? ENABLED_ZNODE_BYTES + : DISABLED_ZNODE_BYTES); listOfOps.add(op1); listOfOps.add(op2); ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false); - // A peer is enabled by default } catch (KeeperException e) { - throw new ReplicationException("Could not add peer with id=" + id - + ", peerConfif=>" + peerConfig, e); + throw new ReplicationException("Could not add peer with id=" + id + ", peerConfif=>" + + peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"), e); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index cad77e5..f09000f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -3326,14 +3326,14 @@ public class HMaster extends HRegionServer implements MasterServices { } @Override - public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig) + public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) throws ReplicationException, IOException { if (cpHost != null) { cpHost.preAddReplicationPeer(peerId, peerConfig); } LOG.info(getClientIdAuditPrefix() + " creating replication peer, id=" + peerId + ", config=" - + peerConfig); - this.replicationManager.addReplicationPeer(peerId, peerConfig); + + peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED")); + this.replicationManager.addReplicationPeer(peerId, peerConfig, enabled); if (cpHost != null) { cpHost.postAddReplicationPeer(peerId, peerConfig); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index 2e3df2d..2fd60af 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -83,7 +83,6 @@ import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; - import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController; import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; @@ -267,6 +266,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListR import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationState; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; @@ -1809,7 +1809,8 @@ public class MasterRpcServices extends RSRpcServices AddReplicationPeerRequest request) throws ServiceException { try { master.addReplicationPeer(request.getPeerId(), - ReplicationPeerConfigUtil.convert(request.getPeerConfig())); + ReplicationPeerConfigUtil.convert(request.getPeerConfig()), request.getPeerState() + .getState().equals(ReplicationState.State.ENABLED)); return AddReplicationPeerResponse.newBuilder().build(); } catch (ReplicationException | IOException e) { throw new ServiceException(e); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index 57f7df5..6b3c212 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -428,8 +428,9 @@ public interface MasterServices extends Server { * Add a new replication peer for replicating data to slave cluster * @param peerId a short name that identifies the peer * @param peerConfig configuration for the replication slave cluster + * @param enabled peer state, true if ENABLED and false if DISABLED */ - void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig) + void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) throws ReplicationException, IOException; /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java index df94ffe..3224dd3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; import org.apache.hadoop.hbase.TableName; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; @@ -40,6 +39,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.yetus.audience.InterfaceAudience; /** * Manages and performs all replication admin operations. @@ -69,12 +69,12 @@ public class ReplicationManager { } } - public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig) + public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) throws ReplicationException, IOException { checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(), peerConfig.getTableCFsMap()); checkConfiguredWALEntryFilters(peerConfig); - replicationPeers.registerPeer(peerId, peerConfig); + replicationPeers.registerPeer(peerId, peerConfig, enabled); replicationPeers.peerConnected(peerId); } @@ -150,8 +150,6 @@ public class ReplicationManager { "Table-cfs config conflict with namespaces config in peer"); } } - - } private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index 62951ef..647b3b8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -25,6 +25,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -34,10 +35,8 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; -import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationQueues; @@ -143,6 +142,21 @@ public class TestReplicationAdmin { assertEquals(0, admin.getPeersCount()); } + @Test + public void testAddPeerWithState() throws Exception { + ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); + rpc1.setClusterKey(KEY_ONE); + hbaseAdmin.addReplicationPeer(ID_ONE, rpc1, true); + assertTrue(hbaseAdmin.listReplicationPeers(Pattern.compile(ID_ONE)).get(0).isEnabled()); + hbaseAdmin.removeReplicationPeer(ID_ONE); + + ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); + rpc2.setClusterKey(KEY_SECOND); + hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2, false); + assertFalse(hbaseAdmin.listReplicationPeers(Pattern.compile(ID_SECOND)).get(0).isEnabled()); + hbaseAdmin.removeReplicationPeer(ID_SECOND); + } + /** * Tests that the peer configuration used by ReplicationAdmin contains all * the peer's properties. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java index 85d2b0b..d5e843a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java @@ -393,7 +393,7 @@ public class MockNoopMasterServices implements MasterServices, Server { } @Override - public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig) + public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) throws ReplicationException { } @@ -460,4 +460,4 @@ public class MockNoopMasterServices implements MasterServices, Server { public FileSystem getFileSystem() { return null; } -} +} \ No newline at end of file diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb index ceb728e..1ca7e9d 100644 --- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb +++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb @@ -65,6 +65,7 @@ module Hbase data = args.fetch(DATA, nil) table_cfs = args.fetch(TABLE_CFS, nil) namespaces = args.fetch(NAMESPACES, nil) + peer_state = args.fetch(STATE, nil) # Create and populate a ReplicationPeerConfig replication_peer_config = ReplicationPeerConfig.new @@ -102,7 +103,14 @@ module Hbase end replication_peer_config.set_table_cfs_map(map) end - @admin.addReplicationPeer(id, replication_peer_config) + + enabled = true + unless peer_state.nil? + if peer_state == "DISABLED" + enabled = false + end + end + @admin.addReplicationPeer(id, replication_peer_config, enabled) else raise(ArgumentError, 'args must be a Hash') end diff --git a/hbase-shell/src/main/ruby/hbase_constants.rb b/hbase-shell/src/main/ruby/hbase_constants.rb index ebaae78..28484cb 100644 --- a/hbase-shell/src/main/ruby/hbase_constants.rb +++ b/hbase-shell/src/main/ruby/hbase_constants.rb @@ -79,6 +79,7 @@ module HBaseConstants CLUSTER_KEY = 'CLUSTER_KEY'.freeze TABLE_CFS = 'TABLE_CFS'.freeze NAMESPACES = 'NAMESPACES'.freeze + STATE = 'STATE'.freeze CONFIG = 'CONFIG'.freeze DATA = 'DATA'.freeze SERVER_NAME = 'SERVER_NAME'.freeze diff --git a/hbase-shell/src/main/ruby/shell/commands/add_peer.rb b/hbase-shell/src/main/ruby/shell/commands/add_peer.rb index edaa386..6b38166 100644 --- a/hbase-shell/src/main/ruby/shell/commands/add_peer.rb +++ b/hbase-shell/src/main/ruby/shell/commands/add_peer.rb @@ -40,6 +40,8 @@ then you can't set this namespace's tables in the peer config again. Examples: hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase" + hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase", STATE => "ENABLED" + hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase", STATE => "DISABLED" hbase> add_peer '2', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod", TABLE_CFS => { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", "cf2"] } hbase> add_peer '2', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod", diff --git a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb index 0d92287..75f3c04 100644 --- a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb +++ b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb @@ -93,7 +93,7 @@ module Hbase command(:remove_peer, @peer_id) end - define_test "add_peer: single zk cluster key - peer config" do + define_test "add_peer: single zk cluster key with enabled/disabled state" do cluster_key = "server1.cie.com:2181:/hbase" args = { CLUSTER_KEY => cluster_key } @@ -101,9 +101,26 @@ module Hbase assert_equal(1, command(:list_peers).length) assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) - assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey) + assert_equal(true, command(:list_peers).get(0).isEnabled) + + command(:remove_peer, @peer_id) + + enable_args = { CLUSTER_KEY => cluster_key, STATE => 'ENABLED' } + command(:add_peer, @peer_id, enable_args) + + assert_equal(1, command(:list_peers).length) + assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) + assert_equal(true, command(:list_peers).get(0).isEnabled) + + command(:remove_peer, @peer_id) + + disable_args = { CLUSTER_KEY => cluster_key, STATE => 'DISABLED' } + command(:add_peer, @peer_id, disable_args) + + assert_equal(1, command(:list_peers).length) + assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) + assert_equal(false, command(:list_peers).get(0).isEnabled) - # cleanup for future tests command(:remove_peer, @peer_id) end -- 1.9.1