diff --git a/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java new file mode 100644 index 0000000..143d491 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -0,0 +1,120 @@ +package org.apache.hadoop.hbase.client.replication; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; + +/** + * This class provides the administrative interface to HBase cluster + * replication. In order to use it, the cluster and the client using + * ReplicationAdmin must be configured with hbase.replication + * set to true. + * + * Adding a new peer results in creating new outbound connections from every + * region server to a subset of region servers on the slave cluster. Each + * new stream of replication will start replicating from the beginning of the + * current HLog, meaning that edits from that past will be replicated. + * + * Removing a peer is a destructive and irreversible operation that stops + * all the replication streams for the given cluster and deletes the metadata + * used to keep track of the replication state. + * + * Enabling and disabling peers is currently not supported. + * + * As cluster replication is still experimental, a kill switch is provided + * in order to stop all replication-related operations, see + * {@link #setReplicating(boolean)}. When setting it back to true, the new + * state of all the replication streams will be unknown and may have holes. + * Use at your own risk. + * + * To see which commands are available in the shell, type + * replication. + */ +public class ReplicationAdmin { + + private final Configuration conf; + private final ReplicationZookeeperWrapper zkWrapper; + + /** + * Constructor that creates a connection to the local ZooKeeper ensemble. + * @param conf Configuration to use + * @throws IOException if the connection to ZK cannot be made + * @throws RuntimeException if replication isn't enabled. + */ + public ReplicationAdmin(Configuration conf) throws IOException { + if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) { + throw new RuntimeException("hbase.replication isn't true, please " + + "enable it in order to use replication"); + } + this.conf = conf; + ZooKeeperWrapper zkw = HConnectionManager.getConnection(conf). + getZooKeeperWrapper(); + zkWrapper = new ReplicationZookeeperWrapper(zkw, conf); + } + + /** + * Add a new peer cluster to replicate to. + * @param id a short that identifies the cluster + * @param clusterKey the concatenation of the slave cluster's + * hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent + * @throws IllegalStateException if there's already one slave since + * multi-slave isn't supported yet. + */ + public void addPeer(String id, String clusterKey) { + this.zkWrapper.addPeer(id, clusterKey); + } + + /** + * Remove a peer cluster, stops the replication to. + * @param id a short that identifies the cluster + */ + public void removePeer(String id) { + this.zkWrapper.removePeer(id); + } + + /** + * Restart the replication stream to the specified peer + * @param id a short that identifies the cluster + */ + public void enablePeer(String id) { + throw new RuntimeException("Not implemented"); + } + + /** + * Stop the replication stream to the specified peer + * @param id a short that identifies the cluster + */ + public void disablePeer(String id) { + throw new RuntimeException("Not implemented"); + } + + /** + * Get the number of slave clusters the local cluster has + * @return + */ + public int getPeersCount() { + return this.zkWrapper.listPeersIds(null).size(); + } + + /** + * Kill switch for all replication-related features + * @param newState true to start replication, false to stop it + * completely + */ + public void setReplicating(boolean newState) { + this.zkWrapper.setReplicating(newState); + } + + /** + * Get the ZK wrapper created and used by this object + * @return + */ + public ReplicationZookeeperWrapper getZkWrapper() { + return zkWrapper; + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java b/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java new file mode 100644 index 0000000..ac7c0c6 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java @@ -0,0 +1,53 @@ +package org.apache.hadoop.hbase.replication; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; + +public class ReplicationPeer { + + private final String clusterKey; + private final String id; + private List regionServers = + new ArrayList(0); + private final AtomicBoolean peerEnabled = new AtomicBoolean(); + // Cannot be final since a new object needs to be recreated when session fails + private ZooKeeperWrapper zkw; + + public ReplicationPeer(String key, String id, ZooKeeperWrapper zkw) { + this.clusterKey = key; + this.id = id; + this.zkw = zkw; + } + + public String getClusterKey() { + return clusterKey; + } + + public AtomicBoolean getPeerEnabled() { + return peerEnabled; + } + + public List getRegionServers() { + return regionServers; + } + + public ZooKeeperWrapper getZkw() { + return zkw; + } + + public void setRegionServers(List regionServers) { + this.regionServers = regionServers; + } + + public void setZkw(ZooKeeperWrapper zkw) { + this.zkw = zkw; + } + + public String getId() { + return id; + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperWrapper.java b/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperWrapper.java index 1007aeb..a5d8621 100644 --- a/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperWrapper.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperWrapper.java @@ -51,7 +51,6 @@ import java.util.concurrent.atomic.AtomicBoolean; *

*

  * replication/
- *  master     {contains a full cluster address}
  *  state      {contains true or false}
  *  clusterId  {contains a byte}
  *  peers/
@@ -78,8 +77,8 @@ public class ReplicationZookeeperWrapper {
   private final static String RS_LOCK_ZNODE = "lock";
   // Our handle on zookeeper
   private final ZooKeeperWrapper zookeeperWrapper;
-  // Map of addresses of peer clusters with their ZKW
-  private final Map peerClusters;
+  // Map of peer clusters keyed by their id
+  private final Map peerClusters;
   // Path to the root replication znode
   private final String replicationZNode;
   // Path to the peer clusters znode
@@ -90,13 +89,27 @@ public class ReplicationZookeeperWrapper {
   private final String rsServerNameZnode;
   // Name node if the replicationState znode
   private final String replicationStateNodeName;
-  // If this RS is part of a master cluster
-  private final boolean replicationMaster;
   private final Configuration conf;
   // Is this cluster replicating at the moment?
   private final AtomicBoolean replicating;
   // Byte (stored as string here) that identifies this cluster
   private final String clusterId;
+  // The key to our own cluster
+  private final String ourClusterKey;
+  // Way to know if this class needs to process the added features
+  // required for the region servers, or just act a as replication client
+  private final boolean clientOnly;
+
+  /**
+   * Constructor used by clients of replication (like master and HBase clients)
+   * @param zookeeperWrapper zkw to wrap
+   * @param conf             conf to use
+   * @throws IOException
+   */
+  public ReplicationZookeeperWrapper(ZooKeeperWrapper zookeeperWrapper,
+                                     Configuration conf) throws IOException {
+    this(zookeeperWrapper, conf, new AtomicBoolean(true), null);
+  }
 
   /**
    * Constructor used by region servers, connects to the peer cluster right away.
@@ -113,6 +126,7 @@ public class ReplicationZookeeperWrapper {
       final AtomicBoolean replicating, String rsName) throws IOException {
     this.zookeeperWrapper = zookeeperWrapper;
     this.conf = conf;
+    this.clientOnly = rsName == null;
     String replicationZNodeName =
         conf.get("zookeeper.znode.replication", "replication");
     String peersZNodeName =
@@ -125,47 +139,53 @@ public class ReplicationZookeeperWrapper {
         conf.get("zookeeper.znode.replication.clusterId", "clusterId");
     String rsZNodeName =
         conf.get("zookeeper.znode.replication.rs", "rs");
-    String thisCluster = this.conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" +
+    this.ourClusterKey = this.conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" +
           this.conf.get("hbase.zookeeper.property.clientPort") + ":" +
           this.conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
 
-    this.peerClusters = new HashMap();
+    this.peerClusters = new HashMap();
     this.replicationZNode = zookeeperWrapper.getZNode(
         zookeeperWrapper.getParentZNode(), replicationZNodeName);
     this.peersZNode =
         zookeeperWrapper.getZNode(replicationZNode, peersZNodeName);
+    this.zookeeperWrapper.ensureExists(this.peersZNode);
     this.rsZNode =
         zookeeperWrapper.getZNode(replicationZNode, rsZNodeName);
 
     this.replicating = replicating;
-    setReplicating();
+    readReplicationStateZnode();
     String idResult = Bytes.toString(
         this.zookeeperWrapper.getData(this.replicationZNode,
         clusterIdZNodeName));
     this.clusterId =
         idResult == null ?
             Byte.toString(HConstants.DEFAULT_CLUSTER_ID) : idResult;
-    String address = Bytes.toString(
-        this.zookeeperWrapper.getData(this.replicationZNode,
-          repMasterZNodeName));
-    this.replicationMaster = thisCluster.equals(address);
-    LOG.info("This cluster (" + thisCluster + ") is a "
-          + (this.replicationMaster ? "master" : "slave") + " for replication" +
-          ", compared with (" + address + ")");
-    if (rsName != null) {
+    if (this.clientOnly) {
+      this.rsServerNameZnode = null;
+    } else {
       this.rsServerNameZnode =
           this.zookeeperWrapper.getZNode(rsZNode, rsName);
-      List znodes = this.zookeeperWrapper.listZnodes(this.peersZNode,
-          new ReplicationStatusWatcher());
-      if (znodes != null) {
-        for (String znode : znodes) {
+      connectExistingPeers();
+    }
+  }
+
+  private void connectExistingPeers() throws IOException {
+    List znodes = listPeersIds(null);
+    if (znodes != null) {
+      for (String znode : znodes) {
+        if (!this.peerClusters.containsKey(znode))
           connectToPeer(znode);
-        }
       }
-    } else {
-      this.rsServerNameZnode = null;
     }
+  }
 
+  /**
+   * Get a list of all the peers registered on this cluster
+   * @param watcher Optionnal watch for all znodes
+   * @return list of cluster ids
+   */
+  public List listPeersIds(Watcher watcher) {
+    return this.zookeeperWrapper.listZnodes(this.peersZNode, watcher);
   }
 
   /**
@@ -174,13 +194,20 @@ public class ReplicationZookeeperWrapper {
    * @param peerClusterId (byte) the cluster to interrogate
    * @return addresses of all region servers
    */
-  public List getPeersAddresses(String peerClusterId) {
+  public List getSlavesAddresses(String peerClusterId) {
     if (this.peerClusters.size() == 0) {
       return new ArrayList(0);
     }
-    ZooKeeperWrapper zkw = this.peerClusters.get(peerClusterId);
-    return zkw == null?
-        new ArrayList(0) : zkw.scanRSDirectory();
+    ReplicationPeer peer = this.peerClusters.get(peerClusterId);
+    if (peer == null) {
+      return new ArrayList(0);
+    }
+    peer.setRegionServers(fetchSlavesAddresses(peer.getZkw()));
+    return peer.getRegionServers();
+  }
+
+  private List fetchSlavesAddresses(ZooKeeperWrapper zkw) {
+    return zkw.scanRSDirectory();
   }
 
   /**
@@ -188,31 +215,134 @@ public class ReplicationZookeeperWrapper {
    * in this region server's replication znode
    * @param peerId id of the peer cluster
    */
-  private void connectToPeer(String peerId) throws IOException {
-    String[] ensemble =
-        Bytes.toString(this.zookeeperWrapper.getData(this.peersZNode, peerId)).
-            split(":");
+  public boolean connectToPeer(String peerId) throws IOException {
+    if (this.clientOnly) {
+      return false;
+    }
+    if (this.peerClusters.containsKey(peerId)) {
+      return false;
+      // TODO remove when we support it
+    } else if (this.peerClusters.size() > 0) {
+      LOG.warn("Multiple slaves feature not supported");
+      return false;
+    }
+    String otherClusterKey = Bytes.toString(this.zookeeperWrapper.getData(
+        this.peersZNode, peerId));
+    if (this.ourClusterKey.equals(otherClusterKey)) {
+      LOG.debug("Not connecting to " + peerId + " because it's us");
+      return false;
+    }
+    String[] ensemble = otherClusterKey.split(":");
     if (ensemble.length != 3) {
-      throw new IllegalArgumentException("Wrong format of cluster address: " +
+     LOG.warn("Wrong format of cluster address: " +
           this.zookeeperWrapper.getData(this.peersZNode, peerId));
+      return false;
     }
+    // Construct the connection to the new peer
     Configuration otherConf = new Configuration(this.conf);
     otherConf.set(HConstants.ZOOKEEPER_QUORUM, ensemble[0]);
     otherConf.set("hbase.zookeeper.property.clientPort", ensemble[1]);
     otherConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, ensemble[2]);
     ZooKeeperWrapper zkw = ZooKeeperWrapper.createInstance(otherConf,
         "connection to cluster: " + peerId);
-    zkw.registerListener(new ReplicationStatusWatcher());
-    this.peerClusters.put(peerId, zkw);
+    ReplicationPeer peer = new ReplicationPeer(peerId, otherClusterKey, zkw);
+    this.peerClusters.put(peerId, peer);
     this.zookeeperWrapper.ensureExists(this.zookeeperWrapper.getZNode(
         this.rsServerNameZnode, peerId));
     LOG.info("Added new peer cluster " + StringUtils.arrayToString(ensemble));
+    return true;
+  }
+
+  /**
+   * Set the new replication state for this cluster
+   * @param newState
+   */
+  public void setReplicating(boolean newState) {
+    try {
+      this.zookeeperWrapper.writeZNode(this.replicationZNode,
+          this.replicationStateNodeName, Boolean.toString(newState));
+    } catch (InterruptedException e) {
+      LOG.error(e);
+    } catch (KeeperException e) {
+      LOG.error(e);
+    }
+  }
+
+  /**
+   * Set the peer as enabled or disabled
+   * @param id identification of the peer
+   * @param enabled true is enabling the replication to the peer,
+   * false is halting it
+   */
+  public void setSourceState(String id, boolean enabled) {
+    try {
+      this.zookeeperWrapper.writeZNode(this.peersZNode, id,
+          Boolean.toString(enabled));
+    } catch (InterruptedException e) {
+      LOG.error(e);
+    } catch (KeeperException e) {
+      LOG.error(e);
+    }
+  }
+
+  /**
+   * Removes the peer's registration with this cluster, the RS receiving
+   * a watcher notification will clear their own znodes
+   * @param id identification of the peer
+   * @throws IllegalArgumentException Thrown when the peer doesn't exist
+   */
+  public void removePeer(String id) {
+    try {
+      if (!peerExists(id)) {
+        throw new IllegalArgumentException("Cannot remove inexisting peer");
+      }
+      this.zookeeperWrapper.deleteZNode(
+          zookeeperWrapper.getZNode(this.peersZNode, id), true);
+    } catch (InterruptedException e) {
+      LOG.error(e);
+    } catch (KeeperException e) {
+      LOG.error(e);
+    }
+  }
+
+  /**
+   * Registers the provided peer with this cluster
+   * @param id identification of the peer
+   * @param clusterKey the ensemble address, client port and root znode of the
+   * peer cluster.
+   * @throws IllegalArgumentException Thrown when the peer doesn't exist
+   * @throws IllegalStateException Thrown when a peer already exists, since
+   *         multi-slave isn't supported yet.
+   */
+  public void addPeer(String id, String clusterKey) {
+    try {
+      if (peerExists(id)) {
+        throw new IllegalArgumentException("Cannot add existing peer");
+      } else if (countPeers() > 0) {
+        throw new IllegalStateException("Multi-slave isn't supported yet");
+      }
+      this.zookeeperWrapper.writeZNode(this.peersZNode, id, clusterKey);
+    } catch (InterruptedException e) {
+      LOG.error(e);
+    } catch (KeeperException e) {
+      LOG.error(e);
+    }
+  }
+
+  private boolean peerExists(String id) {
+    return this.zookeeperWrapper.exists(
+          zookeeperWrapper.getZNode(this.peersZNode, id), false);
+  }
+
+  private int countPeers() {
+    List peers = this.zookeeperWrapper.listZnodes(this.peersZNode);
+    return peers == null ? 0 : peers.size();
   }
 
   /**
    * This reads the state znode for replication and sets the atomic boolean
    */
-  private void setReplicating() {
+  private void readReplicationStateZnode() {
     String value = Bytes.toString(this.zookeeperWrapper.getDataAndWatch(
         this.replicationZNode, this.replicationStateNodeName,
         new ReplicationStatusWatcher()));
@@ -403,10 +533,14 @@ public class ReplicationZookeeperWrapper {
    * Delete a complete queue of hlogs
    * @param peerZnode znode of the peer cluster queue of hlogs to delete
    */
-  public void deleteSource(String peerZnode) {
+  public void deleteSource(String peerZnode, boolean closeConnection) {
     try {
       this.zookeeperWrapper.deleteZNode(
           this.zookeeperWrapper.getZNode(rsServerNameZnode, peerZnode), true);
+      if (closeConnection) {
+        this.peerClusters.get(peerZnode).getZkw().close();
+        this.peerClusters.remove(peerZnode);
+      }
     } catch (InterruptedException e) {
       LOG.error(e);
     } catch (KeeperException e) {
@@ -451,15 +585,6 @@ public class ReplicationZookeeperWrapper {
   }
 
   /**
-   * Tells if this cluster replicates or not
-   *
-   * @return if this is a master
-   */
-  public boolean isReplicationMaster() {
-    return this.replicationMaster;
-  }
-
-  /**
    * Get the identification of the cluster
    *
    * @return the id for the cluster
@@ -470,13 +595,23 @@ public class ReplicationZookeeperWrapper {
 
   /**
    * Get a map of all peer clusters
-   * @return map of peer cluster, zk address to ZKW
+   * @return map of peer cluster keyed by id
    */
-  public Map getPeerClusters() {
+  public Map getPeerClusters() {
     return this.peerClusters;
   }
 
   /**
+   * Extracts the znode name of a peer cluster from a ZK path
+   * @param fullPath Path to extract the id from
+   * @return the id or an empty string if path is invalid
+   */
+  public static String getZNodeName(String fullPath) {
+    String[] parts = fullPath.split("/");
+    return parts.length > 0 ? parts[parts.length-1] : "";
+  }
+
+  /**
    * Watcher for the status of the replication
    */
   public class ReplicationStatusWatcher implements Watcher {
@@ -485,7 +620,7 @@ public class ReplicationZookeeperWrapper {
       Event.EventType type = watchedEvent.getType();
       LOG.info("Got event " + type + " with path " + watchedEvent.getPath());
       if (type.equals(Event.EventType.NodeDataChanged)) {
-        setReplicating();
+        readReplicationStateZnode();
       }
     }
   }
diff --git a/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
index 4d4b00a..1a33435 100644
--- a/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
+++ b/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
@@ -112,8 +112,7 @@ public class ReplicationLogCleaner implements LogCleanerDelegate, Watcher {
     try {
       this.zkHelper = new ReplicationZookeeperWrapper(
           ZooKeeperWrapper.createInstance(this.conf,
-              HMaster.class.getName()),
-          this.conf, new AtomicBoolean(true), null);
+              HMaster.class.getName()), this.conf);
     } catch (IOException e) {
       LOG.error(e);
     }
diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 52cb8e8..4a4bbd5 100644
--- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -47,7 +47,6 @@ public class Replication implements LogEntryVisitor {
 
   private final boolean replication;
   private final ReplicationSourceManager replicationManager;
-  private boolean replicationMaster;
   private final AtomicBoolean replicating = new AtomicBoolean(true);
   private final ReplicationZookeeperWrapper zkHelper;
   private final Configuration conf;
@@ -74,10 +73,8 @@ public class Replication implements LogEntryVisitor {
       this.zkHelper = new ReplicationZookeeperWrapper(
         ZooKeeperWrapper.getInstance(conf, hsi.getServerName()), conf,
         this.replicating, hsi.getServerName());
-      this.replicationMaster = zkHelper.isReplicationMaster();
-      this.replicationManager = this.replicationMaster ?
-        new ReplicationSourceManager(zkHelper, conf, stopRequested,
-          fs, this.replicating, logDir, oldLogDir) : null;
+      this.replicationManager = new ReplicationSourceManager(zkHelper, conf,
+          stopRequested, fs, this.replicating, logDir, oldLogDir);
     } else {
       replicationManager = null;
       zkHelper = null;
@@ -89,9 +86,7 @@ public class Replication implements LogEntryVisitor {
    */
   public void join() {
     if (this.replication) {
-      if (this.replicationMaster) {
-        this.replicationManager.join();
-      }
+      this.replicationManager.join();
       this.zkHelper.deleteOwnRSZNode();
     }
   }
@@ -102,7 +97,7 @@ public class Replication implements LogEntryVisitor {
    * @throws IOException
    */
   public void replicateLogEntries(HLog.Entry[] entries) throws IOException {
-    if (this.replication && !this.replicationMaster) {
+    if (this.replication) {
       this.replicationSink.replicateEntries(entries);
     }
   }
@@ -114,12 +109,9 @@ public class Replication implements LogEntryVisitor {
    */
   public void startReplicationServices() throws IOException {
     if (this.replication) {
-      if (this.replicationMaster) {
-        this.replicationManager.init();
-      } else {
-        this.replicationSink =
-            new ReplicationSink(this.conf, this.stopRequested);
-      }
+      this.replicationManager.init();
+      this.replicationSink =
+          new ReplicationSink(this.conf, this.stopRequested);
     }
   }
 
diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 32508de..a71770a 100644
--- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -124,6 +124,9 @@ public class ReplicationSource extends Thread
   private volatile boolean running = true;
   // Metrics for this source
   private ReplicationSourceMetrics metrics;
+  // If source is enabled, replication happens. If disabled, nothing will be
+  // replicated but HLogs will still be queued
+  private AtomicBoolean sourceEnabled = new AtomicBoolean();
 
   /**
    * Instantiation method used by region servers
@@ -197,7 +200,7 @@ public class ReplicationSource extends Thread
   private void chooseSinks() {
     this.currentPeers.clear();
     List addresses =
-        this.zkHelper.getPeersAddresses(peerClusterId);
+        this.zkHelper.getSlavesAddresses(peerClusterId);
     Set setOfAddr = new HashSet();
     int nbPeers = (int) (Math.ceil(addresses.size() * ratio));
     LOG.info("Getting " + nbPeers +
@@ -236,6 +239,13 @@ public class ReplicationSource extends Thread
     int sleepMultiplier = 1;
     // Loop until we close down
     while (!stop.get() && this.running) {
+      // Sleep until replication is enabled again
+      if (!this.replicating.get() || !this.sourceEnabled.get()) {
+        if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
+          sleepMultiplier++;
+        }
+        continue;
+      }
       // Get a new path
       if (!getNextPath()) {
         if (sleepForRetries("No log to process", sleepMultiplier)) {
@@ -373,7 +383,7 @@ public class ReplicationSource extends Thread
 
   private void connectToPeers() {
     // Connect to peer cluster first, unless we have to stop
-    while (!this.stop.get() && this.currentPeers.size() == 0) {
+    while (!this.stop.get() && this.currentPeers.size() == 0 && running) {
       try {
         chooseSinks();
         Thread.sleep(this.sleepForRetries);
@@ -407,7 +417,7 @@ public class ReplicationSource extends Thread
    */
   protected boolean openReader(int sleepMultiplier) {
     try {
-      LOG.info("Opening log for replication " + this.currentPath.getName() +
+      LOG.debug("Opening log for replication " + this.currentPath.getName() +
           " at " + this.position);
       try {
        this.reader = null;
@@ -433,6 +443,9 @@ public class ReplicationSource extends Thread
           // TODO What happens if the log was missing from every single location?
           // Although we need to check a couple of times as the log could have
           // been moved by the master between the checks
+          throw new IOException("File from recovered queue is " +
+              "nowhere to be found", fnfe);
+
         } else {
           // If the log was archived, continue reading from there
           Path archivedLogLocation =
@@ -572,7 +585,7 @@ public class ReplicationSource extends Thread
       return true;
     } else if (this.queueRecovered) {
       this.manager.closeRecoveredQueue(this);
-      this.abort();
+      this.terminate();
       return true;
     }
     return false;
@@ -584,24 +597,16 @@ public class ReplicationSource extends Thread
         new Thread.UncaughtExceptionHandler() {
           public void uncaughtException(final Thread t, final Throwable e) {
             LOG.fatal("Set stop flag in " + t.getName(), e);
-            abort();
+            terminate();
           }
         };
     Threads.setDaemonThreadRunning(
         this, n + ".replicationSource," + clusterId, handler);
   }
 
-  /**
-   * Hastily stop the replication, then wait for shutdown
-   */
-  private void abort() {
-    LOG.info("abort");
-    this.running = false;
-    terminate();
-  }
-
   public void terminate() {
-    LOG.info("terminate");
+    LOG.info("Closing source " + this.peerClusterZnode);
+    this.running = false;
     Threads.shutdown(this, this.sleepForRetries);
   }
 
@@ -645,15 +650,14 @@ public class ReplicationSource extends Thread
     return down;
   }
 
-  /**
-   * Get the id that the source is replicating to
-   *
-   * @return peer cluster id
-   */
   public String getPeerClusterZnode() {
     return this.peerClusterZnode;
   }
 
+  public String getPeerClusterId() {
+    return this.peerClusterId;
+  }
+
   /**
    * Get the path of the current HLog
    * @return current hlog's path
@@ -663,6 +667,14 @@ public class ReplicationSource extends Thread
   }
 
   /**
+   * Set if this source is enabled or disabled
+   * @param status the new status
+   */
+  public void setSourceEnabled(boolean status) {
+    this.sourceEnabled.set(status);
+  }
+
+  /**
    * Comparator used to compare logs together based on their start time
    */
   public static class LogsComparator implements Comparator {
diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index f6e72dd..a659abf 100644
--- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -72,9 +72,23 @@ public interface ReplicationSourceInterface {
   public void terminate();
 
   /**
-   * Get the id that the source is replicating to
+   * Get the name of the znode that contains the metadata for the replication
+   * this source's peer.
    *
-   * @return peer cluster id
+   * @return peer cluster znode
    */
   public String getPeerClusterZnode();
+
+  /**
+   * Get the id that the source is replicating to.
+   *
+   * @return peer cluster id
+   */
+  public String getPeerClusterId();
+
+  /**
+   * Set if this source is enabled or disabled
+   * @param status the new status
+   */
+  public void setSourceEnabled(boolean status);
 }
diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 8046b73..4f52784 100644
--- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.regionserver.wal.LogActionsListener;
 import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 
@@ -107,6 +108,7 @@ public class ReplicationSourceManager implements LogActionsListener {
     this.oldLogDir = oldLogDir;
     List otherRSs =
         this.zkHelper.getRegisteredRegionServers(new OtherRegionServerWatcher());
+    this.zkHelper.listPeersIds(new PeersWatcher());
     this.otherRegionServers = otherRSs == null ? new ArrayList() : otherRSs;
   }
 
@@ -143,10 +145,12 @@ public class ReplicationSourceManager implements LogActionsListener {
    */
   public void init() throws IOException {
     for (String id : this.zkHelper.getPeerClusters().keySet()) {
-      ReplicationSourceInterface src = addSource(id);
-      src.startup();
+      addSource(id);
     }
     List currentReplicators = this.zkHelper.getListOfReplicators(null);
+    if (currentReplicators == null || currentReplicators.size() == 0) {
+      return;
+    }
     synchronized (otherRegionServers) {
       LOG.info("Current list of replicators: " + currentReplicators
           + " other RSs: " + otherRegionServers);
@@ -164,20 +168,24 @@ public class ReplicationSourceManager implements LogActionsListener {
   /**
    * Add a new normal source to this region server
    * @param id the id of the peer cluster
-   * @return the created source
+   * @return the source that was created
    * @throws IOException
    */
   public ReplicationSourceInterface addSource(String id) throws IOException {
     ReplicationSourceInterface src =
         getReplicationSource(this.conf, this.fs, this, stopper, replicating, id);
-    this.sources.add(src);
+    // TODO set it to what's in ZK
+    src.setSourceEnabled(true);
     synchronized (this.hlogs) {
+      this.sources.add(src);
       if (this.hlogs.size() > 0) {
-        this.zkHelper.addLogToList(this.hlogs.first(),
+        // Add the latest hlog to that source's queue
+        this.zkHelper.addLogToList(this.hlogs.last(),
             this.sources.get(0).getPeerClusterZnode());
         src.enqueueLog(this.latestPath);
       }
     }
+    src.startup();
     return src;
   }
 
@@ -211,17 +219,19 @@ public class ReplicationSourceManager implements LogActionsListener {
 
   @Override
   public void logRolled(Path newLog) {
-    if (this.sources.size() > 0) {
-      this.zkHelper.addLogToList(newLog.getName(),
-          this.sources.get(0).getPeerClusterZnode());
-    }
-    synchronized (this.hlogs) {
-      this.hlogs.add(newLog.getName());
-    }
-    this.latestPath = newLog;
-    // This only update the sources we own, not the recovered ones
-    for (ReplicationSourceInterface source : this.sources) {
-      source.enqueueLog(newLog);
+    if (this.replicating.get()) {
+      if (this.sources.size() > 0) {
+        this.zkHelper.addLogToList(newLog.getName(),
+            this.sources.get(0).getPeerClusterZnode());
+      }
+      synchronized (this.hlogs) {
+        this.hlogs.add(newLog.getName());
+        this.latestPath = newLog;
+        // This only update the sources we own, not the recovered ones
+        for (ReplicationSourceInterface source : this.sources) {
+          source.enqueueLog(newLog);
+        }
+      }
     }
   }
 
@@ -296,10 +306,17 @@ public class ReplicationSourceManager implements LogActionsListener {
       try {
         ReplicationSourceInterface src = getReplicationSource(this.conf,
             this.fs, this, this.stopper, this.replicating, peerId);
+        if (!zkHelper.getPeerClusters().containsKey(src.getPeerClusterId())) {
+          LOG.warn("Recovered queue doesn't belong to any current peer, closing");
+          src.terminate();
+          continue;
+        }
         this.oldsources.add(src);
         for (String hlog : entry.getValue()) {
           src.enqueueLog(new Path(this.oldLogDir, hlog));
         }
+        // TODO set it to what's in ZK
+        src.setSourceEnabled(true);
         src.startup();
       } catch (IOException e) {
         // TODO manage it
@@ -309,13 +326,53 @@ public class ReplicationSourceManager implements LogActionsListener {
   }
 
   /**
-   * Clear the references to the specified old source
+   * Clear the references to the specified recovered source
    * @param src source to clear
    */
   public void closeRecoveredQueue(ReplicationSourceInterface src) {
-    LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
+    LOG.info("Deleting the recovered queue " + src.getPeerClusterZnode());
     this.oldsources.remove(src);
-    this.zkHelper.deleteSource(src.getPeerClusterZnode());
+    this.zkHelper.deleteSource(src.getPeerClusterZnode(), false);
+  }
+
+  /**
+   * Thie method first deletes all the recovered sources for the specified
+   * id, then deletes the normal source (deleting all related data in ZK).
+   * @param id The id of the peer cluster
+   */
+  public void removePeer(String id) {
+    LOG.info("Closing the following queue " + id + ", currently have "
+        + sources.size() + " and another "
+        + oldsources.size() + " that were recovered");
+    ReplicationSourceInterface srcToRemove = null;
+    List oldSourcesToDelete =
+        new ArrayList();
+    // First close all the recovered sources for this peer
+    for (ReplicationSourceInterface src : oldsources) {
+      if (id.equals(src.getPeerClusterId())) {
+        oldSourcesToDelete.add(src);
+      }
+    }
+    for (ReplicationSourceInterface src : oldSourcesToDelete) {
+      closeRecoveredQueue((src));
+    }
+    LOG.info("Number of deleted recovered sources for " + id + ": "
+        + oldSourcesToDelete.size());
+    // Now look for the one on this cluster
+    for (ReplicationSourceInterface src : this.sources) {
+      if (id.equals(src.getPeerClusterId())) {
+        srcToRemove = src;
+        break;
+      }
+    }
+    if (srcToRemove == null) {
+      LOG.error("The queue we wanted to close is missing " + id);
+      return;
+    }
+    srcToRemove.terminate();
+    this.sources.remove(srcToRemove);
+    this.zkHelper.deleteSource(id, true);
+
   }
 
   /**
@@ -328,7 +385,8 @@ public class ReplicationSourceManager implements LogActionsListener {
     public void process(WatchedEvent watchedEvent) {
       LOG.info(" event " + watchedEvent);
       if (watchedEvent.getType().equals(Event.KeeperState.Expired) ||
-          watchedEvent.getType().equals(Event.KeeperState.Disconnected)) {
+          watchedEvent.getType().equals(Event.KeeperState.Disconnected) ||
+          !replicating.get()) {
         return;
       }
 
@@ -343,8 +401,41 @@ public class ReplicationSourceManager implements LogActionsListener {
       }
       if (watchedEvent.getType().equals(Event.EventType.NodeDeleted)) {
         LOG.info(watchedEvent.getPath() + " znode expired, trying to lock it");
-        String[] rsZnodeParts = watchedEvent.getPath().split("/");
-        transferQueues(rsZnodeParts[rsZnodeParts.length-1]);
+        transferQueues(zkHelper.getZNodeName(watchedEvent.getPath()));
+      }
+    }
+  }
+
+  public class PeersWatcher implements Watcher {
+
+    @Override
+    public void process(WatchedEvent watchedEvent) {
+      LOG.info(" event " + watchedEvent);
+      if (watchedEvent.getType().equals(Event.KeeperState.Expired) ||
+          watchedEvent.getType().equals(Event.KeeperState.Disconnected) ||
+          !replicating.get()) {
+        return;
+      }
+      // Reset the watch
+      List currentPeers = zkHelper.listPeersIds(this);
+
+      // If it was deleted, close and return
+      if (watchedEvent.getType().equals(Event.EventType.NodeDeleted)) {
+        String id = zkHelper.getZNodeName(watchedEvent.getPath());
+        removePeer(id);
+        return;
+      }
+      // Else, try adding it the brutal way
+      for (String id : currentPeers) {
+        try {
+          boolean added = zkHelper.connectToPeer(id);
+          if (added) {
+            addSource(id);
+          }
+        } catch (IOException e) {
+          // TODO manage better than that
+          LOG.error("Error while adding a new peer", e);
+        }
       }
     }
   }
diff --git a/src/main/ruby/hbase.rb b/src/main/ruby/hbase.rb
index 16ad293..a2961b5 100644
--- a/src/main/ruby/hbase.rb
+++ b/src/main/ruby/hbase.rb
@@ -71,4 +71,5 @@ end
 # Include classes definition
 require 'hbase/hbase'
 require 'hbase/admin'
+require 'hbase/replication_admin'
 require 'hbase/table'
diff --git a/src/main/ruby/hbase/hbase.rb b/src/main/ruby/hbase/hbase.rb
index c2233ac..8445ae5 100644
--- a/src/main/ruby/hbase/hbase.rb
+++ b/src/main/ruby/hbase/hbase.rb
@@ -49,5 +49,9 @@ module Hbase
     def table(table, formatter)
       ::Hbase::Table.new(configuration, table, formatter)
     end
+
+    def replication_admin(formatter)
+      ::Hbase::RepAdmin.new(configuration, formatter)
+    end
   end
 end
diff --git a/src/main/ruby/hbase/replication_admin.rb b/src/main/ruby/hbase/replication_admin.rb
new file mode 100644
index 0000000..1fb7c32
--- /dev/null
+++ b/src/main/ruby/hbase/replication_admin.rb
@@ -0,0 +1,72 @@
+#
+# Copyright 2010 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+include Java
+
+java_import org.apache.hadoop.hbase.client.replication.ReplicationAdmin
+
+# Wrapper for org.apache.hadoop.hbase.client.HBaseAdmin
+
+module Hbase
+  class RepAdmin
+    include HBaseConstants
+
+    def initialize(configuration, formatter)
+      @replication_admin = ReplicationAdmin.new(configuration)
+      @formatter = formatter
+    end
+
+    #----------------------------------------------------------------------------------------------
+    # Add a new peer cluster to replicate to
+    def add_peer(id, cluster_key)
+      @replication_admin.addPeer(id, cluster_key)
+    end
+
+    #----------------------------------------------------------------------------------------------
+    # Remove a peer cluster, stops the replication
+    def remove_peer(id)
+      @replication_admin.removePeer(id)
+    end
+
+    #----------------------------------------------------------------------------------------------
+    # Restart the replication stream to the specified peer
+    def enable_peer(id)
+      @replication_admin.enablePeer(id)
+    end
+
+    #----------------------------------------------------------------------------------------------
+    # Stop the replication stream to the specified peer
+    def disable_peer(id)
+      @replication_admin.disablePeer(id)
+    end
+
+    #----------------------------------------------------------------------------------------------
+    # Restart the replication, in an unknown state
+    def start_replication
+      @replication_admin.setReplicating(true)
+    end
+
+    #----------------------------------------------------------------------------------------------
+    # Kill switch for replication, stops all its features
+    def stop_replication
+      @replication_admin.setReplicating(false)
+    end
+  end
+end
diff --git a/src/main/ruby/shell.rb b/src/main/ruby/shell.rb
index 5654840..e688160 100644
--- a/src/main/ruby/shell.rb
+++ b/src/main/ruby/shell.rb
@@ -83,6 +83,10 @@ module Shell
       hbase.table(name, formatter)
     end
 
+    def hbase_replication_admin
+      @hbase_replication_admin ||= hbase.replication_admin(formatter)
+    end
+
     def export_commands(where)
       ::Shell.commands.keys.each do |cmd|
         where.send :instance_eval, <<-EOF
@@ -254,3 +258,16 @@ Shell.load_command_group(
   ]
 )
 
+Shell.load_command_group(
+  'replication',
+  :full_name => 'CLUSTER REPLICATION TOOLS',
+  :comment => "In order to use these tools, hbase.replication must be true",
+  :commands => %w[
+    add_peer
+    remove_peer
+    enable_peer
+    disable_peer
+    start_replication
+    stop_replication
+  ]
+)
diff --git a/src/main/ruby/shell/commands.rb b/src/main/ruby/shell/commands.rb
index 49a29cc..4c759bc 100644
--- a/src/main/ruby/shell/commands.rb
+++ b/src/main/ruby/shell/commands.rb
@@ -49,6 +49,9 @@ module Shell
         shell.hbase_table(name)
       end
 
+      def replication_admin
+        shell.hbase_replication_admin
+      end
       #----------------------------------------------------------------------
 
       def formatter
diff --git a/src/main/ruby/shell/commands/add_peer.rb b/src/main/ruby/shell/commands/add_peer.rb
new file mode 100644
index 0000000..8548aee
--- /dev/null
+++ b/src/main/ruby/shell/commands/add_peer.rb
@@ -0,0 +1,44 @@
+#
+# Copyright 2010 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+  module Commands
+    class AddPeer< Command
+      def help
+        return <<-EOF
+          Add a peer cluster to replicate to, the id must be a short and
+          the cluster key is composed like this:
+          hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
+          This gives a full path for HBase to connect to another cluster.
+          Examples:
+
+            hbase> add_peer '1', "server1.cie.com:2181:/hbase"
+            hbase> add_peer '2', "zk1,zk2,zk3:2182:/hbase-prod"
+        EOF
+      end
+
+      def command(id, cluster_key)
+        format_simple_command do
+          replication_admin.add_peer(id, cluster_key)
+        end
+      end
+    end
+  end
+end
diff --git a/src/main/ruby/shell/commands/disable_peer.rb b/src/main/ruby/shell/commands/disable_peer.rb
new file mode 100644
index 0000000..502696c
--- /dev/null
+++ b/src/main/ruby/shell/commands/disable_peer.rb
@@ -0,0 +1,41 @@
+#
+# Copyright 2010 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+  module Commands
+    class DisablePeer< Command
+      def help
+        return <<-EOF
+          Stops the replication stream to the specified cluster, but still
+          keeps track of new edits to replicate.
+          Examples:
+
+            hbase> disable_peer '1'
+        EOF
+      end
+
+      def command(id)
+        format_simple_command do
+          replication_admin.disable_peer(id)
+        end
+      end
+    end
+  end
+end
diff --git a/src/main/ruby/shell/commands/enable_peer.rb b/src/main/ruby/shell/commands/enable_peer.rb
new file mode 100644
index 0000000..e1764e4
--- /dev/null
+++ b/src/main/ruby/shell/commands/enable_peer.rb
@@ -0,0 +1,41 @@
+#
+# Copyright 2010 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+  module Commands
+    class EnablePeer< Command
+      def help
+        return <<-EOF
+          Restarts the replication to the specified peer cluster,
+          continuing from where it was disabled.
+          Examples:
+
+            hbase> enable_peer '1'
+        EOF
+      end
+
+      def command(id)
+        format_simple_command do
+          replication_admin.enable_peer(id)
+        end
+      end
+    end
+  end
+end
diff --git a/src/main/ruby/shell/commands/remove_peer.rb b/src/main/ruby/shell/commands/remove_peer.rb
new file mode 100644
index 0000000..6730a55
--- /dev/null
+++ b/src/main/ruby/shell/commands/remove_peer.rb
@@ -0,0 +1,41 @@
+#
+# Copyright 2010 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+  module Commands
+    class RemovePeer< Command
+      def help
+        return <<-EOF
+          Stops the specified replication stream and deletes all the meta
+          information kept about it.
+          Examples:
+
+            hbase> remove_peer '1'
+        EOF
+      end
+
+      def command(id)
+        format_simple_command do
+          replication_admin.remove_peer(id)
+        end
+      end
+    end
+  end
+end
diff --git a/src/main/ruby/shell/commands/start_replication.rb b/src/main/ruby/shell/commands/start_replication.rb
new file mode 100644
index 0000000..b2f55bc
--- /dev/null
+++ b/src/main/ruby/shell/commands/start_replication.rb
@@ -0,0 +1,43 @@
+#
+# Copyright 2010 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+  module Commands
+    class StartReplication < Command
+      def help
+        return <<-EOF
+          Restarts all the replication features. The state in which each
+          stream starts in is undetermined.
+          WARNING:
+          start/stop replication is only meant to be used in critical load situations.
+          Examples:
+
+            hbase> start_replication
+        EOF
+      end
+
+      def command
+        format_simple_command do
+          replication_admin.start_replication
+        end
+      end
+    end
+  end
+end
diff --git a/src/main/ruby/shell/commands/stop_replication.rb b/src/main/ruby/shell/commands/stop_replication.rb
new file mode 100644
index 0000000..7569627
--- /dev/null
+++ b/src/main/ruby/shell/commands/stop_replication.rb
@@ -0,0 +1,43 @@
+#
+# Copyright 2010 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+  module Commands
+    class StopReplication < Command
+      def help
+        return <<-EOF
+          Stops all the replication features. The state in which each
+          stream stops in is undetermined.
+          WARNING:
+          start/stop replication is only meant to be used in critical load situations.
+          Examples:
+
+            hbase> stop_replication
+        EOF
+      end
+
+      def command
+        format_simple_command do
+          replication_admin.stop_replication
+        end
+      end
+    end
+  end
+end
diff --git a/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
new file mode 100644
index 0000000..e9d1b7b
--- /dev/null
+++ b/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -0,0 +1,93 @@
+package org.apache.hadoop.hbase.replication;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.fail;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit testing of ReplicationAdmin
+ */
+public class TestReplicationAdmin {
+
+  private static final Log LOG =
+      LogFactory.getLog(TestReplicationAdmin.class);
+  private final static HBaseTestingUtility TEST_UTIL =
+      new HBaseTestingUtility();
+
+  private final String ID_ONE = "1";
+  private final String KEY_ONE = "127.0.0.1:2181:/hbase";
+  private final String ID_SECOND = "2";
+  private final String KEY_SECOND = "127.0.0.1:2181:/hbase2";
+
+  private static ReplicationSourceManager manager;
+  private static ReplicationAdmin admin;
+  private static AtomicBoolean replicating = new AtomicBoolean(true);
+  private static AtomicBoolean stopper = new AtomicBoolean(true);
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniZKCluster();
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
+    admin = new ReplicationAdmin(conf);
+    Path oldLogDir = new Path(TEST_UTIL.getTestDir(),
+        HConstants.HREGION_OLDLOGDIR_NAME);
+    Path logDir = new Path(TEST_UTIL.getTestDir(),
+        HConstants.HREGION_LOGDIR_NAME);
+    manager = new ReplicationSourceManager(admin.getZkWrapper(),
+        conf, stopper, FileSystem.get(conf), replicating, logDir, oldLogDir);
+  }
+
+  /**
+   * Simple testing of adding and removing peers, basically shows that
+   * all interactions with ZK work
+   * @throws Exception
+   */
+  @Test
+  public void testAddRemovePeer() throws Exception {
+    assertEquals(0, manager.getSources().size());
+    admin.addPeer(ID_ONE, KEY_ONE);
+    // Add a valid peer
+    try {
+      admin.addPeer(ID_ONE, KEY_ONE);
+    } catch (IllegalArgumentException iae) {
+      // OK!
+    }
+    // Try to remove an inexisting peer
+    assertEquals(1, admin.getPeersCount());
+    try {
+      admin.removePeer(ID_SECOND);
+      fail();
+    } catch (IllegalArgumentException iae) {
+      // OK!
+    }
+    assertEquals(1, admin.getPeersCount());
+    // Add a second, returns illegal since multi peers isn't supported
+    try {
+      admin.addPeer(ID_SECOND, KEY_SECOND);
+      fail();
+    } catch (IllegalStateException iae) {
+      // OK!
+    }
+    assertEquals(1, admin.getPeersCount());
+    // Remove the first peer we added
+    admin.removePeer(ID_ONE);
+    assertEquals(0, admin.getPeersCount());
+  }
+}
diff --git a/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index e1f4f98..05067d3 100644
--- a/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -70,4 +70,14 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
   public String getPeerClusterZnode() {
     return peerClusterId;
   }
+
+  @Override
+  public String getPeerClusterId() {
+    return peerClusterId;
+  }
+
+  @Override
+  public void setSourceEnabled(boolean status) {
+
+  }
 }
diff --git a/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java b/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
index 31cc680..fb6bece 100644
--- a/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
+++ b/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
 import org.junit.After;
@@ -60,6 +61,9 @@ public class TestReplication {
   private static ZooKeeperWrapper zkw1;
   private static ZooKeeperWrapper zkw2;
 
+  private static ReplicationAdmin admin;
+  private static String slaveClusterKey;
+
   private static HTable htable1;
   private static HTable htable2;
 
@@ -98,12 +102,7 @@ public class TestReplication {
     utility1.startMiniZKCluster();
     MiniZooKeeperCluster miniZK = utility1.getZkCluster();
     zkw1 = ZooKeeperWrapper.createInstance(conf1, "cluster1");
-    zkw1.writeZNode("/1", "replication", "");
-    zkw1.writeZNode("/1/replication", "master",
-        conf1.get(HConstants.ZOOKEEPER_QUORUM)+":" +
-            conf1.get("hbase.zookeeper.property.clientPort")+":/1");
-    setIsReplication(true);
-
+    admin = new ReplicationAdmin(conf1);
     LOG.info("Setup first Zk");
 
     conf2 = HBaseConfiguration.create();
@@ -117,13 +116,14 @@ public class TestReplication {
     utility2.setZkCluster(miniZK);
     zkw2 = ZooKeeperWrapper.createInstance(conf2, "cluster2");
     zkw2.writeZNode("/2", "replication", "");
-    zkw2.writeZNode("/2/replication", "master",
-        conf1.get(HConstants.ZOOKEEPER_QUORUM)+":" +
-            conf1.get("hbase.zookeeper.property.clientPort")+":/1");
 
-    zkw1.writeZNode("/1/replication/peers", "1",
+    /*zkw1.writeZNode("/1/replication/peers", "1",
         conf2.get(HConstants.ZOOKEEPER_QUORUM)+":" +
-            conf2.get("hbase.zookeeper.property.clientPort")+":/2");
+            conf2.get("hbase.zookeeper.property.clientPort")+":/2");*/
+    slaveClusterKey = conf2.get(HConstants.ZOOKEEPER_QUORUM)+":" +
+            conf2.get("hbase.zookeeper.property.clientPort")+":/2";
+    admin.addPeer("2", slaveClusterKey);
+    setIsReplication(true);
 
     LOG.info("Setup second Zk");
 
@@ -149,7 +149,8 @@ public class TestReplication {
 
   private static void setIsReplication(boolean rep) throws Exception {
     LOG.info("Set rep " + rep);
-    zkw1.writeZNode("/1/replication", "state", Boolean.toString(rep));
+    //zkw1.writeZNode("/1/replication", "state", Boolean.toString(rep));
+    admin.setReplicating(rep);
     // Takes some ms for ZK to fire the watcher
     Thread.sleep(SLEEP_TIME);
   }
@@ -159,12 +160,27 @@ public class TestReplication {
    */
   @Before
   public void setUp() throws Exception {
-    setIsReplication(false);
     utility1.truncateTable(tableName);
-    utility2.truncateTable(tableName);
     // If test is flaky, set that sleep higher
-    Thread.sleep(SLEEP_TIME*8);
-    setIsReplication(true);
+    Scan scan = new Scan();
+    int lastCount = 0;
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i==NB_RETRIES-1) {
+        fail("Waited too much time for truncate");
+      }
+      ResultScanner scanner = htable2.getScanner(scan);
+      Result[] res = scanner.next(NB_ROWS_IN_BATCH);
+      scanner.close();
+      if (res.length != 0) {
+       if (lastCount < res.length) {
+          i--; // Don't increment timeout if we make progress
+        }
+        LOG.info("Still got " + res.length + " rows");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        break;
+      }
+    }
   }
 
   /**
@@ -189,13 +205,12 @@ public class TestReplication {
     htable1 = new HTable(conf1, tableName);
     htable1.put(put);
 
-    HTable table2 = new HTable(conf2, tableName);
     Get get = new Get(row);
     for (int i = 0; i < NB_RETRIES; i++) {
       if (i==NB_RETRIES-1) {
         fail("Waited too much time for put replication");
       }
-      Result res = table2.get(get);
+      Result res = htable2.get(get);
       if (res.size() == 0) {
         LOG.info("Row not available");
         Thread.sleep(SLEEP_TIME);
@@ -208,13 +223,12 @@ public class TestReplication {
     Delete del = new Delete(row);
     htable1.delete(del);
 
-    table2 = new HTable(conf2, tableName);
     get = new Get(row);
     for (int i = 0; i < NB_RETRIES; i++) {
       if (i==NB_RETRIES-1) {
         fail("Waited too much time for del replication");
       }
-      Result res = table2.get(get);
+      Result res = htable2.get(get);
       if (res.size() >= 1) {
         LOG.info("Row not deleted");
         Thread.sleep(SLEEP_TIME);
@@ -300,8 +314,6 @@ public class TestReplication {
     // Test restart replication
     setIsReplication(true);
 
-    htable1.put(put);
-
     for (int i = 0; i < NB_RETRIES; i++) {
       if (i==NB_RETRIES-1) {
         fail("Waited too much time for put replication");
@@ -337,6 +349,59 @@ public class TestReplication {
   }
 
   /**
+   * Integration test for TestReplicationAdmin, removes and re-add a peer
+   * cluster
+   * @throws Exception
+   */
+  @Test
+  public void testAddAndRemoveClusters() throws Exception {
+    LOG.info("testAddAndRemoveClusters");
+    admin.removePeer("2");
+    Thread.sleep(SLEEP_TIME);
+    byte[] rowKey = Bytes.toBytes("Won't be replicated");
+    Put put = new Put(rowKey);
+    put.add(famName, row, row);
+    htable1.put(put);
+
+    Get get = new Get(rowKey);
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i == NB_RETRIES-1) {
+        break;
+      }
+      Result res = htable2.get(get);
+      if (res.size() >= 1) {
+        fail("Not supposed to be replicated");
+      } else {
+        LOG.info("Row not replicated, let's wait a bit more...");
+        Thread.sleep(SLEEP_TIME);
+      }
+    }
+
+    admin.addPeer("1", slaveClusterKey);
+    Thread.sleep(SLEEP_TIME);
+    rowKey = Bytes.toBytes("do rep");
+    put = new Put(rowKey);
+    put.add(famName, row, row);
+    LOG.info("Adding new row");
+    htable1.put(put);
+
+    get = new Get(rowKey);
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i==NB_RETRIES-1) {
+        fail("Waited too much time for put replication");
+      }
+      Result res = htable2.get(get);
+      if (res.size() == 0) {
+        LOG.info("Row not available");
+        Thread.sleep(SLEEP_TIME*i);
+      } else {
+        assertArrayEquals(res.value(), row);
+        break;
+      }
+    }
+  }
+
+  /**
    * Do a more intense version testSmallBatch, one  that will trigger
    * hlog rolling and other non-trivial code paths
    * @throws Exception
diff --git a/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index b497b9a..930c5c1 100644
--- a/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -61,7 +60,7 @@ public class TestReplicationSourceManager {
 
   private static final AtomicBoolean STOPPER = new AtomicBoolean(false);
 
-  private static final AtomicBoolean REPLICATING = new AtomicBoolean(false);
+  private static final AtomicBoolean REPLICATING = new AtomicBoolean(true);
 
   private static ReplicationSourceManager manager;
 
@@ -90,27 +89,20 @@ public class TestReplicationSourceManager {
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
-
     conf = HBaseConfiguration.create();
+    conf.setBoolean("dfs.support.append", true);
     conf.set("replication.replicationsource.implementation",
         ReplicationSourceDummy.class.getCanonicalName());
     conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
     utility = new HBaseTestingUtility(conf);
     utility.startMiniZKCluster();
-
-    zkw = ZooKeeperWrapper.createInstance(conf, "test");
-    zkw.writeZNode("/hbase", "replication", "");
-    zkw.writeZNode("/hbase/replication", "master",
-        conf.get(HConstants.ZOOKEEPER_QUORUM)+":" +
-    conf.get("hbase.zookeeper.property.clientPort")+":/1");
+    zkw = ZooKeeperWrapper.createInstance(conf,
+        TestReplicationSourceManager.class.toString());
     zkw.writeZNode("/hbase/replication/peers", "1",
           conf.get(HConstants.ZOOKEEPER_QUORUM)+":" +
           conf.get("hbase.zookeeper.property.clientPort")+":/1");
-
-    HRegionServer server = new HRegionServer(conf);
     ReplicationZookeeperWrapper helper = new ReplicationZookeeperWrapper(
-        server.getZooKeeperWrapper(), conf,
-        REPLICATING, "123456789");
+        zkw, conf, REPLICATING, "123456789");
     fs = FileSystem.get(conf);
     oldLogDir = new Path(utility.getTestDir(),
         HConstants.HREGION_OLDLOGDIR_NAME);