Index: src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java (revision 1021408) +++ src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java (working copy) @@ -42,6 +42,8 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -54,11 +56,10 @@ private static Configuration conf1; private static Configuration conf2; -/* - private static ZooKeeperWrapper zkw1; - private static ZooKeeperWrapper zkw2; - */ + private static ZooKeeperWatcher zkw1; + private static ZooKeeperWatcher zkw2; + private static HTable htable1; private static HTable htable2; @@ -92,15 +93,15 @@ conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true); conf1.setBoolean("dfs.support.append", true); conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); - /* REENALBE utility1 = new HBaseTestingUtility(conf1); utility1.startMiniZKCluster(); MiniZooKeeperCluster miniZK = utility1.getZkCluster(); - zkw1 = ZooKeeperWrapper.createInstance(conf1, "cluster1"); - zkw1.writeZNode("/1", "replication", ""); - zkw1.writeZNode("/1/replication", "master", + zkw1 = new ZooKeeperWatcher(conf1, "cluster1", null); + ZKUtil.createWithParents(zkw1, "/1/replication/master"); + ZKUtil.createWithParents(zkw1, "/1/replication/state"); + ZKUtil.setData(zkw1, "/1/replication/master", Bytes.toBytes( conf1.get(HConstants.ZOOKEEPER_QUORUM)+":" + - conf1.get("hbase.zookeeper.property.clientPort")+":/1"); + conf1.get("hbase.zookeeper.property.clientPort")+":/1")); setIsReplication(true); LOG.info("Setup first Zk"); @@ -113,15 +114,16 @@ utility2 = new HBaseTestingUtility(conf2); utility2.setZkCluster(miniZK); - zkw2 = ZooKeeperWrapper.createInstance(conf2, "cluster2"); - zkw2.writeZNode("/2", "replication", ""); - zkw2.writeZNode("/2/replication", "master", + zkw2 = new ZooKeeperWatcher(conf2, "cluster2", null); + ZKUtil.createWithParents(zkw2, "/2/replication"); + /*zkw2.writeZNode("/2/replication", "master", conf1.get(HConstants.ZOOKEEPER_QUORUM)+":" + - conf1.get("hbase.zookeeper.property.clientPort")+":/1"); + conf1.get("hbase.zookeeper.property.clientPort")+":/1");*/ - zkw1.writeZNode("/1/replication/peers", "1", + ZKUtil.createWithParents(zkw1, "/1/replication/peers/2"); + ZKUtil.setData(zkw1, "/1/replication/peers/2", Bytes.toBytes( conf2.get(HConstants.ZOOKEEPER_QUORUM)+":" + - conf2.get("hbase.zookeeper.property.clientPort")+":/2"); + conf2.get("hbase.zookeeper.property.clientPort")+":/2")); LOG.info("Setup second Zk"); @@ -143,12 +145,12 @@ htable1 = new HTable(conf1, tableName); htable1.setWriteBufferSize(1024); htable2 = new HTable(conf2, tableName); - */ } private static void setIsReplication(boolean rep) throws Exception { LOG.info("Set rep " + rep); - // REENALBE zkw1.writeZNode("/1/replication", "state", Boolean.toString(rep)); + ZKUtil.setData(zkw1,"/1/replication/state", + Bytes.toBytes(Boolean.toString(rep))); // Takes some ms for ZK to fire the watcher Thread.sleep(SLEEP_TIME); } @@ -181,7 +183,7 @@ * Add a row, check it's replicated, delete it, check's gone * @throws Exception */ - @Ignore @Test + @Test public void testSimplePutDelete() throws Exception { LOG.info("testSimplePutDelete"); Put put = new Put(row); @@ -229,7 +231,7 @@ * Try a small batch upload using the write buffer, check it's replicated * @throws Exception */ - @Ignore @Test + @Test public void testSmallBatch() throws Exception { LOG.info("testSmallBatch"); Put put; @@ -273,7 +275,7 @@ * replicated, enable it, try replicating and it should work * @throws Exception */ - @Ignore @Test + @Test public void testStartStop() throws Exception { // Test stopping replication @@ -342,7 +344,7 @@ * hlog rolling and other non-trivial code paths * @throws Exception */ - @Ignore @Test + @Test public void loadTesting() throws Exception { htable1.setWriteBufferSize(1024); htable1.setAutoFlush(false); @@ -396,7 +398,7 @@ * the upload. The failover happens internally. * @throws Exception */ - @Ignore @Test + @Test public void queueFailover() throws Exception { utility1.createMultiRegions(htable1, famName); Index: src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (revision 1003174) +++ src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (working copy) @@ -31,15 +31,16 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALObserver; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; -import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.util.Bytes; -// REENABLE import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -65,9 +66,11 @@ private static final AtomicBoolean REPLICATING = new AtomicBoolean(false); + private static Replication replication; + private static ReplicationSourceManager manager; - // REENALBE private static ZooKeeperWrapper zkw; + private static ZooKeeperWatcher zkw; private static HTableDescriptor htd; @@ -100,26 +103,26 @@ utility = new HBaseTestingUtility(conf); utility.startMiniZKCluster(); - // REENABLE -// zkw = ZooKeeperWrapper.createInstance(conf, "test"); -// zkw.writeZNode("/hbase", "replication", ""); -// zkw.writeZNode("/hbase/replication", "master", -// conf.get(HConstants.ZOOKEEPER_QUORUM)+":" + -// conf.get("hbase.zookeeper.property.clientPort")+":/1"); -// zkw.writeZNode("/hbase/replication/peers", "1", -// conf.get(HConstants.ZOOKEEPER_QUORUM)+":" + -// conf.get("hbase.zookeeper.property.clientPort")+":/1"); + zkw = new ZooKeeperWatcher(conf, "test", null); + ZKUtil.createWithParents(zkw, "/hbase/replication"); + ZKUtil.createWithParents(zkw, "/hbase/replication/master"); + ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1"); + ZKUtil.setData(zkw, "/hbase/replication/master", + Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM)+":" + + conf.get("hbase.zookeeper.property.clientPort")+":/1")); + ZKUtil.setData(zkw, "/hbase/replication/peers/1",Bytes.toBytes( + conf.get(HConstants.ZOOKEEPER_QUORUM)+":" + + conf.get("hbase.zookeeper.property.clientPort")+":/1")); - HRegionServer server = new HRegionServer(conf); - ReplicationZookeeper helper = new ReplicationZookeeper(server, REPLICATING); + //HRegionServer server = new HRegionServer(conf); + replication = new Replication(new DummyServer(), fs, logDir, oldLogDir); + manager = replication.getReplicationManager(); fs = FileSystem.get(conf); oldLogDir = new Path(utility.getTestDir(), HConstants.HREGION_OLDLOGDIR_NAME); logDir = new Path(utility.getTestDir(), HConstants.HREGION_LOGDIR_NAME); - manager = new ReplicationSourceManager(helper, - conf, server, fs, REPLICATING, logDir, oldLogDir); manager.addSource("1"); htd = new HTableDescriptor(test); @@ -137,7 +140,7 @@ @AfterClass public static void tearDownAfterClass() throws Exception { -// REENABLE manager.join(); + manager.join(); utility.shutdownMiniCluster(); } @@ -152,7 +155,7 @@ setUp(); } - @Ignore @Test + @Test public void testLogRoll() throws Exception { long seq = 0; long baseline = 1000; @@ -160,8 +163,9 @@ KeyValue kv = new KeyValue(r1, f1, r1); WALEdit edit = new WALEdit(); edit.add(kv); + List listeners = new ArrayList(); -// REENABLE listeners.add(manager); + listeners.add(replication); HLog hlog = new HLog(fs, logDir, oldLogDir, conf, listeners, URLEncoder.encode("regionserver:60020", "UTF8")); @@ -195,17 +199,55 @@ hlog.rollWriter(); - // REENABLE manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(), - // REENABLE "1", 0, false); + manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(), + "1", 0, false); HLogKey key = new HLogKey(hri.getRegionName(), test, seq++, System.currentTimeMillis()); hlog.append(hri, key, edit); - // REENABLE assertEquals(1, manager.getHLogs().size()); + assertEquals(1, manager.getHLogs().size()); // TODO Need a case with only 2 HLogs and we only want to delete the first one } + static class DummyServer implements Server { + + @Override + public Configuration getConfiguration() { + return conf; + } + + @Override + public ZooKeeperWatcher getZooKeeper() { + return zkw; + } + + @Override + public CatalogTracker getCatalogTracker() { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public String getServerName() { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public void abort(String why, Throwable e) { + //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public void stop(String why) { + //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public boolean isStopped() { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + } + } Index: src/test/java/org/apache/hadoop/hbase/replication/regionserver/DISABLEDTestReplicationSink.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/replication/regionserver/DISABLEDTestReplicationSink.java (revision 1021408) +++ src/test/java/org/apache/hadoop/hbase/replication/regionserver/DISABLEDTestReplicationSink.java (working copy) @@ -1,263 +0,0 @@ -/* - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication.regionserver; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.Stoppable; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.regionserver.wal.HLog; -import org.apache.hadoop.hbase.regionserver.wal.HLogKey; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Ignore; -import org.junit.Test; - -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class DISABLEDTestReplicationSink { - - private static final Log LOG = - LogFactory.getLog(DISABLEDTestReplicationSink.class); - - private static final int BATCH_SIZE = 10; - - private static final long SLEEP_TIME = 500; - - private final static Configuration conf = HBaseConfiguration.create(); - - private final static HBaseTestingUtility TEST_UTIL = - new HBaseTestingUtility(); - - private static ReplicationSink SINK; - - private static final byte[] TABLE_NAME1 = - Bytes.toBytes("table1"); - private static final byte[] TABLE_NAME2 = - Bytes.toBytes("table2"); - - private static final byte[] FAM_NAME1 = Bytes.toBytes("info1"); - private static final byte[] FAM_NAME2 = Bytes.toBytes("info2"); - - private static HTable table1; - private static Stoppable STOPPABLE = new Stoppable() { - final AtomicBoolean stop = new AtomicBoolean(false); - - @Override - public boolean isStopped() { - return this.stop.get(); - } - - @Override - public void stop(String why) { - LOG.info("STOPPING BECAUSE: " + why); - this.stop.set(true); - } - - }; - - private static HTable table2; - - /** - * @throws java.lang.Exception - */ - @BeforeClass - public static void setUpBeforeClass() throws Exception { - TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true); - TEST_UTIL.getConfiguration().setBoolean( - HConstants.REPLICATION_ENABLE_KEY, true); - TEST_UTIL.startMiniCluster(3); - conf.setBoolean("dfs.support.append", true); - SINK = new ReplicationSink(conf, STOPPABLE); - table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1); - table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2); - } - - /** - * @throws java.lang.Exception - */ - @AfterClass - public static void tearDownAfterClass() throws Exception { - STOPPABLE.stop("Shutting down"); - TEST_UTIL.shutdownMiniCluster(); - } - - /** - * @throws java.lang.Exception - */ - @Before - public void setUp() throws Exception { - table1 = TEST_UTIL.truncateTable(TABLE_NAME1); - table2 = TEST_UTIL.truncateTable(TABLE_NAME2); - Thread.sleep(SLEEP_TIME); - } - - /** - * Insert a whole batch of entries - * @throws Exception - */ - @Ignore @Test - public void testBatchSink() throws Exception { - HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE]; - for(int i = 0; i < BATCH_SIZE; i++) { - entries[i] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put); - } - SINK.replicateEntries(entries); - Scan scan = new Scan(); - ResultScanner scanRes = table1.getScanner(scan); - assertEquals(BATCH_SIZE, scanRes.next(BATCH_SIZE).length); - } - - /** - * Insert a mix of puts and deletes - * @throws Exception - */ - @Ignore @Test - public void testMixedPutDelete() throws Exception { - HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE/2]; - for(int i = 0; i < BATCH_SIZE/2; i++) { - entries[i] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put); - } - SINK.replicateEntries(entries); - - entries = new HLog.Entry[BATCH_SIZE]; - for(int i = 0; i < BATCH_SIZE; i++) { - entries[i] = createEntry(TABLE_NAME1, i, - i % 2 != 0 ? KeyValue.Type.Put: KeyValue.Type.DeleteColumn); - } - - SINK.replicateEntries(entries); - Scan scan = new Scan(); - ResultScanner scanRes = table1.getScanner(scan); - assertEquals(BATCH_SIZE/2, scanRes.next(BATCH_SIZE).length); - } - - /** - * Insert to 2 different tables - * @throws Exception - */ - @Ignore @Test - public void testMixedPutTables() throws Exception { - HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE]; - for(int i = 0; i < BATCH_SIZE; i++) { - entries[i] = - createEntry( i % 2 == 0 ? TABLE_NAME2 : TABLE_NAME1, - i, KeyValue.Type.Put); - } - - SINK.replicateEntries(entries); - Scan scan = new Scan(); - ResultScanner scanRes = table2.getScanner(scan); - for(Result res : scanRes) { - assertTrue(Bytes.toInt(res.getRow()) % 2 == 0); - } - } - - /** - * Insert then do different types of deletes - * @throws Exception - */ - @Ignore @Test - public void testMixedDeletes() throws Exception { - HLog.Entry[] entries = new HLog.Entry[3]; - for(int i = 0; i < 3; i++) { - entries[i] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put); - } - SINK.replicateEntries(entries); - entries = new HLog.Entry[3]; - - entries[0] = createEntry(TABLE_NAME1, 0, KeyValue.Type.DeleteColumn); - entries[1] = createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily); - entries[2] = createEntry(TABLE_NAME1, 2, KeyValue.Type.DeleteColumn); - - SINK.replicateEntries(entries); - - Scan scan = new Scan(); - ResultScanner scanRes = table1.getScanner(scan); - assertEquals(0, scanRes.next(3).length); - } - - /** - * Puts are buffered, but this tests when a delete (not-buffered) is applied - * before the actual Put that creates it. - * @throws Exception - */ - @Ignore @Test - public void testApplyDeleteBeforePut() throws Exception { - HLog.Entry[] entries = new HLog.Entry[5]; - for(int i = 0; i < 2; i++) { - entries[i] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put); - } - entries[2] = createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily); - for(int i = 3; i < 5; i++) { - entries[i] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put); - } - SINK.replicateEntries(entries); - Get get = new Get(Bytes.toBytes(1)); - Result res = table1.get(get); - assertEquals(0, res.size()); - } - - private HLog.Entry createEntry(byte [] table, int row, KeyValue.Type type) { - byte[] fam = Bytes.equals(table, TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2; - byte[] rowBytes = Bytes.toBytes(row); - // Just make sure we don't get the same ts for two consecutive rows with - // same key - try { - Thread.sleep(1); - } catch (InterruptedException e) { - LOG.info("Was interrupted while sleep, meh", e); - } - final long now = System.currentTimeMillis(); - KeyValue kv = null; - if(type.getCode() == KeyValue.Type.Put.getCode()) { - kv = new KeyValue(rowBytes, fam, fam, now, - KeyValue.Type.Put, Bytes.toBytes(row)); - } else if (type.getCode() == KeyValue.Type.DeleteColumn.getCode()) { - kv = new KeyValue(rowBytes, fam, fam, - now, KeyValue.Type.DeleteColumn); - } else if (type.getCode() == KeyValue.Type.DeleteFamily.getCode()) { - kv = new KeyValue(rowBytes, fam, null, - now, KeyValue.Type.DeleteFamily); - } - - HLogKey key = new HLogKey(table, table, now, now); - - WALEdit edit = new WALEdit(); - edit.add(kv); - - return new HLog.Entry(key, edit); - } -} Index: src/test/java/org/apache/hadoop/hbase/replication/regionserver/DISABLEDTestReplicationSourceManager.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/replication/regionserver/DISABLEDTestReplicationSourceManager.java (revision 1021408) +++ src/test/java/org/apache/hadoop/hbase/replication/regionserver/DISABLEDTestReplicationSourceManager.java (working copy) @@ -1,211 +0,0 @@ -/* - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication.regionserver; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.wal.HLog; -import org.apache.hadoop.hbase.regionserver.wal.HLogKey; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.regionserver.wal.WALObserver; -import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; -import org.apache.hadoop.hbase.replication.ReplicationZookeeper; -import org.apache.hadoop.hbase.util.Bytes; -// REENABLE import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Ignore; -import org.junit.Test; - -import java.net.URLEncoder; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.junit.Assert.assertEquals; - -public class DISABLEDTestReplicationSourceManager { - - private static final Log LOG = - LogFactory.getLog(DISABLEDTestReplicationSourceManager.class); - - private static Configuration conf; - - private static HBaseTestingUtility utility; - - private static final AtomicBoolean REPLICATING = new AtomicBoolean(false); - - private static ReplicationSourceManager manager; - - // REENALBE private static ZooKeeperWrapper zkw; - - private static HTableDescriptor htd; - - private static HRegionInfo hri; - - private static final byte[] r1 = Bytes.toBytes("r1"); - - private static final byte[] r2 = Bytes.toBytes("r2"); - - private static final byte[] f1 = Bytes.toBytes("f1"); - - private static final byte[] f2 = Bytes.toBytes("f2"); - - private static final byte[] test = Bytes.toBytes("test"); - - private static FileSystem fs; - - private static Path oldLogDir; - - private static Path logDir; - - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - - conf = HBaseConfiguration.create(); - conf.set("replication.replicationsource.implementation", - ReplicationSourceDummy.class.getCanonicalName()); - conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true); - utility = new HBaseTestingUtility(conf); - utility.startMiniZKCluster(); - - // REENABLE -// zkw = ZooKeeperWrapper.createInstance(conf, "test"); -// zkw.writeZNode("/hbase", "replication", ""); -// zkw.writeZNode("/hbase/replication", "master", -// conf.get(HConstants.ZOOKEEPER_QUORUM)+":" + -// conf.get("hbase.zookeeper.property.clientPort")+":/1"); -// zkw.writeZNode("/hbase/replication/peers", "1", -// conf.get(HConstants.ZOOKEEPER_QUORUM)+":" + -// conf.get("hbase.zookeeper.property.clientPort")+":/1"); - - HRegionServer server = new HRegionServer(conf); - ReplicationZookeeper helper = new ReplicationZookeeper(server, REPLICATING); - fs = FileSystem.get(conf); - oldLogDir = new Path(utility.getTestDir(), - HConstants.HREGION_OLDLOGDIR_NAME); - logDir = new Path(utility.getTestDir(), - HConstants.HREGION_LOGDIR_NAME); - - manager = new ReplicationSourceManager(helper, - conf, server, fs, REPLICATING, logDir, oldLogDir); - manager.addSource("1"); - - htd = new HTableDescriptor(test); - HColumnDescriptor col = new HColumnDescriptor("f1"); - col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); - htd.addFamily(col); - col = new HColumnDescriptor("f2"); - col.setScope(HConstants.REPLICATION_SCOPE_LOCAL); - htd.addFamily(col); - - hri = new HRegionInfo(htd, r1, r2); - - - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { -// REENABLE manager.join(); - utility.shutdownMiniCluster(); - } - - @Before - public void setUp() throws Exception { - fs.delete(logDir, true); - fs.delete(oldLogDir, true); - } - - @After - public void tearDown() throws Exception { - setUp(); - } - - @Ignore @Test - public void testLogRoll() throws Exception { - long seq = 0; - long baseline = 1000; - long time = baseline; - KeyValue kv = new KeyValue(r1, f1, r1); - WALEdit edit = new WALEdit(); - edit.add(kv); - List listeners = new ArrayList(); -// REENABLE listeners.add(manager); - HLog hlog = new HLog(fs, logDir, oldLogDir, conf, listeners, - URLEncoder.encode("regionserver:60020", "UTF8")); - - manager.init(); - - // Testing normal log rolling every 20 - for(long i = 1; i < 101; i++) { - if(i > 1 && i % 20 == 0) { - hlog.rollWriter(); - } - LOG.info(i); - HLogKey key = new HLogKey(hri.getRegionName(), - test, seq++, System.currentTimeMillis()); - hlog.append(hri, key, edit); - } - - // Simulate a rapid insert that's followed - // by a report that's still not totally complete (missing last one) - LOG.info(baseline + " and " + time); - baseline += 101; - time = baseline; - LOG.info(baseline + " and " + time); - - for (int i = 0; i < 3; i++) { - HLogKey key = new HLogKey(hri.getRegionName(), - test, seq++, System.currentTimeMillis()); - hlog.append(hri, key, edit); - } - - assertEquals(6, manager.getHLogs().size()); - - hlog.rollWriter(); - - // REENABLE manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(), - // REENABLE "1", 0, false); - - HLogKey key = new HLogKey(hri.getRegionName(), - test, seq++, System.currentTimeMillis()); - hlog.append(hri, key, edit); - - // REENABLE assertEquals(1, manager.getHLogs().size()); - - - // TODO Need a case with only 2 HLogs and we only want to delete the first one - } - -} Index: src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java (revision 1003174) +++ src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java (working copy) @@ -47,10 +47,10 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -public class DISABLEDTestReplicationSink { +public class TestReplicationSink { private static final Log LOG = - LogFactory.getLog(DISABLEDTestReplicationSink.class); + LogFactory.getLog(TestReplicationSink.class); private static final int BATCH_SIZE = 10; @@ -128,7 +128,7 @@ * Insert a whole batch of entries * @throws Exception */ - @Ignore @Test + @Test public void testBatchSink() throws Exception { HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE]; for(int i = 0; i < BATCH_SIZE; i++) { @@ -144,7 +144,7 @@ * Insert a mix of puts and deletes * @throws Exception */ - @Ignore @Test + @Test public void testMixedPutDelete() throws Exception { HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE/2]; for(int i = 0; i < BATCH_SIZE/2; i++) { @@ -168,7 +168,7 @@ * Insert to 2 different tables * @throws Exception */ - @Ignore @Test + @Test public void testMixedPutTables() throws Exception { HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE]; for(int i = 0; i < BATCH_SIZE; i++) { @@ -189,7 +189,7 @@ * Insert then do different types of deletes * @throws Exception */ - @Ignore @Test + @Test public void testMixedDeletes() throws Exception { HLog.Entry[] entries = new HLog.Entry[3]; for(int i = 0; i < 3; i++) { @@ -214,7 +214,7 @@ * before the actual Put that creates it. * @throws Exception */ - @Ignore @Test + @Test public void testApplyDeleteBeforePut() throws Exception { HLog.Entry[] entries = new HLog.Entry[5]; for(int i = 0; i < 2; i++) { Index: src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java (revision 1021408) +++ src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java (working copy) @@ -69,7 +69,7 @@ * time reading logs that are being archived. * @throws Exception */ - @Ignore @Test + @Test public void testLogMoving() throws Exception{ Path logPath = new Path(logDir, "log"); HLog.Writer writer = HLog.createWriter(fs, logPath, conf); Index: src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (revision 1021408) +++ src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (working copy) @@ -916,7 +916,7 @@ public static void deleteChildrenRecursively(ZooKeeperWatcher zkw, String node) throws KeeperException { List children = ZKUtil.listChildrenNoWatch(zkw, node); - if(!children.isEmpty()) { + if(children != null || !children.isEmpty()) { for(String child : children) { deleteNodeRecursively(zkw, joinZNode(node, child)); } Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (revision 1021408) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (working copy) @@ -36,8 +36,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.replication.ReplicationZookeeper; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; /** * This class is responsible to manage all the replication @@ -104,8 +104,10 @@ this.fs = fs; this.logDir = logDir; this.oldLogDir = oldLogDir; + this.zkHelper.registerRegionServerListener( + new OtherRegionServerWatcher(this.zkHelper.getZookeeperWatcher())); List otherRSs = - this.zkHelper.getRegisteredRegionServers(new OtherRegionServerWatcher()); + this.zkHelper.getRegisteredRegionServers(); this.otherRegionServers = otherRSs == null ? new ArrayList() : otherRSs; } @@ -145,7 +147,10 @@ ReplicationSourceInterface src = addSource(id); src.startup(); } - List currentReplicators = this.zkHelper.getListOfReplicators(); + List currentReplicators = this.zkHelper.getRegisteredRegionServers(); + if (currentReplicators == null || currentReplicators.size() == 0) { + return; + } synchronized (otherRegionServers) { LOG.info("Current list of replicators: " + currentReplicators + " other RSs: " + otherRegionServers); @@ -322,29 +327,59 @@ * in the local cluster. It initiates the process to transfer the queues * if it is able to grab the lock. */ - public class OtherRegionServerWatcher implements Watcher { - @Override - public void process(WatchedEvent watchedEvent) { - LOG.info(" event " + watchedEvent); - if (watchedEvent.getType().equals(Event.KeeperState.Expired) || - watchedEvent.getType().equals(Event.KeeperState.Disconnected)) { + public class OtherRegionServerWatcher extends ZooKeeperListener { + + /** + * Construct a ZooKeeper event listener. + */ + public OtherRegionServerWatcher(ZooKeeperWatcher watcher) { + super(watcher); + } + + /** + * Called when a new node has been created. + * @param path full path of the new node + */ + public void nodeCreated(String path) { + refreshRegionServersList(path); + } + + /** + * Called when a node has been deleted + * @param path full path of the deleted node + */ + public void nodeDeleted(String path) { + boolean cont = refreshRegionServersList(path); + if (!cont) { return; } + LOG.info(path + " znode expired, trying to lock it"); + String[] rsZnodeParts = path.split("/"); + transferQueues(rsZnodeParts[rsZnodeParts.length-1]); + } - List newRsList = (zkHelper.getRegisteredRegionServers(this)); + /** + * Called when an existing node has a child node added or removed. + * @param path full path of the node whose children have changed + */ + public void nodeChildrenChanged(String path) { + refreshRegionServersList(path); + } + + private boolean refreshRegionServersList(String path) { + if (!path.startsWith(zkHelper.getZookeeperWatcher().rsZNode)) { + return false; + } + List newRsList = (zkHelper.getRegisteredRegionServers()); if (newRsList == null) { - return; + return false; } else { synchronized (otherRegionServers) { otherRegionServers.clear(); otherRegionServers.addAll(newRsList); } } - if (watchedEvent.getType().equals(Event.EventType.NodeDeleted)) { - LOG.info(watchedEvent.getPath() + " znode expired, trying to lock it"); - String[] rsZnodeParts = watchedEvent.getPath().split("/"); - transferQueues(rsZnodeParts[rsZnodeParts.length-1]); - } + return true; } } Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (revision 1021408) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (working copy) @@ -45,7 +45,6 @@ public class Replication implements WALObserver { private final boolean replication; private final ReplicationSourceManager replicationManager; - private boolean replicationMaster; private final AtomicBoolean replicating = new AtomicBoolean(true); private final ReplicationZookeeper zkHelper; private final Configuration conf; @@ -70,10 +69,8 @@ this.replication = isReplication(this.conf); if (replication) { this.zkHelper = new ReplicationZookeeper(server, this.replicating); - this.replicationMaster = zkHelper.isReplicationMaster(); - this.replicationManager = this.replicationMaster ? - new ReplicationSourceManager(zkHelper, conf, this.server, - fs, this.replicating, logDir, oldLogDir) : null; + this.replicationManager = new ReplicationSourceManager(zkHelper, conf, + this.server, fs, this.replicating, logDir, oldLogDir) ; } else { this.replicationManager = null; this.zkHelper = null; @@ -93,10 +90,8 @@ */ public void join() { if (this.replication) { - if (this.replicationMaster) { - this.replicationManager.join(); - } - this.zkHelper.deleteOwnRSZNode(); + this.replicationManager.join(); + this.zkHelper.deleteOwnRSZNode(); } } @@ -106,7 +101,7 @@ * @throws IOException */ public void replicateLogEntries(HLog.Entry[] entries) throws IOException { - if (this.replication && !this.replicationMaster) { + if (this.replication) { this.replicationSink.replicateEntries(entries); } } @@ -118,11 +113,8 @@ */ public void startReplicationServices() throws IOException { if (this.replication) { - if (this.replicationMaster) { - this.replicationManager.init(); - } else { - this.replicationSink = new ReplicationSink(this.conf, this.server); - } + this.replicationManager.init(); + this.replicationSink = new ReplicationSink(this.conf, this.server); } } Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (revision 1021408) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (working copy) @@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; +import org.apache.zookeeper.KeeperException; /** * Class that handles the source of a replication stream. @@ -195,7 +196,7 @@ /** * Select a number of peers at random using the ratio. Mininum 1. */ - private void chooseSinks() { + private void chooseSinks() throws KeeperException { this.currentPeers.clear(); List addresses = this.zkHelper.getPeersAddresses(peerClusterId); @@ -231,8 +232,14 @@ // If this is recovered, the queue is already full and the first log // normally has a position (unless the RS failed between 2 logs) if (this.queueRecovered) { -// this.position = this.zkHelper.getHLogRepPosition( -// this.peerClusterZnode, this.queue.peek().getName()); + try { + this.position = this.zkHelper.getHLogRepPosition( + this.peerClusterZnode, this.queue.peek().getName()); + } catch (KeeperException e) { + LOG.error("Couldn't get the position of this recovered queue " + + peerClusterZnode, e); + this.abort(); + } } int sleepMultiplier = 1; // Loop until we close down @@ -380,6 +387,8 @@ Thread.sleep(this.sleepForRetries); } catch (InterruptedException e) { LOG.error("Interrupted while trying to connect to sinks", e); + } catch (KeeperException e) { + LOG.error("Error talking to zookeeper, retrying", e); } } } @@ -553,6 +562,8 @@ } while (!this.stopper.isStopped() && down); } catch (InterruptedException e) { LOG.debug("Interrupted while trying to contact the peer cluster"); + } catch (KeeperException e) { + LOG.error("Error talking to zookeeper, retrying", e); } } @@ -589,7 +600,7 @@ } }; Threads.setDaemonThreadRunning( - this, n + ".replicationSource," + clusterId, handler); + this, n + ".replicationSource," + peerClusterZnode, handler); } /** Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (revision 1021408) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (working copy) @@ -38,7 +38,6 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; -import java.util.concurrent.atomic.AtomicBoolean; /** * This class is responsible for replicating the edits coming Index: src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java (revision 1021408) +++ src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java (working copy) @@ -23,24 +23,21 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.LogCleanerDelegate; import org.apache.hadoop.hbase.replication.ReplicationZookeeper; -// REENALBE import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; import java.io.IOException; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; /** * Implementation of a log cleaner that checks if a log is still scheduled for * replication before deleting it when its TTL is over. */ -public class ReplicationLogCleaner implements LogCleanerDelegate, Watcher { +public class ReplicationLogCleaner implements LogCleanerDelegate { private static final Log LOG = LogFactory.getLog(ReplicationLogCleaner.class); @@ -78,31 +75,30 @@ private boolean refreshHLogsAndSearch(String searchedLog) { this.hlogs.clear(); final boolean lookForLog = searchedLog != null; -// REENALBE -// List rss = zkHelper.getListOfReplicators(this); -// if (rss == null) { -// LOG.debug("Didn't find any region server that replicates, deleting: " + -// searchedLog); -// return false; -// } -// for (String rs: rss) { -// List listOfPeers = zkHelper.getListPeersForRS(rs, this); -// // if rs just died, this will be null -// if (listOfPeers == null) { -// continue; -// } -// for (String id : listOfPeers) { -// List peersHlogs = zkHelper.getListHLogsForPeerForRS(rs, id, this); -// if (peersHlogs != null) { -// this.hlogs.addAll(peersHlogs); -// } -// // early exit if we found the log -// if(lookForLog && this.hlogs.contains(searchedLog)) { -// LOG.debug("Found log in ZK, keeping: " + searchedLog); -// return true; -// } -// } -// } + List rss = zkHelper.getListOfReplicators(); + if (rss == null) { + LOG.debug("Didn't find any region server that replicates, deleting: " + + searchedLog); + return false; + } + for (String rs: rss) { + List listOfPeers = zkHelper.getListPeersForRS(rs); + // if rs just died, this will be null + if (listOfPeers == null) { + continue; + } + for (String id : listOfPeers) { + List peersHlogs = zkHelper.getListHLogsForPeerForRS(rs, id); + if (peersHlogs != null) { + this.hlogs.addAll(peersHlogs); + } + // early exit if we found the log + if(lookForLog && this.hlogs.contains(searchedLog)) { + LOG.debug("Found log in ZK, keeping: " + searchedLog); + return true; + } + } + } LOG.debug("Didn't find this log in ZK, deleting: " + searchedLog); return false; } @@ -110,15 +106,15 @@ @Override public void setConf(Configuration conf) { this.conf = conf; -// try { - // REENABLE -// this.zkHelper = new ReplicationZookeeperWrapper( -// ZooKeeperWrapper.createInstance(this.conf, -// HMaster.class.getName()), -// this.conf, new AtomicBoolean(true), null); -// } catch (IOException e) { -// LOG.error(e); -// } + try { + ZooKeeperWatcher zkw = + new ZooKeeperWatcher(conf, this.getClass().getName(), null); + this.zkHelper = new ReplicationZookeeper(conf, zkw); + } catch (KeeperException e) { + LOG.error("Error while configuring " + this.getClass().getName(), e); + } catch (IOException e) { + LOG.error("Error while configuring " + this.getClass().getName(), e); + } refreshHLogsAndSearch(null); } @@ -126,7 +122,4 @@ public Configuration getConf() { return conf; } - - @Override - public void process(WatchedEvent watchedEvent) {} } Index: src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (revision 1021408) +++ src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (working copy) @@ -39,11 +39,11 @@ import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.util.StringUtils; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.Watcher; /** * This class serves as a helper for all things related to zookeeper @@ -81,31 +81,39 @@ // Our handle on zookeeper private final ZooKeeperWatcher zookeeper; // Map of addresses of peer clusters with their ZKW - private final Map peerClusters; + private Map peerClusters; // Path to the root replication znode - private final String replicationZNode; + private String replicationZNode; // Path to the peer clusters znode - private final String peersZNode; + private String peersZNode; // Path to the znode that contains all RS that replicates - private final String rsZNode; + private String rsZNode; // Path to this region server's name under rsZNode - private final String rsServerNameZnode; + private String rsServerNameZnode; // Name node if the replicationState znode - private final String replicationStateNodeName; + private String replicationStateNodeName; // If this RS is part of a master cluster - private final boolean replicationMaster; + private boolean replicationMaster; private final Configuration conf; // Is this cluster replicating at the moment? - private final AtomicBoolean replicating; + private AtomicBoolean replicating; // Byte (stored as string here) that identifies this cluster - private final String clusterId; + private String clusterId; // Abortable - private final Abortable abortable; + private Abortable abortable; + public ReplicationZookeeper(final Configuration conf, final ZooKeeperWatcher zk) + throws KeeperException { + + this.conf = conf; + this.zookeeper = zk; + setZNodes(); + } + /** * Constructor used by region servers, connects to the peer cluster right away. * - * @param zookeeper + * @param server * @param replicating atomic boolean to start/stop replication * @throws IOException * @throws KeeperException @@ -115,6 +123,27 @@ this.abortable = server; this.zookeeper = server.getZooKeeper(); this.conf = server.getConfiguration(); + setZNodes(); + + this.peerClusters = new HashMap(); + this.replicating = replicating; + setReplicating(); + this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, server.getServerName()); + ZKUtil.createWithParents(this.zookeeper, this.rsServerNameZnode); + // Set a tracker on replicationStateNodeNode + ReplicationStatusTracker tracker = + new ReplicationStatusTracker(this.zookeeper, server); + tracker.start(); + + List znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode); + if (znodes != null) { + for (String z : znodes) { + connectToPeer(z); + } + } + } + + private void setZNodes() throws KeeperException { String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); String peersZNodeName = @@ -130,15 +159,11 @@ String thisCluster = this.conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + this.conf.get("hbase.zookeeper.property.clientPort") + ":" + this.conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT); - - this.peerClusters = new HashMap(); this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName); this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName); this.rsZNode = ZKUtil.joinZNode(replicationZNode, rsZNodeName); - this.replicating = replicating; - setReplicating(); String znode = ZKUtil.joinZNode(this.replicationZNode, clusterIdZNodeName); byte [] data = ZKUtil.getData(this.zookeeper, znode); String idResult = Bytes.toString(data); @@ -152,24 +177,6 @@ LOG.info("This cluster (" + thisCluster + ") is a " + (this.replicationMaster ? "master" : "slave") + " for replication" + ", compared with (" + address + ")"); - - if (server.getServerName() != null) { - this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, server.getServerName()); - // Set a tracker on replicationStateNodeNode - ReplicationStatusTracker tracker = - new ReplicationStatusTracker(this.zookeeper, getRepStateNode(), server); - tracker.start(); - - List znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode); - if (znodes != null) { - for (String z : znodes) { - connectToPeer(z); - } - } - } else { - this.rsServerNameZnode = null; - } - } /** @@ -178,50 +185,19 @@ * @param peerClusterId (byte) the cluster to interrogate * @return addresses of all region servers */ - public List getPeersAddresses(String peerClusterId) { + public List getPeersAddresses(String peerClusterId) + throws KeeperException { if (this.peerClusters.size() == 0) { return new ArrayList(0); } - ReplicationZookeeper zkw = this.peerClusters.get(peerClusterId); + ZooKeeperWatcher zkw = this.peerClusters.get(peerClusterId); + return zkw == null? new ArrayList(0): - zkw.scanAddressDirectory(this.zookeeper.rsZNode); + ZKUtil.listChildrenAndGetAsAddresses(zkw, zkw.rsZNode); } /** - * Scan a directory of address data. - * @param znode The parent node - * @return The directory contents as HServerAddresses - */ - public List scanAddressDirectory(String znode) { - List list = new ArrayList(); - List nodes = null; - try { - nodes = ZKUtil.listChildrenNoWatch(this.zookeeper, znode); - } catch (KeeperException e) { - this.abortable.abort("Scanning " + znode, e); - } - if (nodes == null) { - return list; - } - for (String node : nodes) { - String path = ZKUtil.joinZNode(znode, node); - list.add(readAddress(path)); - } - return list; - } - - private HServerAddress readAddress(String znode) { - byte [] data = null; - try { - data = ZKUtil.getData(this.zookeeper, znode); - } catch (KeeperException e) { - this.abortable.abort("Getting address", e); - } - return new HServerAddress(Bytes.toString(data)); - } - - /** * This method connects this cluster to another one and registers it * in this region server's replication znode * @param peerId id of the peer cluster @@ -239,15 +215,11 @@ otherConf.set(HConstants.ZOOKEEPER_QUORUM, ensemble[0]); otherConf.set("hbase.zookeeper.property.clientPort", ensemble[1]); otherConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, ensemble[2]); - // REENABLE -- FIX!!!! - /* - ZooKeeperWrapper zkw = ZooKeeperWrapper.createInstance(otherConf, - "connection to cluster: " + peerId); - zkw.registerListener(new ReplicationStatusWatcher()); + ZooKeeperWatcher zkw = new ZooKeeperWatcher(otherConf, + "connection to cluster: " + peerId, this.abortable); this.peerClusters.put(peerId, zkw); - this.zookeeperWrapper.ensureExists(this.zookeeperWrapper.getZNode( + ZKUtil.createWithParents(this.zookeeper, ZKUtil.joinZNode( this.rsServerNameZnode, peerId)); - */ LOG.info("Added new peer cluster " + StringUtils.arrayToString(ensemble)); } @@ -282,7 +254,7 @@ try { String znode = ZKUtil.joinZNode(this.rsServerNameZnode, clusterId); znode = ZKUtil.joinZNode(znode, filename); - ZKUtil.createAndWatch(this.zookeeper, znode, Bytes.toBytes("")); + ZKUtil.createWithParents(this.zookeeper, znode); } catch (KeeperException e) { this.abortable.abort("Failed add log to list", e); } @@ -297,7 +269,7 @@ try { String znode = ZKUtil.joinZNode(rsServerNameZnode, clusterId); znode = ZKUtil.joinZNode(znode, filename); - ZKUtil.deleteChildrenRecursively(this.zookeeper, znode); + ZKUtil.deleteNode(this.zookeeper, znode); } catch (KeeperException e) { this.abortable.abort("Failed remove from list", e); } @@ -316,7 +288,7 @@ String znode = ZKUtil.joinZNode(this.rsServerNameZnode, clusterId); znode = ZKUtil.joinZNode(znode, filename); // Why serialize String of Long and note Long as bytes? - ZKUtil.createAndWatch(this.zookeeper, znode, + ZKUtil.setData(this.zookeeper, znode, Bytes.toBytes(Long.toString(position))); } catch (KeeperException e) { this.abortable.abort("Writing replication status", e); @@ -326,15 +298,18 @@ /** * Get a list of all the other region servers in this cluster * and set a watch - * @param watch the watch to set * @return a list of server nanes */ - public List getRegisteredRegionServers(Watcher watch) { + public List getRegisteredRegionServers() { List result = null; try { - // TODO: This is rsZNode from zk which is like getListOfReplicators - // but maybe these are from different zk instances? - result = ZKUtil.listChildrenNoWatch(this.zookeeper, rsZNode); + List nads = + ZKUtil.watchAndGetNewChildren(this.zookeeper, this.zookeeper.rsZNode); + result = new ArrayList(nads.size()); + for (ZKUtil.NodeAndData nad : nads) { + String[] fullPath = nad.getNode().split("/"); + result.add(fullPath[fullPath.length - 1]); + } } catch (KeeperException e) { this.abortable.abort("Get list of registered region servers", e); } @@ -344,7 +319,6 @@ /** * Get the list of the replicators that have queues, they can be alive, dead * or simply from a previous run - * @param watch the watche to set * @return a list of server names */ public List getListOfReplicators() { @@ -360,7 +334,6 @@ /** * Get the list of peer clusters for the specified server names * @param rs server names of the rs - * @param watch the watch to set * @return a list of peer cluster */ public List getListPeersForRS(String rs) { @@ -378,7 +351,6 @@ * Get the list of hlogs for the specified region server and peer cluster * @param rs server names of the rs * @param id peer cluster - * @param watch the watch to set * @return a list of hlogs */ public List getListHLogsForPeerForRS(String rs, String id) { @@ -401,10 +373,14 @@ public boolean lockOtherRS(String znode) { try { String parent = ZKUtil.joinZNode(this.rsZNode, znode); + if (parent.equals(rsServerNameZnode)) { + LOG.warn("Won't lock because this is us, we're dead!"); + return false; + } String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE); ZKUtil.createAndWatch(this.zookeeper, p, Bytes.toBytes(rsServerNameZnode)); } catch (KeeperException e) { - this.abortable.abort("Failed lock other rs", e); + LOG.info("Failed lock other rs", e); } return true; } @@ -468,7 +444,7 @@ */ public void deleteSource(String peerZnode) { try { - ZKUtil.deleteChildrenRecursively(this.zookeeper, + ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(rsServerNameZnode, peerZnode)); } catch (KeeperException e) { this.abortable.abort("Failed delete of " + peerZnode, e); @@ -481,7 +457,7 @@ */ public void deleteRsQueues(String znode) { try { - ZKUtil.deleteChildrenRecursively(this.zookeeper, + ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(rsZNode, znode)); } catch (KeeperException e) { this.abortable.abort("Failed delete of " + znode, e); @@ -492,7 +468,12 @@ * Delete this cluster's queues */ public void deleteOwnRSZNode() { - deleteRsQueues(this.rsServerNameZnode); + try { + ZKUtil.deleteNodeRecursively(this.zookeeper, + this.rsServerNameZnode); + } catch (KeeperException e) { + this.abortable.abort("Failed delete of " + this.rsServerNameZnode, e); + } } /** @@ -510,13 +491,8 @@ return data == null || data.length() == 0 ? 0 : Long.parseLong(data); } - /** - * Tells if this cluster replicates or not - * - * @return if this is a master - */ - public boolean isReplicationMaster() { - return this.replicationMaster; + public void registerRegionServerListener(ZooKeeperListener listener) { + this.zookeeper.registerListener(listener); } /** @@ -532,17 +508,29 @@ * Get a map of all peer clusters * @return map of peer cluster, zk address to ZKW */ - public Map getPeerClusters() { + public Map getPeerClusters() { return this.peerClusters; } + public String getRSZNode() { + return rsZNode; + } + /** + * + * @return + */ + public ZooKeeperWatcher getZookeeperWatcher() { + return this.zookeeper; + } + + /** * Tracker for status of the replication */ public class ReplicationStatusTracker extends ZooKeeperNodeTracker { - public ReplicationStatusTracker(ZooKeeperWatcher watcher, String node, + public ReplicationStatusTracker(ZooKeeperWatcher watcher, Abortable abortable) { - super(watcher, node, abortable); + super(watcher, getRepStateNode(), abortable); } @Override