Index: src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (revision 1242253) +++ src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (working copy) @@ -30,13 +30,23 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; +import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -100,6 +110,9 @@ ZKUtil.setData(zkw, "/hbase/replication/peers/1", Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1")); + ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state"); + ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state", + Bytes.toBytes(ReplicationZookeeper.PeerState.ENABLED.name())); ZKUtil.createWithParents(zkw, "/hbase/replication/state"); ZKUtil.setData(zkw, "/hbase/replication/state", Bytes.toBytes("true")); Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (revision 1242253) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (working copy) @@ -299,6 +299,13 @@ boolean gotIOE = false; currentNbEntries = 0; try { + if (!zkHelper.getPeerEnabled(peerId)) { + if (sleepForRetries("peer " + peerId + " is disabled", + sleepMultiplier)) { + sleepMultiplier++; + } + continue; + } if(readAllEntriesToReplicateOrNextFile()) { continue; } Index: src/main/ruby/hbase/replication_admin.rb =================================================================== --- src/main/ruby/hbase/replication_admin.rb (revision 1242253) +++ src/main/ruby/hbase/replication_admin.rb (working copy) @@ -50,6 +50,12 @@ end #---------------------------------------------------------------------------------------------- + # Get peer cluster state + def get_peer_state(id) + @replication_admin.getPeerState(id) + end + + #---------------------------------------------------------------------------------------------- # Restart the replication stream to the specified peer def enable_peer(id) @replication_admin.enablePeer(id) Index: src/main/ruby/shell/commands/list_peers.rb =================================================================== --- src/main/ruby/shell/commands/list_peers.rb (revision 1242253) +++ src/main/ruby/shell/commands/list_peers.rb (working copy) @@ -33,10 +33,11 @@ now = Time.now peers = replication_admin.list_peers - formatter.header(["PEER ID", "CLUSTER KEY"]) + formatter.header(["PEER_ID", "CLUSTER_KEY", "STATE"]) peers.entrySet().each do |e| - formatter.row([ e.key, e.value ]) + state = replication_admin.get_peer_state(e.key) + formatter.row([ e.key, e.value, state ]) end formatter.footer(now) Index: src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java (revision 1242253) +++ src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java (working copy) @@ -26,7 +26,14 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; @@ -443,8 +450,53 @@ } /** + * Test disable/enable replication, trying to insert, make sure nothing's + * replicated, enable it, the insert should be replicated + * + * @throws Exception + */ + @Test(timeout = 300000) + public void testDisableEnable() throws Exception { + + // Test disabling replication + admin.disablePeer("2"); + + byte[] rowkey = Bytes.toBytes("disable enable"); + Put put = new Put(rowkey); + put.add(famName, row, row); + htable1.put(put); + + Get get = new Get(rowkey); + for (int i = 0; i < NB_RETRIES; i++) { + Result res = htable2.get(get); + if (res.size() >= 1) { + fail("Replication wasn't disabled"); + } else { + LOG.info("Row not replicated, let's wait a bit more..."); + Thread.sleep(SLEEP_TIME); + } + } + + // Test enable replication + admin.enablePeer("2"); + + for (int i = 0; i < NB_RETRIES; i++) { + Result res = htable2.get(get); + if (res.size() == 0) { + LOG.info("Row not available"); + Thread.sleep(SLEEP_TIME); + } else { + assertArrayEquals(res.value(), row); + return; + } + } + fail("Waited too much time for put replication"); + } + + /** * Integration test for TestReplicationAdmin, removes and re-add a peer * cluster + * * @throws Exception */ @Test(timeout=300000) Index: src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (revision 1242253) +++ src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (working copy) @@ -23,7 +23,6 @@ import java.io.IOException; import java.util.Map; -import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HConnection; @@ -114,16 +113,16 @@ * Restart the replication stream to the specified peer. * @param id a short that identifies the cluster */ - public void enablePeer(String id) { - throw new NotImplementedException("Not implemented"); + public void enablePeer(String id) throws IOException { + this.replicationZk.enablePeer(id); } /** * Stop the replication stream to the specified peer. * @param id a short that identifies the cluster */ - public void disablePeer(String id) { - throw new NotImplementedException("Not implemented"); + public void disablePeer(String id) throws IOException { + this.replicationZk.disablePeer(id); } /** @@ -143,6 +142,20 @@ } /** + * Get state of the peer + * + * @param id peer's identifier + * @return current state of the peer + */ + public String getPeerState(String id) throws IOException { + try { + return this.replicationZk.getPeerState(id).name(); + } catch (KeeperException e) { + throw new IOException("Couldn't get the state of the peer " + id); + } + } + + /** * Get the current status of the kill switch, if the cluster is replicating * or not. * @return true if the cluster is replicated, otherwise false Index: src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (revision 1242253) +++ src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (working copy) @@ -48,18 +48,20 @@ import org.apache.zookeeper.KeeperException.SessionExpiredException; /** - * This class serves as a helper for all things related to zookeeper - * in replication. + * This class serves as a helper for all things related to zookeeper in + * replication. *

