Index: src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java (revision 1240491) +++ src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java (working copy) @@ -26,7 +26,14 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; @@ -443,8 +450,52 @@ } /** + * Test disable/enable replication, trying to insert, make sure nothing's + * replicated, enable it, the insert should be replicated + * + * @throws Exception + */ + @Test(timeout = 300000) + public void testDisableEnable() throws Exception { + + // Test disabling replication + admin.disablePeer("2"); + + Put put = new Put(Bytes.toBytes("disable enable")); + put.add(famName, row, row); + htable1.put(put); + + Get get = new Get(Bytes.toBytes("disable enable")); + for (int i = 0; i < NB_RETRIES; i++) { + Result res = htable2.get(get); + if (res.size() >= 1) { + fail("Replication wasn't disabled"); + } else { + LOG.info("Row not replicated, let's wait a bit more..."); + Thread.sleep(SLEEP_TIME); + } + } + + // Test enable replication + admin.enablePeer("2"); + + for (int i = 0; i < NB_RETRIES; i++) { + Result res = htable2.get(get); + if (res.size() == 0) { + LOG.info("Row not available"); + Thread.sleep(SLEEP_TIME); + } else { + assertArrayEquals(res.value(), row); + return; + } + } + fail("Waited too much time for put replication"); + } + + /** * Integration test for TestReplicationAdmin, removes and re-add a peer * cluster + * * @throws Exception */ @Test(timeout=300000) Index: src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (revision 1240491) +++ src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (working copy) @@ -23,7 +23,6 @@ import java.io.IOException; import java.util.Map; -import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HConnection; @@ -114,16 +113,16 @@ * Restart the replication stream to the specified peer. * @param id a short that identifies the cluster */ - public void enablePeer(String id) { - throw new NotImplementedException("Not implemented"); + public void enablePeer(String id) throws IOException { + this.replicationZk.enablePeer(id); } /** * Stop the replication stream to the specified peer. * @param id a short that identifies the cluster */ - public void disablePeer(String id) { - throw new NotImplementedException("Not implemented"); + public void disablePeer(String id) throws IOException { + this.replicationZk.disablePeer(id); } /** @@ -143,6 +142,16 @@ } /** + * Get state of the peer + * + * @param id peer's identifier + * @return current state of the peer + */ + public String getPeerState(String id) { + return this.replicationZk.getPeerState(id).name(); + } + + /** * Get the current status of the kill switch, if the cluster is replicating * or not. * @return true if the cluster is replicated, otherwise false Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (revision 1240491) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (working copy) @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.ReplicationZookeeper; +import org.apache.hadoop.hbase.replication.ReplicationZookeeper.PeerState; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ClusterId; @@ -299,6 +300,13 @@ boolean gotIOE = false; currentNbEntries = 0; try { + if (PeerState.DISABLED.equals(zkHelper.getPeerState(peerId))) { + if (sleepForRetries("peer " + peerId + " is disabled", + sleepMultiplier)) { + sleepMultiplier++; + } + continue; + } if(readAllEntriesToReplicateOrNextFile()) { continue; } Index: src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (revision 1240491) +++ src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (working copy) @@ -48,18 +48,20 @@ import org.apache.zookeeper.KeeperException.SessionExpiredException; /** - * This class serves as a helper for all things related to zookeeper - * in replication. + * This class serves as a helper for all things related to zookeeper in + * replication. *
- * The layout looks something like this under zookeeper.znode.parent - * for the master cluster: + * The layout looks something like this under zookeeper.znode.parent for the + * master cluster: * + * *
* replication/
* state {contains true or false}
* clusterId {contains a byte}
* peers/
* 1/ {contains a full cluster address}
+ * peer-state {contains ENABLED or DISABLED}
* 2/
* ...
* rs/ {lists all RS that replicate}
@@ -79,6 +81,12 @@
LogFactory.getLog(ReplicationZookeeper.class);
// Name of znode we use to lock when failover
private final static String RS_LOCK_ZNODE = "lock";
+
+ // Values of znode which represent state of a peer
+ public static enum PeerState {
+ ENABLED, DISABLED
+ };
+
// Our handle on zookeeper
private final ZooKeeperWatcher zookeeper;
// Map of peer clusters keyed by their id
@@ -93,6 +101,8 @@
private String rsServerNameZnode;
// Name node if the replicationState znode
private String replicationStateNodeName;
+ // Name of zk node which stores peer state'
+ private String peerStateNodeName;
private final Configuration conf;
// Is this cluster replicating at the moment?
private AtomicBoolean replicating;
@@ -147,6 +157,8 @@
conf.get("zookeeper.znode.replication", "replication");
String peersZNodeName =
conf.get("zookeeper.znode.replication.peers", "peers");
+ this.peerStateNodeName = conf.get(
+ "zookeeper.znode.replication.peers.state", "peer-state");
this.replicationStateNodeName =
conf.get("zookeeper.znode.replication.state", "state");
String rsZNodeName =
@@ -363,7 +375,8 @@
if (!peerExists(id)) {
throw new IllegalArgumentException("Cannot remove inexisting peer");
}
- ZKUtil.deleteNode(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
+ ZKUtil.deleteNodeRecursively(this.zookeeper,
+ ZKUtil.joinZNode(this.peersZNode, id));
} catch (KeeperException e) {
throw new IOException("Unable to remove a peer", e);
}
@@ -385,6 +398,8 @@
ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
ZKUtil.createAndWatch(this.zookeeper,
ZKUtil.joinZNode(this.peersZNode, id), Bytes.toBytes(clusterKey));
+ ZKUtil.createAndWatch(this.zookeeper, getPeerStateZNode(id),
+ Bytes.toBytes(PeerState.ENABLED.name())); // enabled by default
} catch (KeeperException e) {
throw new IOException("Unable to add peer", e);
}
@@ -396,6 +411,66 @@
}
/**
+ * Enable replication to the peer
+ *
+ * @param id peer's identifier
+ * @throws IllegalArgumentException
+ * Thrown when the peer doesn't exist
+ */
+ public void enablePeer(String id) throws IOException {
+ changePeerState(id, PeerState.ENABLED);
+ LOG.info("peer " + id + " is enabled");
+ }
+
+ /**
+ * Disable replication to the peer
+ *
+ * @param id peer's identifier
+ * @throws IllegalArgumentException
+ * Thrown when the peer doesn't exist
+ */
+ public void disablePeer(String id) throws IOException {
+ changePeerState(id, PeerState.DISABLED);
+ LOG.info("peer " + id + " is disabled");
+ }
+
+ private void changePeerState(String id, PeerState state) throws IOException {
+ try {
+ if (!peerExists(id)) {
+ throw new IllegalArgumentException("peer " + id + " is not registered");
+ }
+ ZKUtil.setData(this.zookeeper, getPeerStateZNode(id),
+ Bytes.toBytes(state.name()));
+ LOG.info("state of the peer " + id + " changed to " + state.name());
+ } catch (KeeperException e) {
+ throw new IOException("Unable to change state of the peer " + id, e);
+ }
+ }
+
+ /**
+ * Get state of the peer
+ *
+ * @param id peer's identifier
+ * @return current state of the peer
+ * @throws IOException
+ */
+ public PeerState getPeerState(String id) {
+ PeerState peerState = null;
+ try {
+ byte[] peerStateBytes = ZKUtil.getData(this.zookeeper, getPeerStateZNode(id));
+ peerState = PeerState.valueOf(Bytes.toString(peerStateBytes));
+ } catch (KeeperException e) {
+ this.abortable.abort("Cannot get the state of the peer " + id, e);
+ }
+ return peerState;
+ }
+
+ private String getPeerStateZNode(String id){
+ return ZKUtil.joinZNode(this.peersZNode,
+ ZKUtil.joinZNode(id, this.peerStateNodeName));
+ }
+
+ /**
* This reads the state znode for replication and sets the atomic boolean
*/
private void readReplicationStateZnode() {
Index: src/main/ruby/hbase/replication_admin.rb
===================================================================
--- src/main/ruby/hbase/replication_admin.rb (revision 1240491)
+++ src/main/ruby/hbase/replication_admin.rb (working copy)
@@ -50,6 +50,12 @@
end
#----------------------------------------------------------------------------------------------
+ # Get peer cluster state
+ def get_peer_state(id)
+ @replication_admin.getPeerState(id)
+ end
+
+ #----------------------------------------------------------------------------------------------
# Restart the replication stream to the specified peer
def enable_peer(id)
@replication_admin.enablePeer(id)
Index: src/main/ruby/shell/commands/list_peers.rb
===================================================================
--- src/main/ruby/shell/commands/list_peers.rb (revision 1240491)
+++ src/main/ruby/shell/commands/list_peers.rb (working copy)
@@ -33,10 +33,11 @@
now = Time.now
peers = replication_admin.list_peers
- formatter.header(["PEER ID", "CLUSTER KEY"])
+ formatter.header(["PEER_ID", "CLUSTER_KEY", "STATE"])
peers.entrySet().each do |e|
- formatter.row([ e.key, e.value ])
+ state = replication_admin.get_peer_state(e.key)
+ formatter.row([ e.key, e.value, state ])
end
formatter.footer(now)