Index: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java (revision 1482157) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java (working copy) @@ -45,6 +45,7 @@ private static Configuration conf; private static HBaseTestingUtility utility; private static ZooKeeperWatcher zkw; + private static String replicationZNode; @BeforeClass public static void setUpBeforeClass() throws Exception { @@ -52,6 +53,8 @@ utility.startMiniZKCluster(); conf = utility.getConfiguration(); zkw = HBaseTestingUtility.getZooKeeperWatcher(utility); + String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); + replicationZNode = ZKUtil.joinZNode(zkw.baseZNode, replicationZNodeName); } @Before @@ -63,12 +66,14 @@ rq2 = new ReplicationQueuesZKImpl(zkw, conf, ds2); rq3 = new ReplicationQueuesZKImpl(zkw, conf, ds3); rqc = new ReplicationQueuesClientZKImpl(zkw, conf, ds1); + String peersZnode = ZKUtil.joinZNode(replicationZNode, "peers"); + for (int i = 1; i < 6; i++) { + ZKUtil.createWithParents(zkw, ZKUtil.joinZNode(peersZnode, "qId"+i)); + } } @After public void tearDown() throws KeeperException, IOException { - String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); - String replicationZNode = ZKUtil.joinZNode(zkw.baseZNode, replicationZNodeName); ZKUtil.deleteNodeRecursively(zkw, replicationZNode); } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (revision 1482157) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (working copy) @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.util.Bytes; @@ -293,9 +294,8 @@ testMap = rz3.claimQueues(s2.getServerName().getServerName()); rz3.close(); - ReplicationSource s = new ReplicationSource(); - s.checkIfQueueRecovered(testMap.firstKey()); - List result = s.getDeadRegionServers(); + ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(testMap.firstKey()); + List result = replicationQueueInfo.getDeadRegionServers(); // verify assertTrue(result.contains(server.getServerName().getServerName())); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (revision 1482157) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (working copy) @@ -25,7 +25,6 @@ import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.Comparator; import java.util.HashSet; import java.util.List; @@ -58,6 +57,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; @@ -94,6 +94,7 @@ private Random random; // should we replicate or not? private AtomicBoolean replicating; + private ReplicationQueueInfo replicationQueueInfo; // id of the peer cluster this source replicates to private String peerId; // The manager of all sources to which we ping back our progress @@ -123,10 +124,6 @@ private long totalReplicatedEdits = 0; // The znode we currently play with private String peerClusterZnode; - // Indicates if this queue is recovered (and will be deleted when depleted) - private boolean queueRecovered; - // List of all the dead region servers that had this queue (if recovered) - private List deadRegionServers = new ArrayList(); // Maximum number of retries before taking bold actions private int maxRetriesMultiplier; // Socket timeouts require even bolder actions since we don't want to DDOS @@ -196,100 +193,22 @@ } catch (KeeperException ke) { throw new IOException("Could not read cluster id", ke); } - - // Finally look if this is a recovered queue - this.checkIfQueueRecovered(peerClusterZnode); - } - - // The passed znode will be either the id of the peer cluster or - // the handling story of that queue in the form of id-servername-* - // - // package access for testing - void checkIfQueueRecovered(String peerClusterZnode) { - String[] parts = peerClusterZnode.split("-", 2); - this.queueRecovered = parts.length != 1; - this.peerId = this.queueRecovered ? - parts[0] : peerClusterZnode; this.peerClusterZnode = peerClusterZnode; - - if (parts.length < 2) { - // not queue recovered situation - return; - } - - // extract dead servers - extractDeadServersFromZNodeString(parts[1], this.deadRegionServers); + this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode); + // ReplicationQueueInfo parses the peerId out of the znode for us + this.peerId = this.replicationQueueInfo.getPeerId(); } - - /** - * for tests only - */ - List getDeadRegionServers() { - return Collections.unmodifiableList(this.deadRegionServers); - } /** - * Parse dead server names from znode string servername can contain "-" such as - * "ip-10-46-221-101.ec2.internal", so we need skip some "-" during parsing for the following - * cases: 2-ip-10-46-221-101.ec2.internal,52170,1364333181125--... - */ - private static void - extractDeadServersFromZNodeString(String deadServerListStr, List result) { - - if(deadServerListStr == null || result == null || deadServerListStr.isEmpty()) return; - - // valid server name delimiter "-" has to be after "," in a server name - int seenCommaCnt = 0; - int startIndex = 0; - int len = deadServerListStr.length(); - - for (int i = 0; i < len; i++) { - switch (deadServerListStr.charAt(i)) { - case ',': - seenCommaCnt += 1; - break; - case '-': - if(seenCommaCnt>=2) { - if (i > startIndex) { - String serverName = deadServerListStr.substring(startIndex, i); - if(ServerName.isFullServerName(serverName)){ - result.add(serverName); - } else { - LOG.error("Found invalid server name:" + serverName); - } - startIndex = i + 1; - } - seenCommaCnt = 0; - } - break; - default: - break; - } - } - - // add tail - if(startIndex < len - 1){ - String serverName = deadServerListStr.substring(startIndex, len); - if(ServerName.isFullServerName(serverName)){ - result.add(serverName); - } else { - LOG.error("Found invalid server name at the end:" + serverName); - } - } - - LOG.debug("Found dead servers:" + result); - } - - /** * Select a number of peers at random using the ratio. Mininum 1. */ private void chooseSinks() { this.currentPeers.clear(); - List addresses = this.zkHelper.getSlavesAddresses(peerId); + List addresses = this.zkHelper.getSlavesAddresses(this.peerId); Set setOfAddr = new HashSet(); int nbPeers = (int) (Math.ceil(addresses.size() * ratio)); LOG.info("Getting " + nbPeers + - " rs from peer cluster # " + peerId); + " rs from peer cluster # " + this.peerId); for (int i = 0; i < nbPeers; i++) { ServerName sn; // Make sure we get one address that we don't already have @@ -333,13 +252,13 @@ // If this is recovered, the queue is already full and the first log // normally has a position (unless the RS failed between 2 logs) - if (this.queueRecovered) { + if (this.replicationQueueInfo.isQueueRecovered()) { try { this.repLogReader.setPosition(this.zkHelper.getHLogRepPosition( this.peerClusterZnode, this.queue.peek().getName())); } catch (KeeperException e) { this.terminate("Couldn't get the position of this recovered queue " + - peerClusterZnode, e); + this.peerClusterZnode, e); } } // Loop until we close down @@ -374,7 +293,7 @@ //We take the snapshot now so that we are protected against races //where a new file gets enqueued while the current file is being processed //(and where we just finished reading the current file). - if (!this.queueRecovered && queue.size() == 0) { + if (!this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0) { currentWALisBeingWrittenTo = true; } // Open a reader on it @@ -401,24 +320,24 @@ continue; } } catch (IOException ioe) { - LOG.warn(peerClusterZnode + " Got: ", ioe); + LOG.warn(this.peerClusterZnode + " Got: ", ioe); gotIOE = true; if (ioe.getCause() instanceof EOFException) { boolean considerDumping = false; - if (this.queueRecovered) { + if (this.replicationQueueInfo.isQueueRecovered()) { try { FileStatus stat = this.fs.getFileStatus(this.currentPath); if (stat.getLen() == 0) { - LOG.warn(peerClusterZnode + " Got EOF and the file was empty"); + LOG.warn(this.peerClusterZnode + " Got EOF and the file was empty"); } considerDumping = true; } catch (IOException e) { - LOG.warn(peerClusterZnode + " Got while getting file size: ", e); + LOG.warn(this.peerClusterZnode + " Got while getting file size: ", e); } } else if (currentNbEntries != 0) { - LOG.warn(peerClusterZnode + " Got EOF while reading, " + - "looks like this file is broken? " + currentPath); + LOG.warn(this.peerClusterZnode + + " Got EOF while reading, " + "looks like this file is broken? " + currentPath); considerDumping = true; currentNbEntries = 0; } @@ -446,7 +365,7 @@ if (this.lastLoggedPosition != this.repLogReader.getPosition()) { this.manager.logPositionAndCleanOldLogs(this.currentPath, this.peerClusterZnode, this.repLogReader.getPosition(), - queueRecovered, currentWALisBeingWrittenTo); + this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo); this.lastLoggedPosition = this.repLogReader.getPosition(); } if (sleepForRetries("Nothing to replicate", sleepMultiplier)) { @@ -465,7 +384,7 @@ LOG.debug("Attempt to close connection failed", e); } } - LOG.debug("Source exiting " + peerId); + LOG.debug("Source exiting " + this.peerId); metrics.clear(); } @@ -571,10 +490,11 @@ try { this.reader = repLogReader.openReader(this.currentPath); } catch (FileNotFoundException fnfe) { - if (this.queueRecovered) { + if (this.replicationQueueInfo.isQueueRecovered()) { // We didn't find the log in the archive directory, look if it still // exists in the dead RS folder (there could be a chain of failures // to look at) + List deadRegionServers = this.replicationQueueInfo.getDeadRegionServers(); LOG.info("NB dead servers : " + deadRegionServers.size()); for (String curDeadServerName : deadRegionServers) { Path deadRsDirectory = @@ -621,7 +541,7 @@ } } catch (IOException ioe) { if (ioe instanceof EOFException && isCurrentLogEmpty()) return true; - LOG.warn(peerClusterZnode + " Got: ", ioe); + LOG.warn(this.peerClusterZnode + " Got: ", ioe); this.reader = null; if (ioe.getCause() instanceof NullPointerException) { // Workaround for race condition in HDFS-4380 @@ -645,7 +565,8 @@ * may be empty, and we don't want to retry that. */ private boolean isCurrentLogEmpty() { - return (this.repLogReader.getPosition() == 0 && !queueRecovered && queue.size() == 0); + return (this.repLogReader.getPosition() == 0 && + !this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0); } /** @@ -724,7 +645,7 @@ if (this.lastLoggedPosition != this.repLogReader.getPosition()) { this.manager.logPositionAndCleanOldLogs(this.currentPath, this.peerClusterZnode, this.repLogReader.getPosition(), - queueRecovered, currentWALisBeingWrittenTo); + this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo); this.lastLoggedPosition = this.repLogReader.getPosition(); } this.totalReplicatedEdits += currentNbEntries; @@ -788,7 +709,8 @@ * @return true if the peer is enabled, otherwise false */ protected boolean isPeerEnabled() { - return this.replicating.get() && this.zkHelper.getPeerEnabled(peerId); + return this.replicating.get() && + this.zkHelper.getPeerEnabled(this.peerId); } /** @@ -804,7 +726,7 @@ this.repLogReader.finishCurrentFile(); this.reader = null; return true; - } else if (this.queueRecovered) { + } else if (this.replicationQueueInfo.isQueueRecovered()) { this.manager.closeRecoveredQueue(this); LOG.info("Finished recovering the queue"); this.running = false; @@ -823,7 +745,8 @@ } }; Threads.setDaemonThreadRunning( - this, n + ".replicationSource," + peerClusterZnode, handler); + this, n + ".replicationSource," + + this.peerClusterZnode, handler); } public void terminate(String reason) { Index: hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java (revision 1482157) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java (working copy) @@ -258,6 +258,12 @@ peerIdsToProcess = ZKUtil.listChildrenNoWatch(this.zookeeper, deadRSZnodePath); if (peerIdsToProcess == null) return queues; // node already processed for (String peerId : peerIdsToProcess) { + ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId); + if (!peerExists(replicationQueueInfo.getPeerId())) { + LOG.warn("Peer " + peerId + " didn't exist, skipping the replay"); + // Protection against moving orphaned queues + continue; + } String newPeerId = peerId + "-" + znode; String newPeerZnode = ZKUtil.joinZNode(this.myQueuesZnode, newPeerId); // check the logs queue for the old peer cluster @@ -319,6 +325,12 @@ // The lock isn't a peer cluster, remove it clusters.remove(RS_LOCK_ZNODE); for (String cluster : clusters) { + ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(cluster); + if (!peerExists(replicationQueueInfo.getPeerId())) { + LOG.warn("Peer " + cluster + " didn't exist, skipping the replay"); + // Protection against moving orphaned queues + continue; + } // We add the name of the recovered RS to the new znode, we can even // do that for queues that were recovered 10 times giving a znode like // number-startcode-number-otherstartcode-number-anotherstartcode-etc Index: hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java (revision 1482157) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java (working copy) @@ -78,4 +78,9 @@ } return result; } + + public boolean peerExists(String id) throws KeeperException { + return ZKUtil.checkExists(this.zookeeper, + ZKUtil.joinZNode(this.peersZNode, id)) >= 0; + } } \ No newline at end of file Index: hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java (revision 0) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java (revision 0) @@ -0,0 +1,127 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.ServerName; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * This class is responsible for the parsing logic for a znode representing a queue. + * It will extract the peerId if it's recovered as well as the dead region servers + * that were part of the queue's history. + */ +public class ReplicationQueueInfo { + private static final Log LOG = LogFactory.getLog(ReplicationQueueInfo.class); + + private final String peerId; + private final String peerClusterZnode; + private boolean queueRecovered; + // List of all the dead region servers that had this queue (if recovered) + private List deadRegionServers = new ArrayList(); + + /** + * The passed znode will be either the id of the peer cluster or + * the handling story of that queue in the form of id-servername-* + */ + public ReplicationQueueInfo(String znode) { + this.peerClusterZnode = znode; + String[] parts = znode.split("-", 2); + this.queueRecovered = parts.length != 1; + this.peerId = this.queueRecovered ? + parts[0] : peerClusterZnode; + if (parts.length >= 2) { + // extract dead servers + extractDeadServersFromZNodeString(parts[1], this.deadRegionServers); + } + } + + /** + * Parse dead server names from znode string servername can contain "-" such as + * "ip-10-46-221-101.ec2.internal", so we need skip some "-" during parsing for the following + * cases: 2-ip-10-46-221-101.ec2.internal,52170,1364333181125--... + */ + private static void + extractDeadServersFromZNodeString(String deadServerListStr, List result) { + + if(deadServerListStr == null || result == null || deadServerListStr.isEmpty()) return; + + // valid server name delimiter "-" has to be after "," in a server name + int seenCommaCnt = 0; + int startIndex = 0; + int len = deadServerListStr.length(); + + for (int i = 0; i < len; i++) { + switch (deadServerListStr.charAt(i)) { + case ',': + seenCommaCnt += 1; + break; + case '-': + if(seenCommaCnt>=2) { + if (i > startIndex) { + String serverName = deadServerListStr.substring(startIndex, i); + if(ServerName.isFullServerName(serverName)){ + result.add(serverName); + } else { + LOG.error("Found invalid server name:" + serverName); + } + startIndex = i + 1; + } + seenCommaCnt = 0; + } + break; + default: + break; + } + } + + // add tail + if(startIndex < len - 1){ + String serverName = deadServerListStr.substring(startIndex, len); + if(ServerName.isFullServerName(serverName)){ + result.add(serverName); + } else { + LOG.error("Found invalid server name at the end:" + serverName); + } + } + + LOG.debug("Found dead servers:" + result); + } + + public List getDeadRegionServers() { + return Collections.unmodifiableList(this.deadRegionServers); + } + + public String getPeerId() { + return this.peerId; + } + + public String getPeerClusterZnode() { + return this.peerClusterZnode; + } + + public boolean isQueueRecovered() { + return queueRecovered; + } +} Index: hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (revision 1482157) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (working copy) @@ -82,7 +82,7 @@ * */ @InterfaceAudience.Private -public class ReplicationZookeeper implements Closeable { +public class ReplicationZookeeper extends ReplicationStateZKBase implements Closeable { private static final Log LOG = LogFactory.getLog(ReplicationZookeeper.class); @@ -132,6 +132,7 @@ */ public ReplicationZookeeper(final Abortable abortable, final Configuration conf, final ZooKeeperWatcher zk) throws KeeperException { + super(zk, conf, abortable); this.conf = conf; this.zookeeper = zk; setZNodes(abortable); @@ -151,6 +152,7 @@ */ public ReplicationZookeeper(final Server server, final AtomicBoolean replicating) throws IOException, KeeperException { + super(server.getZooKeeper(), server.getConfiguration(), server); this.abortable = server; this.zookeeper = server.getZooKeeper(); this.conf = server.getConfiguration(); @@ -476,11 +478,6 @@ } } - private boolean peerExists(String id) throws KeeperException { - return ZKUtil.checkExists(this.zookeeper, - ZKUtil.joinZNode(this.peersZNode, id)) >= 0; - } - /** * Enable replication to the peer *