Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (revision 1513284) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (working copy) @@ -175,6 +175,16 @@ return this.replicationPeers.getAllPeerClusterKeys(); } + /** + * Get the state of the specified peer cluster + * @param id String format of the Short that identifies the peer, an IllegalArgumentException + * is thrown if it doesn't exist + * @return true if replication is enabled to that peer, false if it isn't + */ + public boolean getPeerState(String id) throws IOException { + return this.replicationPeers.getStatusOfPeerFromBackingStore(id); + } + @Override public void close() throws IOException { if (this.connection != null) { Index: hbase-server/src/main/ruby/hbase/replication_admin.rb =================================================================== --- hbase-server/src/main/ruby/hbase/replication_admin.rb (revision 1513284) +++ hbase-server/src/main/ruby/hbase/replication_admin.rb (working copy) @@ -58,7 +58,7 @@ #---------------------------------------------------------------------------------------------- # Get peer cluster state def get_peer_state(id) - @replication_admin.getPeerState(id) + @replication_admin.getPeerState(id) ? "ENABLED" : "DISABLED" end #---------------------------------------------------------------------------------------------- Index: hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java (revision 1513284) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java (working copy) @@ -27,6 +27,8 @@ import static org.junit.Assert.fail; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; /** * Unit testing of ReplicationAdmin @@ -86,13 +88,34 @@ admin.addPeer(ID_SECOND, KEY_SECOND); } catch (IllegalStateException iae) { fail(); - // OK! } assertEquals(2, admin.getPeersCount()); // Remove the first peer we added admin.removePeer(ID_ONE); assertEquals(1, admin.getPeersCount()); + admin.removePeer(ID_SECOND); + assertEquals(0, admin.getPeersCount()); } + /** + * basic checks that when we add a peer that it is enabled, and that we can disable + * @throws Exception + */ + @Test + public void testEnableDisable() throws Exception { + admin.addPeer(ID_ONE, KEY_ONE); + assertEquals(1, admin.getPeersCount()); + assertTrue(admin.getPeerState(ID_ONE)); + admin.disablePeer(ID_ONE); + + assertFalse(admin.getPeerState(ID_ONE)); + try { + admin.getPeerState(ID_SECOND); + } catch (IllegalArgumentException iae) { + // OK! + } + admin.removePeer(ID_ONE); + } + } Index: hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java (revision 1513284) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java (working copy) @@ -138,6 +138,21 @@ } @Override + public boolean getStatusOfPeerFromBackingStore(String id) throws IOException { + if (!this.getAllPeerIds().contains(id)) { + throw new IllegalArgumentException("peer " + id + " doesn't exist"); + } + String peerStateZNode = getPeerStateNode(id); + try { + return ReplicationPeer.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode)); + } catch (KeeperException e) { + throw new IOException(e); + } catch (DeserializationException e) { + throw new IOException(e); + } + } + + @Override public boolean connectToPeer(String peerId) throws IOException, KeeperException { if (peerClusters == null) { return false; Index: hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java (revision 1513284) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java (working copy) @@ -200,11 +200,12 @@ } /** - * @param bytes + * Parse the raw data from ZK to get a peer's state + * @param bytes raw ZK data * @return True if the passed in bytes are those of a pb serialized ENABLED state. * @throws DeserializationException */ - private static boolean isStateEnabled(final byte[] bytes) throws DeserializationException { + public static boolean isStateEnabled(final byte[] bytes) throws DeserializationException { ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes); return ZooKeeperProtos.ReplicationState.State.ENABLED == state; } Index: hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java (revision 1513284) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java (working copy) @@ -76,12 +76,24 @@ /** * Get the replication status for the specified connected remote slave cluster. + * The value might be read from cache, so it is recommended to + * use {@link #getStatusOfPeerFromBackingStore(String)} + * if reading the state after enabling or disabling it. * @param peerId a short that identifies the cluster * @return true if replication is enabled, false otherwise. */ boolean getStatusOfConnectedPeer(String peerId); /** + * Get the replication status for the specified remote slave cluster, which doesn't + * have to be connected. The state is read directly from the backing store. + * @param peerId a short that identifies the cluster + * @return true if replication is enabled, false otherwise. + * @throws IOException Throws if there's an error contacting the store + */ + boolean getStatusOfPeerFromBackingStore(String peerId) throws IOException; + + /** * Get a set of all connected remote slave clusters. * @return set of peer ids */ Index: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java (revision 1513284) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java (working copy) @@ -231,6 +231,10 @@ } protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception { + // we can first check if the value was changed in the store, if it wasn't then fail right away + if (status != rp.getStatusOfPeerFromBackingStore(peerId)) { + fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK"); + } while (true) { if (status == rp.getStatusOfConnectedPeer(peerId)) { return;