- * The layout looks something like this under zookeeper.znode.parent - * for the master cluster: + * The layout looks something like this under zookeeper.znode.parent for the + * master cluster: *

+ * *

  * replication/
  *  state      {contains true or false}
  *  clusterId  {contains a byte}
  *  peers/
  *    1/   {contains a full cluster address}
+ *      peer-state  {contains ENABLED or DISABLED}
  *    2/
  *    ...
  *  rs/ {lists all RS that replicate}
@@ -79,6 +81,12 @@
     LogFactory.getLog(ReplicationZookeeper.class);
   // Name of znode we use to lock when failover
   private final static String RS_LOCK_ZNODE = "lock";
+
+  // Values of znode which stores state of a peer
+  public static enum PeerState {
+    ENABLED, DISABLED
+  };
+
   // Our handle on zookeeper
   private final ZooKeeperWatcher zookeeper;
   // Map of peer clusters keyed by their id
@@ -93,6 +101,8 @@
   private String rsServerNameZnode;
   // Name node if the replicationState znode
   private String replicationStateNodeName;
+  // Name of zk node which stores peer state
+  private String peerStateNodeName;
   private final Configuration conf;
   // Is this cluster replicating at the moment?
   private AtomicBoolean replicating;
@@ -101,6 +111,8 @@
   // Abortable
   private Abortable abortable;
   private ReplicationStatusTracker statusTracker;
