diff --git a/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index d76e333..93b8aed 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ b/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client.replication; import java.io.IOException; +import java.util.Map; import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.conf.Configuration; @@ -133,6 +134,14 @@ public class ReplicationAdmin { } /** + * Map of this cluster's peers for display. + * @return A map of peer ids to peer cluster keys + */ + public Map listPeers() { + return this.replicationZk.listPeers(); + } + + /** * Get the current status of the kill switch, if the cluster is replicating * or not. * @return true if the cluster is replicated, otherwise false diff --git a/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java b/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java index 9686000..fe0802e 100644 --- a/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java @@ -199,6 +199,24 @@ public class ReplicationZookeeper { } /** + * Map of this cluster's peers for display. + * @return A map of peer ids to peer cluster keys + */ + public Map listPeers() { + Map peers = new TreeMap(); + List ids = null; + try { + ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode); + for (String id : ids) { + peers.put(id, Bytes.toString(ZKUtil.getData(this.zookeeper, + ZKUtil.joinZNode(this.peersZNode, id)))); + } + } catch (KeeperException e) { + this.abortable.abort("Cannot get the list of peers ", e); + } + return peers; + } + /** * Returns all region servers from given peer * * @param peerClusterId (byte) the cluster to interrogate @@ -245,10 +263,6 @@ public class ReplicationZookeeper { } if (this.peerClusters.containsKey(peerId)) { return false; - // TODO remove when we support it - } else if (this.peerClusters.size() > 0) { - LOG.warn("Multiple slaves feature not supported"); - return false; } ReplicationPeer peer = getPeer(peerId); if (peer == null) { @@ -332,8 +346,6 @@ public class ReplicationZookeeper { try { if (peerExists(id)) { throw new IllegalArgumentException("Cannot add existing peer"); - } else if (countPeers() > 0) { - throw new IllegalStateException("Multi-slave isn't supported yet"); } ZKUtil.createWithParents(this.zookeeper, this.peersZNode); ZKUtil.createAndWatch(this.zookeeper, @@ -348,12 +360,6 @@ public class ReplicationZookeeper { ZKUtil.joinZNode(this.peersZNode, id)) >= 0; } - private int countPeers() throws KeeperException { - List peers = - ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode); - return peers == null ? 0 : peers.size(); - } - /** * This reads the state znode for replication and sets the atomic boolean */ @@ -389,11 +395,11 @@ public class ReplicationZookeeper { /** * Add a new log to the list of hlogs in zookeeper * @param filename name of the hlog's znode - * @param clusterId name of the cluster's znode + * @param peerId name of the cluster's znode */ - public void addLogToList(String filename, String clusterId) { + public void addLogToList(String filename, String peerId) { try { - String znode = ZKUtil.joinZNode(this.rsServerNameZnode, clusterId); + String znode = ZKUtil.joinZNode(this.rsServerNameZnode, peerId); znode = ZKUtil.joinZNode(znode, filename); ZKUtil.createWithParents(this.zookeeper, znode); } catch (KeeperException e) { diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index e576633..d911863 100644 --- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -22,6 +22,8 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.SortedMap; @@ -70,7 +72,7 @@ public class ReplicationSourceManager { // All about stopping private final Stoppable stopper; // All logs we are currently trackign - private final SortedSet hlogs; + private final Map> hlogsById; private final Configuration conf; private final FileSystem fs; // The path to the latest log we saw, for new coming sources @@ -108,7 +110,7 @@ public class ReplicationSourceManager { this.replicating = replicating; this.zkHelper = zkHelper; this.stopper = stopper; - this.hlogs = new TreeSet(); + this.hlogsById = new HashMap>(); this.oldsources = new ArrayList(); this.conf = conf; this.fs = fs; @@ -149,14 +151,15 @@ public class ReplicationSourceManager { public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered) { String key = log.getName(); LOG.info("Going to report log #" + key + " for position " + position + " in " + log); - this.zkHelper.writeReplicationStatus(key.toString(), id, position); - synchronized (this.hlogs) { - if (!queueRecovered && this.hlogs.first() != key) { - SortedSet hlogSet = this.hlogs.headSet(key); + this.zkHelper.writeReplicationStatus(key, id, position); + synchronized (this.hlogsById) { + SortedSet hlogs = this.hlogsById.get(id); + if (!queueRecovered && hlogs.first() != key) { + SortedSet hlogSet = hlogs.headSet(key); LOG.info("Removing " + hlogSet.size() + " logs in the list: " + hlogSet); for (String hlog : hlogSet) { - this.zkHelper.removeLogFromList(hlog.toString(), id); + this.zkHelper.removeLogFromList(hlog, id); } hlogSet.clear(); } @@ -200,12 +203,14 @@ public class ReplicationSourceManager { getReplicationSource(this.conf, this.fs, this, stopper, replicating, id); // TODO set it to what's in ZK src.setSourceEnabled(true); - synchronized (this.hlogs) { + synchronized (this.hlogsById) { this.sources.add(src); - if (this.hlogs.size() > 0) { - // Add the latest hlog to that source's queue - this.zkHelper.addLogToList(this.hlogs.last(), - this.sources.get(0).getPeerClusterZnode()); + this.hlogsById.put(id, new TreeSet()); + // Add the latest hlog to that source's queue + if (this.latestPath != null) { + String name = this.latestPath.getName(); + this.hlogsById.get(id).add(name); + this.zkHelper.addLogToList(name, src.getPeerClusterZnode()); src.enqueueLog(this.latestPath); } } @@ -230,8 +235,8 @@ public class ReplicationSourceManager { * Get a copy of the hlogs of the first source on this rs * @return a sorted set of hlog names */ - protected SortedSet getHLogs() { - return new TreeSet(this.hlogs); + protected Map> getHLogs() { + return Collections.unmodifiableMap(hlogsById); } /** @@ -248,21 +253,25 @@ public class ReplicationSourceManager { return; } - synchronized (this.hlogs) { - if (this.sources.size() > 0) { - this.zkHelper.addLogToList(newLog.getName(), - this.sources.get(0).getPeerClusterZnode()); - } else { - // If there's no slaves, don't need to keep the old hlogs since - // we only consider the last one when a new slave comes in - this.hlogs.clear(); + synchronized (this.hlogsById) { + String name = newLog.getName(); + for (ReplicationSourceInterface source : this.sources) { + this.zkHelper.addLogToList(name, source.getPeerClusterZnode()); + } + for (SortedSet hlogs : this.hlogsById.values()) { + if (this.sources.isEmpty()) { + // If there's no slaves, don't need to keep the old hlogs since + // we only consider the last one when a new slave comes in + hlogs.clear(); + } + hlogs.add(name); } - this.hlogs.add(newLog.getName()); } + this.latestPath = newLog; - // This only update the sources we own, not the recovered ones + // This only updates the sources we own, not the recovered ones for (ReplicationSourceInterface source : this.sources) { - source.enqueueLog(newLog); + source.enqueueLog(newLog); } } @@ -281,7 +290,7 @@ public class ReplicationSourceManager { * @param manager the manager to use * @param stopper the stopper object for this region server * @param replicating the status of the replication on this cluster - * @param peerClusterId the id of the peer cluster + * @param peerId the id of the peer cluster * @return the created source * @throws IOException */ @@ -291,7 +300,7 @@ public class ReplicationSourceManager { final ReplicationSourceManager manager, final Stoppable stopper, final AtomicBoolean replicating, - final String peerClusterId) throws IOException { + final String peerId) throws IOException { ReplicationSourceInterface src; try { @SuppressWarnings("rawtypes") @@ -299,12 +308,12 @@ public class ReplicationSourceManager { ReplicationSource.class.getCanonicalName())); src = (ReplicationSourceInterface) c.newInstance(); } catch (Exception e) { - LOG.warn("Passed replication source implemention throws errors, " + + LOG.warn("Passed replication source implementation throws errors, " + "defaulting to ReplicationSource", e); src = new ReplicationSource(); } - src.init(conf, fs, manager, stopper, replicating, peerClusterId); + src.init(conf, fs, manager, stopper, replicating, peerId); return src; } @@ -410,7 +419,7 @@ public class ReplicationSourceManager { return; } LOG.info(path + " znode expired, trying to lock it"); - transferQueues(zkHelper.getZNodeName(path)); + transferQueues(ReplicationZookeeper.getZNodeName(path)); } /** @@ -462,7 +471,7 @@ public class ReplicationSourceManager { if (peers == null) { return; } - String id = zkHelper.getZNodeName(path); + String id = ReplicationZookeeper.getZNodeName(path); removePeer(id); } diff --git a/src/main/ruby/hbase/replication_admin.rb b/src/main/ruby/hbase/replication_admin.rb index 35e37af..c4be93c 100644 --- a/src/main/ruby/hbase/replication_admin.rb +++ b/src/main/ruby/hbase/replication_admin.rb @@ -44,6 +44,12 @@ module Hbase end #---------------------------------------------------------------------------------------------- + # List all peer clusters + def list_peers + @replication_admin.listPeers + end + + #---------------------------------------------------------------------------------------------- # Restart the replication stream to the specified peer def enable_peer(id) @replication_admin.enablePeer(id) diff --git a/src/main/ruby/shell.rb b/src/main/ruby/shell.rb index 9027202..6d108e7 100644 --- a/src/main/ruby/shell.rb +++ b/src/main/ruby/shell.rb @@ -269,6 +269,7 @@ Shell.load_command_group( :commands => %w[ add_peer remove_peer + list_peers enable_peer disable_peer start_replication diff --git a/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index 645b8ce..fa82e1c 100644 --- a/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -9,6 +9,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; import org.junit.BeforeClass; import org.junit.Test; @@ -48,8 +49,13 @@ public class TestReplicationAdmin { HConstants.HREGION_OLDLOGDIR_NAME); Path logDir = new Path(TEST_UTIL.getTestDir(), HConstants.HREGION_LOGDIR_NAME); - manager = new ReplicationSourceManager(admin.getReplicationZk(), - conf, null, FileSystem.get(conf), replicating, logDir, oldLogDir); + manager = new ReplicationSourceManager(admin.getReplicationZk(), conf, + new Stoppable() { + @Override + public void stop(String why) {} + @Override + public boolean isStopped() {return false;} + }, FileSystem.get(conf), replicating, logDir, oldLogDir); } /** @@ -77,16 +83,15 @@ public class TestReplicationAdmin { // OK! } assertEquals(1, admin.getPeersCount()); - // Add a second, returns illegal since multi-slave isn't supported + // Add a second since multi-slave is supported try { admin.addPeer(ID_SECOND, KEY_SECOND); - fail(); } catch (IllegalStateException iae) { - // OK! + fail(); } - assertEquals(1, admin.getPeersCount()); + assertEquals(2, admin.getPeersCount()); // Remove the first peer we added admin.removePeer(ID_ONE); - assertEquals(0, admin.getPeersCount()); + assertEquals(1, admin.getPeersCount()); } } diff --git a/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 20a1ff8..4752ab7 100644 --- a/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -84,6 +84,8 @@ public class TestReplicationSourceManager { private static final byte[] test = Bytes.toBytes("test"); + private static final String slaveId = "1"; + private static FileSystem fs; private static Path oldLogDir; @@ -118,7 +120,7 @@ public class TestReplicationSourceManager { logDir = new Path(utility.getTestDir(), HConstants.HREGION_LOGDIR_NAME); - manager.addSource("1"); + manager.addSource(slaveId); htd = new HTableDescriptor(test); HColumnDescriptor col = new HColumnDescriptor("f1"); @@ -190,7 +192,7 @@ public class TestReplicationSourceManager { hlog.append(hri, key, edit); } - assertEquals(6, manager.getHLogs().size()); + assertEquals(6, manager.getHLogs().get(slaveId).size()); hlog.rollWriter();