diff --git a/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index d76e333..93b8aed 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ b/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client.replication; import java.io.IOException; +import java.util.Map; import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.conf.Configuration; @@ -133,6 +134,14 @@ public class ReplicationAdmin { } /** + * Map of this cluster's peers for display. + * @return A map of peer ids to peer cluster keys + */ + public Map listPeers() { + return this.replicationZk.listPeers(); + } + + /** * Get the current status of the kill switch, if the cluster is replicating * or not. * @return true if the cluster is replicated, otherwise false diff --git a/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java b/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java index 9686000..fe0802e 100644 --- a/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java @@ -199,6 +199,24 @@ public class ReplicationZookeeper { } /** + * Map of this cluster's peers for display. + * @return A map of peer ids to peer cluster keys + */ + public Map listPeers() { + Map peers = new TreeMap(); + List ids = null; + try { + ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode); + for (String id : ids) { + peers.put(id, Bytes.toString(ZKUtil.getData(this.zookeeper, + ZKUtil.joinZNode(this.peersZNode, id)))); + } + } catch (KeeperException e) { + this.abortable.abort("Cannot get the list of peers ", e); + } + return peers; + } + /** * Returns all region servers from given peer * * @param peerClusterId (byte) the cluster to interrogate @@ -245,10 +263,6 @@ public class ReplicationZookeeper { } if (this.peerClusters.containsKey(peerId)) { return false; - // TODO remove when we support it - } else if (this.peerClusters.size() > 0) { - LOG.warn("Multiple slaves feature not supported"); - return false; } ReplicationPeer peer = getPeer(peerId); if (peer == null) { @@ -332,8 +346,6 @@ public class ReplicationZookeeper { try { if (peerExists(id)) { throw new IllegalArgumentException("Cannot add existing peer"); - } else if (countPeers() > 0) { - throw new IllegalStateException("Multi-slave isn't supported yet"); } ZKUtil.createWithParents(this.zookeeper, this.peersZNode); ZKUtil.createAndWatch(this.zookeeper, @@ -348,12 +360,6 @@ public class ReplicationZookeeper { ZKUtil.joinZNode(this.peersZNode, id)) >= 0; } - private int countPeers() throws KeeperException { - List peers = - ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode); - return peers == null ? 0 : peers.size(); - } - /** * This reads the state znode for replication and sets the atomic boolean */ @@ -389,11 +395,11 @@ public class ReplicationZookeeper { /** * Add a new log to the list of hlogs in zookeeper * @param filename name of the hlog's znode - * @param clusterId name of the cluster's znode + * @param peerId name of the cluster's znode */ - public void addLogToList(String filename, String clusterId) { + public void addLogToList(String filename, String peerId) { try { - String znode = ZKUtil.joinZNode(this.rsServerNameZnode, clusterId); + String znode = ZKUtil.joinZNode(this.rsServerNameZnode, peerId); znode = ZKUtil.joinZNode(znode, filename); ZKUtil.createWithParents(this.zookeeper, znode); } catch (KeeperException e) { diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index e576633..d911863 100644 --- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -22,6 +22,8 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.SortedMap; @@ -70,7 +72,7 @@ public class ReplicationSourceManager { // All about stopping private final Stoppable stopper; // All logs we are currently trackign - private final SortedSet hlogs; + private final Map> hlogsById; private final Configuration conf; private final FileSystem fs; // The path to the latest log we saw, for new coming sources @@ -108,7 +110,7 @@ public class ReplicationSourceManager { this.replicating = replicating; this.zkHelper = zkHelper; this.stopper = stopper; - this.hlogs = new TreeSet(); + this.hlogsById = new HashMap>(); this.oldsources = new ArrayList(); this.conf = conf; this.fs = fs; @@ -149,14 +151,15 @@ public class ReplicationSourceManager { public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered) { String key = log.getName(); LOG.info("Going to report log #" + key + " for position " + position + " in " + log); - this.zkHelper.writeReplicationStatus(key.toString(), id, position); - synchronized (this.hlogs) { - if (!queueRecovered && this.hlogs.first() != key) { - SortedSet hlogSet = this.hlogs.headSet(key); + this.zkHelper.writeReplicationStatus(key, id, position); + synchronized (this.hlogsById) { + SortedSet hlogs = this.hlogsById.get(id); + if (!queueRecovered && hlogs.first() != key) { + SortedSet hlogSet = hlogs.headSet(key); LOG.info("Removing " + hlogSet.size() + " logs in the list: " + hlogSet); for (String hlog : hlogSet) { - this.zkHelper.removeLogFromList(hlog.toString(), id); + this.zkHelper.removeLogFromList(hlog, id); } hlogSet.clear(); } @@ -200,12 +203,14 @@ public class ReplicationSourceManager { getReplicationSource(this.conf, this.fs, this, stopper, replicating, id); // TODO set it to what's in ZK src.setSourceEnabled(true); - synchronized (this.hlogs) { + synchronized (this.hlogsById) { this.sources.add(src); - if (this.hlogs.size() > 0) { - // Add the latest hlog to that source's queue - this.zkHelper.addLogToList(this.hlogs.last(), - this.sources.get(0).getPeerClusterZnode()); + this.hlogsById.put(id, new TreeSet()); + // Add the latest hlog to that source's queue + if (this.latestPath != null) { + String name = this.latestPath.getName(); + this.hlogsById.get(id).add(name); + this.zkHelper.addLogToList(name, src.getPeerClusterZnode()); src.enqueueLog(this.latestPath); } } @@ -230,8 +235,8 @@ public class ReplicationSourceManager { * Get a copy of the hlogs of the first source on this rs * @return a sorted set of hlog names */ - protected SortedSet getHLogs() { - return new TreeSet(this.hlogs); + protected Map> getHLogs() { + return Collections.unmodifiableMap(hlogsById); } /** @@ -248,21 +253,25 @@ public class ReplicationSourceManager { return; } - synchronized (this.hlogs) { - if (this.sources.size() > 0) { - this.zkHelper.addLogToList(newLog.getName(), - this.sources.get(0).getPeerClusterZnode()); - } else { - // If there's no slaves, don't need to keep the old hlogs since - // we only consider the last one when a new slave comes in - this.hlogs.clear(); + synchronized (this.hlogsById) { + String name = newLog.getName(); + for (ReplicationSourceInterface source : this.sources) { + this.zkHelper.addLogToList(name, source.getPeerClusterZnode()); + } + for (SortedSet hlogs : this.hlogsById.values()) { + if (this.sources.isEmpty()) { + // If there's no slaves, don't need to keep the old hlogs since + // we only consider the last one when a new slave comes in + hlogs.clear(); + } + hlogs.add(name); } - this.hlogs.add(newLog.getName()); } + this.latestPath = newLog; - // This only update the sources we own, not the recovered ones + // This only updates the sources we own, not the recovered ones for (ReplicationSourceInterface source : this.sources) { - source.enqueueLog(newLog); + source.enqueueLog(newLog); } } @@ -281,7 +290,7 @@ public class ReplicationSourceManager { * @param manager the manager to use * @param stopper the stopper object for this region server * @param replicating the status of the replication on this cluster - * @param peerClusterId the id of the peer cluster + * @param peerId the id of the peer cluster * @return the created source * @throws IOException */ @@ -291,7 +300,7 @@ public class ReplicationSourceManager { final ReplicationSourceManager manager, final Stoppable stopper, final AtomicBoolean replicating, - final String peerClusterId) throws IOException { + final String peerId) throws IOException { ReplicationSourceInterface src; try { @SuppressWarnings("rawtypes") @@ -299,12 +308,12 @@ public class ReplicationSourceManager { ReplicationSource.class.getCanonicalName())); src = (ReplicationSourceInterface) c.newInstance(); } catch (Exception e) { - LOG.warn("Passed replication source implemention throws errors, " + + LOG.warn("Passed replication source implementation throws errors, " + "defaulting to ReplicationSource", e); src = new ReplicationSource(); } - src.init(conf, fs, manager, stopper, replicating, peerClusterId); + src.init(conf, fs, manager, stopper, replicating, peerId); return src; } @@ -410,7 +419,7 @@ public class ReplicationSourceManager { return; } LOG.info(path + " znode expired, trying to lock it"); - transferQueues(zkHelper.getZNodeName(path)); + transferQueues(ReplicationZookeeper.getZNodeName(path)); } /** @@ -462,7 +471,7 @@ public class ReplicationSourceManager { if (peers == null) { return; } - String id = zkHelper.getZNodeName(path); + String id = ReplicationZookeeper.getZNodeName(path); removePeer(id); } diff --git a/src/main/ruby/hbase/replication_admin.rb b/src/main/ruby/hbase/replication_admin.rb index 35e37af..c4be93c 100644 --- a/src/main/ruby/hbase/replication_admin.rb +++ b/src/main/ruby/hbase/replication_admin.rb @@ -44,6 +44,12 @@ module Hbase end #---------------------------------------------------------------------------------------------- + # List all peer clusters + def list_peers + @replication_admin.listPeers + end + + #---------------------------------------------------------------------------------------------- # Restart the replication stream to the specified peer def enable_peer(id) @replication_admin.enablePeer(id) diff --git a/src/main/ruby/shell.rb b/src/main/ruby/shell.rb index 9027202..6d108e7 100644 --- a/src/main/ruby/shell.rb +++ b/src/main/ruby/shell.rb @@ -269,6 +269,7 @@ Shell.load_command_group( :commands => %w[ add_peer remove_peer + list_peers enable_peer disable_peer start_replication diff --git a/src/main/ruby/shell/commands/list_peers.rb b/src/main/ruby/shell/commands/list_peers.rb new file mode 100644 index 0000000..3f464f3 --- /dev/null +++ b/src/main/ruby/shell/commands/list_peers.rb @@ -0,0 +1,47 @@ +# +# Copyright The Apache Software Foundation +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class ListPeers< Command + def help + return <<-EOF +List all replication peer clusters. + + hbase> list_peers +EOF + end + + def command() + now = Time.now + peers = replication_admin.list_peers + + formatter.header(["PEER ID", "CLUSTER KEY"]) + + peers.entrySet().each do |e| + formatter.row([ e.key, e.value ]) + end + + formatter.footer(now) + end + end + end +end + diff --git a/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index 645b8ce..fa82e1c 100644 --- a/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -9,6 +9,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; import org.junit.BeforeClass; import org.junit.Test; @@ -48,8 +49,13 @@ public class TestReplicationAdmin { HConstants.HREGION_OLDLOGDIR_NAME); Path logDir = new Path(TEST_UTIL.getTestDir(), HConstants.HREGION_LOGDIR_NAME); - manager = new ReplicationSourceManager(admin.getReplicationZk(), - conf, null, FileSystem.get(conf), replicating, logDir, oldLogDir); + manager = new ReplicationSourceManager(admin.getReplicationZk(), conf, + new Stoppable() { + @Override + public void stop(String why) {} + @Override + public boolean isStopped() {return false;} + }, FileSystem.get(conf), replicating, logDir, oldLogDir); } /** @@ -77,16 +83,15 @@ public class TestReplicationAdmin { // OK! } assertEquals(1, admin.getPeersCount()); - // Add a second, returns illegal since multi-slave isn't supported + // Add a second since multi-slave is supported try { admin.addPeer(ID_SECOND, KEY_SECOND); - fail(); } catch (IllegalStateException iae) { - // OK! + fail(); } - assertEquals(1, admin.getPeersCount()); + assertEquals(2, admin.getPeersCount()); // Remove the first peer we added admin.removePeer(ID_ONE); - assertEquals(0, admin.getPeersCount()); + assertEquals(1, admin.getPeersCount()); } } diff --git a/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java b/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java new file mode 100644 index 0000000..b69c356 --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java @@ -0,0 +1,266 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestMultiSlaveReplication { + + private static final Log LOG = LogFactory.getLog(TestReplication.class); + + private static Configuration conf1; + private static Configuration conf2; + private static Configuration conf3; + + private static String clusterKey2; + private static String clusterKey3; + + private static HBaseTestingUtility utility1; + private static HBaseTestingUtility utility2; + private static HBaseTestingUtility utility3; + private static final long SLEEP_TIME = 500; + private static final int NB_RETRIES = 10; + + private static final byte[] tableName = Bytes.toBytes("test"); + private static final byte[] famName = Bytes.toBytes("f"); + private static final byte[] row = Bytes.toBytes("row"); + private static final byte[] row1 = Bytes.toBytes("row1"); + private static final byte[] row2 = Bytes.toBytes("row2"); + private static final byte[] row3 = Bytes.toBytes("row3"); + private static final byte[] noRepfamName = Bytes.toBytes("norep"); + + private static HTableDescriptor table; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf1 = HBaseConfiguration.create(); + conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); + // smaller block size and capacity to trigger more operations + // and test them + conf1.setInt("hbase.regionserver.hlog.blocksize", 1024*20); + conf1.setInt("replication.source.size.capacity", 1024); + conf1.setLong("replication.source.sleepforretries", 100); + conf1.setInt("hbase.regionserver.maxlogs", 10); + conf1.setLong("hbase.master.logcleaner.ttl", 10); + conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true); + conf1.setBoolean("dfs.support.append", true); + conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); + + utility1 = new HBaseTestingUtility(conf1); + utility1.startMiniZKCluster(); + MiniZooKeeperCluster miniZK = utility1.getZkCluster(); + new ZooKeeperWatcher(conf1, "cluster1", null); + + conf2 = new Configuration(conf1); + conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); + + conf3 = new Configuration(conf1); + conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3"); + + utility2 = new HBaseTestingUtility(conf2); + utility2.setZkCluster(miniZK); + new ZooKeeperWatcher(conf2, "cluster3", null); + + utility3 = new HBaseTestingUtility(conf3); + utility3.setZkCluster(miniZK); + new ZooKeeperWatcher(conf3, "cluster3", null); + + clusterKey2 = conf2.get(HConstants.ZOOKEEPER_QUORUM)+":" + + conf2.get("hbase.zookeeper.property.clientPort")+":/2"; + + clusterKey3 = conf3.get(HConstants.ZOOKEEPER_QUORUM)+":" + + conf3.get("hbase.zookeeper.property.clientPort")+":/3"; + + table = new HTableDescriptor(tableName); + HColumnDescriptor fam = new HColumnDescriptor(famName); + fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); + table.addFamily(fam); + fam = new HColumnDescriptor(noRepfamName); + table.addFamily(fam); + } + + @Test(timeout=300000) + public void testMultiSlaveReplication() throws Exception { + LOG.info("testCyclicReplication"); + MiniHBaseCluster master = utility1.startMiniCluster(); + utility2.startMiniCluster(); + utility3.startMiniCluster(); + ReplicationAdmin admin1 = new ReplicationAdmin(conf1); + + new HBaseAdmin(conf1).createTable(table); + new HBaseAdmin(conf2).createTable(table); + new HBaseAdmin(conf3).createTable(table); + HTable htable1 = new HTable(conf1, tableName); + htable1.setWriteBufferSize(1024); + HTable htable2 = new HTable(conf2, tableName); + htable2.setWriteBufferSize(1024); + HTable htable3 = new HTable(conf3, tableName); + htable3.setWriteBufferSize(1024); + + admin1.addPeer("1", clusterKey2); + + // put "row" and wait 'til it got around, then delete + putAndWait(row, famName, htable1, htable2); + deleteAndWait(row, htable1, htable2); + // check it wasn't replication to cluster 3 + checkRow(row,0,htable3); + + putAndWait(row2, famName, htable1, htable2); + + // now roll the region server's logs + for ( JVMClusterUtil.RegionServerThread r : + utility1.getHBaseCluster().getRegionServerThreads()) { + r.getRegionServer().getWAL().rollWriter(); + } + // after the log was rolled put a new row + putAndWait(row3, famName, htable1, htable2); + + admin1.addPeer("2", clusterKey3); + + // put a row, check it was replicated to all clusters + putAndWait(row1, famName, htable1, htable2, htable3); + // delete and verify + deleteAndWait(row1, htable1, htable2, htable3); + + // make sure row2 did not get replicated after + // cluster 3 was added + checkRow(row2,0,htable3); + + // row3 will get replicated, because it was in the + // latest log + checkRow(row3,1,htable3); + + Put p = new Put(row); + p.add(famName, row, row); + htable1.put(p); + // now roll the logs again + for ( JVMClusterUtil.RegionServerThread r : + utility1.getHBaseCluster().getRegionServerThreads()) { + r.getRegionServer().getWAL().rollWriter(); + } + + // cleanup "row2", also conveniently use this to wait replication + // to finish + deleteAndWait(row2, htable1, htable2, htable3); + // Even if the log was rolled in the middle of the replication + // "row" is still replication. + checkRow(row, 1, htable2, htable3); + + // cleanup the rest + deleteAndWait(row, htable1, htable2, htable3); + deleteAndWait(row3, htable1, htable2, htable3); + + utility3.shutdownMiniCluster(); + utility2.shutdownMiniCluster(); + utility1.shutdownMiniCluster(); + } + + private void checkRow(byte[] row, int count, HTable... tables) throws IOException { + Get get = new Get(row); + for (HTable table : tables) { + Result res = table.get(get); + assertEquals(count, res.size()); + } + } + + private void deleteAndWait(byte[] row, HTable source, HTable... targets) + throws Exception { + Delete del = new Delete(row); + source.delete(del); + + Get get = new Get(row); + for (int i = 0; i < NB_RETRIES; i++) { + if (i==NB_RETRIES-1) { + fail("Waited too much time for del replication"); + } + boolean removedFromAll = true; + for (HTable target : targets) { + Result res = target.get(get); + if (res.size() >= 1) { + LOG.info("Row not deleted"); + removedFromAll = false; + break; + } + } + if (removedFromAll) { + break; + } else { + Thread.sleep(SLEEP_TIME); + } + } + } + + private void putAndWait(byte[] row, byte[] fam, HTable source, HTable... targets) + throws Exception { + Put put = new Put(row); + put.add(fam, row, row); + source.put(put); + + Get get = new Get(row); + for (int i = 0; i < NB_RETRIES; i++) { + if (i==NB_RETRIES-1) { + fail("Waited too much time for put replication"); + } + boolean replicatedToAll = true; + for (HTable target : targets) { + Result res = target.get(get); + if (res.size() == 0) { + LOG.info("Row not available"); + replicatedToAll = false; + break; + } else { + assertArrayEquals(res.value(), row); + } + } + if (replicatedToAll) { + break; + } else { + Thread.sleep(SLEEP_TIME); + } + } + } +} diff --git a/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 20a1ff8..4752ab7 100644 --- a/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -84,6 +84,8 @@ public class TestReplicationSourceManager { private static final byte[] test = Bytes.toBytes("test"); + private static final String slaveId = "1"; + private static FileSystem fs; private static Path oldLogDir; @@ -118,7 +120,7 @@ public class TestReplicationSourceManager { logDir = new Path(utility.getTestDir(), HConstants.HREGION_LOGDIR_NAME); - manager.addSource("1"); + manager.addSource(slaveId); htd = new HTableDescriptor(test); HColumnDescriptor col = new HColumnDescriptor("f1"); @@ -190,7 +192,7 @@ public class TestReplicationSourceManager { hlog.append(hri, key, edit); } - assertEquals(6, manager.getHLogs().size()); + assertEquals(6, manager.getHLogs().get(slaveId).size()); hlog.rollWriter();