regionServers) {
+ this.regionServers = regionServers;
+ }
+
+ public void setZkw(ZooKeeperWrapper zkw) {
+ this.zkw = zkw;
+ }
+
+ public String getId() {
+ return id;
+ }
+}
diff --git a/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperWrapper.java b/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperWrapper.java
index 1007aeb..c7bd4ce 100644
--- a/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperWrapper.java
+++ b/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeperWrapper.java
@@ -51,7 +51,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
*
*
* replication/
- * master {contains a full cluster address}
* state {contains true or false}
* clusterId {contains a byte}
* peers/
@@ -78,8 +77,8 @@ public class ReplicationZookeeperWrapper {
private final static String RS_LOCK_ZNODE = "lock";
// Our handle on zookeeper
private final ZooKeeperWrapper zookeeperWrapper;
- // Map of addresses of peer clusters with their ZKW
- private final Map peerClusters;
+ // Map of peer clusters keyed by their id
+ private final Map peerClusters;
// Path to the root replication znode
private final String replicationZNode;
// Path to the peer clusters znode
@@ -90,13 +89,27 @@ public class ReplicationZookeeperWrapper {
private final String rsServerNameZnode;
// Name node if the replicationState znode
private final String replicationStateNodeName;
- // If this RS is part of a master cluster
- private final boolean replicationMaster;
private final Configuration conf;
// Is this cluster replicating at the moment?
private final AtomicBoolean replicating;
// Byte (stored as string here) that identifies this cluster
private final String clusterId;
+ // The key to our own cluster
+ private final String ourClusterKey;
+ // Way to know if this class needs to process the added features
+ // required for the region servers, or just act a as replication client
+ private final boolean clientOnly;
+
+ /**
+ * Constructor used by clients of replication (like master and HBase clients)
+ * @param zookeeperWrapper zkw to wrap
+ * @param conf conf to use
+ * @throws IOException
+ */
+ public ReplicationZookeeperWrapper(ZooKeeperWrapper zookeeperWrapper,
+ Configuration conf) throws IOException {
+ this(zookeeperWrapper, conf, new AtomicBoolean(true), null);
+ }
/**
* Constructor used by region servers, connects to the peer cluster right away.
@@ -113,6 +126,7 @@ public class ReplicationZookeeperWrapper {
final AtomicBoolean replicating, String rsName) throws IOException {
this.zookeeperWrapper = zookeeperWrapper;
this.conf = conf;
+ this.clientOnly = rsName == null;
String replicationZNodeName =
conf.get("zookeeper.znode.replication", "replication");
String peersZNodeName =
@@ -125,47 +139,53 @@ public class ReplicationZookeeperWrapper {
conf.get("zookeeper.znode.replication.clusterId", "clusterId");
String rsZNodeName =
conf.get("zookeeper.znode.replication.rs", "rs");
- String thisCluster = this.conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" +
+ this.ourClusterKey = this.conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" +
this.conf.get("hbase.zookeeper.property.clientPort") + ":" +
this.conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
- this.peerClusters = new HashMap();
+ this.peerClusters = new HashMap();
this.replicationZNode = zookeeperWrapper.getZNode(
zookeeperWrapper.getParentZNode(), replicationZNodeName);
this.peersZNode =
zookeeperWrapper.getZNode(replicationZNode, peersZNodeName);
+ this.zookeeperWrapper.ensureExists(this.peersZNode);
this.rsZNode =
zookeeperWrapper.getZNode(replicationZNode, rsZNodeName);
this.replicating = replicating;
- setReplicating();
+ readReplicationStateZnode();
String idResult = Bytes.toString(
this.zookeeperWrapper.getData(this.replicationZNode,
clusterIdZNodeName));
this.clusterId =
idResult == null ?
Byte.toString(HConstants.DEFAULT_CLUSTER_ID) : idResult;
- String address = Bytes.toString(
- this.zookeeperWrapper.getData(this.replicationZNode,
- repMasterZNodeName));
- this.replicationMaster = thisCluster.equals(address);
- LOG.info("This cluster (" + thisCluster + ") is a "
- + (this.replicationMaster ? "master" : "slave") + " for replication" +
- ", compared with (" + address + ")");
- if (rsName != null) {
+ if (this.clientOnly) {
+ this.rsServerNameZnode = null;
+ } else {
this.rsServerNameZnode =
this.zookeeperWrapper.getZNode(rsZNode, rsName);
- List znodes = this.zookeeperWrapper.listZnodes(this.peersZNode,
- new ReplicationStatusWatcher());
- if (znodes != null) {
- for (String znode : znodes) {
+ connectExistingPeers();
+ }
+ }
+
+ private void connectExistingPeers() throws IOException {
+ List znodes = listPeersIds(null);
+ if (znodes != null) {
+ for (String znode : znodes) {
+ if (!this.peerClusters.containsKey(znode))
connectToPeer(znode);
- }
}
- } else {
- this.rsServerNameZnode = null;
}
+ }
+ /**
+ *
+ * @param watcher
+ * @return
+ */
+ public List listPeersIds(Watcher watcher) {
+ return this.zookeeperWrapper.listZnodes(this.peersZNode, watcher);
}
/**
@@ -174,13 +194,20 @@ public class ReplicationZookeeperWrapper {
* @param peerClusterId (byte) the cluster to interrogate
* @return addresses of all region servers
*/
- public List getPeersAddresses(String peerClusterId) {
+ public List getSlavesAddresses(String peerClusterId) {
if (this.peerClusters.size() == 0) {
return new ArrayList(0);
}
- ZooKeeperWrapper zkw = this.peerClusters.get(peerClusterId);
- return zkw == null?
- new ArrayList(0) : zkw.scanRSDirectory();
+ ReplicationPeer peer = this.peerClusters.get(peerClusterId);
+ if (peer == null) {
+ return new ArrayList(0);
+ }
+ peer.setRegionServers(fetchSlavesAddresses(peer.getZkw()));
+ return peer.getRegionServers();
+ }
+
+ private List fetchSlavesAddresses(ZooKeeperWrapper zkw) {
+ return zkw.scanRSDirectory();
}
/**
@@ -188,31 +215,126 @@ public class ReplicationZookeeperWrapper {
* in this region server's replication znode
* @param peerId id of the peer cluster
*/
- private void connectToPeer(String peerId) throws IOException {
- String[] ensemble =
- Bytes.toString(this.zookeeperWrapper.getData(this.peersZNode, peerId)).
- split(":");
+ public boolean connectToPeer(String peerId) throws IOException {
+ if (this.clientOnly) {
+ return false;
+ }
+ if (this.peerClusters.containsKey(peerId)) {
+ return false;
+ // TODO remove when we support it
+ } else if (this.peerClusters.size() > 0) {
+ LOG.warn("Multiple slaves feature not supported");
+ return false;
+ }
+ String otherClusterKey = Bytes.toString(this.zookeeperWrapper.getData(
+ this.peersZNode, peerId));
+ if (this.ourClusterKey.equals(otherClusterKey)) {
+ LOG.debug("Not connecting to " + peerId + " because it's us");
+ return false;
+ }
+ String[] ensemble = otherClusterKey.split(":");
if (ensemble.length != 3) {
- throw new IllegalArgumentException("Wrong format of cluster address: " +
+ LOG.warn("Wrong format of cluster address: " +
this.zookeeperWrapper.getData(this.peersZNode, peerId));
+ return false;
}
+ // Construct the connection to the new peer
Configuration otherConf = new Configuration(this.conf);
otherConf.set(HConstants.ZOOKEEPER_QUORUM, ensemble[0]);
otherConf.set("hbase.zookeeper.property.clientPort", ensemble[1]);
otherConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, ensemble[2]);
ZooKeeperWrapper zkw = ZooKeeperWrapper.createInstance(otherConf,
"connection to cluster: " + peerId);
- zkw.registerListener(new ReplicationStatusWatcher());
- this.peerClusters.put(peerId, zkw);
+ ReplicationPeer peer = new ReplicationPeer(peerId, otherClusterKey, zkw);
+ this.peerClusters.put(peerId, peer);
this.zookeeperWrapper.ensureExists(this.zookeeperWrapper.getZNode(
this.rsServerNameZnode, peerId));
LOG.info("Added new peer cluster " + StringUtils.arrayToString(ensemble));
+ return true;
+ }
+
+ /**
+ * Set the new replication state for this cluster
+ * @param newState
+ */
+ public void setReplicating(boolean newState) {
+ try {
+ this.zookeeperWrapper.writeZNode(this.replicationZNode,
+ this.replicationStateNodeName, Boolean.toString(newState));
+ } catch (InterruptedException e) {
+ LOG.error(e);
+ } catch (KeeperException e) {
+ LOG.error(e);
+ }
+ }
+
+ public void setSourceState(String id, boolean enabled) {
+ try {
+ this.zookeeperWrapper.writeZNode(this.peersZNode, id,
+ Boolean.toString(enabled));
+ } catch (InterruptedException e) {
+ LOG.error(e);
+ } catch (KeeperException e) {
+ LOG.error(e);
+ }
+ }
+
+ /**
+ *
+ * @param id
+ * @throws IllegalArgumentException Thrown when the peer doesn't exist
+ */
+ public void removePeer(String id) {
+ try {
+ if (!peerExists(id)) {
+ throw new IllegalArgumentException("Cannot remove inexisting peer");
+ }
+ this.zookeeperWrapper.deleteZNode(
+ zookeeperWrapper.getZNode(this.peersZNode, id), true);
+ } catch (InterruptedException e) {
+ LOG.error(e);
+ } catch (KeeperException e) {
+ LOG.error(e);
+ }
+ }
+
+ /**
+ *
+ * @param id
+ * @param clusterKey
+ * @throws IllegalArgumentException Thrown when the peer doesn't exist
+ * @throws IllegalStateException Thrown when a peer already exists, since
+ * multi-slave isn't supported yet.
+ */
+ public void addPeer(String id, String clusterKey) {
+ try {
+ if (peerExists(id)) {
+ throw new IllegalArgumentException("Cannot add existing peer");
+ } else if (countPeers() > 0) {
+ throw new IllegalStateException("Multi-slave isn't supported yet");
+ }
+ this.zookeeperWrapper.writeZNode(this.peersZNode, id, clusterKey);
+ } catch (InterruptedException e) {
+ LOG.error(e);
+ } catch (KeeperException e) {
+ LOG.error(e);
+ }
+ }
+
+ private boolean peerExists(String id) {
+ return this.zookeeperWrapper.exists(
+ zookeeperWrapper.getZNode(this.peersZNode, id), false);
+ }
+
+ private int countPeers() {
+ List peers = this.zookeeperWrapper.listZnodes(this.peersZNode);
+ return peers == null ? 0 : peers.size();
}
/**
* This reads the state znode for replication and sets the atomic boolean
*/
- private void setReplicating() {
+ private void readReplicationStateZnode() {
String value = Bytes.toString(this.zookeeperWrapper.getDataAndWatch(
this.replicationZNode, this.replicationStateNodeName,
new ReplicationStatusWatcher()));
@@ -403,10 +525,14 @@ public class ReplicationZookeeperWrapper {
* Delete a complete queue of hlogs
* @param peerZnode znode of the peer cluster queue of hlogs to delete
*/
- public void deleteSource(String peerZnode) {
+ public void deleteSource(String peerZnode, boolean closeConnection) {
try {
this.zookeeperWrapper.deleteZNode(
this.zookeeperWrapper.getZNode(rsServerNameZnode, peerZnode), true);
+ if (closeConnection) {
+ this.peerClusters.get(peerZnode).getZkw().close();
+ this.peerClusters.remove(peerZnode);
+ }
} catch (InterruptedException e) {
LOG.error(e);
} catch (KeeperException e) {
@@ -451,15 +577,6 @@ public class ReplicationZookeeperWrapper {
}
/**
- * Tells if this cluster replicates or not
- *
- * @return if this is a master
- */
- public boolean isReplicationMaster() {
- return this.replicationMaster;
- }
-
- /**
* Get the identification of the cluster
*
* @return the id for the cluster
@@ -470,13 +587,23 @@ public class ReplicationZookeeperWrapper {
/**
* Get a map of all peer clusters
- * @return map of peer cluster, zk address to ZKW
+ * @return map of peer cluster keyed by id
*/
- public Map getPeerClusters() {
+ public Map getPeerClusters() {
return this.peerClusters;
}
/**
+ * Extracts the znode name of a peer cluster from a ZK path
+ * @param fullPath Path to extract the id from
+ * @return the id or an empty string if path is invalid
+ */
+ public static String getZNodeName(String fullPath) {
+ String[] parts = fullPath.split("/");
+ return parts.length > 0 ? parts[parts.length-1] : "";
+ }
+
+ /**
* Watcher for the status of the replication
*/
public class ReplicationStatusWatcher implements Watcher {
@@ -485,7 +612,7 @@ public class ReplicationZookeeperWrapper {
Event.EventType type = watchedEvent.getType();
LOG.info("Got event " + type + " with path " + watchedEvent.getPath());
if (type.equals(Event.EventType.NodeDataChanged)) {
- setReplicating();
+ readReplicationStateZnode();
}
}
}
diff --git a/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
index 4d4b00a..1a33435 100644
--- a/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
+++ b/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
@@ -112,8 +112,7 @@ public class ReplicationLogCleaner implements LogCleanerDelegate, Watcher {
try {
this.zkHelper = new ReplicationZookeeperWrapper(
ZooKeeperWrapper.createInstance(this.conf,
- HMaster.class.getName()),
- this.conf, new AtomicBoolean(true), null);
+ HMaster.class.getName()), this.conf);
} catch (IOException e) {
LOG.error(e);
}
diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 52cb8e8..4a4bbd5 100644
--- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -47,7 +47,6 @@ public class Replication implements LogEntryVisitor {
private final boolean replication;
private final ReplicationSourceManager replicationManager;
- private boolean replicationMaster;
private final AtomicBoolean replicating = new AtomicBoolean(true);
private final ReplicationZookeeperWrapper zkHelper;
private final Configuration conf;
@@ -74,10 +73,8 @@ public class Replication implements LogEntryVisitor {
this.zkHelper = new ReplicationZookeeperWrapper(
ZooKeeperWrapper.getInstance(conf, hsi.getServerName()), conf,
this.replicating, hsi.getServerName());
- this.replicationMaster = zkHelper.isReplicationMaster();
- this.replicationManager = this.replicationMaster ?
- new ReplicationSourceManager(zkHelper, conf, stopRequested,
- fs, this.replicating, logDir, oldLogDir) : null;
+ this.replicationManager = new ReplicationSourceManager(zkHelper, conf,
+ stopRequested, fs, this.replicating, logDir, oldLogDir);
} else {
replicationManager = null;
zkHelper = null;
@@ -89,9 +86,7 @@ public class Replication implements LogEntryVisitor {
*/
public void join() {
if (this.replication) {
- if (this.replicationMaster) {
- this.replicationManager.join();
- }
+ this.replicationManager.join();
this.zkHelper.deleteOwnRSZNode();
}
}
@@ -102,7 +97,7 @@ public class Replication implements LogEntryVisitor {
* @throws IOException
*/
public void replicateLogEntries(HLog.Entry[] entries) throws IOException {
- if (this.replication && !this.replicationMaster) {
+ if (this.replication) {
this.replicationSink.replicateEntries(entries);
}
}
@@ -114,12 +109,9 @@ public class Replication implements LogEntryVisitor {
*/
public void startReplicationServices() throws IOException {
if (this.replication) {
- if (this.replicationMaster) {
- this.replicationManager.init();
- } else {
- this.replicationSink =
- new ReplicationSink(this.conf, this.stopRequested);
- }
+ this.replicationManager.init();
+ this.replicationSink =
+ new ReplicationSink(this.conf, this.stopRequested);
}
}
diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 32508de..3fd4406 100644
--- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -124,6 +124,9 @@ public class ReplicationSource extends Thread
private volatile boolean running = true;
// Metrics for this source
private ReplicationSourceMetrics metrics;
+ // If source is enabled, replication happens. If disabled, nothing will be
+ // replicated but HLogs will still be queued
+ private AtomicBoolean sourceEnabled = new AtomicBoolean();
/**
* Instantiation method used by region servers
@@ -197,7 +200,7 @@ public class ReplicationSource extends Thread
private void chooseSinks() {
this.currentPeers.clear();
List addresses =
- this.zkHelper.getPeersAddresses(peerClusterId);
+ this.zkHelper.getSlavesAddresses(peerClusterId);
Set setOfAddr = new HashSet();
int nbPeers = (int) (Math.ceil(addresses.size() * ratio));
LOG.info("Getting " + nbPeers +
@@ -236,6 +239,13 @@ public class ReplicationSource extends Thread
int sleepMultiplier = 1;
// Loop until we close down
while (!stop.get() && this.running) {
+ // Sleep until replication is enabled again
+ if (!this.replicating.get() || !this.sourceEnabled.get()) {
+ if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
+ sleepMultiplier++;
+ }
+ continue;
+ }
// Get a new path
if (!getNextPath()) {
if (sleepForRetries("No log to process", sleepMultiplier)) {
@@ -572,7 +582,7 @@ public class ReplicationSource extends Thread
return true;
} else if (this.queueRecovered) {
this.manager.closeRecoveredQueue(this);
- this.abort();
+ this.terminate();
return true;
}
return false;
@@ -584,24 +594,16 @@ public class ReplicationSource extends Thread
new Thread.UncaughtExceptionHandler() {
public void uncaughtException(final Thread t, final Throwable e) {
LOG.fatal("Set stop flag in " + t.getName(), e);
- abort();
+ terminate();
}
};
Threads.setDaemonThreadRunning(
this, n + ".replicationSource," + clusterId, handler);
}
- /**
- * Hastily stop the replication, then wait for shutdown
- */
- private void abort() {
- LOG.info("abort");
- this.running = false;
- terminate();
- }
-
public void terminate() {
- LOG.info("terminate");
+ LOG.info("Closing normal source " + this.peerClusterZnode);
+ this.running = false;
Threads.shutdown(this, this.sleepForRetries);
}
@@ -645,15 +647,14 @@ public class ReplicationSource extends Thread
return down;
}
- /**
- * Get the id that the source is replicating to
- *
- * @return peer cluster id
- */
public String getPeerClusterZnode() {
return this.peerClusterZnode;
}
+ public String getPeerClusterId() {
+ return this.peerClusterId;
+ }
+
/**
* Get the path of the current HLog
* @return current hlog's path
@@ -663,6 +664,14 @@ public class ReplicationSource extends Thread
}
/**
+ * Set if this source is enabled or disabled
+ * @param status the new status
+ */
+ public void setSourceEnabled(boolean status) {
+ this.sourceEnabled.set(status);
+ }
+
+ /**
* Comparator used to compare logs together based on their start time
*/
public static class LogsComparator implements Comparator {
diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index f6e72dd..a659abf 100644
--- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -72,9 +72,23 @@ public interface ReplicationSourceInterface {
public void terminate();
/**
- * Get the id that the source is replicating to
+ * Get the name of the znode that contains the metadata for the replication
+ * this source's peer.
*
- * @return peer cluster id
+ * @return peer cluster znode
*/
public String getPeerClusterZnode();
+
+ /**
+ * Get the id that the source is replicating to.
+ *
+ * @return peer cluster id
+ */
+ public String getPeerClusterId();
+
+ /**
+ * Set if this source is enabled or disabled
+ * @param status the new status
+ */
+ public void setSourceEnabled(boolean status);
}
diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 8046b73..ea1a4df 100644
--- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.wal.LogActionsListener;
import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
@@ -107,6 +108,7 @@ public class ReplicationSourceManager implements LogActionsListener {
this.oldLogDir = oldLogDir;
List otherRSs =
this.zkHelper.getRegisteredRegionServers(new OtherRegionServerWatcher());
+ this.zkHelper.listPeersIds(new PeersWatcher());
this.otherRegionServers = otherRSs == null ? new ArrayList() : otherRSs;
}
@@ -144,9 +146,13 @@ public class ReplicationSourceManager implements LogActionsListener {
public void init() throws IOException {
for (String id : this.zkHelper.getPeerClusters().keySet()) {
ReplicationSourceInterface src = addSource(id);
- src.startup();
+ // TODO set it to what's in ZK
+ src.setSourceEnabled(true);
}
List currentReplicators = this.zkHelper.getListOfReplicators(null);
+ if (currentReplicators == null || currentReplicators.size() == 0) {
+ return;
+ }
synchronized (otherRegionServers) {
LOG.info("Current list of replicators: " + currentReplicators
+ " other RSs: " + otherRegionServers);
@@ -164,20 +170,23 @@ public class ReplicationSourceManager implements LogActionsListener {
/**
* Add a new normal source to this region server
* @param id the id of the peer cluster
- * @return the created source
+ * @return the source that was created
* @throws IOException
*/
public ReplicationSourceInterface addSource(String id) throws IOException {
ReplicationSourceInterface src =
getReplicationSource(this.conf, this.fs, this, stopper, replicating, id);
- this.sources.add(src);
+ src.setSourceEnabled(true);
synchronized (this.hlogs) {
+ this.sources.add(src);
if (this.hlogs.size() > 0) {
- this.zkHelper.addLogToList(this.hlogs.first(),
+ // Add the latest hlog to that source's queue
+ this.zkHelper.addLogToList(this.hlogs.last(),
this.sources.get(0).getPeerClusterZnode());
src.enqueueLog(this.latestPath);
}
}
+ src.startup();
return src;
}
@@ -211,17 +220,19 @@ public class ReplicationSourceManager implements LogActionsListener {
@Override
public void logRolled(Path newLog) {
- if (this.sources.size() > 0) {
- this.zkHelper.addLogToList(newLog.getName(),
- this.sources.get(0).getPeerClusterZnode());
- }
- synchronized (this.hlogs) {
- this.hlogs.add(newLog.getName());
- }
- this.latestPath = newLog;
- // This only update the sources we own, not the recovered ones
- for (ReplicationSourceInterface source : this.sources) {
- source.enqueueLog(newLog);
+ if (this.replicating.get()) {
+ if (this.sources.size() > 0) {
+ this.zkHelper.addLogToList(newLog.getName(),
+ this.sources.get(0).getPeerClusterZnode());
+ }
+ synchronized (this.hlogs) {
+ this.hlogs.add(newLog.getName());
+ this.latestPath = newLog;
+ // This only update the sources we own, not the recovered ones
+ for (ReplicationSourceInterface source : this.sources) {
+ source.enqueueLog(newLog);
+ }
+ }
}
}
@@ -309,13 +320,51 @@ public class ReplicationSourceManager implements LogActionsListener {
}
/**
- * Clear the references to the specified old source
+ * Clear the references to the specified recovered source
* @param src source to clear
*/
public void closeRecoveredQueue(ReplicationSourceInterface src) {
- LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
+ LOG.info("Deleting the recovered queue " + src.getPeerClusterZnode());
this.oldsources.remove(src);
- this.zkHelper.deleteSource(src.getPeerClusterZnode());
+ this.zkHelper.deleteSource(src.getPeerClusterZnode(), false);
+ }
+
+ /**
+ * Thie method first deletes all the recovered sources for the specified
+ * id, then deletes the normal source (deleting all related data in ZK).
+ * @param id The id of the peer cluster
+ */
+ public void removePeer(String id) {
+ LOG.info("Closing the following queue " + id + ", currently have "
+ + sources.size() + " and another "
+ + oldsources.size() + " that were recovered");
+ ReplicationSourceInterface srcToRemove = null;
+ int nbDeletedRecovQueues = 0;
+ // First close all the recovered sources for this peer
+ for (ReplicationSourceInterface src : oldsources) {
+ if (id.equals(src.getPeerClusterId())) {
+ closeRecoveredQueue((src));
+ nbDeletedRecovQueues++;
+ }
+ }
+ LOG.info("Number of deleted recovered sources for " + id + ": "
+ + nbDeletedRecovQueues);
+ // Now look for the one on this cluster
+ for (ReplicationSourceInterface src : this.sources) {
+ LOG.info("There is " + src.getPeerClusterZnode());
+ if (id.equals(src.getPeerClusterId())) {
+ srcToRemove = src;
+ break;
+ }
+ }
+ if (srcToRemove == null) {
+ LOG.error("The queue we wanted to close is missing " + id);
+ return;
+ }
+ srcToRemove.terminate();
+ this.sources.remove(srcToRemove);
+ this.zkHelper.deleteSource(id, true);
+
}
/**
@@ -328,7 +377,8 @@ public class ReplicationSourceManager implements LogActionsListener {
public void process(WatchedEvent watchedEvent) {
LOG.info(" event " + watchedEvent);
if (watchedEvent.getType().equals(Event.KeeperState.Expired) ||
- watchedEvent.getType().equals(Event.KeeperState.Disconnected)) {
+ watchedEvent.getType().equals(Event.KeeperState.Disconnected) ||
+ !replicating.get()) {
return;
}
@@ -343,8 +393,41 @@ public class ReplicationSourceManager implements LogActionsListener {
}
if (watchedEvent.getType().equals(Event.EventType.NodeDeleted)) {
LOG.info(watchedEvent.getPath() + " znode expired, trying to lock it");
- String[] rsZnodeParts = watchedEvent.getPath().split("/");
- transferQueues(rsZnodeParts[rsZnodeParts.length-1]);
+ transferQueues(zkHelper.getZNodeName(watchedEvent.getPath()));
+ }
+ }
+ }
+
+ public class PeersWatcher implements Watcher {
+
+ @Override
+ public void process(WatchedEvent watchedEvent) {
+ LOG.info(" event " + watchedEvent);
+ if (watchedEvent.getType().equals(Event.KeeperState.Expired) ||
+ watchedEvent.getType().equals(Event.KeeperState.Disconnected) ||
+ !replicating.get()) {
+ return;
+ }
+ // Reset the watch
+ List currentPeers = zkHelper.listPeersIds(this);
+
+ // If it was deleted, close and return
+ if (watchedEvent.getType().equals(Event.EventType.NodeDeleted)) {
+ String id = zkHelper.getZNodeName(watchedEvent.getPath());
+ removePeer(id);
+ return;
+ }
+ // Else, try adding it the brutal way
+ for (String id : currentPeers) {
+ try {
+ boolean added = zkHelper.connectToPeer(id);
+ if (added) {
+ addSource(id);
+ }
+ } catch (IOException e) {
+ // TODO manage better than that
+ LOG.error("Error while adding a new peer", e);
+ }
}
}
}
diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java
index 3256ac9..e45c460 100644
--- a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java
+++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java
@@ -503,6 +503,7 @@ public class ZooKeeperWrapper implements Watcher {
* @return true if it works
*/
public boolean ensureExists(final String znode) {
+ LOG.info("ensureexist " + znode);
try {
Stat stat = zooKeeper.exists(znode, false);
if (stat != null) {
diff --git a/src/main/ruby/hbase.rb b/src/main/ruby/hbase.rb
index 16ad293..a2961b5 100644
--- a/src/main/ruby/hbase.rb
+++ b/src/main/ruby/hbase.rb
@@ -71,4 +71,5 @@ end
# Include classes definition
require 'hbase/hbase'
require 'hbase/admin'
+require 'hbase/replication_admin'
require 'hbase/table'
diff --git a/src/main/ruby/hbase/hbase.rb b/src/main/ruby/hbase/hbase.rb
index c2233ac..8445ae5 100644
--- a/src/main/ruby/hbase/hbase.rb
+++ b/src/main/ruby/hbase/hbase.rb
@@ -49,5 +49,9 @@ module Hbase
def table(table, formatter)
::Hbase::Table.new(configuration, table, formatter)
end
+
+ def replication_admin(formatter)
+ ::Hbase::RepAdmin.new(configuration, formatter)
+ end
end
end
diff --git a/src/main/ruby/hbase/replication_admin.rb b/src/main/ruby/hbase/replication_admin.rb
new file mode 100644
index 0000000..1fb7c32
--- /dev/null
+++ b/src/main/ruby/hbase/replication_admin.rb
@@ -0,0 +1,72 @@
+#
+# Copyright 2010 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+include Java
+
+java_import org.apache.hadoop.hbase.client.replication.ReplicationAdmin
+
+# Wrapper for org.apache.hadoop.hbase.client.HBaseAdmin
+
+module Hbase
+ class RepAdmin
+ include HBaseConstants
+
+ def initialize(configuration, formatter)
+ @replication_admin = ReplicationAdmin.new(configuration)
+ @formatter = formatter
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Add a new peer cluster to replicate to
+ def add_peer(id, cluster_key)
+ @replication_admin.addPeer(id, cluster_key)
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Remove a peer cluster, stops the replication
+ def remove_peer(id)
+ @replication_admin.removePeer(id)
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Restart the replication stream to the specified peer
+ def enable_peer(id)
+ @replication_admin.enablePeer(id)
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Stop the replication stream to the specified peer
+ def disable_peer(id)
+ @replication_admin.disablePeer(id)
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Restart the replication, in an unknown state
+ def start_replication
+ @replication_admin.setReplicating(true)
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Kill switch for replication, stops all its features
+ def stop_replication
+ @replication_admin.setReplicating(false)
+ end
+ end
+end
diff --git a/src/main/ruby/shell.rb b/src/main/ruby/shell.rb
index 5654840..e688160 100644
--- a/src/main/ruby/shell.rb
+++ b/src/main/ruby/shell.rb
@@ -83,6 +83,10 @@ module Shell
hbase.table(name, formatter)
end
+ def hbase_replication_admin
+ @hbase_replication_admin ||= hbase.replication_admin(formatter)
+ end
+
def export_commands(where)
::Shell.commands.keys.each do |cmd|
where.send :instance_eval, <<-EOF
@@ -254,3 +258,16 @@ Shell.load_command_group(
]
)
+Shell.load_command_group(
+ 'replication',
+ :full_name => 'CLUSTER REPLICATION TOOLS',
+ :comment => "In order to use these tools, hbase.replication must be true",
+ :commands => %w[
+ add_peer
+ remove_peer
+ enable_peer
+ disable_peer
+ start_replication
+ stop_replication
+ ]
+)
diff --git a/src/main/ruby/shell/commands.rb b/src/main/ruby/shell/commands.rb
index 49a29cc..4c759bc 100644
--- a/src/main/ruby/shell/commands.rb
+++ b/src/main/ruby/shell/commands.rb
@@ -49,6 +49,9 @@ module Shell
shell.hbase_table(name)
end
+ def replication_admin
+ shell.hbase_replication_admin
+ end
#----------------------------------------------------------------------
def formatter
diff --git a/src/main/ruby/shell/commands/add_peer.rb b/src/main/ruby/shell/commands/add_peer.rb
new file mode 100644
index 0000000..8548aee
--- /dev/null
+++ b/src/main/ruby/shell/commands/add_peer.rb
@@ -0,0 +1,44 @@
+#
+# Copyright 2010 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+ module Commands
+ class AddPeer< Command
+ def help
+ return <<-EOF
+ Add a peer cluster to replicate to, the id must be a short and
+ the cluster key is composed like this:
+ hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
+ This gives a full path for HBase to connect to another cluster.
+ Examples:
+
+ hbase> add_peer '1', "server1.cie.com:2181:/hbase"
+ hbase> add_peer '2', "zk1,zk2,zk3:2182:/hbase-prod"
+ EOF
+ end
+
+ def command(id, cluster_key)
+ format_simple_command do
+ replication_admin.add_peer(id, cluster_key)
+ end
+ end
+ end
+ end
+end
diff --git a/src/main/ruby/shell/commands/disable_peer.rb b/src/main/ruby/shell/commands/disable_peer.rb
new file mode 100644
index 0000000..502696c
--- /dev/null
+++ b/src/main/ruby/shell/commands/disable_peer.rb
@@ -0,0 +1,41 @@
+#
+# Copyright 2010 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+ module Commands
+ class DisablePeer< Command
+ def help
+ return <<-EOF
+ Stops the replication stream to the specified cluster, but still
+ keeps track of new edits to replicate.
+ Examples:
+
+ hbase> disable_peer '1'
+ EOF
+ end
+
+ def command(id)
+ format_simple_command do
+ replication_admin.disable_peer(id)
+ end
+ end
+ end
+ end
+end
diff --git a/src/main/ruby/shell/commands/enable_peer.rb b/src/main/ruby/shell/commands/enable_peer.rb
new file mode 100644
index 0000000..e1764e4
--- /dev/null
+++ b/src/main/ruby/shell/commands/enable_peer.rb
@@ -0,0 +1,41 @@
+#
+# Copyright 2010 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+ module Commands
+ class EnablePeer< Command
+ def help
+ return <<-EOF
+ Restarts the replication to the specified peer cluster,
+ continuing from where it was disabled.
+ Examples:
+
+ hbase> enable_peer '1'
+ EOF
+ end
+
+ def command(id)
+ format_simple_command do
+ replication_admin.enable_peer(id)
+ end
+ end
+ end
+ end
+end
diff --git a/src/main/ruby/shell/commands/remove_peer.rb b/src/main/ruby/shell/commands/remove_peer.rb
new file mode 100644
index 0000000..6730a55
--- /dev/null
+++ b/src/main/ruby/shell/commands/remove_peer.rb
@@ -0,0 +1,41 @@
+#
+# Copyright 2010 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+ module Commands
+ class RemovePeer< Command
+ def help
+ return <<-EOF
+ Stops the specified replication stream and deletes all the meta
+ information kept about it.
+ Examples:
+
+ hbase> remove_peer '1'
+ EOF
+ end
+
+ def command(id)
+ format_simple_command do
+ replication_admin.remove_peer(id)
+ end
+ end
+ end
+ end
+end
diff --git a/src/main/ruby/shell/commands/start_replication.rb b/src/main/ruby/shell/commands/start_replication.rb
new file mode 100644
index 0000000..b2f55bc
--- /dev/null
+++ b/src/main/ruby/shell/commands/start_replication.rb
@@ -0,0 +1,43 @@
+#
+# Copyright 2010 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+ module Commands
+ class StartReplication < Command
+ def help
+ return <<-EOF
+ Restarts all the replication features. The state in which each
+ stream starts in is undetermined.
+ WARNING:
+ start/stop replication is only meant to be used in critical load situations.
+ Examples:
+
+ hbase> start_replication
+ EOF
+ end
+
+ def command
+ format_simple_command do
+ replication_admin.start_replication
+ end
+ end
+ end
+ end
+end
diff --git a/src/main/ruby/shell/commands/stop_replication.rb b/src/main/ruby/shell/commands/stop_replication.rb
new file mode 100644
index 0000000..7569627
--- /dev/null
+++ b/src/main/ruby/shell/commands/stop_replication.rb
@@ -0,0 +1,43 @@
+#
+# Copyright 2010 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+ module Commands
+ class StopReplication < Command
+ def help
+ return <<-EOF
+ Stops all the replication features. The state in which each
+ stream stops in is undetermined.
+ WARNING:
+ start/stop replication is only meant to be used in critical load situations.
+ Examples:
+
+ hbase> stop_replication
+ EOF
+ end
+
+ def command
+ format_simple_command do
+ replication_admin.stop_replication
+ end
+ end
+ end
+ end
+end
diff --git a/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
new file mode 100644
index 0000000..e9d1b7b
--- /dev/null
+++ b/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -0,0 +1,93 @@
+package org.apache.hadoop.hbase.replication;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.fail;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit testing of ReplicationAdmin
+ */
+public class TestReplicationAdmin {
+
+ private static final Log LOG =
+ LogFactory.getLog(TestReplicationAdmin.class);
+ private final static HBaseTestingUtility TEST_UTIL =
+ new HBaseTestingUtility();
+
+ private final String ID_ONE = "1";
+ private final String KEY_ONE = "127.0.0.1:2181:/hbase";
+ private final String ID_SECOND = "2";
+ private final String KEY_SECOND = "127.0.0.1:2181:/hbase2";
+
+ private static ReplicationSourceManager manager;
+ private static ReplicationAdmin admin;
+ private static AtomicBoolean replicating = new AtomicBoolean(true);
+ private static AtomicBoolean stopper = new AtomicBoolean(true);
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.startMiniZKCluster();
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
+ admin = new ReplicationAdmin(conf);
+ Path oldLogDir = new Path(TEST_UTIL.getTestDir(),
+ HConstants.HREGION_OLDLOGDIR_NAME);
+ Path logDir = new Path(TEST_UTIL.getTestDir(),
+ HConstants.HREGION_LOGDIR_NAME);
+ manager = new ReplicationSourceManager(admin.getZkWrapper(),
+ conf, stopper, FileSystem.get(conf), replicating, logDir, oldLogDir);
+ }
+
+ /**
+ * Simple testing of adding and removing peers, basically shows that
+ * all interactions with ZK work
+ * @throws Exception
+ */
+ @Test
+ public void testAddRemovePeer() throws Exception {
+ assertEquals(0, manager.getSources().size());
+ admin.addPeer(ID_ONE, KEY_ONE);
+ // Add a valid peer
+ try {
+ admin.addPeer(ID_ONE, KEY_ONE);
+ } catch (IllegalArgumentException iae) {
+ // OK!
+ }
+ // Try to remove an inexisting peer
+ assertEquals(1, admin.getPeersCount());
+ try {
+ admin.removePeer(ID_SECOND);
+ fail();
+ } catch (IllegalArgumentException iae) {
+ // OK!
+ }
+ assertEquals(1, admin.getPeersCount());
+ // Add a second, returns illegal since multi peers isn't supported
+ try {
+ admin.addPeer(ID_SECOND, KEY_SECOND);
+ fail();
+ } catch (IllegalStateException iae) {
+ // OK!
+ }
+ assertEquals(1, admin.getPeersCount());
+ // Remove the first peer we added
+ admin.removePeer(ID_ONE);
+ assertEquals(0, admin.getPeersCount());
+ }
+}
diff --git a/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index e1f4f98..05067d3 100644
--- a/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -70,4 +70,14 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
public String getPeerClusterZnode() {
return peerClusterId;
}
+
+ @Override
+ public String getPeerClusterId() {
+ return peerClusterId;
+ }
+
+ @Override
+ public void setSourceEnabled(boolean status) {
+
+ }
}
diff --git a/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java b/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
index 31cc680..fb6bece 100644
--- a/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
+++ b/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
import org.junit.After;
@@ -60,6 +61,9 @@ public class TestReplication {
private static ZooKeeperWrapper zkw1;
private static ZooKeeperWrapper zkw2;
+ private static ReplicationAdmin admin;
+ private static String slaveClusterKey;
+
private static HTable htable1;
private static HTable htable2;
@@ -98,12 +102,7 @@ public class TestReplication {
utility1.startMiniZKCluster();
MiniZooKeeperCluster miniZK = utility1.getZkCluster();
zkw1 = ZooKeeperWrapper.createInstance(conf1, "cluster1");
- zkw1.writeZNode("/1", "replication", "");
- zkw1.writeZNode("/1/replication", "master",
- conf1.get(HConstants.ZOOKEEPER_QUORUM)+":" +
- conf1.get("hbase.zookeeper.property.clientPort")+":/1");
- setIsReplication(true);
-
+ admin = new ReplicationAdmin(conf1);
LOG.info("Setup first Zk");
conf2 = HBaseConfiguration.create();
@@ -117,13 +116,14 @@ public class TestReplication {
utility2.setZkCluster(miniZK);
zkw2 = ZooKeeperWrapper.createInstance(conf2, "cluster2");
zkw2.writeZNode("/2", "replication", "");
- zkw2.writeZNode("/2/replication", "master",
- conf1.get(HConstants.ZOOKEEPER_QUORUM)+":" +
- conf1.get("hbase.zookeeper.property.clientPort")+":/1");
- zkw1.writeZNode("/1/replication/peers", "1",
+ /*zkw1.writeZNode("/1/replication/peers", "1",
conf2.get(HConstants.ZOOKEEPER_QUORUM)+":" +
- conf2.get("hbase.zookeeper.property.clientPort")+":/2");
+ conf2.get("hbase.zookeeper.property.clientPort")+":/2");*/
+ slaveClusterKey = conf2.get(HConstants.ZOOKEEPER_QUORUM)+":" +
+ conf2.get("hbase.zookeeper.property.clientPort")+":/2";
+ admin.addPeer("2", slaveClusterKey);
+ setIsReplication(true);
LOG.info("Setup second Zk");
@@ -149,7 +149,8 @@ public class TestReplication {
private static void setIsReplication(boolean rep) throws Exception {
LOG.info("Set rep " + rep);
- zkw1.writeZNode("/1/replication", "state", Boolean.toString(rep));
+ //zkw1.writeZNode("/1/replication", "state", Boolean.toString(rep));
+ admin.setReplicating(rep);
// Takes some ms for ZK to fire the watcher
Thread.sleep(SLEEP_TIME);
}
@@ -159,12 +160,27 @@ public class TestReplication {
*/
@Before
public void setUp() throws Exception {
- setIsReplication(false);
utility1.truncateTable(tableName);
- utility2.truncateTable(tableName);
// If test is flaky, set that sleep higher
- Thread.sleep(SLEEP_TIME*8);
- setIsReplication(true);
+ Scan scan = new Scan();
+ int lastCount = 0;
+ for (int i = 0; i < NB_RETRIES; i++) {
+ if (i==NB_RETRIES-1) {
+ fail("Waited too much time for truncate");
+ }
+ ResultScanner scanner = htable2.getScanner(scan);
+ Result[] res = scanner.next(NB_ROWS_IN_BATCH);
+ scanner.close();
+ if (res.length != 0) {
+ if (lastCount < res.length) {
+ i--; // Don't increment timeout if we make progress
+ }
+ LOG.info("Still got " + res.length + " rows");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ break;
+ }
+ }
}
/**
@@ -189,13 +205,12 @@ public class TestReplication {
htable1 = new HTable(conf1, tableName);
htable1.put(put);
- HTable table2 = new HTable(conf2, tableName);
Get get = new Get(row);
for (int i = 0; i < NB_RETRIES; i++) {
if (i==NB_RETRIES-1) {
fail("Waited too much time for put replication");
}
- Result res = table2.get(get);
+ Result res = htable2.get(get);
if (res.size() == 0) {
LOG.info("Row not available");
Thread.sleep(SLEEP_TIME);
@@ -208,13 +223,12 @@ public class TestReplication {
Delete del = new Delete(row);
htable1.delete(del);
- table2 = new HTable(conf2, tableName);
get = new Get(row);
for (int i = 0; i < NB_RETRIES; i++) {
if (i==NB_RETRIES-1) {
fail("Waited too much time for del replication");
}
- Result res = table2.get(get);
+ Result res = htable2.get(get);
if (res.size() >= 1) {
LOG.info("Row not deleted");
Thread.sleep(SLEEP_TIME);
@@ -300,8 +314,6 @@ public class TestReplication {
// Test restart replication
setIsReplication(true);
- htable1.put(put);
-
for (int i = 0; i < NB_RETRIES; i++) {
if (i==NB_RETRIES-1) {
fail("Waited too much time for put replication");
@@ -337,6 +349,59 @@ public class TestReplication {
}
/**
+ * Integration test for TestReplicationAdmin, removes and re-add a peer
+ * cluster
+ * @throws Exception
+ */
+ @Test
+ public void testAddAndRemoveClusters() throws Exception {
+ LOG.info("testAddAndRemoveClusters");
+ admin.removePeer("2");
+ Thread.sleep(SLEEP_TIME);
+ byte[] rowKey = Bytes.toBytes("Won't be replicated");
+ Put put = new Put(rowKey);
+ put.add(famName, row, row);
+ htable1.put(put);
+
+ Get get = new Get(rowKey);
+ for (int i = 0; i < NB_RETRIES; i++) {
+ if (i == NB_RETRIES-1) {
+ break;
+ }
+ Result res = htable2.get(get);
+ if (res.size() >= 1) {
+ fail("Not supposed to be replicated");
+ } else {
+ LOG.info("Row not replicated, let's wait a bit more...");
+ Thread.sleep(SLEEP_TIME);
+ }
+ }
+
+ admin.addPeer("1", slaveClusterKey);
+ Thread.sleep(SLEEP_TIME);
+ rowKey = Bytes.toBytes("do rep");
+ put = new Put(rowKey);
+ put.add(famName, row, row);
+ LOG.info("Adding new row");
+ htable1.put(put);
+
+ get = new Get(rowKey);
+ for (int i = 0; i < NB_RETRIES; i++) {
+ if (i==NB_RETRIES-1) {
+ fail("Waited too much time for put replication");
+ }
+ Result res = htable2.get(get);
+ if (res.size() == 0) {
+ LOG.info("Row not available");
+ Thread.sleep(SLEEP_TIME*i);
+ } else {
+ assertArrayEquals(res.value(), row);
+ break;
+ }
+ }
+ }
+
+ /**
* Do a more intense version testSmallBatch, one that will trigger
* hlog rolling and other non-trivial code paths
* @throws Exception
diff --git a/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index b497b9a..930c5c1 100644
--- a/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -61,7 +60,7 @@ public class TestReplicationSourceManager {
private static final AtomicBoolean STOPPER = new AtomicBoolean(false);
- private static final AtomicBoolean REPLICATING = new AtomicBoolean(false);
+ private static final AtomicBoolean REPLICATING = new AtomicBoolean(true);
private static ReplicationSourceManager manager;
@@ -90,27 +89,20 @@ public class TestReplicationSourceManager {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
-
conf = HBaseConfiguration.create();
+ conf.setBoolean("dfs.support.append", true);
conf.set("replication.replicationsource.implementation",
ReplicationSourceDummy.class.getCanonicalName());
conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
utility = new HBaseTestingUtility(conf);
utility.startMiniZKCluster();
-
- zkw = ZooKeeperWrapper.createInstance(conf, "test");
- zkw.writeZNode("/hbase", "replication", "");
- zkw.writeZNode("/hbase/replication", "master",
- conf.get(HConstants.ZOOKEEPER_QUORUM)+":" +
- conf.get("hbase.zookeeper.property.clientPort")+":/1");
+ zkw = ZooKeeperWrapper.createInstance(conf,
+ TestReplicationSourceManager.class.toString());
zkw.writeZNode("/hbase/replication/peers", "1",
conf.get(HConstants.ZOOKEEPER_QUORUM)+":" +
conf.get("hbase.zookeeper.property.clientPort")+":/1");
-
- HRegionServer server = new HRegionServer(conf);
ReplicationZookeeperWrapper helper = new ReplicationZookeeperWrapper(
- server.getZooKeeperWrapper(), conf,
- REPLICATING, "123456789");
+ zkw, conf, REPLICATING, "123456789");
fs = FileSystem.get(conf);
oldLogDir = new Path(utility.getTestDir(),
HConstants.HREGION_OLDLOGDIR_NAME);