Index: src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java (revision 1305723) +++ src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java (working copy) @@ -19,6 +19,9 @@ */ package org.apache.hadoop.hbase.replication; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -26,9 +29,6 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; -import java.io.IOException; -import java.util.concurrent.atomic.AtomicBoolean; - /** * Source that does nothing at all, helpful to test ReplicationSourceManager */ @@ -81,10 +81,4 @@ public String getPeerClusterId() { return peerClusterId; } - - @Override - public void setSourceEnabled(boolean status) { - - } - } Index: src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java (revision 1305723) +++ src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java (working copy) @@ -26,7 +26,14 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; @@ -445,8 +452,107 @@ } /** + * Test disable/enable replication, trying to insert, make sure nothing's + * replicated, enable it, the insert should be replicated + * + * @throws Exception + */ + @Test(timeout = 300000) + public void testDisableEnable() throws Exception { + + // Test disabling replication + admin.disablePeer("2"); + + byte[] rowkey = Bytes.toBytes("disable enable"); + Put put = new Put(rowkey); + put.add(famName, row, row); + htable1.put(put); + + Get get = new Get(rowkey); + for (int i = 0; i < NB_RETRIES; i++) { + Result res = htable2.get(get); + if (res.size() >= 1) { + fail("Replication wasn't disabled"); + } else { + LOG.info("Row not replicated, let's wait a bit more..."); + Thread.sleep(SLEEP_TIME); + } + } + + // Test enable replication + admin.enablePeer("2"); + + for (int i = 0; i < NB_RETRIES; i++) { + Result res = htable2.get(get); + if (res.size() == 0) { + LOG.info("Row not available"); + Thread.sleep(SLEEP_TIME); + } else { + assertArrayEquals(res.value(), row); + return; + } + } + fail("Waited too much time for put replication"); + } + + /** + * Test disabling an inactive peer. Add a peer which is inactive, trying to + * insert, disable the peer, then activate the peer and make sure nothing is + * replicated. In Addition, enable the peer and check the updates are + * replicated. + * + * @throws Exception + */ + @Test(timeout = 600000) + public void testDisableInactivePeer() throws Exception { + + // enabling and shutdown the peer + admin.enablePeer("2"); + utility2.shutdownMiniHBaseCluster(); + + byte[] rowkey = Bytes.toBytes("disable inactive peer"); + Put put = new Put(rowkey); + put.add(famName, row, row); + htable1.put(put); + + // wait for the sleep interval of the master cluster to become long + Thread.sleep(SLEEP_TIME * NB_RETRIES); + + // disable and start the peer + admin.disablePeer("2"); + utility2.startMiniHBaseCluster(1, 1); + Get get = new Get(rowkey); + for (int i = 0; i < NB_RETRIES; i++) { + Result res = htable2.get(get); + if (res.size() >= 1) { + fail("Replication wasn't disabled"); + } else { + LOG.info("Row not replicated, let's wait a bit more..."); + Thread.sleep(SLEEP_TIME); + } + } + + // Test enable replication + admin.enablePeer("2"); + // wait since the sleep interval would be long + Thread.sleep(SLEEP_TIME * NB_RETRIES); + for (int i = 0; i < NB_RETRIES; i++) { + Result res = htable2.get(get); + if (res.size() == 0) { + LOG.info("Row not available"); + Thread.sleep(SLEEP_TIME * NB_RETRIES); + } else { + assertArrayEquals(res.value(), row); + return; + } + } + fail("Waited too much time for put replication"); + } + + /** * Integration test for TestReplicationAdmin, removes and re-add a peer * cluster + * * @throws Exception */ @Test(timeout=300000) Index: src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (revision 1305723) +++ src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (working copy) @@ -30,13 +30,23 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; +import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -100,6 +110,9 @@ ZKUtil.setData(zkw, "/hbase/replication/peers/1", Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1")); + ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state"); + ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state", + Bytes.toBytes(ReplicationZookeeper.PeerState.ENABLED.name())); ZKUtil.createWithParents(zkw, "/hbase/replication/state"); ZKUtil.setData(zkw, "/hbase/replication/state", Bytes.toBytes("true")); Index: src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (revision 1305723) +++ src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (working copy) @@ -23,7 +23,6 @@ import java.io.IOException; import java.util.Map; -import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HConnection; @@ -114,16 +113,16 @@ * Restart the replication stream to the specified peer. * @param id a short that identifies the cluster */ - public void enablePeer(String id) { - throw new NotImplementedException("Not implemented"); + public void enablePeer(String id) throws IOException { + this.replicationZk.enablePeer(id); } /** * Stop the replication stream to the specified peer. * @param id a short that identifies the cluster */ - public void disablePeer(String id) { - throw new NotImplementedException("Not implemented"); + public void disablePeer(String id) throws IOException { + this.replicationZk.disablePeer(id); } /** @@ -143,6 +142,20 @@ } /** + * Get state of the peer + * + * @param id peer's identifier + * @return current state of the peer + */ + public String getPeerState(String id) throws IOException { + try { + return this.replicationZk.getPeerState(id).name(); + } catch (KeeperException e) { + throw new IOException("Couldn't get the state of the peer " + id, e); + } + } + + /** * Get the current status of the kill switch, if the cluster is replicating * or not. * @return true if the cluster is replicated, otherwise false Index: src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java (revision 1305723) +++ src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java (working copy) @@ -31,7 +31,12 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.replication.ReplicationZookeeper.PeerState; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; /** * This class acts as a wrapper for all the objects used to identify and @@ -50,6 +55,8 @@ private ZooKeeperWatcher zkw; private final Configuration conf; + private PeerStateTracker peerStateTracker; + /** * Constructor that takes all the objects required to communicate with the * specified peer, except for the region server addresses. @@ -66,6 +73,31 @@ } /** + * start a state tracker to check whether this peer is enabled or not + * + * @param zookeeper zk watcher for the local cluster + * @param peerStateNode path to zk node which stores peer state + * @throws KeeperException + */ + public void startStateTracker(ZooKeeperWatcher zookeeper, String peerStateNode) + throws KeeperException { + if (ZKUtil.checkExists(zookeeper, peerStateNode) == -1) { + ZKUtil.createAndWatch(zookeeper, peerStateNode, + Bytes.toBytes(PeerState.ENABLED.name())); // enabled by default + } + this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, + this); + this.peerStateTracker.start(); + this.readPeerStateZnode(); + } + + private void readPeerStateZnode() { + String currentState = Bytes.toString(peerStateTracker.getData(false)); + this.peerEnabled.set(PeerState.ENABLED.equals(PeerState + .valueOf(currentState))); + } + + /** * Get the cluster key of that peer * @return string consisting of zk ensemble addresses, client port * and root znode @@ -152,4 +184,23 @@ zkw.close(); } } + + /** + * Tracker for state of this peer + */ + public class PeerStateTracker extends ZooKeeperNodeTracker { + + public PeerStateTracker(String peerStateZNode, ZooKeeperWatcher watcher, + Abortable abortable) { + super(watcher, peerStateZNode, abortable); + } + + @Override + public synchronized void nodeDataChanged(String path) { + if (path.equals(node)) { + super.nodeDataChanged(path); + readPeerStateZnode(); + } + } + } } Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (revision 1305723) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (working copy) @@ -35,7 +35,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -48,6 +47,8 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * This class is responsible to manage all the replication * sources. There are two classes of sources: @@ -203,8 +204,6 @@ public ReplicationSourceInterface addSource(String id) throws IOException { ReplicationSourceInterface src = getReplicationSource(this.conf, this.fs, this, stopper, replicating, id); - // TODO set it to what's in ZK - src.setSourceEnabled(true); synchronized (this.hlogsById) { this.sources.add(src); this.hlogsById.put(id, new TreeSet()); @@ -585,8 +584,6 @@ for (String hlog : entry.getValue()) { src.enqueueLog(new Path(oldLogDir, hlog)); } - // TODO set it to what's in ZK - src.setSourceEnabled(true); src.startup(); } catch (IOException e) { // TODO manage it Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (revision 1305723) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (working copy) @@ -138,9 +138,6 @@ private volatile boolean running = true; // Metrics for this source private ReplicationSourceMetrics metrics; - // If source is enabled, replication happens. If disabled, nothing will be - // replicated but HLogs will still be queued - private AtomicBoolean sourceEnabled = new AtomicBoolean(); /** * Instantiation method used by region servers @@ -274,7 +271,7 @@ // Loop until we close down while (isActive()) { // Sleep until replication is enabled again - if (!this.replicating.get() || !this.sourceEnabled.get()) { + if (!isPeerEnabled()) { if (sleepForRetries("Replication is disabled", sleepMultiplier)) { sleepMultiplier++; } @@ -601,6 +598,12 @@ return; } while (this.isActive()) { + if (!isPeerEnabled()) { + if (sleepForRetries("Replication is disabled", sleepMultiplier)) { + sleepMultiplier++; + } + continue; + } try { HRegionInterface rrs = getRS(); LOG.debug("Replicating " + currentNbEntries); @@ -660,6 +663,15 @@ } /** + * check whether the peer is enabled or not + * + * @return true if the peer is enabled, otherwise false + */ + protected boolean isPeerEnabled() { + return this.replicating.get() && this.zkHelper.getPeerEnabled(peerId); + } + + /** * If the queue isn't empty, switch to the next one * Else if this is a recovered queue, it means we're done! * Else we'll just continue to try reading the log file @@ -765,10 +777,6 @@ return this.currentPath; } - public void setSourceEnabled(boolean status) { - this.sourceEnabled.set(status); - } - private boolean isActive() { return !this.stopper.isStopped() && this.running; } Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java (revision 1305723) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java (working copy) @@ -95,9 +95,4 @@ */ public String getPeerClusterId(); - /** - * Set if this source is enabled or disabled - * @param status the new status - */ - public void setSourceEnabled(boolean status); } Index: src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (revision 1305723) +++ src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (working copy) @@ -50,18 +50,20 @@ import org.apache.zookeeper.KeeperException.SessionExpiredException; /** - * This class serves as a helper for all things related to zookeeper - * in replication. + * This class serves as a helper for all things related to zookeeper in + * replication. *

- * The layout looks something like this under zookeeper.znode.parent - * for the master cluster: + * The layout looks something like this under zookeeper.znode.parent for the + * master cluster: *

+ * *

  * replication/
  *  state      {contains true or false}
  *  clusterId  {contains a byte}
  *  peers/
  *    1/   {contains a full cluster address}
+ *      peer-state  {contains ENABLED or DISABLED}
  *    2/
  *    ...
  *  rs/ {lists all RS that replicate}
@@ -82,6 +84,12 @@
     LogFactory.getLog(ReplicationZookeeper.class);
   // Name of znode we use to lock when failover
   private final static String RS_LOCK_ZNODE = "lock";
+
+  // Values of znode which stores state of a peer
+  public static enum PeerState {
+    ENABLED, DISABLED
+  };
+
   // Our handle on zookeeper
   private final ZooKeeperWatcher zookeeper;
   // Map of peer clusters keyed by their id
@@ -96,6 +104,8 @@
   private String rsServerNameZnode;
   // Name node if the replicationState znode
   private String replicationStateNodeName;
+  // Name of zk node which stores peer state
+  private String peerStateNodeName;
   private final Configuration conf;
   // Is this cluster replicating at the moment?
   private AtomicBoolean replicating;
@@ -150,6 +160,8 @@
         conf.get("zookeeper.znode.replication", "replication");
     String peersZNodeName =
         conf.get("zookeeper.znode.replication.peers", "peers");
+    this.peerStateNodeName = conf.get(
+        "zookeeper.znode.replication.peers.state", "peer-state");
     this.replicationStateNodeName =
         conf.get("zookeeper.znode.replication.state", "state");
     String rsZNodeName =
@@ -339,8 +351,10 @@
       return null;
     }
 
