Index: src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java (revision 1236057) +++ src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java (working copy) @@ -26,7 +26,14 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; @@ -443,8 +450,57 @@ } /** + * Test disable/enable replication, trying to insert, make sure nothing's + * replicated, enable it, the insert should be replicated + * + * @throws Exception + */ + @Test(timeout = 300000) + public void testDisableEnable() throws Exception { + + // Test disabling replication + admin.disablePeer("2"); + + Put put = new Put(Bytes.toBytes("disable enable")); + put.add(famName, row, row); + htable1.put(put); + + Get get = new Get(Bytes.toBytes("disable enable")); + for (int i = 0; i < NB_RETRIES; i++) { + if (i == NB_RETRIES - 1) { + break; + } + Result res = htable2.get(get); + if (res.size() >= 1) { + fail("Replication wasn't disabled"); + } else { + LOG.info("Row not replicated, let's wait a bit more..."); + Thread.sleep(SLEEP_TIME); + } + } + + // Test enable replication + admin.enablePeer("2"); + + for (int i = 0; i < NB_RETRIES; i++) { + if (i == NB_RETRIES - 1) { + fail("Waited too much time for put replication"); + } + Result res = htable2.get(get); + if (res.size() == 0) { + LOG.info("Row not available"); + Thread.sleep(SLEEP_TIME); + } else { + assertArrayEquals(res.value(), row); + break; + } + } + } + + /** * Integration test for TestReplicationAdmin, removes and re-add a peer * cluster + * * @throws Exception */ @Test(timeout=300000) Index: src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (revision 1236057) +++ src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (working copy) @@ -23,7 +23,6 @@ import java.io.IOException; import java.util.Map; -import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HConnection; @@ -114,16 +113,16 @@ * Restart the replication stream to the specified peer. * @param id a short that identifies the cluster */ - public void enablePeer(String id) { - throw new NotImplementedException("Not implemented"); + public void enablePeer(String id) throws IOException { + this.replicationZk.enablePeer(id); } /** * Stop the replication stream to the specified peer. * @param id a short that identifies the cluster */ - public void disablePeer(String id) { - throw new NotImplementedException("Not implemented"); + public void disablePeer(String id) throws IOException { + this.replicationZk.disablePeer(id); } /** Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (revision 1236057) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (working copy) @@ -299,6 +299,13 @@ boolean gotIOE = false; currentNbEntries = 0; try { + if (!zkHelper.peerEnabled(peerId)) { + if (sleepForRetries("peer " + peerId + " is disalbed", + sleepMultiplier)) { + sleepMultiplier++; + } + continue; + } if(readAllEntriesToReplicateOrNextFile()) { continue; } Index: src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (revision 1236057) +++ src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (working copy) @@ -79,6 +79,8 @@ LogFactory.getLog(ReplicationZookeeper.class); // Name of znode we use to lock when failover private final static String RS_LOCK_ZNODE = "lock"; + // Value of the znode which indicates that the peer is disabled + private static final byte[] PEER_DISABLED = Bytes.toBytes("disabled"); // Our handle on zookeeper private final ZooKeeperWatcher zookeeper; // Map of peer clusters keyed by their id @@ -93,6 +95,8 @@ private String rsServerNameZnode; // Name node if the replicationState znode private String replicationStateNodeName; + // Name of a node which indicate whether a peer is enabled or not + private String peerStateNodeName; private final Configuration conf; // Is this cluster replicating at the moment? private AtomicBoolean replicating; @@ -147,6 +151,8 @@ conf.get("zookeeper.znode.replication", "replication"); String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers"); + this.peerStateNodeName = conf.get( + "zookeeper.znode.replication.peers.state", "state"); this.replicationStateNodeName = conf.get("zookeeper.znode.replication.state", "state"); String rsZNodeName = @@ -396,6 +402,73 @@ } /** + * Enable replication to the peer + * + * @param id peer's identifier + * @throws IllegalArgumentException + * Thrown when the peer doesn't exist + */ + public void enablePeer(String id) throws IOException { + try { + if (!peerExists(id)) { + throw new IllegalArgumentException("peer " + id + " is not registered"); + } + if (peerEnabled(id)) { + LOG.warn("peer " + id + " is already enabled"); + return; + } + ZKUtil.deleteNode(this.zookeeper, getPeerStateZNode(id)); + LOG.info("peer " + id + " is enabled"); + } catch (KeeperException e) { + throw new IOException("Unable to enable peer " + id, e); + } + } + + /** + * Disable replication to the peer + * + * @param id peer's identifier + * @throws IllegalArgumentException + * Thrown when the peer doesn't exist + */ + public void disablePeer(String id) throws IOException { + try { + if (!peerExists(id)) { + throw new IllegalArgumentException("peer " + id + " is not registered"); + } + if (!peerEnabled(id)) { + LOG.warn("peer " + id + " is already disabled"); + return; + } + ZKUtil.createAndWatch(this.zookeeper, getPeerStateZNode(id), + PEER_DISABLED); + LOG.info("peer " + id + " is disabled"); + } catch (KeeperException e) { + throw new IOException("Unable to enable peer " + id, e); + } + } + + /** + * Get state of the peer. + * + * @param id peer's identifier + * @return return true if the peer is enabled. + * @throws IOException + */ + public boolean peerEnabled(String id) throws IOException { + try { + return ZKUtil.checkExists(this.zookeeper, getPeerStateZNode(id)) == -1; + } catch (KeeperException e) { + throw new IOException("Unable to check state of peer " + id, e); + } + } + + private String getPeerStateZNode(String id){ + return ZKUtil.joinZNode(this.peersZNode, + ZKUtil.joinZNode(id, this.peerStateNodeName)); + } + + /** * This reads the state znode for replication and sets the atomic boolean */ private void readReplicationStateZnode() {