Index: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java (revision 0) @@ -0,0 +1,140 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test handling of changes to the number of a peer's regionservers. + */ +@Category(LargeTests.class) +public class TestReplicationChangingPeerRegionservers extends TestReplicationBase { + + private static final Log LOG = LogFactory.getLog(TestReplicationChangingPeerRegionservers.class); + + /** + * @throws java.lang.Exception + */ + @Before + public void setUp() throws Exception { + htable1.setAutoFlush(true); + // Starting and stopping replication can make us miss new logs, + // rolling like this makes sure the most recent one gets added to the queue + for (JVMClusterUtil.RegionServerThread r : utility1.getHBaseCluster().getRegionServerThreads()) { + r.getRegionServer().getWAL().rollWriter(); + } + utility1.truncateTable(tableName); + // truncating the table will send one Delete per row to the slave cluster + // in an async fashion, which is why we cannot just call truncateTable on + // utility2 since late writes could make it to the slave in some way. + // Instead, we truncate the first table and wait for all the Deletes to + // make it to the slave. + Scan scan = new Scan(); + int lastCount = 0; + for (int i = 0; i < NB_RETRIES; i++) { + if (i == NB_RETRIES - 1) { + fail("Waited too much time for truncate"); + } + ResultScanner scanner = htable2.getScanner(scan); + Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH); + scanner.close(); + if (res.length != 0) { + if (res.length < lastCount) { + i--; // Don't increment timeout if we make progress + } + lastCount = res.length; + LOG.info("Still got " + res.length + " rows"); + Thread.sleep(SLEEP_TIME); + } else { + break; + } + } + } + + @Test(timeout = 300000) + public void testReplication_KilledPeerSink() throws IOException, InterruptedException { + + LOG.info("testSimplePutDelete"); + MiniHBaseCluster peerCluster = utility2.getMiniHBaseCluster(); + + doPutTest(Bytes.toBytes(1)); + + int rsToStop = peerCluster.getServerWithMeta() == 0 ? 1 : 0; + peerCluster.stopRegionServer(rsToStop); + peerCluster.waitOnRegionServer(rsToStop); + + // Sanity check + assertEquals(1, peerCluster.getRegionServerThreads().size()); + + doPutTest(Bytes.toBytes(2)); + + peerCluster.startRegionServer(); + + // Sanity check + assertEquals(2, peerCluster.getRegionServerThreads().size()); + + doPutTest(Bytes.toBytes(3)); + + } + + private void doPutTest(byte[] row) throws IOException, InterruptedException { + Put put = new Put(row); + put.add(famName, row, row); + + htable1 = new HTable(conf1, tableName); + htable1.put(put); + + Get get = new Get(row); + for (int i = 0; i < NB_RETRIES; i++) { + if (i == NB_RETRIES - 1) { + fail("Waited too much time for put replication"); + } + Result res = htable2.get(get); + if (res.size() == 0) { + LOG.info("Row not available"); + Thread.sleep(SLEEP_TIME); + } else { + assertArrayEquals(res.value(), row); + break; + } + } + + } + +} Index: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java (revision 0) @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.client.AdminProtocol; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.replication.ReplicationZookeeper; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.Lists; + +@Category(SmallTests.class) +public class TestReplicationSinkManager { + + private static final String PEER_CLUSTER_ID = "PEER_CLUSTER_ID"; + + private ReplicationZookeeper zkHelper; + private ReplicationSinkManager sinkManager; + + @Before + public void setUp() { + zkHelper = mock(ReplicationZookeeper.class); + sinkManager = new ReplicationSinkManager(mock(HConnection.class), PEER_CLUSTER_ID, zkHelper, + 0.1f); + } + + @Test + public void testChooseSinks() { + List serverNames = Lists.newArrayList(); + for (int i = 0; i < 20; i++) { + serverNames.add(mock(ServerName.class)); + } + + when(zkHelper.getSlavesAddresses(PEER_CLUSTER_ID)).thenReturn(serverNames); + + sinkManager.chooseSinks(); + + assertEquals(2, sinkManager.getSinks().size()); + + } + + @Test + public void testChooseSinks_LessThanRatioAvailable() { + List serverNames = Lists.newArrayList(mock(ServerName.class), + mock(ServerName.class)); + + when(zkHelper.getSlavesAddresses(PEER_CLUSTER_ID)).thenReturn(serverNames); + + sinkManager.chooseSinks(); + + assertEquals(1, sinkManager.getSinks().size()); + } + + @Test + public void testReportBadSink() { + ServerName serverNameA = mock(ServerName.class); + ServerName serverNameB = mock(ServerName.class); + when(zkHelper.getSlavesAddresses(PEER_CLUSTER_ID)).thenReturn( + Lists.newArrayList(serverNameA, serverNameB)); + + sinkManager.chooseSinks(); + // Sanity check + assertEquals(1, sinkManager.getSinks().size()); + + SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AdminProtocol.class)); + + sinkManager.reportBadSink(sinkPeer); + + // Just reporting a bad sink once shouldn't have an effect + assertEquals(1, sinkManager.getSinks().size()); + + } + + /** + * Once a SinkPeer has been reported as bad more than BAD_SINK_THRESHOLD times, it should not + * be replicated to anymore. + */ + @Test + public void testReportBadSink_PastThreshold() { + List serverNames = Lists.newArrayList(); + for (int i = 0; i < 20; i++) { + serverNames.add(mock(ServerName.class)); + } + when(zkHelper.getSlavesAddresses(PEER_CLUSTER_ID)).thenReturn(serverNames); + + + sinkManager.chooseSinks(); + // Sanity check + assertEquals(2, sinkManager.getSinks().size()); + + ServerName serverName = sinkManager.getSinks().get(0); + + SinkPeer sinkPeer = new SinkPeer(serverName, mock(AdminProtocol.class)); + + for (int i = 0; i <= ReplicationSinkManager.BAD_SINK_THRESHOLD; i++) { + sinkManager.reportBadSink(sinkPeer); + } + + // Reporting a bad sink more than the threshold count should remove it + // from the list of potential sinks + assertEquals(1, sinkManager.getSinks().size()); + } + + @Test + public void testReportBadSink_DownToZeroSinks() { + List serverNames = Lists.newArrayList(); + for (int i = 0; i < 20; i++) { + serverNames.add(mock(ServerName.class)); + } + when(zkHelper.getSlavesAddresses(PEER_CLUSTER_ID)).thenReturn(serverNames); + + + sinkManager.chooseSinks(); + // Sanity check + assertEquals(2, sinkManager.getSinks().size()); + + ServerName serverNameA = sinkManager.getSinks().get(0); + ServerName serverNameB = sinkManager.getSinks().get(1); + + sinkManager.chooseSinks(); + // Sanity check + assertEquals(2, sinkManager.getSinks().size()); + + SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AdminProtocol.class)); + SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AdminProtocol.class)); + + for (int i = 0; i <= ReplicationSinkManager.BAD_SINK_THRESHOLD; i++) { + sinkManager.reportBadSink(sinkPeerA); + sinkManager.reportBadSink(sinkPeerB); + } + + // We've gone down to 0 good sinks, so the replication sinks + // should have been refreshed now + assertEquals(2, sinkManager.getSinks().size()); + } + +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java (revision 1437240) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java (working copy) @@ -21,6 +21,7 @@ import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.DeserializationException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; @@ -52,9 +54,11 @@ // Cannot be final since a new object needs to be recreated when session fails private ZooKeeperWatcher zkw; private final Configuration conf; + private long lastRegionserverUpdate; private PeerStateTracker peerStateTracker; - + + /** * Constructor that takes all the objects required to communicate with the * specified peer, except for the region server addresses. @@ -125,6 +129,7 @@ */ public void setRegionServers(List regionServers) { this.regionServers = regionServers; + lastRegionserverUpdate = System.currentTimeMillis(); } /** @@ -134,6 +139,15 @@ public ZooKeeperWatcher getZkw() { return zkw; } + + /** + * Get the timestamp at which the last change occurred to the list of region servers to replicate + * to. + * @return The System.currentTimeMillis at the last time the list of peer region servers changed. + */ + public long getLastRegionserverUpdate() { + return lastRegionserverUpdate; + } /** * Get the identifier of this peer @@ -165,8 +179,51 @@ if (zkw != null) zkw.close(); zkw = new ZooKeeperWatcher(conf, "connection to cluster: " + id, this); + zkw.registerListener(new PeerRegionServerListener(zkw.rsZNode, zkw)); } + + /** + * Get the list of all the region servers for this peer. + * @return list of region server addresses or an empty list if the slave is unavailable + */ + public synchronized List fetchSlaveAddresses() + throws KeeperException { + try { + setRegionServers(listChildrenAndGetAsServerNames()); + return getRegionServers(); + } catch (KeeperException ke) { + setRegionServers(Collections.emptyList()); + throw ke; + } + } + /** + * Lists the children of the specified znode, retrieving the data of each + * child as a server address. + * + * Used to list the currently online regionservers and their addresses. + * + * Sets no watches at all, this method is best effort. + * + * Returns an empty list if the node has no children. Returns null if the + * parent node itself does not exist. + * + * @return list of data of children of specified znode, empty if no children, + * null if parent does not exist + * @throws KeeperException if unexpected zookeeper exception + */ + List listChildrenAndGetAsServerNames() throws KeeperException { + List children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.rsZNode); + if (children == null) { + return Collections.emptyList(); + } + List addresses = new ArrayList(children.size()); + for (String child : children) { + addresses.add(ServerName.parseServerName(child)); + } + return addresses; + } + @Override public boolean isAborted() { // Currently the replication peer is never "Aborted", we just log when the @@ -180,6 +237,8 @@ zkw.close(); } } + + /** * Tracker for state of this peer @@ -203,4 +262,30 @@ } } } + + /** + * Tracks changes to the list of region servers in this peer's cluster. + */ + public class PeerRegionServerListener extends ZooKeeperListener { + + private String regionServerListNode; + + public PeerRegionServerListener(String regionServerListNode, ZooKeeperWatcher zkw) { + super(zkw); + this.regionServerListNode = regionServerListNode; + } + + @Override + public synchronized void nodeChildrenChanged(String path) { + if (path.equals(regionServerListNode)) { + try { + LOG.info("Detected change to peer regionservers, fetching updated list"); + fetchSlaveAddresses(); + } catch (KeeperException e) { + abort("Error reading slave addresses", e); + } + } + } + + } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java (revision 0) @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.AdminProtocol; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.replication.ReplicationZookeeper; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +/** + * Maintains a collection of peers to replicate to, and randomly selects a single peer to replicate + * to per set of data to replicate. Also handles keeping track of peer availability. + */ +public class ReplicationSinkManager { + + private static final Log LOG = LogFactory.getLog(ReplicationSinkManager.class); + + /** + * Once a ReplicationSink has been reported as bad more than this many times, it will no longer be + * provided as a sink for replication until the pool of replication sinks is refreshed. + */ + public static final int BAD_SINK_THRESHOLD = 3; + + private HConnection conn; + + private String peerClusterId; + + private ReplicationZookeeper zkHelper; + + // Count of "bad replication sink" reports per peer sink + private Map badReportCounts; + + // Ratio of total number of potential sinks to be used + private float ratio; + + private Random random; + + // A timestamp of the last time the list of replication peers changed + private long lastUpdateToPeers; + + // The current pool of sinks to which replication can be performed + private List sinks = Lists.newArrayList(); + + public ReplicationSinkManager(HConnection conn, String peerClusterId, + ReplicationZookeeper zkHelper, float ratio) { + this.conn = conn; + this.peerClusterId = peerClusterId; + this.zkHelper = zkHelper; + this.badReportCounts = Maps.newHashMap(); + this.ratio = ratio; + this.random = new Random(); + } + + /** + * Get a randomly-chosen replication sink to replicate to. + * @return a replication sink to replicate to + */ + public SinkPeer getReplicationSink() throws IOException { + if (zkHelper.getPeerClusters().get(peerClusterId).getLastRegionserverUpdate() > this.lastUpdateToPeers) { + LOG.info("Current list of sinks is out of date, updating"); + chooseSinks(); + } + + if (sinks.isEmpty()) { + throw new IOException("No replication sinks are available"); + } + ServerName serverName = sinks.get(random.nextInt(sinks.size())); + return new SinkPeer(serverName, this.conn.getAdmin(serverName.getHostname(), + serverName.getPort())); + } + + /** + * Report a {@code SinkPeer} as being bad (i.e. an attempt to replicate to it failed). If a single + * SinkPeer is reported as bad more than {@link ReplicationSinkManager#BAD_SINK_THRESHOLD} times, + * it will be removed from the pool of potential replication targets. + * @param sinkPeer The SinkPeer that had a failed replication attempt on it + */ + public void reportBadSink(SinkPeer sinkPeer) { + ServerName serverName = sinkPeer.getServerName(); + int badReportCount = (badReportCounts.containsKey(serverName) ? badReportCounts.get(serverName) + : 0) + 1; + badReportCounts.put(serverName, badReportCount); + if (badReportCount > BAD_SINK_THRESHOLD) { + this.sinks.remove(serverName); + if (sinks.isEmpty()) { + chooseSinks(); + } + } + } + + void chooseSinks() { + List slaveAddresses = zkHelper.getSlavesAddresses(peerClusterId); + Collections.shuffle(slaveAddresses, random); + int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio); + sinks = slaveAddresses.subList(0, numSinks); + lastUpdateToPeers = System.currentTimeMillis(); + badReportCounts.clear(); + } + + List getSinks() { + return sinks; + } + + /** + * Wraps a replication region server sink to provide the ability to identify it. + */ + public static class SinkPeer { + private ServerName serverName; + private AdminProtocol regionServer; + + public SinkPeer(ServerName serverName, AdminProtocol regionServer) { + this.serverName = serverName; + this.regionServer = regionServer; + } + + ServerName getServerName() { + return serverName; + } + + public AdminProtocol getRegionServer() { + return regionServer; + } + + } + +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (revision 1437240) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (working copy) @@ -23,16 +23,11 @@ import java.io.IOException; import java.net.ConnectException; import java.net.SocketTimeoutException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; -import java.util.HashSet; import java.util.List; import java.util.NavigableMap; -import java.util.Random; -import java.util.Set; import java.util.UUID; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -46,7 +41,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.client.AdminProtocol; import org.apache.hadoop.hbase.client.HConnection; @@ -56,6 +50,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.ReplicationZookeeper; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.ipc.RemoteException; @@ -86,9 +81,6 @@ // Helper class for zookeeper private ReplicationZookeeper zkHelper; private Configuration conf; - // ratio of region servers to chose from a slave cluster - private float ratio; - private Random random; // should we replicate or not? private AtomicBoolean replicating; // id of the peer cluster this source replicates to @@ -97,8 +89,6 @@ private ReplicationSourceManager manager; // Should we stop everything? private Stoppable stopper; - // List of chosen sinks (region servers) - private List currentPeers; // How long should we sleep for each retry private long sleepForRetries; // Max size in bytes of entriesArray @@ -140,6 +130,8 @@ private MetricsSource metrics; // Handle on the log reader helper private ReplicationHLogReaderManager repLogReader; + // Handles connecting to peer region servers + private ReplicationSinkManager replicationSinkMgr; /** * Instantiation method used by region servers @@ -178,9 +170,6 @@ new LogsComparator()); this.conn = HConnectionManager.getConnection(conf); this.zkHelper = manager.getRepZkWrapper(); - this.ratio = this.conf.getFloat("replication.source.ratio", 0.1f); - this.currentPeers = new ArrayList(); - this.random = new Random(); this.replicating = replicating; this.manager = manager; this.sleepForRetries = @@ -193,9 +182,11 @@ } catch (KeeperException ke) { throw new IOException("Could not read cluster id", ke); } - - // Finally look if this is a recovered queue + // Look if this is a recovered queue this.checkIfQueueRecovered(peerClusterZnode); + + this.replicationSinkMgr = new ReplicationSinkManager(conn, peerId, zkHelper, + this.conf.getFloat("replication.source.ratio", 0.1f)); } // The passed znode will be either the id of the peer cluster or @@ -213,28 +204,6 @@ } } - /** - * Select a number of peers at random using the ratio. Mininum 1. - */ - private void chooseSinks() { - this.currentPeers.clear(); - List addresses = this.zkHelper.getSlavesAddresses(peerId); - Set setOfAddr = new HashSet(); - int nbPeers = (int) (Math.ceil(addresses.size() * ratio)); - LOG.info("Getting " + nbPeers + - " rs from peer cluster # " + peerId); - for (int i = 0; i < nbPeers; i++) { - ServerName sn; - // Make sure we get one address that we don't already have - do { - sn = addresses.get(this.random.nextInt(addresses.size())); - } while (setOfAddr.contains(sn)); - LOG.info("Choosing peer " + sn); - setOfAddr.add(sn); - } - this.currentPeers.addAll(setOfAddr); - } - @Override public void enqueueLog(Path log) { this.queue.put(log); @@ -464,10 +433,9 @@ private void connectToPeers() { // Connect to peer cluster first, unless we have to stop - while (this.isActive() && this.currentPeers.size() == 0) { - + while (this.isActive() && replicationSinkMgr.getSinks().size() == 0) { try { - chooseSinks(); + replicationSinkMgr.chooseSinks(); Thread.sleep(this.sleepForRetries); } catch (InterruptedException e) { LOG.error("Interrupted while trying to connect to sinks", e); @@ -633,8 +601,10 @@ } continue; } + SinkPeer sinkPeer = null; try { - AdminProtocol rrs = getRS(); + sinkPeer = replicationSinkMgr.getReplicationSink(); + AdminProtocol rrs = sinkPeer.getRegionServer(); ProtobufUtil.replicateWALEntry(rrs, Arrays.copyOf(this.entriesArray, currentNbEntries)); if (this.lastLoggedPosition != this.repLogReader.getPosition()) { @@ -666,28 +636,18 @@ this.socketTimeoutMultiplier); } else if (ioe instanceof ConnectException) { LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe); - chooseSinks(); + replicationSinkMgr.chooseSinks(); } else { LOG.warn("Can't replicate because of a local or network error: ", ioe); } } - - try { - boolean down; - // Spin while the slave is down and we're not asked to shutdown/close - do { - down = isSlaveDown(); - if (down) { - if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) { - sleepMultiplier++; - } else { - chooseSinks(); - } - } - } while (this.isActive() && down ); - } catch (InterruptedException e) { - LOG.debug("Interrupted while trying to contact the peer cluster"); + + if (sinkPeer != null) { + replicationSinkMgr.reportBadSink(sinkPeer); } + if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) { + sleepMultiplier++; + } } } } @@ -753,49 +713,6 @@ Threads.shutdown(this, this.sleepForRetries); } - /** - * Get a new region server at random from this peer - * @return - * @throws IOException - */ - private AdminProtocol getRS() throws IOException { - if (this.currentPeers.size() == 0) { - throw new IOException(this.peerClusterZnode + " has 0 region servers"); - } - ServerName address = - currentPeers.get(random.nextInt(this.currentPeers.size())); - return this.conn.getAdmin(address.getHostname(), address.getPort()); - } - - /** - * Check if the slave is down by trying to establish a connection - * @return true if down, false if up - * @throws InterruptedException - */ - public boolean isSlaveDown() throws InterruptedException { - final CountDownLatch latch = new CountDownLatch(1); - Thread pingThread = new Thread() { - public void run() { - try { - AdminProtocol rrs = getRS(); - // Dummy call which should fail - ProtobufUtil.getServerInfo(rrs); - latch.countDown(); - } catch (IOException ex) { - if (ex instanceof RemoteException) { - ex = ((RemoteException) ex).unwrapRemoteException(); - } - LOG.info("Slave cluster looks down: " + ex.getMessage()); - } - } - }; - pingThread.start(); - // awaits returns true if countDown happened - boolean down = ! latch.await(this.sleepForRetries, TimeUnit.MILLISECONDS); - pingThread.interrupt(); - return down; - } - public String getPeerClusterZnode() { return this.peerClusterZnode; } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (revision 1437240) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (working copy) @@ -246,60 +246,17 @@ return Collections.emptyList(); } - List addresses; try { - addresses = fetchSlavesAddresses(peer.getZkw()); + return peer.fetchSlaveAddresses(); } catch (KeeperException ke) { reconnectPeer(ke, peer); - addresses = Collections.emptyList(); + return Collections.emptyList(); } - peer.setRegionServers(addresses); - return peer.getRegionServers(); } - /** - * Get the list of all the region servers from the specified peer - * @param zkw zk connection to use - * @return list of region server addresses or an empty list if the slave - * is unavailable - */ - private List fetchSlavesAddresses(ZooKeeperWatcher zkw) - throws KeeperException { - return listChildrenAndGetAsServerNames(zkw, zkw.rsZNode); - } + /** - * Lists the children of the specified znode, retrieving the data of each - * child as a server address. - * - * Used to list the currently online regionservers and their addresses. - * - * Sets no watches at all, this method is best effort. - * - * Returns an empty list if the node has no children. Returns null if the - * parent node itself does not exist. - * - * @param zkw zookeeper reference - * @param znode node to get children of as addresses - * @return list of data of children of specified znode, empty if no children, - * null if parent does not exist - * @throws KeeperException if unexpected zookeeper exception - */ - public static List listChildrenAndGetAsServerNames( - ZooKeeperWatcher zkw, String znode) - throws KeeperException { - List children = ZKUtil.listChildrenNoWatch(zkw, znode); - if(children == null) { - return Collections.emptyList(); - } - List addresses = new ArrayList(children.size()); - for (String child : children) { - addresses.add(ServerName.parseServerName(child)); - } - return addresses; - } - - /** * This method connects this cluster to another one and registers it * in this region server's replication znode * @param peerId id of the peer cluster