+  // Map of peer state trackers keyed by peer id
+  private Map peerStateTrackers;
 
   /**
    * Constructor used by clients of replication (like master and HBase clients)
@@ -135,6 +147,7 @@
     setZNodes(server);
 
     this.peerClusters = new HashMap();
+    this.peerStateTrackers = new HashMap();
     ZKUtil.createWithParents(this.zookeeper,
         ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
     this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, server.getServerName().toString());
@@ -147,6 +160,8 @@
         conf.get("zookeeper.znode.replication", "replication");
     String peersZNodeName =
         conf.get("zookeeper.znode.replication.peers", "peers");
+    this.peerStateNodeName = conf.get(
+        "zookeeper.znode.replication.peers.state", "peer-state");
     this.replicationStateNodeName =
         conf.get("zookeeper.znode.replication.state", "state");
     String rsZNodeName =
@@ -306,6 +321,10 @@
       return false;
     }
     this.peerClusters.put(peerId, peer);
+    PeerStateTracker stateTracker = new PeerStateTracker(peerId,
+        this.zookeeper, this.abortable);
+    this.peerStateTrackers.put(peerId, stateTracker);
+    stateTracker.start();
     ZKUtil.createWithParents(this.zookeeper, ZKUtil.joinZNode(
         this.rsServerNameZnode, peerId));
     LOG.info("Added new peer cluster " + peer.getClusterKey());
@@ -336,8 +355,11 @@
       return null;
     }
 
-    return new ReplicationPeer(otherConf, peerId,
+    ReplicationPeer peer = new ReplicationPeer(otherConf, peerId,
         otherClusterKey);
+    peer.getPeerEnabled().set(
+        PeerState.ENABLED.equals(this.getPeerState(peerId)));
+    return peer;
   }
 
   /**
@@ -363,7 +385,8 @@
       if (!peerExists(id)) {
         throw new IllegalArgumentException("Cannot remove inexisting peer");
       }
-      ZKUtil.deleteNode(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
+      ZKUtil.deleteNodeRecursively(this.zookeeper,
+          ZKUtil.joinZNode(this.peersZNode, id));
     } catch (KeeperException e) {
       throw new IOException("Unable to remove a peer", e);
     }
@@ -385,6 +408,8 @@
       ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
       ZKUtil.createAndWatch(this.zookeeper,
           ZKUtil.joinZNode(this.peersZNode, id), Bytes.toBytes(clusterKey));
+      ZKUtil.createAndWatch(this.zookeeper, getPeerStateNode(id),
+          Bytes.toBytes(PeerState.ENABLED.name())); // enabled by default
     } catch (KeeperException e) {
       throw new IOException("Unable to add peer", e);
     }
@@ -396,6 +421,87 @@
   }
 
   /**
+   * Enable replication to the peer
+   *
+   * @param id peer's identifier
+   * @throws IllegalArgumentException
+   *           Thrown when the peer doesn't exist
+   */
+  public void enablePeer(String id) throws IOException {
+    changePeerState(id, PeerState.ENABLED);
+    LOG.info("peer " + id + " is enabled");
+  }
+
+  /**
+   * Disable replication to the peer
+   *
+   * @param id peer's identifier
+   * @throws IllegalArgumentException
+   *           Thrown when the peer doesn't exist
+   */
+  public void disablePeer(String id) throws IOException {
+    changePeerState(id, PeerState.DISABLED);
+    LOG.info("peer " + id + " is disabled");
+  }
+
+  private void changePeerState(String id, PeerState state) throws IOException {
+    try {
+      if (!peerExists(id)) {
+        throw new IllegalArgumentException("peer " + id + " is not registered");
+      }
+      ZKUtil.setData(this.zookeeper, getPeerStateNode(id),
+          Bytes.toBytes(state.name()));
+      LOG.info("state of the peer " + id + " changed to " + state.name());
+    } catch (KeeperException e) {
+      throw new IOException("Unable to change state of the peer " + id, e);
+    }
+  }
+
+  /**
+   * Get state of the peer. This method checks the state by connecting to ZK.
+   *
+   * @param id peer's identifier
+   * @return current state of the peer
+   */
+  public PeerState getPeerState(String id) throws KeeperException {
+    byte[] peerStateBytes = ZKUtil
+        .getData(this.zookeeper, getPeerStateNode(id));
+    return PeerState.valueOf(Bytes.toString(peerStateBytes));
+  }
+
+  /**
+   * Check whether the peer is enabled or not. This method checks the atomic
+   * boolean of ReplicationPeer locally.
+   *
+   * @param id peer identifier
+   * @return true if the peer is enabled, otherwise false
+   * @throws IllegalArgumentException
+   *           Thrown when the peer doesn't exist
+   */
+  public boolean getPeerEnabled(String id) {
+    if (!this.peerClusters.containsKey(id)) {
+      throw new IllegalArgumentException("peer " + id + " is not registered");
+    }
+    return this.peerClusters.get(id).getPeerEnabled().get();
+  }
+
+  private void readPeerStateZnode(String id) {
+    if (!this.peerClusters.containsKey(id)) {
+      throw new IllegalArgumentException("peer " + id + " is not registered");
+    }
+    ReplicationPeer peer = this.peerClusters.get(id);
+    PeerStateTracker stateTracker = this.peerStateTrackers.get(id);
+    String currentState = Bytes.toString(stateTracker.getData(false));
+    peer.getPeerEnabled().set(
+        PeerState.ENABLED.equals(PeerState.valueOf(currentState)));
+  }
+
+  private String getPeerStateNode(String id) {
+    return ZKUtil.joinZNode(this.peersZNode,
+        ZKUtil.joinZNode(id, this.peerStateNodeName));
+  }
+
+  /**
    * This reads the state znode for replication and sets the atomic boolean
    */
   private void readReplicationStateZnode() {
@@ -761,4 +867,25 @@
       }
     }
   }
+
+  /**
+   * Tracker for state of each peer
+   */
+  public class PeerStateTracker extends ZooKeeperNodeTracker {
+    private String id;
+
+    public PeerStateTracker(String id, ZooKeeperWatcher watcher,
+        Abortable abortable) {
+      super(watcher, getPeerStateNode(id), abortable);
+      this.id = id;
+    }
+
+    @Override
+    public synchronized void nodeDataChanged(String path) {
+      if (path.equals(node)) {
+        super.nodeDataChanged(path);
+        readPeerStateZnode(id);
+      }
+    }
+  }
 }