diff --git src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java index 0e73431..93331c1 100644 --- src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java +++ src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java @@ -43,12 +43,17 @@ import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ClusterId; import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.ConnectionLossException; +import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.KeeperException.SessionExpiredException; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.Op; /** * This class serves as a helper for all things related to zookeeper in @@ -659,6 +664,57 @@ public class ReplicationZookeeper { } /** + * It "atomically" copies all the hlogs queues from another region server and returns them all + * sorted per peer cluster (appended with the dead server's znode). + * @param znode the znode for dead server + * @return HLog queues sorted per peer cluster + */ + public SortedMap> copyQueuesFromRSUsingMulti(String znode) { + SortedMap> queues = new TreeMap>(); + String deadRSZnodePath = ZKUtil.joinZNode(rsZNode, znode);// hbase/replication/rs/deadrs + List peerIdsToProcess = null; + List listOfOps = new ArrayList(); + try { + peerIdsToProcess = ZKUtil.listChildrenNoWatch(this.zookeeper, deadRSZnodePath); + if (peerIdsToProcess == null) return null; // node already processed + for (String peerId : peerIdsToProcess) { + String newPeerId = peerId + "-" + znode; + String newPeerZnode = ZKUtil.joinZNode(this.rsServerNameZnode, newPeerId); + // check the logs queue for the old peer cluster + String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId); + List hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode); + if (hlogs == null || hlogs.size() == 0) continue; // empty log queue. + // create the new cluster znode + SortedSet logQueue = new TreeSet(); + queues.put(newPeerId, logQueue); + ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY); + listOfOps.add(op); + // get the offset of the logs and set it to new znodes + for (String hlog : hlogs) { + String oldHlogZnode = ZKUtil.joinZNode(oldClusterZnode, hlog); + byte[] logOffset = ZKUtil.getData(this.zookeeper, oldHlogZnode); + LOG.debug("Creating " + hlog + " with data " + Bytes.toString(logOffset)); + String newLogZnode = ZKUtil.joinZNode(newPeerZnode, hlog); + listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset)); + // add ops for deleting + listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldHlogZnode)); + logQueue.add(hlog); + } + // add delete op for peer + listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode)); + } + // add delete op for dead rs + listOfOps.add(ZKUtilOp.deleteNodeFailSilent(deadRSZnodePath)); + LOG.debug(" The multi list is: " + listOfOps + ", size: " + listOfOps.size()); + ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false); + LOG.info("Atomically moved the dead regionserver logs. "); + } catch (KeeperException e) { + // Multi call failed; it looks like some other regionserver took away the logs. + LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e); + } + return queues; + } + /** * This methods copies all the hlogs queues from another region server * and returns them all sorted per peer cluster (appended with the dead * server's znode) diff --git src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index c89de9a..876e418 100644 --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -40,6 +40,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; @@ -576,17 +577,24 @@ public class ReplicationSourceManager { LOG.info("Not transferring queue since we are shutting down"); return; } - if (!zkHelper.lockOtherRS(rsZnode)) { - return; - } + SortedMap> newQueues = null; + + // check whether there is multi support. If yes, use it. LOG.info("Moving " + rsZnode + "'s hlogs to my queue"); - SortedMap> newQueues = - zkHelper.copyQueuesFromRS(rsZnode); - zkHelper.deleteRsQueues(rsZnode); - if (newQueues == null || newQueues.size() == 0) { + if (conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, false)) { + newQueues = zkHelper.copyQueuesFromRSUsingMulti(rsZnode); + } else { + if (!zkHelper.lockOtherRS(rsZnode)) { + return; + } + newQueues = zkHelper.copyQueuesFromRS(rsZnode); + zkHelper.deleteRsQueues(rsZnode); + } + // process of copying over the failed queue is completed. + if (newQueues.size() == 0) { return; } - + // change till here for (Map.Entry> entry : newQueues.entrySet()) { String peerId = entry.getKey(); try {