Index: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java (revision 0) @@ -0,0 +1,142 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test handling of changes to the number of a peer's regionservers. + */ +@Category(LargeTests.class) +public class TestReplicationChangingPeerRegionservers extends TestReplicationBase { + + private static final Log LOG = LogFactory.getLog(TestReplicationChangingPeerRegionservers.class); + + /** + * @throws java.lang.Exception + */ + @Before + public void setUp() throws Exception { + htable1.setAutoFlush(true); + // Starting and stopping replication can make us miss new logs, + // rolling like this makes sure the most recent one gets added to the queue + for (JVMClusterUtil.RegionServerThread r : + utility1.getHBaseCluster().getRegionServerThreads()) { + r.getRegionServer().getWAL().rollWriter(); + } + utility1.truncateTable(tableName); + // truncating the table will send one Delete per row to the slave cluster + // in an async fashion, which is why we cannot just call truncateTable on + // utility2 since late writes could make it to the slave in some way. + // Instead, we truncate the first table and wait for all the Deletes to + // make it to the slave. + Scan scan = new Scan(); + int lastCount = 0; + for (int i = 0; i < NB_RETRIES; i++) { + if (i == NB_RETRIES - 1) { + fail("Waited too much time for truncate"); + } + ResultScanner scanner = htable2.getScanner(scan); + Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH); + scanner.close(); + if (res.length != 0) { + if (res.length < lastCount) { + i--; // Don't increment timeout if we make progress + } + lastCount = res.length; + LOG.info("Still got " + res.length + " rows"); + Thread.sleep(SLEEP_TIME); + } else { + break; + } + } + } + + @Test(timeout = 300000) + public void testReplication_KilledPeerSink() throws IOException, InterruptedException { + + LOG.info("testSimplePutDelete"); + MiniHBaseCluster peerCluster = utility2.getMiniHBaseCluster(); + + doPutTest(Bytes.toBytes(1)); + + int rsToStop = peerCluster.getServerWithMeta() == 0 ? 1 : 0; + peerCluster.stopRegionServer(rsToStop); + peerCluster.waitOnRegionServer(rsToStop); + + // Sanity check + assertEquals(1, peerCluster.getRegionServerThreads().size()); + + doPutTest(Bytes.toBytes(2)); + + peerCluster.startRegionServer(); + + // Sanity check + assertEquals(2, peerCluster.getRegionServerThreads().size()); + + doPutTest(Bytes.toBytes(3)); + + } + + private void doPutTest(byte[] row) throws IOException, InterruptedException { + Put put = new Put(row); + put.add(famName, row, row); + + htable1 = new HTable(conf1, tableName); + htable1.put(put); + + Get get = new Get(row); + for (int i = 0; i < NB_RETRIES; i++) { + if (i == NB_RETRIES - 1) { + fail("Waited too much time for put replication"); + } + Result res = htable2.get(get); + if (res.size() == 0) { + LOG.info("Row not available"); + Thread.sleep(SLEEP_TIME); + } else { + assertArrayEquals(res.value(), row); + break; + } + } + + } + +} Index: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java (revision 0) @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.List; + +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; +import org.apache.hadoop.hbase.replication.ReplicationPeers; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.Lists; + +@Category(SmallTests.class) +public class TestReplicationSinkManager { + + private static final String PEER_CLUSTER_ID = "PEER_CLUSTER_ID"; + + private ReplicationPeers replicationPeers; + private ReplicationSinkManager sinkManager; + + @Before + public void setUp() { + replicationPeers = mock(ReplicationPeers.class); + sinkManager = new ReplicationSinkManager(mock(HConnection.class), PEER_CLUSTER_ID, replicationPeers, + 0.1f); + } + + @Test + public void testChooseSinks() { + List serverNames = Lists.newArrayList(); + for (int i = 0; i < 20; i++) { + serverNames.add(mock(ServerName.class)); + } + + when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID)).thenReturn(serverNames); + + sinkManager.chooseSinks(); + + assertEquals(2, sinkManager.getSinks().size()); + + } + + @Test + public void testChooseSinks_LessThanRatioAvailable() { + List serverNames = Lists.newArrayList(mock(ServerName.class), + mock(ServerName.class)); + + when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID)).thenReturn(serverNames); + + sinkManager.chooseSinks(); + + assertEquals(1, sinkManager.getSinks().size()); + } + + @Test + public void testReportBadSink() { + ServerName serverNameA = mock(ServerName.class); + ServerName serverNameB = mock(ServerName.class); + when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID)).thenReturn( + Lists.newArrayList(serverNameA, serverNameB)); + + sinkManager.chooseSinks(); + // Sanity check + assertEquals(1, sinkManager.getSinks().size()); + + SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class)); + + sinkManager.reportBadSink(sinkPeer); + + // Just reporting a bad sink once shouldn't have an effect + assertEquals(1, sinkManager.getSinks().size()); + + } + + /** + * Once a SinkPeer has been reported as bad more than BAD_SINK_THRESHOLD times, it should not + * be replicated to anymore. + */ + @Test + public void testReportBadSink_PastThreshold() { + List serverNames = Lists.newArrayList(); + for (int i = 0; i < 20; i++) { + serverNames.add(mock(ServerName.class)); + } + when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID)).thenReturn(serverNames); + + + sinkManager.chooseSinks(); + // Sanity check + assertEquals(2, sinkManager.getSinks().size()); + + ServerName serverName = sinkManager.getSinks().get(0); + + SinkPeer sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class)); + + for (int i = 0; i <= ReplicationSinkManager.BAD_SINK_THRESHOLD; i++) { + sinkManager.reportBadSink(sinkPeer); + } + + // Reporting a bad sink more than the threshold count should remove it + // from the list of potential sinks + assertEquals(1, sinkManager.getSinks().size()); + } + + @Test + public void testReportBadSink_DownToZeroSinks() { + List serverNames = Lists.newArrayList(); + for (int i = 0; i < 20; i++) { + serverNames.add(mock(ServerName.class)); + } + when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID)).thenReturn(serverNames); + + + sinkManager.chooseSinks(); + // Sanity check + + List sinkList = sinkManager.getSinks(); + assertEquals(2, sinkList.size()); + + ServerName serverNameA = sinkList.get(0); + ServerName serverNameB = sinkList.get(1); + + SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class)); + SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AdminService.BlockingInterface.class)); + + for (int i = 0; i <= ReplicationSinkManager.BAD_SINK_THRESHOLD; i++) { + sinkManager.reportBadSink(sinkPeerA); + sinkManager.reportBadSink(sinkPeerB); + } + + // We've gone down to 0 good sinks, so the replication sinks + // should have been refreshed now + assertEquals(2, sinkManager.getSinks().size()); + } + +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java (revision 0) @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; +import org.apache.hadoop.hbase.replication.ReplicationPeers; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +/** + * Maintains a collection of peers to replicate to, and randomly selects a + * single peer to replicate to per set of data to replicate. Also handles + * keeping track of peer availability. + */ +public class ReplicationSinkManager { + + private static final Log LOG = LogFactory.getLog(ReplicationSinkManager.class); + + /** + * Once a ReplicationSink has been reported as bad more than this many times, + * it will no longer be provided as a sink for replication until the pool of + * replication sinks is refreshed. + */ + public static final int BAD_SINK_THRESHOLD = 3; + + private final HConnection conn; + + private final String peerClusterId; + + private final ReplicationPeers replicationPeers; + + // Count of "bad replication sink" reports per peer sink + private final Map badReportCounts; + + // Ratio of total number of potential sinks to be used + private final float ratio; + + private final Random random; + + // A timestamp of the last time the list of replication peers changed + private long lastUpdateToPeers; + + // The current pool of sinks to which replication can be performed + private List sinks = Lists.newArrayList(); + + public ReplicationSinkManager(HConnection conn, String peerClusterId, + ReplicationPeers replicationPeers, float ratio) { + this.conn = conn; + this.peerClusterId = peerClusterId; + this.replicationPeers = replicationPeers; + this.badReportCounts = Maps.newHashMap(); + this.ratio = ratio; + this.random = new Random(); + } + + /** + * Get a randomly-chosen replication sink to replicate to. + * + * @return a replication sink to replicate to + */ + public SinkPeer getReplicationSink() throws IOException { + if (replicationPeers.getTimestampOfLastChangeToPeer(peerClusterId) + > this.lastUpdateToPeers) { + LOG.info("Current list of sinks is out of date, updating"); + chooseSinks(); + } + + if (sinks.isEmpty()) { + throw new IOException("No replication sinks are available"); + } + ServerName serverName = sinks.get(random.nextInt(sinks.size())); + return new SinkPeer(serverName, conn.getAdmin(serverName)); + } + + /** + * Report a {@code SinkPeer} as being bad (i.e. an attempt to replicate to it + * failed). If a single SinkPeer is reported as bad more than + * {@link ReplicationSinkManager#BAD_SINK_THRESHOLD} times, it will be removed + * from the pool of potential replication targets. + * + * @param sinkPeer + * The SinkPeer that had a failed replication attempt on it + */ + public void reportBadSink(SinkPeer sinkPeer) { + ServerName serverName = sinkPeer.getServerName(); + int badReportCount = (badReportCounts.containsKey(serverName) + ? badReportCounts.get(serverName) : 0) + 1; + badReportCounts.put(serverName, badReportCount); + if (badReportCount > BAD_SINK_THRESHOLD) { + this.sinks.remove(serverName); + if (sinks.isEmpty()) { + chooseSinks(); + } + } + } + + void chooseSinks() { + List slaveAddresses = replicationPeers.getRegionServersOfConnectedPeer(peerClusterId); + Collections.shuffle(slaveAddresses, random); + int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio); + sinks = slaveAddresses.subList(0, numSinks); + lastUpdateToPeers = System.currentTimeMillis(); + badReportCounts.clear(); + } + + List getSinks() { + return sinks; + } + + /** + * Wraps a replication region server sink to provide the ability to identify + * it. + */ + public static class SinkPeer { + private ServerName serverName; + private AdminService.BlockingInterface regionServer; + + public SinkPeer(ServerName serverName, AdminService.BlockingInterface regionServer) { + this.serverName = serverName; + this.regionServer = regionServer; + } + + ServerName getServerName() { + return serverName; + } + + public AdminService.BlockingInterface getRegionServer() { + return regionServer; + } + + } + +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (revision 1508338) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (working copy) @@ -23,16 +23,11 @@ import java.io.IOException; import java.net.ConnectException; import java.net.SocketTimeoutException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; -import java.util.HashSet; import java.util.List; import java.util.NavigableMap; -import java.util.Random; -import java.util.Set; import java.util.UUID; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; @@ -45,20 +40,19 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; -import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.ipc.RemoteException; @@ -89,9 +83,6 @@ private ReplicationQueues replicationQueues; private ReplicationPeers replicationPeers; private Configuration conf; - // ratio of region servers to chose from a slave cluster - private float ratio; - private Random random; private ReplicationQueueInfo replicationQueueInfo; // id of the peer cluster this source replicates to private String peerId; @@ -99,8 +90,6 @@ private ReplicationSourceManager manager; // Should we stop everything? private Stoppable stopper; - // List of chosen sinks (region servers) - private List currentPeers; // How long should we sleep for each retry private long sleepForRetries; // Max size in bytes of entriesArray @@ -140,6 +129,8 @@ private MetricsSource metrics; // Handle on the log reader helper private ReplicationHLogReaderManager repLogReader; + // Handles connecting to peer region servers + private ReplicationSinkManager replicationSinkMgr; /** * Instantiation method used by region servers @@ -178,9 +169,6 @@ this.conn = HConnectionManager.getConnection(conf); this.replicationQueues = replicationQueues; this.replicationPeers = replicationPeers; - this.ratio = this.conf.getFloat("replication.source.ratio", 0.1f); - this.currentPeers = new ArrayList(); - this.random = new Random(); this.manager = manager; this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); @@ -193,29 +181,10 @@ this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode); // ReplicationQueueInfo parses the peerId out of the znode for us this.peerId = this.replicationQueueInfo.getPeerId(); + this.replicationSinkMgr = new ReplicationSinkManager(conn, peerId, replicationPeers, + this.conf.getFloat("replication.source.ratio", 0.1f)); } - /** - * Select a number of peers at random using the ratio. Mininum 1. - */ - private void chooseSinks() { - this.currentPeers.clear(); - List addresses = this.replicationPeers.getRegionServersOfConnectedPeer(this.peerId); - Set setOfAddr = new HashSet(); - int nbPeers = (int) (Math.ceil(addresses.size() * ratio)); - LOG.debug("Getting " + nbPeers + - " rs from peer cluster # " + this.peerId); - for (int i = 0; i < nbPeers; i++) { - ServerName sn; - // Make sure we get one address that we don't already have - do { - sn = addresses.get(this.random.nextInt(addresses.size())); - } while (setOfAddr.contains(sn)); - LOG.info("Choosing peer " + sn); - setOfAddr.add(sn); - } - this.currentPeers.addAll(setOfAddr); - } @Override public void enqueueLog(Path log) { @@ -457,9 +426,9 @@ int sleepMultiplier = 1; // Connect to peer cluster first, unless we have to stop - while (this.isActive() && this.currentPeers.size() == 0) { - chooseSinks(); - if (this.isActive() && this.currentPeers.size() == 0) { + while (this.isActive() && replicationSinkMgr.getSinks().size() == 0) { + replicationSinkMgr.chooseSinks(); + if (this.isActive() && replicationSinkMgr.getSinks().size() == 0) { if (sleepForRetries("Waiting for peers", sleepMultiplier)) { sleepMultiplier++; } @@ -653,8 +622,10 @@ } continue; } + SinkPeer sinkPeer = null; try { - AdminService.BlockingInterface rrs = getRS(); + sinkPeer = replicationSinkMgr.getReplicationSink(); + BlockingInterface rrs = sinkPeer.getRegionServer(); if (LOG.isTraceEnabled()) { LOG.trace("Replicating " + this.currentNbEntries + " entries"); } @@ -700,28 +671,18 @@ this.socketTimeoutMultiplier); } else if (ioe instanceof ConnectException) { LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe); - chooseSinks(); + replicationSinkMgr.chooseSinks(); } else { LOG.warn("Can't replicate because of a local or network error: ", ioe); } } - - try { - boolean down; - // Spin while the slave is down and we're not asked to shutdown/close - do { - down = isSlaveDown(); - if (down) { - if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) { - sleepMultiplier++; - } else { - chooseSinks(); - } - } - } while (this.isActive() && down ); - } catch (InterruptedException e) { - LOG.debug("Interrupted while trying to contact the peer cluster"); + + if (sinkPeer != null) { + replicationSinkMgr.reportBadSink(sinkPeer); } + if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) { + sleepMultiplier++; + } } } } @@ -797,49 +758,6 @@ Threads.shutdown(this, this.sleepForRetries); } - /** - * Get a new region server at random from this peer - * @return - * @throws IOException - */ - private AdminService.BlockingInterface getRS() throws IOException { - if (this.currentPeers.size() == 0) { - throw new IOException(this.peerClusterZnode + " has 0 region servers"); - } - ServerName address = - currentPeers.get(random.nextInt(this.currentPeers.size())); - return this.conn.getAdmin(address); - } - - /** - * Check if the slave is down by trying to establish a connection - * @return true if down, false if up - * @throws InterruptedException - */ - public boolean isSlaveDown() throws InterruptedException { - final CountDownLatch latch = new CountDownLatch(1); - Thread pingThread = new Thread() { - public void run() { - try { - AdminService.BlockingInterface rrs = getRS(); - // Dummy call which should fail - ProtobufUtil.getServerInfo(rrs); - latch.countDown(); - } catch (IOException ex) { - if (ex instanceof RemoteException) { - ex = ((RemoteException) ex).unwrapRemoteException(); - } - LOG.info("Slave cluster looks down: " + ex.getMessage(), ex); - } - } - }; - pingThread.start(); - // awaits returns true if countDown happened - boolean down = ! latch.await(this.sleepForRetries, TimeUnit.MILLISECONDS); - pingThread.interrupt(); - return down; - } - public String getPeerClusterZnode() { return this.peerClusterZnode; } Index: hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java (revision 1508338) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java (working copy) @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.ConnectionLossException; @@ -265,6 +266,14 @@ } return ids; } + + @Override + public long getTimestampOfLastChangeToPeer(String peerId) { + if (!peerClusters.containsKey(peerId)) { + throw new IllegalArgumentException("Unknown peer id: " + peerId); + } + return peerClusters.get(peerId).getLastRegionserverUpdate(); + } /** * A private method used during initialization. This method attempts to connect to all registered @@ -291,6 +300,7 @@ LOG.warn("Lost the ZooKeeper connection for peer " + peer.getClusterKey(), ke); try { peer.reloadZkWatcher(); + peer.getZkw().registerListener(new PeerRegionServerListener(peer)); } catch (IOException io) { LOG.warn("Creation of ZookeeperWatcher failed for peer " + peer.getClusterKey(), io); } @@ -304,7 +314,7 @@ */ private static List fetchSlavesAddresses(ZooKeeperWatcher zkw) throws KeeperException { - List children = ZKUtil.listChildrenNoWatch(zkw, zkw.rsZNode); + List children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.rsZNode); if (children == null) { return Collections.emptyList(); } @@ -314,6 +324,7 @@ } return addresses; } + private String getPeerStateNode(String id) { return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName)); @@ -366,6 +377,7 @@ ReplicationPeer peer = new ReplicationPeer(peerConf, peerId, ZKUtil.getZooKeeperClusterKey(peerConf)); peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId)); + peer.getZkw().registerListener(new PeerRegionServerListener(peer)); return peer; } @@ -406,4 +418,37 @@ .toByteArray(); return ProtobufUtil.prependPBMagic(bytes); } + + /** + * Tracks changes to the list of region servers in a peer's cluster. + */ + public static class PeerRegionServerListener extends ZooKeeperListener { + + private ReplicationPeer peer; + private String regionServerListNode; + + public PeerRegionServerListener(ReplicationPeer replicationPeer) { + super(replicationPeer.getZkw()); + this.peer = replicationPeer; + this.regionServerListNode = peer.getZkw().rsZNode; + } + + public PeerRegionServerListener(String regionServerListNode, ZooKeeperWatcher zkw) { + super(zkw); + this.regionServerListNode = regionServerListNode; + } + + @Override + public synchronized void nodeChildrenChanged(String path) { + if (path.equals(regionServerListNode)) { + try { + LOG.info("Detected change to peer regionservers, fetching updated list"); + peer.setRegionServers(fetchSlavesAddresses(peer.getZkw())); + } catch (KeeperException e) { + LOG.fatal("Error reading slave addresses", e); + } + } + } + + } } \ No newline at end of file Index: hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java (revision 1508338) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java (working copy) @@ -18,6 +18,13 @@ */ package org.apache.hadoop.hbase.replication; +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -35,12 +42,6 @@ import com.google.protobuf.InvalidProtocolBufferException; -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; - /** * This class acts as a wrapper for all the objects used to identify and * communicate with remote peers and is responsible for answering to expired @@ -57,9 +58,11 @@ // Cannot be final since a new object needs to be recreated when session fails private ZooKeeperWatcher zkw; private final Configuration conf; + private long lastRegionserverUpdate; private PeerStateTracker peerStateTracker; - + + /** * Constructor that takes all the objects required to communicate with the * specified peer, except for the region server addresses. @@ -130,6 +133,7 @@ */ public void setRegionServers(List regionServers) { this.regionServers = regionServers; + lastRegionserverUpdate = System.currentTimeMillis(); } /** @@ -139,6 +143,15 @@ public ZooKeeperWatcher getZkw() { return zkw; } + + /** + * Get the timestamp at which the last change occurred to the list of region servers to replicate + * to. + * @return The System.currentTimeMillis at the last time the list of peer region servers changed. + */ + public long getLastRegionserverUpdate() { + return lastRegionserverUpdate; + } /** * Get the identifier of this peer @@ -171,7 +184,7 @@ zkw = new ZooKeeperWatcher(conf, "connection to cluster: " + id, this); } - + @Override public boolean isAborted() { // Currently the replication peer is never "Aborted", we just log when the @@ -185,6 +198,8 @@ zkw.close(); } } + + /** * @param bytes Index: hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java (revision 1508338) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java (working copy) @@ -121,6 +121,14 @@ * cluster is unavailable or there are no region servers in the cluster. */ List getRegionServersOfConnectedPeer(String peerId); + + /** + * Get the timestamp of the last change in composition of a given peer cluster. + * @param peerId identifier of the peer cluster for which the timestamp is requested + * @return the timestamp (in milliseconds) of the last change to the composition of + * the peer cluster + */ + long getTimestampOfLastChangeToPeer(String peerId); /** * Returns the UUID of the provided peer id.