Index: src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java (revision 1238918) +++ src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java (working copy) @@ -26,7 +26,14 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; @@ -443,8 +450,57 @@ } /** + * Test disable/enable replication, trying to insert, make sure nothing's + * replicated, enable it, the insert should be replicated + * + * @throws Exception + */ + @Test(timeout = 300000) + public void testDisableEnable() throws Exception { + + // Test disabling replication + admin.disablePeer("2"); + + Put put = new Put(Bytes.toBytes("disable enable")); + put.add(famName, row, row); + htable1.put(put); + + Get get = new Get(Bytes.toBytes("disable enable")); + for (int i = 0; i < NB_RETRIES; i++) { + if (i == NB_RETRIES - 1) { + break; + } + Result res = htable2.get(get); + if (res.size() >= 1) { + fail("Replication wasn't disabled"); + } else { + LOG.info("Row not replicated, let's wait a bit more..."); + Thread.sleep(SLEEP_TIME); + } + } + + // Test enable replication + admin.enablePeer("2"); + + for (int i = 0; i < NB_RETRIES; i++) { + if (i == NB_RETRIES - 1) { + fail("Waited too much time for put replication"); + } + Result res = htable2.get(get); + if (res.size() == 0) { + LOG.info("Row not available"); + Thread.sleep(SLEEP_TIME); + } else { + assertArrayEquals(res.value(), row); + break; + } + } + } + + /** * Integration test for TestReplicationAdmin, removes and re-add a peer * cluster + * * @throws Exception */ @Test(timeout=300000) Index: src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (revision 1238918) +++ src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (working copy) @@ -23,7 +23,6 @@ import java.io.IOException; import java.util.Map; -import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HConnection; @@ -114,16 +113,16 @@ * Restart the replication stream to the specified peer. * @param id a short that identifies the cluster */ - public void enablePeer(String id) { - throw new NotImplementedException("Not implemented"); + public void enablePeer(String id) throws IOException { + this.replicationZk.enablePeer(id); } /** * Stop the replication stream to the specified peer. * @param id a short that identifies the cluster */ - public void disablePeer(String id) { - throw new NotImplementedException("Not implemented"); + public void disablePeer(String id) throws IOException { + this.replicationZk.disablePeer(id); } /** @@ -143,6 +142,16 @@ } /** + * Get state of the peer + * + * @param id peer's identifier + * @return current state of the peer + */ + public String getPeerState(String id) { + return this.replicationZk.getPeerState(id).name(); + } + + /** * Get the current status of the kill switch, if the cluster is replicating * or not. * @return true if the cluster is replicated, otherwise false Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (revision 1238918) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (working copy) @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.ReplicationZookeeper; +import org.apache.hadoop.hbase.replication.ReplicationZookeeper.PeerState; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ClusterId; @@ -299,6 +300,13 @@ boolean gotIOE = false; currentNbEntries = 0; try { + if (PeerState.DISABLED.equals(zkHelper.getPeerState(peerId))) { + if (sleepForRetries("peer " + peerId + " is disalbed", + sleepMultiplier)) { + sleepMultiplier++; + } + continue; + } if(readAllEntriesToReplicateOrNextFile()) { continue; } Index: src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (revision 1238918) +++ src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (working copy) @@ -79,6 +79,12 @@ LogFactory.getLog(ReplicationZookeeper.class); // Name of znode we use to lock when failover private final static String RS_LOCK_ZNODE = "lock"; + + // Values of znode which represents state of a peer + public static enum PeerState { + ENABLED, DISABLED + }; + // Our handle on zookeeper private final ZooKeeperWatcher zookeeper; // Map of peer clusters keyed by their id @@ -93,6 +99,8 @@ private String rsServerNameZnode; // Name node if the replicationState znode private String replicationStateNodeName; + // Name of a node which indicate whether a peer is enabled or not + private String peerStateNodeName; private final Configuration conf; // Is this cluster replicating at the moment? private AtomicBoolean replicating; @@ -147,6 +155,8 @@ conf.get("zookeeper.znode.replication", "replication"); String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers"); + this.peerStateNodeName = conf.get( + "zookeeper.znode.replication.peers.state", "state"); this.replicationStateNodeName = conf.get("zookeeper.znode.replication.state", "state"); String rsZNodeName = @@ -363,7 +373,8 @@ if (!peerExists(id)) { throw new IllegalArgumentException("Cannot remove inexisting peer"); } - ZKUtil.deleteNode(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id)); + ZKUtil.deleteNodeRecursively(this.zookeeper, + ZKUtil.joinZNode(this.peersZNode, id)); } catch (KeeperException e) { throw new IOException("Unable to remove a peer", e); } @@ -385,6 +396,8 @@ ZKUtil.createWithParents(this.zookeeper, this.peersZNode); ZKUtil.createAndWatch(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id), Bytes.toBytes(clusterKey)); + ZKUtil.createAndWatch(this.zookeeper, getPeerStateZNode(id), + Bytes.toBytes(PeerState.ENABLED.name())); // enabled by default } catch (KeeperException e) { throw new IOException("Unable to add peer", e); } @@ -396,6 +409,66 @@ } /** + * Enable replication to the peer + * + * @param id peer's identifier + * @throws IllegalArgumentException + * Thrown when the peer doesn't exist + */ + public void enablePeer(String id) throws IOException { + changePeerState(id, PeerState.ENABLED); + LOG.info("peer " + id + " is enabled"); + } + + /** + * Disable replication to the peer + * + * @param id peer's identifier + * @throws IllegalArgumentException + * Thrown when the peer doesn't exist + */ + public void disablePeer(String id) throws IOException { + changePeerState(id, PeerState.DISABLED); + LOG.info("peer " + id + " is disabled"); + } + + private void changePeerState(String id, PeerState state) throws IOException { + try { + if (!peerExists(id)) { + throw new IllegalArgumentException("peer " + id + " is not registered"); + } + ZKUtil.setData(this.zookeeper, getPeerStateZNode(id), + Bytes.toBytes(state.name())); + LOG.info("peer " + id + " is disabled"); + } catch (KeeperException e) { + throw new IOException("Unable to change state of peer " + id, e); + } + } + + /** + * Get state of the peer + * + * @param id peer's identifier + * @return return current state of the peer + * @throws IOException + */ + public PeerState getPeerState(String id) { + PeerState peerState = null; + try { + byte[] peerStateBytes = ZKUtil.getData(this.zookeeper, getPeerStateZNode(id)); + peerState =PeerState.valueOf(Bytes.toString(peerStateBytes)); + } catch (KeeperException e) { + this.abortable.abort("Cannot get the state of the peer " + id, e); + } + return peerState; + } + + private String getPeerStateZNode(String id){ + return ZKUtil.joinZNode(this.peersZNode, + ZKUtil.joinZNode(id, this.peerStateNodeName)); + } + + /** * This reads the state znode for replication and sets the atomic boolean */ private void readReplicationStateZnode() { Index: src/main/ruby/hbase/replication_admin.rb =================================================================== --- src/main/ruby/hbase/replication_admin.rb (revision 1238918) +++ src/main/ruby/hbase/replication_admin.rb (working copy) @@ -50,6 +50,12 @@ end #---------------------------------------------------------------------------------------------- + # Get peer cluster state + def get_peer_state(id) + @replication_admin.getPeerState(id) + end + + #---------------------------------------------------------------------------------------------- # Restart the replication stream to the specified peer def enable_peer(id) @replication_admin.enablePeer(id) Index: src/main/ruby/shell/commands/list_peers.rb =================================================================== --- src/main/ruby/shell/commands/list_peers.rb (revision 1238918) +++ src/main/ruby/shell/commands/list_peers.rb (working copy) @@ -33,10 +33,11 @@ now = Time.now peers = replication_admin.list_peers - formatter.header(["PEER ID", "CLUSTER KEY"]) + formatter.header(["PEER_ID", "CLUSTER_KEY", "STATE"]) peers.entrySet().each do |e| - formatter.row([ e.key, e.value ]) + state = replication_admin.get_peer_state(e.key) + formatter.row([ e.key, e.value, state ]) end formatter.footer(now)