-    return new ReplicationPeer(otherConf, peerId,
+    ReplicationPeer peer = new ReplicationPeer(otherConf, peerId,
         otherClusterKey);
+    peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId));
+    return peer;
   }
 
   /**
@@ -366,7 +380,8 @@
       if (!peerExists(id)) {
         throw new IllegalArgumentException("Cannot remove inexisting peer");
       }
-      ZKUtil.deleteNode(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
+      ZKUtil.deleteNodeRecursively(this.zookeeper,
+          ZKUtil.joinZNode(this.peersZNode, id));
     } catch (KeeperException e) {
       throw new IOException("Unable to remove a peer", e);
     }
@@ -388,6 +403,8 @@
       ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
       ZKUtil.createAndWatch(this.zookeeper,
           ZKUtil.joinZNode(this.peersZNode, id), Bytes.toBytes(clusterKey));
+      ZKUtil.createAndWatch(this.zookeeper, getPeerStateNode(id),
+          Bytes.toBytes(PeerState.ENABLED.name())); // enabled by default
     } catch (KeeperException e) {
       throw new IOException("Unable to add peer", e);
     }
@@ -399,6 +416,82 @@
   }
 
   /**
+   * Enable replication to the peer
+   *
+   * @param id peer's identifier
+   * @throws IllegalArgumentException
+   *           Thrown when the peer doesn't exist
+   */
+  public void enablePeer(String id) throws IOException {
+    changePeerState(id, PeerState.ENABLED);
+    LOG.info("peer " + id + " is enabled");
+  }
+
+  /**
+   * Disable replication to the peer
+   *
+   * @param id peer's identifier
+   * @throws IllegalArgumentException
+   *           Thrown when the peer doesn't exist
+   */
+  public void disablePeer(String id) throws IOException {
+    changePeerState(id, PeerState.DISABLED);
+    LOG.info("peer " + id + " is disabled");
+  }
+
+  private void changePeerState(String id, PeerState state) throws IOException {
+    try {
+      if (!peerExists(id)) {
+        throw new IllegalArgumentException("peer " + id + " is not registered");
+      }
+      String peerStateZNode = getPeerStateNode(id);
+      if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) {
+        ZKUtil.setData(this.zookeeper, peerStateZNode,
+          Bytes.toBytes(state.name()));
+      } else {
+        ZKUtil.createAndWatch(zookeeper, peerStateZNode,
+            Bytes.toBytes(state.name()));
+      }
+      LOG.info("state of the peer " + id + " changed to " + state.name());
+    } catch (KeeperException e) {
+      throw new IOException("Unable to change state of the peer " + id, e);
+    }
+  }
+
+  /**
+   * Get state of the peer. This method checks the state by connecting to ZK.
+   *
+   * @param id peer's identifier
+   * @return current state of the peer
+   */
+  public PeerState getPeerState(String id) throws KeeperException {
+    byte[] peerStateBytes = ZKUtil
+        .getData(this.zookeeper, getPeerStateNode(id));
+    return PeerState.valueOf(Bytes.toString(peerStateBytes));
+  }
+
+  /**
+   * Check whether the peer is enabled or not. This method checks the atomic
+   * boolean of ReplicationPeer locally.
+   *
+   * @param id peer identifier
+   * @return true if the peer is enabled, otherwise false
+   * @throws IllegalArgumentException
+   *           Thrown when the peer doesn't exist
+   */
+  public boolean getPeerEnabled(String id) {
+    if (!this.peerClusters.containsKey(id)) {
+      throw new IllegalArgumentException("peer " + id + " is not registered");
+    }
+    return this.peerClusters.get(id).getPeerEnabled().get();
+  }
+
+  private String getPeerStateNode(String id) {
+    return ZKUtil.joinZNode(this.peersZNode,
+        ZKUtil.joinZNode(id, this.peerStateNodeName));
+  }
+
+  /**
    * This reads the state znode for replication and sets the atomic boolean
    */
   private void readReplicationStateZnode() {
Index: src/main/ruby/hbase/replication_admin.rb
===================================================================
--- src/main/ruby/hbase/replication_admin.rb	(revision 1305723)
+++ src/main/ruby/hbase/replication_admin.rb	(working copy)
@@ -50,6 +50,12 @@
     end
 
     #----------------------------------------------------------------------------------------------
+    # Get peer cluster state
+    def get_peer_state(id)
+      @replication_admin.getPeerState(id)
+    end
+
+    #----------------------------------------------------------------------------------------------
     # Restart the replication stream to the specified peer
     def enable_peer(id)
       @replication_admin.enablePeer(id)
Index: src/main/ruby/shell/commands/list_peers.rb
===================================================================
--- src/main/ruby/shell/commands/list_peers.rb	(revision 1305723)
+++ src/main/ruby/shell/commands/list_peers.rb	(working copy)
@@ -33,10 +33,11 @@
         now = Time.now
         peers = replication_admin.list_peers
 
-        formatter.header(["PEER ID", "CLUSTER KEY"])
+        formatter.header(["PEER_ID", "CLUSTER_KEY", "STATE"])
 
         peers.entrySet().each do |e|
-          formatter.row([ e.key, e.value ])
+          state = replication_admin.get_peer_state(e.key)
+          formatter.row([ e.key, e.value, state ])
         end
 
         formatter.footer(now)
Index: src/main/ruby/shell/commands/disable_peer.rb
===================================================================
--- src/main/ruby/shell/commands/disable_peer.rb	(revision 1305723)
+++ src/main/ruby/shell/commands/disable_peer.rb	(working copy)
@@ -26,8 +26,6 @@
 Stops the replication stream to the specified cluster, but still
 keeps track of new edits to replicate.
 
-CURRENTLY UNSUPPORTED
-
 Examples:
 
   hbase> disable_peer '1'
Index: src/main/ruby/shell/commands/enable_peer.rb
===================================================================
--- src/main/ruby/shell/commands/enable_peer.rb	(revision 1305723)
+++ src/main/ruby/shell/commands/enable_peer.rb	(working copy)
@@ -26,8 +26,6 @@
 Restarts the replication to the specified peer cluster,
 continuing from where it was disabled.
 
-CURRENTLY UNSUPPORTED
-
 Examples:
 
   hbase> enable_peer '1'