Index: src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (revision 1081954) +++ src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (working copy) @@ -51,7 +51,6 @@ import java.net.URLEncoder; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; import static org.junit.Assert.assertEquals; @@ -80,10 +79,10 @@ private static final byte[] f1 = Bytes.toBytes("f1"); - private static final byte[] f2 = Bytes.toBytes("f2"); - private static final byte[] test = Bytes.toBytes("test"); + private static final String slaveId = "1"; + private static FileSystem fs; private static Path oldLogDir; @@ -118,7 +117,7 @@ logDir = new Path(utility.getTestDir(), HConstants.HREGION_LOGDIR_NAME); - manager.addSource("1"); + manager.addSource(slaveId); htd = new HTableDescriptor(test); HColumnDescriptor col = new HColumnDescriptor("f1"); @@ -190,7 +189,7 @@ hlog.append(hri, key, edit); } - assertEquals(6, manager.getHLogs().size()); + assertEquals(6, manager.getHLogs().get(slaveId).size()); hlog.rollWriter(); @@ -201,10 +200,7 @@ test, seq++, System.currentTimeMillis()); hlog.append(hri, key, edit); - assertEquals(1, manager.getHLogs().size()); - - - // TODO Need a case with only 2 HLogs and we only want to delete the first one + assertEquals(1, manager.getHLogs().get(slaveId).size()); } static class DummyServer implements Server { Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (revision 1081954) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (working copy) @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.SortedMap; @@ -64,8 +65,8 @@ private final ReplicationZookeeper zkHelper; // All about stopping private final Stoppable stopper; - // All logs we are currently trackign - private final SortedSet hlogs; + // All logs we are currently tracking + private final Map> hlogsById; private final Configuration conf; private final FileSystem fs; // The path to the latest log we saw, for new coming sources @@ -99,7 +100,8 @@ this.replicating = replicating; this.zkHelper = zkHelper; this.stopper = stopper; - this.hlogs = new TreeSet(); + this.hlogsById = new HashMap>(); + //this.hlogs = new TreeSet(); this.oldsources = new ArrayList(); this.conf = conf; this.fs = fs; @@ -126,12 +128,13 @@ * @param queueRecovered indicates if this queue comes from another region server */ public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered) { - String key = log.getName(); + String key = getLogFromPath(log); LOG.info("Going to report log #" + key + " for position " + position + " in " + log); this.zkHelper.writeReplicationStatus(key.toString(), id, position); - synchronized (this.hlogs) { - if (!queueRecovered && this.hlogs.first() != key) { - SortedSet hlogSet = this.hlogs.headSet(key); + synchronized (this.hlogsById) { + SortedSet hlogs = this.hlogsById.get(id); + if (!queueRecovered && hlogs.first() != key) { + SortedSet hlogSet = hlogs.headSet(key); LOG.info("Removing " + hlogSet.size() + " logs in the list: " + hlogSet); for (String hlog : hlogSet) { @@ -179,12 +182,15 @@ getReplicationSource(this.conf, this.fs, this, stopper, replicating, id); // TODO set it to what's in ZK src.setSourceEnabled(true); - synchronized (this.hlogs) { + synchronized (this.hlogsById) { + LOG.info("adding log"); this.sources.add(src); - if (this.hlogs.size() > 0) { - // Add the latest hlog to that source's queue - this.zkHelper.addLogToList(this.hlogs.last(), - this.sources.get(0).getPeerClusterZnode()); + this.hlogsById.put(id, new TreeSet()); + // Add the latest hlog to that source's queue + if (this.latestPath != null) { + String name = getLogFromPath(this.latestPath); + this.hlogsById.get(id).add(name); + this.zkHelper.addLogToList(name, src.getPeerClusterZnode()); src.enqueueLog(this.latestPath); } } @@ -208,8 +214,13 @@ * Get a copy of the hlogs of the first source on this rs * @return a sorted set of hlog names */ - protected SortedSet getHLogs() { - return new TreeSet(this.hlogs); + protected HashMap> getHLogs() { + HashMap> copy = + new HashMap>(this.hlogsById.size()); + for (String id : this.hlogsById.keySet()) { + copy.put(id, new TreeSet(this.hlogsById.get(id))); + } + return copy; } /** @@ -225,16 +236,20 @@ LOG.warn("Replication stopped, won't add new log"); return; } - - if (this.sources.size() > 0) { - this.zkHelper.addLogToList(newLog.getName(), - this.sources.get(0).getPeerClusterZnode()); + String name = getLogFromPath(newLog); + for (ReplicationSourceInterface source : this.sources) { + this.zkHelper.addLogToList(name, source.getPeerClusterZnode()); } - synchronized (this.hlogs) { - this.hlogs.add(newLog.getName()); + synchronized (this.hlogsById) { + //LOG.info("111 " + hlogsById.keySet().toArray()[0] ); + for (SortedSet hlogs : this.hlogsById.values()) { + LOG.info("222 " + this.hlogsById.toString()); + hlogs.add(name); + LOG.info("size! " + hlogs.size()); + } } this.latestPath = newLog; - // This only update the sources we own, not the recovered ones + // This only updates the sources we own, not the recovered ones for (ReplicationSourceInterface source : this.sources) { source.enqueueLog(newLog); } @@ -273,7 +288,7 @@ ReplicationSource.class.getCanonicalName())); src = (ReplicationSourceInterface) c.newInstance(); } catch (Exception e) { - LOG.warn("Passed replication source implemention throws errors, " + + LOG.warn("Passed replication source implementation throws errors, " + "defaulting to ReplicationSource", e); src = new ReplicationSource(); @@ -532,4 +547,8 @@ public FileSystem getFs() { return this.fs; } + + private String getLogFromPath(Path p) { + return p.getName(); + } }