Index: src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (revision 1242253) +++ src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (working copy) @@ -30,13 +30,23 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; +import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -100,6 +110,9 @@ ZKUtil.setData(zkw, "/hbase/replication/peers/1", Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1")); + ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state"); + ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state", + Bytes.toBytes(ReplicationZookeeper.PeerState.ENABLED.name())); ZKUtil.createWithParents(zkw, "/hbase/replication/state"); ZKUtil.setData(zkw, "/hbase/replication/state", Bytes.toBytes("true")); Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (revision 1242253) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (working copy) @@ -299,6 +299,13 @@ boolean gotIOE = false; currentNbEntries = 0; try { + if (!zkHelper.getPeerEnabled(peerId)) { + if (sleepForRetries("peer " + peerId + " is disabled", + sleepMultiplier)) { + sleepMultiplier++; + } + continue; + } if(readAllEntriesToReplicateOrNextFile()) { continue; } Index: src/main/ruby/hbase/replication_admin.rb =================================================================== --- src/main/ruby/hbase/replication_admin.rb (revision 1242253) +++ src/main/ruby/hbase/replication_admin.rb (working copy) @@ -50,6 +50,12 @@ end #---------------------------------------------------------------------------------------------- + # Get peer cluster state + def get_peer_state(id) + @replication_admin.getPeerState(id) + end + + #---------------------------------------------------------------------------------------------- # Restart the replication stream to the specified peer def enable_peer(id) @replication_admin.enablePeer(id) Index: src/main/ruby/shell/commands/list_peers.rb =================================================================== --- src/main/ruby/shell/commands/list_peers.rb (revision 1242253) +++ src/main/ruby/shell/commands/list_peers.rb (working copy) @@ -33,10 +33,11 @@ now = Time.now peers = replication_admin.list_peers - formatter.header(["PEER ID", "CLUSTER KEY"]) + formatter.header(["PEER_ID", "CLUSTER_KEY", "STATE"]) peers.entrySet().each do |e| - formatter.row([ e.key, e.value ]) + state = replication_admin.get_peer_state(e.key) + formatter.row([ e.key, e.value, state ]) end formatter.footer(now) Index: src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java (revision 1242253) +++ src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java (working copy) @@ -26,7 +26,14 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; @@ -443,8 +450,53 @@ } /** + * Test disable/enable replication, trying to insert, make sure nothing's + * replicated, enable it, the insert should be replicated + * + * @throws Exception + */ + @Test(timeout = 300000) + public void testDisableEnable() throws Exception { + + // Test disabling replication + admin.disablePeer("2"); + + byte[] rowkey = Bytes.toBytes("disable enable"); + Put put = new Put(rowkey); + put.add(famName, row, row); + htable1.put(put); + + Get get = new Get(rowkey); + for (int i = 0; i < NB_RETRIES; i++) { + Result res = htable2.get(get); + if (res.size() >= 1) { + fail("Replication wasn't disabled"); + } else { + LOG.info("Row not replicated, let's wait a bit more..."); + Thread.sleep(SLEEP_TIME); + } + } + + // Test enable replication + admin.enablePeer("2"); + + for (int i = 0; i < NB_RETRIES; i++) { + Result res = htable2.get(get); + if (res.size() == 0) { + LOG.info("Row not available"); + Thread.sleep(SLEEP_TIME); + } else { + assertArrayEquals(res.value(), row); + return; + } + } + fail("Waited too much time for put replication"); + } + + /** * Integration test for TestReplicationAdmin, removes and re-add a peer * cluster + * * @throws Exception */ @Test(timeout=300000) Index: src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (revision 1242253) +++ src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (working copy) @@ -23,7 +23,6 @@ import java.io.IOException; import java.util.Map; -import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HConnection; @@ -114,16 +113,16 @@ * Restart the replication stream to the specified peer. * @param id a short that identifies the cluster */ - public void enablePeer(String id) { - throw new NotImplementedException("Not implemented"); + public void enablePeer(String id) throws IOException { + this.replicationZk.enablePeer(id); } /** * Stop the replication stream to the specified peer. * @param id a short that identifies the cluster */ - public void disablePeer(String id) { - throw new NotImplementedException("Not implemented"); + public void disablePeer(String id) throws IOException { + this.replicationZk.disablePeer(id); } /** @@ -143,6 +142,20 @@ } /** + * Get state of the peer + * + * @param id peer's identifier + * @return current state of the peer + */ + public String getPeerState(String id) throws IOException { + try { + return this.replicationZk.getPeerState(id).name(); + } catch (KeeperException e) { + throw new IOException("Couldn't get the state of the peer " + id); + } + } + + /** * Get the current status of the kill switch, if the cluster is replicating * or not. * @return true if the cluster is replicated, otherwise false Index: src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (revision 1242253) +++ src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (working copy) @@ -48,18 +48,20 @@ import org.apache.zookeeper.KeeperException.SessionExpiredException; /** - * This class serves as a helper for all things related to zookeeper - * in replication. + * This class serves as a helper for all things related to zookeeper in + * replication. *
- * The layout looks something like this under zookeeper.znode.parent - * for the master cluster: + * The layout looks something like this under zookeeper.znode.parent for the + * master cluster: * + * *
* replication/
* state {contains true or false}
* clusterId {contains a byte}
* peers/
* 1/ {contains a full cluster address}
+ * peer-state {contains ENABLED or DISABLED}
* 2/
* ...
* rs/ {lists all RS that replicate}
@@ -79,6 +81,12 @@
LogFactory.getLog(ReplicationZookeeper.class);
// Name of znode we use to lock when failover
private final static String RS_LOCK_ZNODE = "lock";
+
+ // Values of znode which stores state of a peer
+ public static enum PeerState {
+ ENABLED, DISABLED
+ };
+
// Our handle on zookeeper
private final ZooKeeperWatcher zookeeper;
// Map of peer clusters keyed by their id
@@ -93,6 +101,8 @@
private String rsServerNameZnode;
// Name node if the replicationState znode
private String replicationStateNodeName;
+ // Name of zk node which stores peer state
+ private String peerStateNodeName;
private final Configuration conf;
// Is this cluster replicating at the moment?
private AtomicBoolean replicating;
@@ -101,6 +111,8 @@
// Abortable
private Abortable abortable;
private ReplicationStatusTracker statusTracker;
+ // Map of peer state trackers keyed by peer id
+ private Map peerStateTrackers;
/**
* Constructor used by clients of replication (like master and HBase clients)
@@ -135,6 +147,7 @@
setZNodes(server);
this.peerClusters = new HashMap();
+ this.peerStateTrackers = new HashMap();
ZKUtil.createWithParents(this.zookeeper,
ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, server.getServerName().toString());
@@ -147,6 +160,8 @@
conf.get("zookeeper.znode.replication", "replication");
String peersZNodeName =
conf.get("zookeeper.znode.replication.peers", "peers");
+ this.peerStateNodeName = conf.get(
+ "zookeeper.znode.replication.peers.state", "peer-state");
this.replicationStateNodeName =
conf.get("zookeeper.znode.replication.state", "state");
String rsZNodeName =
@@ -306,6 +321,10 @@
return false;
}
this.peerClusters.put(peerId, peer);
+ PeerStateTracker stateTracker = new PeerStateTracker(peerId,
+ this.zookeeper, this.abortable);
+ this.peerStateTrackers.put(peerId, stateTracker);
+ stateTracker.start();
ZKUtil.createWithParents(this.zookeeper, ZKUtil.joinZNode(
this.rsServerNameZnode, peerId));
LOG.info("Added new peer cluster " + peer.getClusterKey());
@@ -336,8 +355,11 @@
return null;
}
- return new ReplicationPeer(otherConf, peerId,
+ ReplicationPeer peer = new ReplicationPeer(otherConf, peerId,
otherClusterKey);
+ peer.getPeerEnabled().set(
+ PeerState.ENABLED.equals(this.getPeerState(peerId)));
+ return peer;
}
/**
@@ -363,7 +385,8 @@
if (!peerExists(id)) {
throw new IllegalArgumentException("Cannot remove inexisting peer");
}
- ZKUtil.deleteNode(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
+ ZKUtil.deleteNodeRecursively(this.zookeeper,
+ ZKUtil.joinZNode(this.peersZNode, id));
} catch (KeeperException e) {
throw new IOException("Unable to remove a peer", e);
}
@@ -385,6 +408,8 @@
ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
ZKUtil.createAndWatch(this.zookeeper,
ZKUtil.joinZNode(this.peersZNode, id), Bytes.toBytes(clusterKey));
+ ZKUtil.createAndWatch(this.zookeeper, getPeerStateNode(id),
+ Bytes.toBytes(PeerState.ENABLED.name())); // enabled by default
} catch (KeeperException e) {
throw new IOException("Unable to add peer", e);
}
@@ -396,6 +421,87 @@
}
/**
+ * Enable replication to the peer
+ *
+ * @param id peer's identifier
+ * @throws IllegalArgumentException
+ * Thrown when the peer doesn't exist
+ */
+ public void enablePeer(String id) throws IOException {
+ changePeerState(id, PeerState.ENABLED);
+ LOG.info("peer " + id + " is enabled");
+ }
+
+ /**
+ * Disable replication to the peer
+ *
+ * @param id peer's identifier
+ * @throws IllegalArgumentException
+ * Thrown when the peer doesn't exist
+ */
+ public void disablePeer(String id) throws IOException {
+ changePeerState(id, PeerState.DISABLED);
+ LOG.info("peer " + id + " is disabled");
+ }
+
+ private void changePeerState(String id, PeerState state) throws IOException {
+ try {
+ if (!peerExists(id)) {
+ throw new IllegalArgumentException("peer " + id + " is not registered");
+ }
+ ZKUtil.setData(this.zookeeper, getPeerStateNode(id),
+ Bytes.toBytes(state.name()));
+ LOG.info("state of the peer " + id + " changed to " + state.name());
+ } catch (KeeperException e) {
+ throw new IOException("Unable to change state of the peer " + id, e);
+ }
+ }
+
+ /**
+ * Get state of the peer. This method checks the state by connecting to ZK.
+ *
+ * @param id peer's identifier
+ * @return current state of the peer
+ */
+ public PeerState getPeerState(String id) throws KeeperException {
+ byte[] peerStateBytes = ZKUtil
+ .getData(this.zookeeper, getPeerStateNode(id));
+ return PeerState.valueOf(Bytes.toString(peerStateBytes));
+ }
+
+ /**
+ * Check whether the peer is enabled or not. This method checks the atomic
+ * boolean of ReplicationPeer locally.
+ *
+ * @param id peer identifier
+ * @return true if the peer is enabled, otherwise false
+ * @throws IllegalArgumentException
+ * Thrown when the peer doesn't exist
+ */
+ public boolean getPeerEnabled(String id) {
+ if (!this.peerClusters.containsKey(id)) {
+ throw new IllegalArgumentException("peer " + id + " is not registered");
+ }
+ return this.peerClusters.get(id).getPeerEnabled().get();
+ }
+
+ private void readPeerStateZnode(String id) {
+ if (!this.peerClusters.containsKey(id)) {
+ throw new IllegalArgumentException("peer " + id + " is not registered");
+ }
+ ReplicationPeer peer = this.peerClusters.get(id);
+ PeerStateTracker stateTracker = this.peerStateTrackers.get(id);
+ String currentState = Bytes.toString(stateTracker.getData(false));
+ peer.getPeerEnabled().set(
+ PeerState.ENABLED.equals(PeerState.valueOf(currentState)));
+ }
+
+ private String getPeerStateNode(String id) {
+ return ZKUtil.joinZNode(this.peersZNode,
+ ZKUtil.joinZNode(id, this.peerStateNodeName));
+ }
+
+ /**
* This reads the state znode for replication and sets the atomic boolean
*/
private void readReplicationStateZnode() {
@@ -761,4 +867,25 @@
}
}
}
+
+ /**
+ * Tracker for state of each peer
+ */
+ public class PeerStateTracker extends ZooKeeperNodeTracker {
+ private String id;
+
+ public PeerStateTracker(String id, ZooKeeperWatcher watcher,
+ Abortable abortable) {
+ super(watcher, getPeerStateNode(id), abortable);
+ this.id = id;
+ }
+
+ @Override
+ public synchronized void nodeDataChanged(String path) {
+ if (path.equals(node)) {
+ super.nodeDataChanged(path);
+ readPeerStateZnode(id);
+ }
+ }
+ }
}