Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (revision 1513284) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (working copy) @@ -175,6 +175,19 @@ return this.replicationPeers.getAllPeerClusterKeys(); } + /** + * Get the state of the specified peer cluster + * @param id String format of the Short that identifies the peer, an IllegalArgumentException + * is thrown is if doesn't exist + * @return true if replication is enabled to that peer, false if it isn't + */ + public boolean getPeerState(String id) throws IOException, KeeperException { + if (! this.replicationPeers.getConnectedPeers().contains(id)) { + this.replicationPeers.connectToPeer(id); + } + return this.replicationPeers.getStatusOfConnectedPeer(id); + } + @Override public void close() throws IOException { if (this.connection != null) { Index: hbase-server/src/main/ruby/hbase/replication_admin.rb =================================================================== --- hbase-server/src/main/ruby/hbase/replication_admin.rb (revision 1513284) +++ hbase-server/src/main/ruby/hbase/replication_admin.rb (working copy) @@ -58,7 +58,7 @@ #---------------------------------------------------------------------------------------------- # Get peer cluster state def get_peer_state(id) - @replication_admin.getPeerState(id) + @replication_admin.getPeerState(id) ? "ENABLED" : "DISABLED" end #---------------------------------------------------------------------------------------------- Index: hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java (revision 1513284) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java (working copy) @@ -27,6 +27,8 @@ import static org.junit.Assert.fail; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; /** * Unit testing of ReplicationAdmin @@ -86,13 +88,34 @@ admin.addPeer(ID_SECOND, KEY_SECOND); } catch (IllegalStateException iae) { fail(); - // OK! } assertEquals(2, admin.getPeersCount()); // Remove the first peer we added admin.removePeer(ID_ONE); assertEquals(1, admin.getPeersCount()); + admin.removePeer(ID_SECOND); + assertEquals(0, admin.getPeersCount()); } + /** + * basic checks that when we add a peer that it is enabled, and that we can disable + * @throws Exception + */ + @Test + public void testEnableDisable() throws Exception { + admin.addPeer(ID_ONE, KEY_ONE); + assertEquals(1, admin.getPeersCount()); + assertTrue(admin.getPeerState(ID_ONE)); + admin.disablePeer(ID_ONE); + + assertFalse(admin.getPeerState(ID_ONE)); + try { + admin.getPeerState(ID_SECOND); + } catch (IllegalArgumentException iae) { + // OK! + } + admin.removePeer(ID_ONE); + } + } Index: hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java (revision 1513284) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java (working copy) @@ -120,15 +120,34 @@ @Override public void enablePeer(String id) throws IOException { changePeerState(id, ZooKeeperProtos.ReplicationState.State.ENABLED); + this.waitForStateChange(id, true); LOG.info("peer " + id + " is enabled"); } @Override public void disablePeer(String id) throws IOException { changePeerState(id, ZooKeeperProtos.ReplicationState.State.DISABLED); + this.waitForStateChange(id, false); LOG.info("peer " + id + " is disabled"); } + /** + * Right now enable/disablePeer() cannot guarantee that the change is immediately + * visible to ourselves, so we have to wait it out... + */ + private void waitForStateChange(String id, boolean newState) throws IOException { + for (int i = 0; i < 10; i++) { + try { + if (this.getStatusOfConnectedPeer(id) != newState) { + Thread.sleep(50); + } else { + break; + } + } catch (InterruptedException e) { + } + } + } + @Override public boolean getStatusOfConnectedPeer(String id) { if (!this.peerClusters.containsKey(id)) {