Index: src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java (revision 0) @@ -0,0 +1,209 @@ +/* + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.fail; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestMultiSlaveReplication { + + private static final Log LOG = LogFactory.getLog(TestReplication.class); + + private static Configuration conf1; + private static Configuration conf2; + private static Configuration conf3; + + private static String clusterKey2; + private static String clusterKey3; + + private static HBaseTestingUtility utility1; + private static HBaseTestingUtility utility2; + private static HBaseTestingUtility utility3; + private static final long SLEEP_TIME = 500; + private static final int NB_RETRIES = 10; + + private static final byte[] tableName = Bytes.toBytes("test"); + private static final byte[] famName = Bytes.toBytes("f"); + private static final byte[] row = Bytes.toBytes("row"); + private static final byte[] noRepfamName = Bytes.toBytes("norep"); + + private static HTableDescriptor table; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf1 = HBaseConfiguration.create(); + conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); + // smaller block size and capacity to trigger more operations + // and test them + conf1.setInt("hbase.regionserver.hlog.blocksize", 1024*20); + conf1.setInt("replication.source.size.capacity", 1024); + conf1.setLong("replication.source.sleepforretries", 100); + conf1.setInt("hbase.regionserver.maxlogs", 10); + conf1.setLong("hbase.master.logcleaner.ttl", 10); + conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true); + conf1.setBoolean("dfs.support.append", true); + conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); + conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, + "org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter"); + + utility1 = new HBaseTestingUtility(conf1); + utility1.startMiniZKCluster(); + MiniZooKeeperCluster miniZK = utility1.getZkCluster(); + new ZooKeeperWatcher(conf1, "cluster1", null, true); + + conf2 = new Configuration(conf1); + conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); + + conf3 = new Configuration(conf1); + conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3"); + + utility2 = new HBaseTestingUtility(conf2); + utility2.setZkCluster(miniZK); + new ZooKeeperWatcher(conf2, "cluster3", null, true); + + utility3 = new HBaseTestingUtility(conf3); + utility3.setZkCluster(miniZK); + new ZooKeeperWatcher(conf3, "cluster3", null, true); + + clusterKey2 = conf2.get(HConstants.ZOOKEEPER_QUORUM)+":" + + conf2.get("hbase.zookeeper.property.clientPort")+":/2"; + + clusterKey3 = conf3.get(HConstants.ZOOKEEPER_QUORUM)+":" + + conf3.get("hbase.zookeeper.property.clientPort")+":/3"; + + table = new HTableDescriptor(tableName); + HColumnDescriptor fam = new HColumnDescriptor(famName); + fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); + table.addFamily(fam); + fam = new HColumnDescriptor(noRepfamName); + table.addFamily(fam); + } + + @Test(timeout=300000) + public void testCyclicReplication() throws Exception { + LOG.info("testCyclicReplication"); + utility1.startMiniCluster(); + utility2.startMiniCluster(); + utility3.startMiniCluster(); + ReplicationAdmin admin1 = new ReplicationAdmin(conf1); + + new HBaseAdmin(conf1).createTable(table); + new HBaseAdmin(conf2).createTable(table); + new HBaseAdmin(conf3).createTable(table); + HTable htable1 = new HTable(conf1, tableName); + htable1.setWriteBufferSize(1024); + HTable htable2 = new HTable(conf2, tableName); + htable2.setWriteBufferSize(1024); + HTable htable3 = new HTable(conf3, tableName); + htable3.setWriteBufferSize(1024); + + admin1.addPeer("1", clusterKey2); + admin1.addPeer("2", clusterKey3); + + // put "row" and wait 'til it got around + putAndWait(row, famName, htable1, htable2, htable3); + + deleteAndWait(row,htable1,htable2,htable3); + + utility3.shutdownMiniCluster(); + utility2.shutdownMiniCluster(); + utility1.shutdownMiniCluster(); + } + + + private void deleteAndWait(byte[] row, HTable source, HTable... targets) + throws Exception { + Delete del = new Delete(row); + source.delete(del); + + Get get = new Get(row); + for (int i = 0; i < NB_RETRIES; i++) { + if (i==NB_RETRIES-1) { + fail("Waited too much time for del replication"); + } + boolean removeFromAll = true; + for (HTable target : targets) { + Result res = target.get(get); + if (res.size() >= 1) { + LOG.info("Row not deleted"); + removeFromAll = false; + break; + } + } + if (removeFromAll) { + break; + } else { + Thread.sleep(SLEEP_TIME); + } + } + } + + private void putAndWait(byte[] row, byte[] fam, HTable source, HTable... targets) + throws Exception { + Put put = new Put(row); + put.add(fam, row, row); + source.put(put); + + Get get = new Get(row); + for (int i = 0; i < NB_RETRIES; i++) { + if (i==NB_RETRIES-1) { + fail("Waited too much time for put replication"); + } + boolean replicatedToAll = true; + for (HTable target : targets) { + Result res = target.get(get); + if (res.size() == 0) { + LOG.info("Row not available"); + replicatedToAll = false; + break; + } else { + assertArrayEquals(res.value(), row); + } + } + if (replicatedToAll) { + break; + } else { + Thread.sleep(SLEEP_TIME); + } + } + } +} Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (revision 1167600) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (working copy) @@ -205,7 +205,7 @@ if (this.hlogs.size() > 0) { // Add the latest hlog to that source's queue this.zkHelper.addLogToList(this.hlogs.last(), - this.sources.get(0).getPeerClusterZnode()); + src.getPeerClusterZnode()); src.enqueueLog(this.latestPath); } } Index: src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (revision 1167600) +++ src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (working copy) @@ -264,10 +264,6 @@ } if (this.peerClusters.containsKey(peerId)) { return false; - // TODO remove when we support it - } else if (this.peerClusters.size() > 0) { - LOG.warn("Multiple slaves feature not supported"); - return false; } ReplicationPeer peer = getPeer(peerId); if (peer == null) { @@ -351,8 +347,6 @@ try { if (peerExists(id)) { throw new IllegalArgumentException("Cannot add existing peer"); - } else if (countPeers() > 0) { - throw new IllegalStateException("Multi-slave isn't supported yet"); } ZKUtil.createWithParents(this.zookeeper, this.peersZNode); ZKUtil.createAndWatch(this.zookeeper, @@ -367,12 +361,6 @@ ZKUtil.joinZNode(this.peersZNode, id)) >= 0; } - private int countPeers() throws KeeperException { - List peers = - ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode); - return peers == null ? 0 : peers.size(); - } - /** * This reads the state znode for replication and sets the atomic boolean */