Index: src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java (revision 0) @@ -0,0 +1,209 @@ +/* + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.fail; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestMultiSlaveReplication { + + private static final Log LOG = LogFactory.getLog(TestReplication.class); + + private static Configuration conf1; + private static Configuration conf2; + private static Configuration conf3; + + private static String clusterKey2; + private static String clusterKey3; + + private static HBaseTestingUtility utility1; + private static HBaseTestingUtility utility2; + private static HBaseTestingUtility utility3; + private static final long SLEEP_TIME = 500; + private static final int NB_RETRIES = 10; + + private static final byte[] tableName = Bytes.toBytes("test"); + private static final byte[] famName = Bytes.toBytes("f"); + private static final byte[] row = Bytes.toBytes("row"); + private static final byte[] noRepfamName = Bytes.toBytes("norep"); + + private static HTableDescriptor table; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf1 = HBaseConfiguration.create(); + conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); + // smaller block size and capacity to trigger more operations + // and test them + conf1.setInt("hbase.regionserver.hlog.blocksize", 1024*20); + conf1.setInt("replication.source.size.capacity", 1024); + conf1.setLong("replication.source.sleepforretries", 100); + conf1.setInt("hbase.regionserver.maxlogs", 10); + conf1.setLong("hbase.master.logcleaner.ttl", 10); + conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true); + conf1.setBoolean("dfs.support.append", true); + conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); + conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, + "org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter"); + + utility1 = new HBaseTestingUtility(conf1); + utility1.startMiniZKCluster(); + MiniZooKeeperCluster miniZK = utility1.getZkCluster(); + new ZooKeeperWatcher(conf1, "cluster1", null, true); + + conf2 = new Configuration(conf1); + conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); + + conf3 = new Configuration(conf1); + conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3"); + + utility2 = new HBaseTestingUtility(conf2); + utility2.setZkCluster(miniZK); + new ZooKeeperWatcher(conf2, "cluster3", null, true); + + utility3 = new HBaseTestingUtility(conf3); + utility3.setZkCluster(miniZK); + new ZooKeeperWatcher(conf3, "cluster3", null, true); + + clusterKey2 = conf2.get(HConstants.ZOOKEEPER_QUORUM)+":" + + conf2.get("hbase.zookeeper.property.clientPort")+":/2"; + + clusterKey3 = conf3.get(HConstants.ZOOKEEPER_QUORUM)+":" + + conf3.get("hbase.zookeeper.property.clientPort")+":/3"; + + table = new HTableDescriptor(tableName); + HColumnDescriptor fam = new HColumnDescriptor(famName); + fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); + table.addFamily(fam); + fam = new HColumnDescriptor(noRepfamName); + table.addFamily(fam); + } + + @Test(timeout=300000) + public void testCyclicReplication() throws Exception { + LOG.info("testCyclicReplication"); + utility1.startMiniCluster(); + utility2.startMiniCluster(); + utility3.startMiniCluster(); + ReplicationAdmin admin1 = new ReplicationAdmin(conf1); + + new HBaseAdmin(conf1).createTable(table); + new HBaseAdmin(conf2).createTable(table); + new HBaseAdmin(conf3).createTable(table); + HTable htable1 = new HTable(conf1, tableName); + htable1.setWriteBufferSize(1024); + HTable htable2 = new HTable(conf2, tableName); + htable2.setWriteBufferSize(1024); + HTable htable3 = new HTable(conf3, tableName); + htable3.setWriteBufferSize(1024); + + admin1.addPeer("1", clusterKey2); + admin1.addPeer("2", clusterKey3); + + // put "row" and wait 'til it got around + putAndWait(row, famName, htable1, htable2, htable3); + + deleteAndWait(row,htable1,htable2,htable3); + + utility3.shutdownMiniCluster(); + utility2.shutdownMiniCluster(); + utility1.shutdownMiniCluster(); + } + + + private void deleteAndWait(byte[] row, HTable source, HTable... targets) + throws Exception { + Delete del = new Delete(row); + source.delete(del); + + Get get = new Get(row); + for (int i = 0; i < NB_RETRIES; i++) { + if (i==NB_RETRIES-1) { + fail("Waited too much time for del replication"); + } + boolean removeFromAll = true; + for (HTable target : targets) { + Result res = target.get(get); + if (res.size() >= 1) { + LOG.info("Row not deleted"); + removeFromAll = false; + break; + } + } + if (removeFromAll) { + break; + } else { + Thread.sleep(SLEEP_TIME); + } + } + } + + private void putAndWait(byte[] row, byte[] fam, HTable source, HTable... targets) + throws Exception { + Put put = new Put(row); + put.add(fam, row, row); + source.put(put); + + Get get = new Get(row); + for (int i = 0; i < NB_RETRIES; i++) { + if (i==NB_RETRIES-1) { + fail("Waited too much time for put replication"); + } + boolean replicatedToAll = true; + for (HTable target : targets) { + Result res = target.get(get); + if (res.size() == 0) { + LOG.info("Row not available"); + replicatedToAll = false; + break; + } else { + assertArrayEquals(res.value(), row); + } + } + if (replicatedToAll) { + break; + } else { + Thread.sleep(SLEEP_TIME); + } + } + } +} Index: src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (revision 1169834) +++ src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (working copy) @@ -81,6 +81,8 @@ private static final byte[] test = Bytes.toBytes("test"); + private static final String slaveId = "1"; + private static FileSystem fs; private static Path oldLogDir; @@ -115,7 +117,7 @@ logDir = new Path(utility.getTestDir(), HConstants.HREGION_LOGDIR_NAME); - manager.addSource("1"); + manager.addSource(slaveId); htd = new HTableDescriptor(test); HColumnDescriptor col = new HColumnDescriptor("f1"); @@ -188,7 +190,7 @@ hlog.append(hri, key, edit, htd); } - assertEquals(6, manager.getHLogs().size()); + assertEquals(6, manager.getHLogs().get(slaveId).size()); hlog.rollWriter(); Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (revision 1169834) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (working copy) @@ -22,6 +22,8 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.SortedMap; @@ -70,7 +72,7 @@ // All about stopping private final Stoppable stopper; // All logs we are currently trackign - private final SortedSet hlogs; + private final Map> hlogsById; private final Configuration conf; private final FileSystem fs; // The path to the latest log we saw, for new coming sources @@ -108,7 +110,7 @@ this.replicating = replicating; this.zkHelper = zkHelper; this.stopper = stopper; - this.hlogs = new TreeSet(); + this.hlogsById = new HashMap>(); this.oldsources = new ArrayList(); this.conf = conf; this.fs = fs; @@ -150,9 +152,10 @@ String key = log.getName(); LOG.info("Going to report log #" + key + " for position " + position + " in " + log); this.zkHelper.writeReplicationStatus(key.toString(), id, position); - synchronized (this.hlogs) { - if (!queueRecovered && this.hlogs.first() != key) { - SortedSet hlogSet = this.hlogs.headSet(key); + synchronized (this.hlogsById) { + SortedSet hlogs = this.hlogsById.get(id); + if (!queueRecovered && hlogs.first() != key) { + SortedSet hlogSet = hlogs.headSet(key); LOG.info("Removing " + hlogSet.size() + " logs in the list: " + hlogSet); for (String hlog : hlogSet) { @@ -200,12 +203,14 @@ getReplicationSource(this.conf, this.fs, this, stopper, replicating, id); // TODO set it to what's in ZK src.setSourceEnabled(true); - synchronized (this.hlogs) { + synchronized (this.hlogsById) { this.sources.add(src); - if (this.hlogs.size() > 0) { - // Add the latest hlog to that source's queue - this.zkHelper.addLogToList(this.hlogs.last(), - this.sources.get(0).getPeerClusterZnode()); + this.hlogsById.put(id, new TreeSet()); + // Add the latest hlog to that source's queue + if (this.latestPath != null) { + String name = this.latestPath.getName(); + this.hlogsById.get(id).add(name); + this.zkHelper.addLogToList(name, src.getPeerClusterZnode()); src.enqueueLog(this.latestPath); } } @@ -230,8 +235,8 @@ * Get a copy of the hlogs of the first source on this rs * @return a sorted set of hlog names */ - protected SortedSet getHLogs() { - return new TreeSet(this.hlogs); + protected Map> getHLogs() { + return Collections.unmodifiableMap(hlogsById); } /** @@ -248,21 +253,25 @@ return; } - synchronized (this.hlogs) { - if (this.sources.size() > 0) { - this.zkHelper.addLogToList(newLog.getName(), - this.sources.get(0).getPeerClusterZnode()); - } else { - // If there's no slaves, don't need to keep the old hlogs since - // we only consider the last one when a new slave comes in - this.hlogs.clear(); + synchronized (this.hlogsById) { + String name = newLog.getName(); + for (ReplicationSourceInterface source : this.sources) { + this.zkHelper.addLogToList(name, source.getPeerClusterZnode()); } - this.hlogs.add(newLog.getName()); + for (SortedSet hlogs : this.hlogsById.values()) { + if (this.sources.isEmpty()) { + // If there's no slaves, don't need to keep the old hlogs since + // we only consider the last one when a new slave comes in + hlogs.clear(); + } + hlogs.add(name); + } } + this.latestPath = newLog; - // This only update the sources we own, not the recovered ones + // This only updates the sources we own, not the recovered ones for (ReplicationSourceInterface source : this.sources) { - source.enqueueLog(newLog); + source.enqueueLog(newLog); } } @@ -281,7 +290,7 @@ * @param manager the manager to use * @param stopper the stopper object for this region server * @param replicating the status of the replication on this cluster - * @param peerClusterId the id of the peer cluster + * @param peerId the id of the peer cluster * @return the created source * @throws IOException */ @@ -291,7 +300,7 @@ final ReplicationSourceManager manager, final Stoppable stopper, final AtomicBoolean replicating, - final String peerClusterId) throws IOException { + final String peerId) throws IOException { ReplicationSourceInterface src; try { @SuppressWarnings("rawtypes") @@ -299,12 +308,12 @@ ReplicationSource.class.getCanonicalName())); src = (ReplicationSourceInterface) c.newInstance(); } catch (Exception e) { - LOG.warn("Passed replication source implemention throws errors, " + + LOG.warn("Passed replication source implementation throws errors, " + "defaulting to ReplicationSource", e); src = new ReplicationSource(); } - src.init(conf, fs, manager, stopper, replicating, peerClusterId); + src.init(conf, fs, manager, stopper, replicating, peerId); return src; } Index: src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (revision 1169834) +++ src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (working copy) @@ -264,10 +264,6 @@ } if (this.peerClusters.containsKey(peerId)) { return false; - // TODO remove when we support it - } else if (this.peerClusters.size() > 0) { - LOG.warn("Multiple slaves feature not supported"); - return false; } ReplicationPeer peer = getPeer(peerId); if (peer == null) { @@ -351,8 +347,6 @@ try { if (peerExists(id)) { throw new IllegalArgumentException("Cannot add existing peer"); - } else if (countPeers() > 0) { - throw new IllegalStateException("Multi-slave isn't supported yet"); } ZKUtil.createWithParents(this.zookeeper, this.peersZNode); ZKUtil.createAndWatch(this.zookeeper, @@ -367,12 +361,6 @@ ZKUtil.joinZNode(this.peersZNode, id)) >= 0; } - private int countPeers() throws KeeperException { - List peers = - ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode); - return peers == null ? 0 : peers.size(); - } - /** * This reads the state znode for replication and sets the atomic boolean */ @@ -408,11 +396,11 @@ /** * Add a new log to the list of hlogs in zookeeper * @param filename name of the hlog's znode - * @param clusterId name of the cluster's znode + * @param peerId name of the cluster's znode */ - public void addLogToList(String filename, String clusterId) { + public void addLogToList(String filename, String peerId) { try { - String znode = ZKUtil.joinZNode(this.rsServerNameZnode, clusterId); + String znode = ZKUtil.joinZNode(this.rsServerNameZnode, peerId); znode = ZKUtil.joinZNode(znode, filename); ZKUtil.createWithParents(this.zookeeper, znode); } catch (KeeperException e) {