Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (revision 1424023) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (working copy) @@ -22,6 +22,7 @@ import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; +import java.net.ConnectException; import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Arrays; @@ -97,6 +98,8 @@ private Stoppable stopper; // List of chosen sinks (region servers) private List currentPeers; + // number of bad peers we removed + private int removedPeers; // How long should we sleep for each retry private long sleepForRetries; // Max size in bytes of entriesArray @@ -214,6 +217,7 @@ */ private void chooseSinks() { this.currentPeers.clear(); + removedPeers = 0; List addresses = this.zkHelper.getSlavesAddresses(peerId); Set setOfAddr = new HashSet(); int nbPeers = (int) (Math.ceil(addresses.size() * ratio)); @@ -644,8 +648,10 @@ } continue; } + ServerName address = null; try { - HRegionInterface rrs = getRS(); + address = getRSServerName(); + HRegionInterface rrs = this.conn.getHRegionConnection(address.getHostname(), address.getPort()); LOG.debug("Replicating " + currentNbEntries); rrs.replicateLogEntries(Arrays.copyOf(this.entriesArray, currentNbEntries)); if (this.lastLoggedPosition != this.position) { @@ -677,6 +683,13 @@ "call to the remote cluster timed out, which is usually " + "caused by a machine failure or a massive slowdown", this.socketTimeoutMultiplier); + } else if (address != null && ioe instanceof ConnectException) { + LOG.warn("Removing peer because of a local or network error: ", ioe); + currentPeers.remove(address); + if (++removedPeers > currentPeers.size()) { + LOG.info("Found "+removedPeers+" bad peers. Re-choosing all peers"); + chooseSinks(); + } } else { LOG.warn("Can't replicate because of a local or network error: ", ioe); } @@ -771,12 +784,20 @@ * @throws IOException */ private HRegionInterface getRS() throws IOException { + ServerName address = getRSServerName(); + return this.conn.getHRegionConnection(address.getHostname(), address.getPort()); + } + + /** + * Get a new region server at random from this peer + * @return + * @throws IOException + */ + private ServerName getRSServerName() throws IOException { if (this.currentPeers.size() == 0) { throw new IOException(this.peerClusterZnode + " has 0 region servers"); } - ServerName address = - currentPeers.get(random.nextInt(this.currentPeers.size())); - return this.conn.getHRegionConnection(address.getHostname(), address.getPort()); + return currentPeers.get(random.nextInt(this.currentPeers.size())); } /**