From 2d75ee7d4537aadfaeb3c677235a0ef8b894d2a6 Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Sat, 18 Nov 2017 08:48:40 +0800 Subject: [PATCH] HBASE-19293 Support add a disabled state replication peer directly --- .../java/org/apache/hadoop/hbase/client/Admin.java | 89 +++++++++------------- .../org/apache/hadoop/hbase/client/AsyncAdmin.java | 13 +++- .../hadoop/hbase/client/AsyncHBaseAdmin.java | 6 +- .../org/apache/hadoop/hbase/client/HBaseAdmin.java | 10 +-- .../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 6 +- .../hbase/shaded/protobuf/RequestConverter.java | 7 +- .../src/main/protobuf/Replication.proto | 1 + .../hadoop/hbase/replication/ReplicationPeers.java | 13 +++- .../hbase/replication/ReplicationPeersZKImpl.java | 19 +++-- .../org/apache/hadoop/hbase/master/HMaster.java | 6 +- .../hadoop/hbase/master/MasterRpcServices.java | 5 +- .../apache/hadoop/hbase/master/MasterServices.java | 3 +- .../master/replication/ReplicationManager.java | 6 +- .../client/replication/TestReplicationAdmin.java | 16 ++++ .../hbase/master/MockNoopMasterServices.java | 4 +- .../src/main/ruby/hbase/replication_admin.rb | 8 +- hbase-shell/src/main/ruby/hbase_constants.rb | 1 + .../src/main/ruby/shell/commands/add_peer.rb | 4 + .../src/test/ruby/hbase/replication_admin_test.rb | 23 +++++- src/main/asciidoc/_chapters/ops_mgt.adoc | 1 + 20 files changed, 147 insertions(+), 94 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java index fd02a48..6f1190e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.client; import java.io.Closeable; import java.io.IOException; -import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; import java.util.List; @@ -2466,111 +2465,97 @@ public interface Admin extends Abortable, Closeable { * Add a new replication peer for replicating data to slave cluster. * @param peerId a short name that identifies the peer * @param peerConfig configuration for the replication slave cluster - * @throws IOException + * @throws IOException if a remote or network exception occurs */ default void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig) throws IOException { + addReplicationPeer(peerId, peerConfig, true); } /** + * Add a new replication peer for replicating data to slave cluster. + * @param peerId a short name that identifies the peer + * @param peerConfig configuration for the replication slave cluster + * @param enabled peer state, true if ENABLED and false if DISABLED + * @throws IOException if a remote or network exception occurs + */ + void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) + throws IOException; + + /** * Remove a peer and stop the replication. * @param peerId a short name that identifies the peer - * @throws IOException + * @throws IOException if a remote or network exception occurs */ - default void removeReplicationPeer(String peerId) throws IOException { - } + void removeReplicationPeer(String peerId) throws IOException; /** * Restart the replication stream to the specified peer. * @param peerId a short name that identifies the peer - * @throws IOException + * @throws IOException if a remote or network exception occurs */ - default void enableReplicationPeer(String peerId) throws IOException { - } + void enableReplicationPeer(String peerId) throws IOException; /** * Stop the replication stream to the specified peer. * @param peerId a short name that identifies the peer - * @throws IOException + * @throws IOException if a remote or network exception occurs */ - default void disableReplicationPeer(String peerId) throws IOException { - } + void disableReplicationPeer(String peerId) throws IOException; /** * Returns the configured ReplicationPeerConfig for the specified peer. * @param peerId a short name that identifies the peer * @return ReplicationPeerConfig for the peer - * @throws IOException + * @throws IOException if a remote or network exception occurs */ - default ReplicationPeerConfig getReplicationPeerConfig(String peerId) throws IOException { - return new ReplicationPeerConfig(); - } + ReplicationPeerConfig getReplicationPeerConfig(String peerId) throws IOException; /** * Update the peerConfig for the specified peer. * @param peerId a short name that identifies the peer * @param peerConfig new config for the peer - * @throws IOException + * @throws IOException if a remote or network exception occurs */ - default void updateReplicationPeerConfig(String peerId, - ReplicationPeerConfig peerConfig) throws IOException { - } + void updateReplicationPeerConfig(String peerId, + ReplicationPeerConfig peerConfig) throws IOException; /** * Append the replicable table column family config from the specified peer. * @param id a short that identifies the cluster * @param tableCfs A map from tableName to column family names - * @throws ReplicationException - * @throws IOException + * @throws ReplicationException if tableCfs has conflict with existing config + * @throws IOException if a remote or network exception occurs */ - default void appendReplicationPeerTableCFs(String id, - Map> tableCfs) throws ReplicationException, - IOException { - } + void appendReplicationPeerTableCFs(String id, + Map> tableCfs) + throws ReplicationException, IOException; /** * Remove some table-cfs from config of the specified peer. * @param id a short name that identifies the cluster * @param tableCfs A map from tableName to column family names - * @throws ReplicationException - * @throws IOException - */ - default void removeReplicationPeerTableCFs(String id, - Map> tableCfs) throws ReplicationException, - IOException { - } - - /** - * Return a list of replication peers. - * @return a list of replication peers description - * @throws IOException + * @throws ReplicationException if tableCfs has conflict with existing config + * @throws IOException if a remote or network exception occurs */ - default List listReplicationPeers() throws IOException { - return new ArrayList<>(); - } + void removeReplicationPeerTableCFs(String id, + Map> tableCfs) + throws ReplicationException, IOException; /** * Return a list of replication peers. - * @param regex The regular expression to match peer id * @return a list of replication peers description - * @throws IOException - * @deprecated since 2.0 version and will be removed in 3.0 version. Use - * {@link #listReplicationPeers(Pattern)} instead. + * @throws IOException if a remote or network exception occurs */ - @Deprecated - default List listReplicationPeers(String regex) throws IOException { - return new ArrayList<>(); - } + List listReplicationPeers() throws IOException; /** * Return a list of replication peers. * @param pattern The compiled regular expression to match peer id * @return a list of replication peers description - * @throws IOException + * @throws IOException if a remote or network exception occurs */ - default List listReplicationPeers(Pattern pattern) throws IOException { - return new ArrayList<>(); - } + List listReplicationPeers(Pattern pattern) throws IOException; /** * Mark region server(s) as decommissioned to prevent additional regions from getting diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index 0e0a673..abd8e5a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -499,8 +499,19 @@ public interface AsyncAdmin { * @param peerId a short name that identifies the peer * @param peerConfig configuration for the replication slave cluster */ + default CompletableFuture addReplicationPeer(String peerId, + ReplicationPeerConfig peerConfig) { + return addReplicationPeer(peerId, peerConfig, true); + } + + /** + * Add a new replication peer for replicating data to slave cluster + * @param peerId a short name that identifies the peer + * @param peerConfig configuration for the replication slave cluster + * @param enabled peer state, true if ENABLED and false if DISABLED + */ CompletableFuture addReplicationPeer(String peerId, - ReplicationPeerConfig peerConfig); + ReplicationPeerConfig peerConfig, boolean enabled); /** * Remove a peer and stop the replication diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index 3fe7951..fb16fce 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -375,9 +375,9 @@ class AsyncHBaseAdmin implements AsyncAdmin { } @Override - public CompletableFuture - addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig) { - return wrap(rawAdmin.addReplicationPeer(peerId, peerConfig)); + public CompletableFuture addReplicationPeer(String peerId, + ReplicationPeerConfig peerConfig, boolean enabled) { + return wrap(rawAdmin.addReplicationPeer(peerId, peerConfig, enabled)); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 7669eb2..05157dd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -103,7 +103,6 @@ import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.util.StringUtils; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; - import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; @@ -3850,13 +3849,13 @@ public class HBaseAdmin implements Admin { } @Override - public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig) + public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) throws IOException { executeCallable(new MasterCallable(getConnection(), getRpcControllerFactory()) { @Override protected Void rpcCall() throws Exception { master.addReplicationPeer(getRpcController(), - RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig)); + RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled)); return null; } }); @@ -3954,11 +3953,6 @@ public class HBaseAdmin implements Admin { } @Override - public List listReplicationPeers(String regex) throws IOException { - return listReplicationPeers(Pattern.compile(regex)); - } - - @Override public List listReplicationPeers(Pattern pattern) throws IOException { return executeCallable(new MasterCallable>(getConnection(), diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index fe1d685..f56e7ca 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -1501,14 +1501,14 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture addReplicationPeer(String peerId, - ReplicationPeerConfig peerConfig) { + ReplicationPeerConfig peerConfig, boolean enabled) { return this . newMasterCaller() .action( (controller, stub) -> this . call(controller, stub, - RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig), (s, c, req, - done) -> s.addReplicationPeer(c, req, done), (resp) -> null)).call(); + RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled), (s, + c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> null)).call(); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index 9eff114..4fdc87d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -1618,10 +1618,15 @@ public final class RequestConverter { } public static ReplicationProtos.AddReplicationPeerRequest buildAddReplicationPeerRequest( - String peerId, ReplicationPeerConfig peerConfig) { + String peerId, ReplicationPeerConfig peerConfig, boolean enabled) { AddReplicationPeerRequest.Builder builder = AddReplicationPeerRequest.newBuilder(); builder.setPeerId(peerId); builder.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig)); + ReplicationProtos.ReplicationState.Builder stateBuilder = + ReplicationProtos.ReplicationState.newBuilder(); + stateBuilder.setState(enabled ? ReplicationProtos.ReplicationState.State.ENABLED + : ReplicationProtos.ReplicationState.State.DISABLED); + builder.setPeerState(stateBuilder.build()); return builder.build(); } diff --git a/hbase-protocol-shaded/src/main/protobuf/Replication.proto b/hbase-protocol-shaded/src/main/protobuf/Replication.proto index 7e78144..88efa00 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Replication.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Replication.proto @@ -77,6 +77,7 @@ message ReplicationHLogPosition { message AddReplicationPeerRequest { required string peer_id = 1; required ReplicationPeer peer_config = 2; + required ReplicationState peer_state = 3; } message AddReplicationPeerResponse { diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java index 3e2e1ad..10936bf 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java @@ -51,7 +51,18 @@ public interface ReplicationPeers { * @param peerId a short that identifies the cluster * @param peerConfig configuration for the replication slave cluster */ - void registerPeer(String peerId, ReplicationPeerConfig peerConfig) + default void registerPeer(String peerId, ReplicationPeerConfig peerConfig) + throws ReplicationException { + registerPeer(peerId, peerConfig, true); + } + + /** + * Add a new remote slave cluster for replication. + * @param peerId a short that identifies the cluster + * @param peerConfig configuration for the replication slave cluster + * @param enabled peer state, true if ENABLED and false if DISABLED + */ + void registerPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) throws ReplicationException; /** diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index 9c4e3fe..b7564f4 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -105,7 +105,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re } @Override - public void registerPeer(String id, ReplicationPeerConfig peerConfig) + public void registerPeer(String id, ReplicationPeerConfig peerConfig, boolean enabled) throws ReplicationException { try { if (peerExists(id)) { @@ -130,19 +130,18 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re ZKUtil.createWithParents(this.zookeeper, this.peersZNode); List listOfOps = new ArrayList<>(2); - ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(getPeerNode(id), - ReplicationPeerConfigUtil.toByteArray(peerConfig)); - // b/w PeerWatcher and ReplicationZookeeper#add method to create the - // peer-state znode. This happens while adding a peer - // The peer state data is set as "ENABLED" by default. - ZKUtilOp op2 = ZKUtilOp.createAndFailSilent(getPeerStateNode(id), ENABLED_ZNODE_BYTES); + ZKUtilOp op1 = + ZKUtilOp.createAndFailSilent(getPeerNode(id), + ReplicationPeerConfigUtil.toByteArray(peerConfig)); + ZKUtilOp op2 = + ZKUtilOp.createAndFailSilent(getPeerStateNode(id), enabled ? ENABLED_ZNODE_BYTES + : DISABLED_ZNODE_BYTES); listOfOps.add(op1); listOfOps.add(op2); ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false); - // A peer is enabled by default } catch (KeeperException e) { - throw new ReplicationException("Could not add peer with id=" + id - + ", peerConfif=>" + peerConfig, e); + throw new ReplicationException("Could not add peer with id=" + id + ", peerConfif=>" + + peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"), e); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 1967296..97982b9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -3322,14 +3322,14 @@ public class HMaster extends HRegionServer implements MasterServices { } @Override - public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig) + public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) throws ReplicationException, IOException { if (cpHost != null) { cpHost.preAddReplicationPeer(peerId, peerConfig); } LOG.info(getClientIdAuditPrefix() + " creating replication peer, id=" + peerId + ", config=" - + peerConfig); - this.replicationManager.addReplicationPeer(peerId, peerConfig); + + peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED")); + this.replicationManager.addReplicationPeer(peerId, peerConfig, enabled); if (cpHost != null) { cpHost.postAddReplicationPeer(peerId, peerConfig); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index 2e3df2d..2fd60af 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -83,7 +83,6 @@ import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; - import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController; import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; @@ -267,6 +266,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListR import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationState; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; @@ -1809,7 +1809,8 @@ public class MasterRpcServices extends RSRpcServices AddReplicationPeerRequest request) throws ServiceException { try { master.addReplicationPeer(request.getPeerId(), - ReplicationPeerConfigUtil.convert(request.getPeerConfig())); + ReplicationPeerConfigUtil.convert(request.getPeerConfig()), request.getPeerState() + .getState().equals(ReplicationState.State.ENABLED)); return AddReplicationPeerResponse.newBuilder().build(); } catch (ReplicationException | IOException e) { throw new ServiceException(e); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index 57f7df5..6b3c212 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -428,8 +428,9 @@ public interface MasterServices extends Server { * Add a new replication peer for replicating data to slave cluster * @param peerId a short name that identifies the peer * @param peerConfig configuration for the replication slave cluster + * @param enabled peer state, true if ENABLED and false if DISABLED */ - void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig) + void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) throws ReplicationException, IOException; /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java index 7cfaefd..3615992 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java @@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; @@ -40,6 +39,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; +import org.apache.yetus.audience.InterfaceAudience; /** * Manages and performs all replication admin operations. @@ -69,12 +69,12 @@ public class ReplicationManager { } } - public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig) + public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) throws ReplicationException, IOException { checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(), peerConfig.getTableCFsMap()); checkConfiguredWALEntryFilters(peerConfig); - replicationPeers.registerPeer(peerId, peerConfig); + replicationPeers.registerPeer(peerId, peerConfig, enabled); replicationPeers.peerConnected(peerId); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index c98a02c..036706a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -25,6 +25,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -141,6 +142,21 @@ public class TestReplicationAdmin { assertEquals(0, admin.getPeersCount()); } + @Test + public void testAddPeerWithState() throws Exception { + ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); + rpc1.setClusterKey(KEY_ONE); + hbaseAdmin.addReplicationPeer(ID_ONE, rpc1, true); + assertTrue(hbaseAdmin.listReplicationPeers(Pattern.compile(ID_ONE)).get(0).isEnabled()); + hbaseAdmin.removeReplicationPeer(ID_ONE); + + ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); + rpc2.setClusterKey(KEY_SECOND); + hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2, false); + assertFalse(hbaseAdmin.listReplicationPeers(Pattern.compile(ID_SECOND)).get(0).isEnabled()); + hbaseAdmin.removeReplicationPeer(ID_SECOND); + } + /** * Tests that the peer configuration used by ReplicationAdmin contains all * the peer's properties. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java index dadec1f..a752e9b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java @@ -393,7 +393,7 @@ public class MockNoopMasterServices implements MasterServices, Server { } @Override - public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig) + public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) throws ReplicationException { } @@ -460,4 +460,4 @@ public class MockNoopMasterServices implements MasterServices, Server { public FileSystem getFileSystem() { return null; } -} +} \ No newline at end of file diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb index ceb728e..3f64356 100644 --- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb +++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb @@ -65,6 +65,7 @@ module Hbase data = args.fetch(DATA, nil) table_cfs = args.fetch(TABLE_CFS, nil) namespaces = args.fetch(NAMESPACES, nil) + peer_state = args.fetch(STATE, nil) # Create and populate a ReplicationPeerConfig replication_peer_config = ReplicationPeerConfig.new @@ -102,7 +103,12 @@ module Hbase end replication_peer_config.set_table_cfs_map(map) end - @admin.addReplicationPeer(id, replication_peer_config) + + enabled = true + unless peer_state.nil? + enabled = false if peer_state == 'DISABLED' + end + @admin.addReplicationPeer(id, replication_peer_config, enabled) else raise(ArgumentError, 'args must be a Hash') end diff --git a/hbase-shell/src/main/ruby/hbase_constants.rb b/hbase-shell/src/main/ruby/hbase_constants.rb index ebaae78..28484cb 100644 --- a/hbase-shell/src/main/ruby/hbase_constants.rb +++ b/hbase-shell/src/main/ruby/hbase_constants.rb @@ -79,6 +79,7 @@ module HBaseConstants CLUSTER_KEY = 'CLUSTER_KEY'.freeze TABLE_CFS = 'TABLE_CFS'.freeze NAMESPACES = 'NAMESPACES'.freeze + STATE = 'STATE'.freeze CONFIG = 'CONFIG'.freeze DATA = 'DATA'.freeze SERVER_NAME = 'SERVER_NAME'.freeze diff --git a/hbase-shell/src/main/ruby/shell/commands/add_peer.rb b/hbase-shell/src/main/ruby/shell/commands/add_peer.rb index edaa386..eb2da83 100644 --- a/hbase-shell/src/main/ruby/shell/commands/add_peer.rb +++ b/hbase-shell/src/main/ruby/shell/commands/add_peer.rb @@ -28,6 +28,8 @@ must be specified to identify the peer. For a HBase cluster peer, a cluster key must be provided and is composed like this: hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent This gives a full path for HBase to connect to another HBase cluster. +An optional parameter for state identifies the replication peer's state is enabled or disabled. +And the default state is enabled. An optional parameter for namespaces identifies which namespace's tables will be replicated to the peer cluster. An optional parameter for table column families identifies which tables and/or column families @@ -40,6 +42,8 @@ then you can't set this namespace's tables in the peer config again. Examples: hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase" + hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase", STATE => "ENABLED" + hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase", STATE => "DISABLED" hbase> add_peer '2', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod", TABLE_CFS => { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", "cf2"] } hbase> add_peer '2', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod", diff --git a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb index 0d92287..75f3c04 100644 --- a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb +++ b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb @@ -93,7 +93,7 @@ module Hbase command(:remove_peer, @peer_id) end - define_test "add_peer: single zk cluster key - peer config" do + define_test "add_peer: single zk cluster key with enabled/disabled state" do cluster_key = "server1.cie.com:2181:/hbase" args = { CLUSTER_KEY => cluster_key } @@ -101,9 +101,26 @@ module Hbase assert_equal(1, command(:list_peers).length) assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) - assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey) + assert_equal(true, command(:list_peers).get(0).isEnabled) + + command(:remove_peer, @peer_id) + + enable_args = { CLUSTER_KEY => cluster_key, STATE => 'ENABLED' } + command(:add_peer, @peer_id, enable_args) + + assert_equal(1, command(:list_peers).length) + assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) + assert_equal(true, command(:list_peers).get(0).isEnabled) + + command(:remove_peer, @peer_id) + + disable_args = { CLUSTER_KEY => cluster_key, STATE => 'DISABLED' } + command(:add_peer, @peer_id, disable_args) + + assert_equal(1, command(:list_peers).length) + assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) + assert_equal(false, command(:list_peers).get(0).isEnabled) - # cleanup for future tests command(:remove_peer, @peer_id) end diff --git a/src/main/asciidoc/_chapters/ops_mgt.adoc b/src/main/asciidoc/_chapters/ops_mgt.adoc index b6babd6..2bb2510 100644 --- a/src/main/asciidoc/_chapters/ops_mgt.adoc +++ b/src/main/asciidoc/_chapters/ops_mgt.adoc @@ -1415,6 +1415,7 @@ add_peer :: Adds a replication relationship between two clusters. + * ID -- a unique string, which must not contain a hyphen. * CLUSTER_KEY: composed using the following template, with appropriate place-holders: `hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent` + * STATE(optional): ENABLED or DISABLED, default value is ENABLED list_peers:: list all replication relationships known by this cluster enable_peer :: Enable a previously-disabled replication relationship -- 1.9.1