Index: hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java (revision 1458068) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java (working copy) @@ -17,15 +17,10 @@ */ package org.apache.hadoop.hbase.client.replication; -import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.*; -import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -49,9 +44,7 @@ private final String ID_SECOND = "2"; private final String KEY_SECOND = "127.0.0.1:2181:/hbase2"; - private static ReplicationSourceManager manager; private static ReplicationAdmin admin; - private static AtomicBoolean replicating = new AtomicBoolean(true); /** * @throws java.lang.Exception @@ -62,19 +55,6 @@ Configuration conf = TEST_UTIL.getConfiguration(); conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true); admin = new ReplicationAdmin(conf); - Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(), - HConstants.HREGION_OLDLOGDIR_NAME); - Path logDir = new Path(TEST_UTIL.getDataTestDir(), - HConstants.HREGION_LOGDIR_NAME); - manager = new ReplicationSourceManager(admin.getReplicationZk(), conf, - // The following stopper never stops so that we can respond - // to zk notification - new Stoppable() { - @Override - public void stop(String why) {} - @Override - public boolean isStopped() {return false;} - }, FileSystem.get(conf), replicating, logDir, oldLogDir); } /** @@ -84,7 +64,6 @@ */ @Test public void testAddRemovePeer() throws Exception { - assertEquals(0, manager.getSources().size()); // Add a valid peer admin.addPeer(ID_ONE, KEY_ONE); // try adding the same (fails) Index: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (revision 1458068) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (working copy) @@ -224,7 +224,7 @@ } @Test - public void testNodeFailoverWorkerCopyQueuesFromRSUsingMulti() throws Exception { + public void testClaimQueues() throws Exception { LOG.debug("testNodeFailoverWorkerCopyQueuesFromRSUsingMulti"); conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); final Server server = new DummyServer("hostname0.example.org"); @@ -281,7 +281,7 @@ @Override public void run() { try { - logZnodesMap = rz.copyQueuesFromRSUsingMulti(deadRsZnode); + logZnodesMap = rz.claimQueues(deadRsZnode); rz.close(); server.abort("Done with testing", null); } catch (Exception e) { Index: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (revision 1458068) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (working copy) @@ -41,8 +41,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -73,6 +73,7 @@ private final AtomicBoolean replicating; // Helper for zookeeper private final ReplicationZookeeper zkHelper; + private final ReplicationQueues replicationQueues; // All about stopping private final Stoppable stopper; // All logs we are currently tracking @@ -91,14 +92,14 @@ private final long sleepBeforeFailover; // Homemade executer service for replication private final ThreadPoolExecutor executor; - + private final Random rand; /** - * Creates a replication manager and sets the watch on all the other - * registered region servers + * Creates a replication manager and sets the watch on all the other registered region servers * @param zkHelper the zk helper for replication + * @param replicationQueues the interface for manipulating replication queues * @param conf the configuration to use * @param stopper the stopper object for this region server * @param fs the file system to use @@ -107,15 +108,13 @@ * @param oldLogDir the directory where old logs are archived */ public ReplicationSourceManager(final ReplicationZookeeper zkHelper, - final Configuration conf, - final Stoppable stopper, - final FileSystem fs, - final AtomicBoolean replicating, - final Path logDir, - final Path oldLogDir) { + final ReplicationQueues replicationQueues, final Configuration conf, final Stoppable stopper, + final FileSystem fs, final AtomicBoolean replicating, final Path logDir, + final Path oldLogDir) { this.sources = new ArrayList(); this.replicating = replicating; this.zkHelper = zkHelper; + this.replicationQueues = replicationQueues; this.stopper = stopper; this.hlogsById = new HashMap>(); this.oldsources = new ArrayList(); @@ -181,7 +180,7 @@ for (String id : this.zkHelper.getPeerClusters().keySet()) { addSource(id); } - List currentReplicators = this.zkHelper.getListOfReplicators(); + List currentReplicators = this.replicationQueues.getListOfReplicators(); if (currentReplicators == null || currentReplicators.size() == 0) { return; } @@ -350,13 +349,12 @@ * It creates one old source for any type of source of the old rs. * @param rsZnode */ - public void transferQueues(String rsZnode) { + private void transferQueues(String rsZnode) { NodeFailoverWorker transfer = new NodeFailoverWorker(rsZnode); try { this.executor.execute(transfer); } catch (RejectedExecutionException ex) { - LOG.info("Cancelling the transfer of " + rsZnode + - " because of " + ex.getMessage()); + LOG.info("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage()); } } @@ -589,20 +587,12 @@ } SortedMap> newQueues = null; - // check whether there is multi support. If yes, use it. - if (conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) { - LOG.info("Atomically moving " + rsZnode + "'s hlogs to my queue"); - newQueues = zkHelper.copyQueuesFromRSUsingMulti(rsZnode); - } else { - LOG.info("Moving " + rsZnode + "'s hlogs to my queue"); - if (!zkHelper.lockOtherRS(rsZnode)) { - return; - } - newQueues = zkHelper.copyQueuesFromRS(rsZnode); - zkHelper.deleteRsQueues(rsZnode); - } - // process of copying over the failed queue is completed. + newQueues = zkHelper.claimQueues(rsZnode); + + // Copying over the failed queue is completed. if (newQueues.isEmpty()) { + // We either didn't get the lock or the failed region server didn't have any outstanding + // HLogs to replicate, so we are done. return; } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (revision 1458068) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (working copy) @@ -43,6 +43,8 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.replication.ReplicationQueues; +import org.apache.hadoop.hbase.replication.ReplicationQueuesImpl; import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; import org.apache.hadoop.hbase.util.Bytes; @@ -64,6 +66,7 @@ private ReplicationSourceManager replicationManager; private final AtomicBoolean replicating = new AtomicBoolean(true); private ReplicationZookeeper zkHelper; + private ReplicationQueues replicationQueues; private Configuration conf; private ReplicationSink replicationSink; // Hosting server @@ -104,18 +107,23 @@ if (replication) { try { this.zkHelper = new ReplicationZookeeper(server, this.replicating); + this.replicationQueues = + new ReplicationQueuesImpl(server.getZooKeeper(), this.conf, this.server); + this.replicationQueues.init(); } catch (KeeperException ke) { throw new IOException("Failed replication handler create " + "(replicating=" + this.replicating, ke); } - this.replicationManager = new ReplicationSourceManager(zkHelper, conf, this.server, fs, - this.replicating, logDir, oldLogDir); + this.replicationManager = + new ReplicationSourceManager(zkHelper, replicationQueues, conf, this.server, fs, + this.replicating, logDir, oldLogDir); this.statsThreadPeriod = this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60); LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod); } else { this.replicationManager = null; this.zkHelper = null; + this.replicationQueues = null; } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java (revision 1458068) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java (working copy) @@ -27,7 +27,10 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate; -import org.apache.hadoop.hbase.replication.ReplicationZookeeper; +import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; +import org.apache.hadoop.hbase.replication.ReplicationQueuesClientImpl; +import org.apache.hadoop.hbase.replication.ReplicationStateImpl; +import org.apache.hadoop.hbase.replication.ReplicationStateInterface; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; @@ -43,8 +46,10 @@ @InterfaceAudience.Private public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abortable { private static final Log LOG = LogFactory.getLog(ReplicationLogCleaner.class); - private ReplicationZookeeper zkHelper; - private Set hlogs = new HashSet(); + private ZooKeeperWatcher zkw; + private ReplicationQueuesClient replicationQueues; + private ReplicationStateInterface replicationState; + private final Set hlogs = new HashSet(); private boolean stopped = false; private boolean aborted; @@ -53,7 +58,7 @@ public boolean isLogDeletable(Path filePath) { try { - if (!zkHelper.getReplication()) { + if (!replicationState.getState()) { return false; } } catch (KeeperException e) { @@ -89,20 +94,20 @@ private boolean refreshHLogsAndSearch(String searchedLog) { this.hlogs.clear(); final boolean lookForLog = searchedLog != null; - List rss = zkHelper.getListOfReplicators(); + List rss = replicationQueues.getListOfReplicators(); if (rss == null) { LOG.debug("Didn't find any region server that replicates, deleting: " + searchedLog); return false; } for (String rs: rss) { - List listOfPeers = zkHelper.getListPeersForRS(rs); + List listOfPeers = replicationQueues.getAllQueues(rs); // if rs just died, this will be null if (listOfPeers == null) { continue; } for (String id : listOfPeers) { - List peersHlogs = zkHelper.getListHLogsForPeerForRS(rs, id); + List peersHlogs = replicationQueues.getLogsInQueue(rs, id); if (peersHlogs != null) { this.hlogs.addAll(peersHlogs); } @@ -128,8 +133,10 @@ Configuration conf = new Configuration(config); super.setConf(conf); try { - ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "replicationLogCleaner", null); - this.zkHelper = new ReplicationZookeeper(this, conf, zkw); + this.zkw = new ZooKeeperWatcher(conf, "replicationLogCleaner", null); + this.replicationQueues = new ReplicationQueuesClientImpl(zkw, conf, this); + this.replicationQueues.init(); + this.replicationState = new ReplicationStateImpl(zkw, conf, this); } catch (KeeperException e) { LOG.error("Error while configuring " + this.getClass().getName(), e); } catch (IOException e) { @@ -143,10 +150,18 @@ public void stop(String why) { if (this.stopped) return; this.stopped = true; - if (this.zkHelper != null) { - LOG.info("Stopping " + this.zkHelper.getZookeeperWatcher()); - this.zkHelper.getZookeeperWatcher().close(); + if (this.zkw != null) { + LOG.info("Stopping " + this.zkw); + this.zkw.close(); } + if (this.replicationState != null) { + LOG.info("Stopping " + this.replicationState); + try { + this.replicationState.close(); + } catch (IOException e) { + LOG.error("Error while stopping " + this.replicationState, e); + } + } // Not sure why we're deleting a connection that we never acquired or used HConnectionManager.deleteConnection(this.getConf()); } Index: hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java (revision 0) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java (revision 0) @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.util.List; + +import org.apache.zookeeper.KeeperException; + +/** + * This provides an interface for clients of replication to view replication queues. These queues + * keep track of the HLogs that still need to be replicated to remote clusters. + */ +public interface ReplicationQueuesClient { + + /** + * Initialize the cluster replication queues interface. + * @throws KeeperException + */ + public void init() throws KeeperException; + + /** + * Get a list of all region servers that have outstanding replication queues. These servers could + * be alive, dead or from a previous run of the cluster. + * @return a list of server names + */ + public List getListOfReplicators(); + + /** + * Get a list of all HLogs in the given queue. + * @param serverName the server name of the region server that owns the queue + * @param queueId a String that identifies the queue + * @return a list of HLogs, null if this region server is dead and has no outstanding queues + */ + public List getLogsInQueue(String serverName, String queueId); + + /** + * Get a list of all queues for this region server. + * @param serverName the server name of the region server that owns the set of queues + * @return a list of queueIds, null if this region server is dead and has no outstanding queues + */ + public List getAllQueues(String serverName); +} \ No newline at end of file Index: hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java (revision 0) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java (revision 0) @@ -0,0 +1,95 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.util.List; +import java.util.SortedMap; +import java.util.SortedSet; + +import org.apache.zookeeper.KeeperException; + +/** + * This provides an interface for maintaining a region server's replication queues. These queues + * keep track of the HLogs that still need to be replicated to remote clusters. + */ +public interface ReplicationQueues extends ReplicationQueuesClient { + + /** + * Remove a replication queue. + * @param queueId a String that identifies the queue. + */ + public void removeQueue(String queueId); + + /** + * Add a new HLog file to the given queue. If the queue does not exist it is created. + * @param queueId a String that identifies the queue. + * @param filename name of the HLog + * @throws KeeperException + */ + public void addLog(String queueId, String filename) throws KeeperException; + + /** + * Remove an HLog file from the given queue. + * @param queueId a String that identifies the queue. + * @param filename name of the HLog + */ + public void removeLog(String queueId, String filename); + + /** + * Set the current position for a specific HLog in a given queue. + * @param queueId a String that identifies the queue + * @param filename name of the HLog + * @param position the current position in the file + */ + public void setLogPosition(String queueId, String filename, long position); + + /** + * Get the current position for a specific HLog in a given queue. + * @param queueId a String that identifies the queue + * @param filename name of the HLog + * @return the current position in the file + */ + public long getLogPosition(String queueId, String filename) throws KeeperException; + + /** + * Remove all replication queues for this region server. + */ + public void removeAllQueues(); + + /** + * Get a list of all HLogs in the given queue. + * @param queueId a String that identifies the queue + * @return a list of HLogs, null if this region server is dead and has no outstanding queues + */ + public List getLogsInQueue(String queueId); + + /** + * Get a list of all queues for this region server. + * @return a list of queueIds, null if this region server is dead and has no outstanding queues + */ + public List getAllQueues(); + + /** + * Take ownership for the set of queues belonging to a dead region server. + * @param regionserver the id of the dead region server + * @return A SortedMap of the queues that have been claimed, including a SortedSet of HLogs in + * each queue. Returns null if no queues were failed-over. + */ + public SortedMap> claimQueues(String regionserver); +} \ No newline at end of file Index: hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientImpl.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientImpl.java (revision 0) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientImpl.java (revision 0) @@ -0,0 +1,82 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +public class ReplicationQueuesClientImpl extends ReplicationZKStateBase implements + ReplicationQueuesClient { + + protected final Abortable abortable; + + public ReplicationQueuesClientImpl(final ZooKeeperWatcher zk, Configuration conf, + Abortable abortable) { + super(zk, conf); + this.abortable = abortable; + } + + @Override + public void init() throws KeeperException { + ZKUtil.createWithParents(this.zookeeper, this.queuesZNode); + } + + @Override + public List getListOfReplicators() { + List result = null; + try { + result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.queuesZNode); + } catch (KeeperException e) { + this.abortable.abort("Failed to get list of replicators", e); + } + return result; + } + + @Override + public List getLogsInQueue(String serverName, String queueId) { + String znode = ZKUtil.joinZNode(this.queuesZNode, serverName); + znode = ZKUtil.joinZNode(znode, queueId); + List result = null; + try { + result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode); + } catch (KeeperException e) { + this.abortable.abort("Failed to get list of hlogs for queueId=" + queueId + + " and serverName=" + serverName, e); + } + return result; + } + + @Override + public List getAllQueues(String serverName) { + String znode = ZKUtil.joinZNode(this.queuesZNode, serverName); + List result = null; + try { + result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode); + } catch (KeeperException e) { + this.abortable.abort("Failed to get list of queues for serverName=" + serverName, e); + } + return result; + } + +} \ No newline at end of file Index: hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesImpl.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesImpl.java (revision 0) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesImpl.java (revision 0) @@ -0,0 +1,396 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.util.ArrayList; +import java.util.List; +import java.util.SortedMap; +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; +import org.apache.zookeeper.KeeperException; + +import com.google.protobuf.InvalidProtocolBufferException; + +public class ReplicationQueuesImpl extends ReplicationQueuesClientImpl implements + ReplicationQueues { + + /** Znode containing all replication queues for this region server. */ + private final String myQueuesZnode; + /** Name of znode we use to lock during failover */ + private final static String RS_LOCK_ZNODE = "lock"; + + private static final Log LOG = LogFactory.getLog(ReplicationQueuesImpl.class); + + public ReplicationQueuesImpl(final ZooKeeperWatcher zk, Configuration conf, Server server) { + super(zk, conf, server); + this.myQueuesZnode = ZKUtil.joinZNode(this.queuesZNode, server.getServerName().toString()); + } + + @Override + public void removeQueue(String queueId) { + try { + ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.myQueuesZnode, queueId)); + } catch (KeeperException e) { + this.abortable.abort("Failed to delete queue (queueId=" + queueId + ")", e); + } + } + + @Override + public void addLog(String queueId, String filename) throws KeeperException { + String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId); + znode = ZKUtil.joinZNode(znode, filename); + ZKUtil.createWithParents(this.zookeeper, znode); + } + + @Override + public void removeLog(String queueId, String filename) { + try { + String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId); + znode = ZKUtil.joinZNode(znode, filename); + ZKUtil.deleteNode(this.zookeeper, znode); + } catch (KeeperException e) { + this.abortable.abort("Failed to remove hlog from queue (queueId=" + queueId + ", filename=" + + filename + ")", e); + } + } + + @Override + public void setLogPosition(String queueId, String filename, long position) { + try { + String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId); + znode = ZKUtil.joinZNode(znode, filename); + // Why serialize String of Long and not Long as bytes? + ZKUtil.setData(this.zookeeper, znode, toByteArray(position)); + } catch (KeeperException e) { + this.abortable.abort("Failed to write replication hlog position (filename=" + filename + + ", position=" + position + ")", e); + } + } + + @Override + public long getLogPosition(String queueId, String filename) throws KeeperException { + String clusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, queueId); + String znode = ZKUtil.joinZNode(clusterZnode, filename); + byte[] bytes = ZKUtil.getData(this.zookeeper, znode); + try { + return parseHLogPositionFrom(bytes); + } catch (DeserializationException de) { + LOG.warn("Failed to parse HLogPosition for queueId=" + queueId + " and hlog=" + filename + + "znode content, continuing."); + } + // if we can not parse the position, start at the beginning of the hlog file + // again + return 0; + } + + @Override + public SortedMap> claimQueues(String regionserverZnode) { + SortedMap> newQueues = null; + // check whether there is multi support. If yes, use it. + if (conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) { + LOG.info("Atomically moving " + regionserverZnode + "'s hlogs to my queue"); + newQueues = copyQueuesFromRSUsingMulti(regionserverZnode); + } else { + LOG.info("Moving " + regionserverZnode + "'s hlogs to my queue"); + if (!lockOtherRS(regionserverZnode)) { + return null; + } + newQueues = copyQueuesFromRS(regionserverZnode); + deleteAnotherRSQueues(regionserverZnode); + } + return newQueues; + } + + @Override + public void removeAllQueues() { + try { + ZKUtil.deleteNodeRecursively(this.zookeeper, this.myQueuesZnode); + } catch (KeeperException e) { + // if the znode is already expired, don't bother going further + if (e instanceof KeeperException.SessionExpiredException) { + return; + } + this.abortable.abort("Failed to delete replication queues for region server: " + + this.myQueuesZnode, e); + } + } + + @Override + public List getLogsInQueue(String queueId) { + String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId); + List result = null; + try { + result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode); + } catch (KeeperException e) { + this.abortable.abort("Failed to get list of hlogs for queueId=" + queueId, e); + } + return result; + } + + @Override + public List getAllQueues() { + List listOfQueues = null; + try { + listOfQueues = ZKUtil.listChildrenNoWatch(this.zookeeper, this.myQueuesZnode); + } catch (KeeperException e) { + this.abortable.abort("Failed to get a list of queues for region server: " + + this.myQueuesZnode, e); + } + return listOfQueues; + } + + /** + * Try to set a lock in another region server's znode. + * @param znode the server names of the other server + * @return true if the lock was acquired, false in every other cases + */ + private boolean lockOtherRS(String znode) { + try { + String parent = ZKUtil.joinZNode(this.queuesZNode, znode); + if (parent.equals(this.myQueuesZnode)) { + LOG.warn("Won't lock because this is us, we're dead!"); + return false; + } + String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE); + ZKUtil.createAndWatch(this.zookeeper, p, lockToByteArray(this.myQueuesZnode)); + } catch (KeeperException e) { + // This exception will pop up if the znode under which we're trying to + // create the lock is already deleted by another region server, meaning + // that the transfer already occurred. + // NoNode => transfer is done and znodes are already deleted + // NodeExists => lock znode already created by another RS + if (e instanceof KeeperException.NoNodeException + || e instanceof KeeperException.NodeExistsException) { + LOG.info("Won't transfer the queue," + " another RS took care of it because of: " + + e.getMessage()); + } else { + LOG.info("Failed lock other rs", e); + } + return false; + } + return true; + } + + /** + * Delete all the replication queues for a given region server. + * @param regionserverZnode The znode of the region server to delete. + */ + private void deleteAnotherRSQueues(String regionserverZnode) { + String fullpath = ZKUtil.joinZNode(this.queuesZNode, regionserverZnode); + try { + List clusters = ZKUtil.listChildrenNoWatch(this.zookeeper, fullpath); + for (String cluster : clusters) { + // No need to delete, it will be deleted later. + if (cluster.equals(RS_LOCK_ZNODE)) { + continue; + } + String fullClusterPath = ZKUtil.joinZNode(fullpath, cluster); + ZKUtil.deleteNodeRecursively(this.zookeeper, fullClusterPath); + } + // Finish cleaning up + ZKUtil.deleteNodeRecursively(this.zookeeper, fullpath); + } catch (KeeperException e) { + if (e instanceof KeeperException.NoNodeException + || e instanceof KeeperException.NotEmptyException) { + // Testing a special case where another region server was able to + // create a lock just after we deleted it, but then was also able to + // delete the RS znode before us or its lock znode is still there. + if (e.getPath().equals(fullpath)) { + return; + } + } + this.abortable.abort("Failed to delete replication queues for region server: " + + regionserverZnode, e); + } + } + + /** + * It "atomically" copies all the hlogs queues from another region server and returns them all + * sorted per peer cluster (appended with the dead server's znode). + * @param znode pertaining to the region server to copy the queues from + * @return HLog queues sorted per peer cluster + */ + private SortedMap> copyQueuesFromRSUsingMulti(String znode) { + SortedMap> queues = new TreeMap>(); + // hbase/replication/rs/deadrs + String deadRSZnodePath = ZKUtil.joinZNode(this.queuesZNode, znode); + List peerIdsToProcess = null; + List listOfOps = new ArrayList(); + try { + peerIdsToProcess = ZKUtil.listChildrenNoWatch(this.zookeeper, deadRSZnodePath); + if (peerIdsToProcess == null) return queues; // node already processed + for (String peerId : peerIdsToProcess) { + String newPeerId = peerId + "-" + znode; + String newPeerZnode = ZKUtil.joinZNode(this.myQueuesZnode, newPeerId); + // check the logs queue for the old peer cluster + String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId); + List hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode); + if (hlogs == null || hlogs.size() == 0) continue; // empty log queue. + // create the new cluster znode + SortedSet logQueue = new TreeSet(); + queues.put(newPeerId, logQueue); + ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY); + listOfOps.add(op); + // get the offset of the logs and set it to new znodes + for (String hlog : hlogs) { + String oldHlogZnode = ZKUtil.joinZNode(oldClusterZnode, hlog); + byte[] logOffset = ZKUtil.getData(this.zookeeper, oldHlogZnode); + LOG.debug("Creating " + hlog + " with data " + Bytes.toString(logOffset)); + String newLogZnode = ZKUtil.joinZNode(newPeerZnode, hlog); + listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset)); + // add ops for deleting + listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldHlogZnode)); + logQueue.add(hlog); + } + // add delete op for peer + listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode)); + } + // add delete op for dead rs + listOfOps.add(ZKUtilOp.deleteNodeFailSilent(deadRSZnodePath)); + LOG.debug(" The multi list size is: " + listOfOps.size()); + ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false); + LOG.info("Atomically moved the dead regionserver logs. "); + } catch (KeeperException e) { + // Multi call failed; it looks like some other regionserver took away the logs. + LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e); + queues.clear(); + } + return queues; + } + + /** + * This methods copies all the hlogs queues from another region server and returns them all sorted + * per peer cluster (appended with the dead server's znode) + * @param znode server names to copy + * @return all hlogs for all peers of that cluster, null if an error occurred + */ + private SortedMap> copyQueuesFromRS(String znode) { + // TODO this method isn't atomic enough, we could start copying and then + // TODO fail for some reason and we would end up with znodes we don't want. + SortedMap> queues = new TreeMap>(); + try { + String nodePath = ZKUtil.joinZNode(this.queuesZNode, znode); + List clusters = ZKUtil.listChildrenNoWatch(this.zookeeper, nodePath); + // We have a lock znode in there, it will count as one. + if (clusters == null || clusters.size() <= 1) { + return queues; + } + // The lock isn't a peer cluster, remove it + clusters.remove(RS_LOCK_ZNODE); + for (String cluster : clusters) { + // We add the name of the recovered RS to the new znode, we can even + // do that for queues that were recovered 10 times giving a znode like + // number-startcode-number-otherstartcode-number-anotherstartcode-etc + String newCluster = cluster + "-" + znode; + String newClusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, newCluster); + String clusterPath = ZKUtil.joinZNode(nodePath, cluster); + List hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, clusterPath); + // That region server didn't have anything to replicate for this cluster + if (hlogs == null || hlogs.size() == 0) { + continue; + } + ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, newClusterZnode, + HConstants.EMPTY_BYTE_ARRAY); + SortedSet logQueue = new TreeSet(); + queues.put(newCluster, logQueue); + for (String hlog : hlogs) { + String z = ZKUtil.joinZNode(clusterPath, hlog); + byte[] positionBytes = ZKUtil.getData(this.zookeeper, z); + long position = 0; + try { + position = parseHLogPositionFrom(positionBytes); + } catch (DeserializationException e) { + LOG.warn("Failed parse of hlog position from the following znode: " + z); + } + LOG.debug("Creating " + hlog + " with data " + position); + String child = ZKUtil.joinZNode(newClusterZnode, hlog); + // Position doesn't actually change, we are just deserializing it for + // logging, so just use the already serialized version + ZKUtil.createAndWatch(this.zookeeper, child, positionBytes); + logQueue.add(hlog); + } + } + } catch (KeeperException e) { + this.abortable.abort("Copy queues from rs", e); + } + return queues; + } + + /** + * @param lockOwner + * @return Serialized protobuf of lockOwner with pb magic prefix prepended suitable + * for use as content of an replication lock during region server fail over. + */ + static byte[] lockToByteArray(final String lockOwner) { + byte[] bytes = + ZooKeeperProtos.ReplicationLock.newBuilder().setLockOwner(lockOwner).build().toByteArray(); + return ProtobufUtil.prependPBMagic(bytes); + } + + /** + * @param position + * @return Serialized protobuf of position with pb magic prefix prepended suitable + * for use as content of an hlog position in a replication queue. + */ + static byte[] toByteArray(final long position) { + byte[] bytes = + ZooKeeperProtos.ReplicationHLogPosition.newBuilder().setPosition(position).build() + .toByteArray(); + return ProtobufUtil.prependPBMagic(bytes); + } + + /** + * @param bytes - Content of a HLog position znode. + * @return long - The current HLog position. + * @throws DeserializationException + */ + private long parseHLogPositionFrom(final byte[] bytes) throws DeserializationException { + if (ProtobufUtil.isPBMagicPrefix(bytes)) { + int pblen = ProtobufUtil.lengthOfPBMagic(); + ZooKeeperProtos.ReplicationHLogPosition.Builder builder = + ZooKeeperProtos.ReplicationHLogPosition.newBuilder(); + ZooKeeperProtos.ReplicationHLogPosition position; + try { + position = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build(); + } catch (InvalidProtocolBufferException e) { + throw new DeserializationException(e); + } + return position.getPosition(); + } else { + if (bytes.length > 0) { + return Bytes.toLong(bytes); + } + return 0; + } + } +} \ No newline at end of file Index: hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZKStateBase.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZKStateBase.java (revision 0) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZKStateBase.java (revision 0) @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; + + +/** + * This is a base class for maintaining replication state in zookeeper. + */ +public abstract class ReplicationZKStateBase { + + /** + * The name of the znode that contains the replication status of a remote slave (i.e. peer) + * cluster. + */ + protected final String peerStateNodeName; + /** The name of the znode that contains the replication status of the local cluster. */ + protected final String stateZNode; + /** The name of the base znode that contains all replication state. */ + protected final String replicationZNode; + /** The name of the znode that contains a list of all remote slave (i.e. peer) clusters. */ + protected final String peersZNode; + /** The name of the znode that contains all replication queues */ + protected final String queuesZNode; + /** The cluster key of the local cluster */ + protected final String ourClusterKey; + protected final ZooKeeperWatcher zookeeper; + protected final Configuration conf; + + public ReplicationZKStateBase(ZooKeeperWatcher zookeeper, Configuration conf) { + this.zookeeper = zookeeper; + this.conf = conf; + + String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); + String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers"); + String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs"); + String stateZNodeName = conf.get("zookeeper.znode.replication.state", "state"); + this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state"); + this.ourClusterKey = ZKUtil.getZooKeeperClusterKey(this.conf); + this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName); + this.stateZNode = ZKUtil.joinZNode(replicationZNode, stateZNodeName); + this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName); + this.queuesZNode = ZKUtil.joinZNode(replicationZNode, queuesZNodeName); + } +} \ No newline at end of file Index: hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java (revision 1458068) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java (working copy) @@ -21,6 +21,7 @@ import com.google.protobuf.InvalidProtocolBufferException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -37,34 +38,39 @@ * ReplicationStateImpl is responsible for maintaining the replication state * znode. */ -public class ReplicationStateImpl implements ReplicationStateInterface { +public class ReplicationStateImpl extends ReplicationZKStateBase implements + ReplicationStateInterface { - private ReplicationStateTracker stateTracker; - private final String stateZnode; - private final ZooKeeperWatcher zookeeper; + private final ReplicationStateTracker stateTracker; private final Abortable abortable; private final AtomicBoolean replicating; private static final Log LOG = LogFactory.getLog(ReplicationStateImpl.class); - public ReplicationStateImpl(final ZooKeeperWatcher zk, final String stateZnode, + public ReplicationStateImpl(final ZooKeeperWatcher zk, final Configuration conf, final Abortable abortable, final AtomicBoolean replicating) { - this.zookeeper = zk; - this.stateZnode = stateZnode; + super(zk, conf); this.abortable = abortable; this.replicating = replicating; // Set a tracker on replicationStateNode - this.stateTracker = new ReplicationStateTracker(this.zookeeper, this.stateZnode, - this.abortable); + this.stateTracker = + new ReplicationStateTracker(this.zookeeper, this.stateZNode, this.abortable); stateTracker.start(); readReplicationStateZnode(); } + public ReplicationStateImpl(final ZooKeeperWatcher zk, final Configuration conf, + final Abortable abortable) { + this(zk, conf, abortable, new AtomicBoolean()); + } + + @Override public boolean getState() throws KeeperException { return getReplication(); } + @Override public void setState(boolean newState) throws KeeperException { setReplicating(newState); } @@ -110,10 +116,10 @@ * @param newState */ private void setReplicating(boolean newState) throws KeeperException { - ZKUtil.createWithParents(this.zookeeper, this.stateZnode); + ZKUtil.createWithParents(this.zookeeper, this.stateZNode); byte[] stateBytes = (newState == true) ? ReplicationZookeeper.ENABLED_ZNODE_BYTES : ReplicationZookeeper.DISABLED_ZNODE_BYTES; - ZKUtil.setData(this.zookeeper, this.stateZnode, stateBytes); + ZKUtil.setData(this.zookeeper, this.stateZNode, stateBytes); } /** @@ -143,7 +149,7 @@ this.replicating.set(getReplication()); LOG.info("Replication is now " + (this.replicating.get() ? "started" : "stopped")); } catch (KeeperException e) { - this.abortable.abort("Failed getting data on from " + this.stateZnode, e); + this.abortable.abort("Failed getting data on from " + this.stateZNode, e); } } Index: hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (revision 1458068) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (working copy) @@ -24,7 +24,6 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.exceptions.DeserializationException; @@ -33,7 +32,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; @@ -51,7 +49,6 @@ import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeMap; -import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; @@ -88,8 +85,6 @@ public class ReplicationZookeeper implements Closeable { private static final Log LOG = LogFactory.getLog(ReplicationZookeeper.class); - // Name of znode we use to lock when failover - private final static String RS_LOCK_ZNODE = "lock"; // Our handle on zookeeper private final ZooKeeperWatcher zookeeper; @@ -114,6 +109,7 @@ // Abortable private Abortable abortable; private final ReplicationStateInterface replicationState; + private final ReplicationQueues replicationQueues; /** * ZNode content if enabled state. @@ -139,8 +135,10 @@ this.conf = conf; this.zookeeper = zk; setZNodes(abortable); - this.replicationState = - new ReplicationStateImpl(this.zookeeper, getRepStateNode(), abortable, new AtomicBoolean()); + this.replicationState = new ReplicationStateImpl(this.zookeeper, conf, abortable); + // TODO This interface is no longer used by anyone using this constructor. When this class goes + // away, we will no longer have this null initialization business + this.replicationQueues = null; } /** @@ -149,7 +147,7 @@ * @param server * @param replicating atomic boolean to start/stop replication * @throws IOException - * @throws KeeperException + * @throws KeeperException */ public ReplicationZookeeper(final Server server, final AtomicBoolean replicating) throws IOException, KeeperException { @@ -158,13 +156,14 @@ this.conf = server.getConfiguration(); setZNodes(server); - this.replicationState = - new ReplicationStateImpl(this.zookeeper, getRepStateNode(), server, replicating); + this.replicationState = new ReplicationStateImpl(this.zookeeper, conf, server, replicating); this.peerClusters = new HashMap(); ZKUtil.createWithParents(this.zookeeper, ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName)); this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, server.getServerName().toString()); ZKUtil.createWithParents(this.zookeeper, this.rsServerNameZnode); + this.replicationQueues = new ReplicationQueuesImpl(this.zookeeper, this.conf, server); + this.replicationQueues.init(); connectExistingPeers(); } @@ -433,32 +432,6 @@ } /** - * @param position - * @return Serialized protobuf of position with pb magic prefix - * prepended suitable for use as content of an hlog position in a - * replication queue. - */ - static byte[] toByteArray( - final long position) { - byte[] bytes = ZooKeeperProtos.ReplicationHLogPosition.newBuilder().setPosition(position) - .build().toByteArray(); - return ProtobufUtil.prependPBMagic(bytes); - } - - /** - * @param lockOwner - * @return Serialized protobuf of lockOwner with pb magic prefix - * prepended suitable for use as content of an replication lock during - * region server fail over. - */ - static byte[] lockToByteArray( - final String lockOwner) { - byte[] bytes = ZooKeeperProtos.ReplicationLock.newBuilder().setLockOwner(lockOwner).build() - .toByteArray(); - return ProtobufUtil.prependPBMagic(bytes); - } - - /** * @param bytes Content of a peer znode. * @return ClusterKey parsed from the passed bytes. * @throws DeserializationException @@ -503,58 +476,6 @@ } } - /** - * @param bytes - Content of a HLog position znode. - * @return long - The current HLog position. - * @throws DeserializationException - */ - static long parseHLogPositionFrom( - final byte[] bytes) throws DeserializationException { - if (ProtobufUtil.isPBMagicPrefix(bytes)) { - int pblen = ProtobufUtil.lengthOfPBMagic(); - ZooKeeperProtos.ReplicationHLogPosition.Builder builder = ZooKeeperProtos.ReplicationHLogPosition - .newBuilder(); - ZooKeeperProtos.ReplicationHLogPosition position; - try { - position = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build(); - } catch (InvalidProtocolBufferException e) { - throw new DeserializationException(e); - } - return position.getPosition(); - } else { - if (bytes.length > 0) { - return Bytes.toLong(bytes); - } - return 0; - } - } - - /** - * @param bytes - Content of a lock znode. - * @return String - The owner of the lock. - * @throws DeserializationException - */ - static String parseLockOwnerFrom( - final byte[] bytes) throws DeserializationException { - if (ProtobufUtil.isPBMagicPrefix(bytes)) { - int pblen = ProtobufUtil.lengthOfPBMagic(); - ZooKeeperProtos.ReplicationLock.Builder builder = ZooKeeperProtos.ReplicationLock - .newBuilder(); - ZooKeeperProtos.ReplicationLock lock; - try { - lock = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build(); - } catch (InvalidProtocolBufferException e) { - throw new DeserializationException(e); - } - return lock.getLockOwner(); - } else { - if (bytes.length > 0) { - return Bytes.toString(bytes); - } - return ""; - } - } - private boolean peerExists(String id) throws KeeperException { return ZKUtil.checkExists(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id)) >= 0; @@ -624,10 +545,6 @@ return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName)); } - private String getRepStateNode() { - return ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName); - } - /** * Get the replication status of this cluster. If the state znode doesn't exist it will also * create it and set it true. @@ -652,11 +569,8 @@ * @param filename name of the hlog's znode * @param peerId name of the cluster's znode */ - public void addLogToList(String filename, String peerId) - throws KeeperException { - String znode = ZKUtil.joinZNode(this.rsServerNameZnode, peerId); - znode = ZKUtil.joinZNode(znode, filename); - ZKUtil.createWithParents(this.zookeeper, znode); + public void addLogToList(String filename, String peerId) throws KeeperException { + this.replicationQueues.addLog(peerId, filename); } /** @@ -665,13 +579,7 @@ * @param clusterId name of the cluster's znode */ public void removeLogFromList(String filename, String clusterId) { - try { - String znode = ZKUtil.joinZNode(rsServerNameZnode, clusterId); - znode = ZKUtil.joinZNode(znode, filename); - ZKUtil.deleteNode(this.zookeeper, znode); - } catch (KeeperException e) { - this.abortable.abort("Failed remove from list", e); - } + this.replicationQueues.removeLog(clusterId, filename); } /** @@ -679,18 +587,9 @@ * @param filename filename name of the hlog's znode * @param clusterId clusterId name of the cluster's znode * @param position the position in the file - * @throws IOException */ - public void writeReplicationStatus(String filename, String clusterId, - long position) { - try { - String znode = ZKUtil.joinZNode(this.rsServerNameZnode, clusterId); - znode = ZKUtil.joinZNode(znode, filename); - // Why serialize String of Long and note Long as bytes? - ZKUtil.setData(this.zookeeper, znode, toByteArray(position)); - } catch (KeeperException e) { - this.abortable.abort("Writing replication status", e); - } + public void writeReplicationStatus(String filename, String clusterId, long position) { + this.replicationQueues.setLogPosition(clusterId, filename, position); } /** @@ -709,265 +608,34 @@ return result; } - /** - * Get the list of the replicators that have queues, they can be alive, dead - * or simply from a previous run - * @return a list of server names - */ - public List getListOfReplicators() { - List result = null; - try { - result = ZKUtil.listChildrenNoWatch(this.zookeeper, rsZNode); - } catch (KeeperException e) { - this.abortable.abort("Get list of replicators", e); - } - return result; - } /** - * Get the list of peer clusters for the specified server names - * @param rs server names of the rs - * @return a list of peer cluster + * Take ownership for the set of queues belonging to a dead region server. + * @param regionserver the id of the dead region server + * @return A SortedMap of the queues that have been claimed, including a SortedSet of HLogs in + * each queue. Returns null if no queues were failed-over. */ - public List getListPeersForRS(String rs) { - String znode = ZKUtil.joinZNode(rsZNode, rs); - List result = null; - try { - result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode); - } catch (KeeperException e) { - this.abortable.abort("Get list of peers for rs", e); - } - return result; + public SortedMap> claimQueues(String regionserver) { + return this.replicationQueues.claimQueues(regionserver); } /** - * Get the list of hlogs for the specified region server and peer cluster - * @param rs server names of the rs - * @param id peer cluster - * @return a list of hlogs - */ - public List getListHLogsForPeerForRS(String rs, String id) { - String znode = ZKUtil.joinZNode(rsZNode, rs); - znode = ZKUtil.joinZNode(znode, id); - List result = null; - try { - result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode); - } catch (KeeperException e) { - this.abortable.abort("Get list of hlogs for peer", e); - } - return result; - } - - /** - * Try to set a lock in another server's znode. - * @param znode the server names of the other server - * @return true if the lock was acquired, false in every other cases - */ - public boolean lockOtherRS(String znode) { - try { - String parent = ZKUtil.joinZNode(this.rsZNode, znode); - if (parent.equals(rsServerNameZnode)) { - LOG.warn("Won't lock because this is us, we're dead!"); - return false; - } - String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE); - ZKUtil.createAndWatch(this.zookeeper, p, lockToByteArray(rsServerNameZnode)); - } catch (KeeperException e) { - // This exception will pop up if the znode under which we're trying to - // create the lock is already deleted by another region server, meaning - // that the transfer already occurred. - // NoNode => transfer is done and znodes are already deleted - // NodeExists => lock znode already created by another RS - if (e instanceof KeeperException.NoNodeException || - e instanceof KeeperException.NodeExistsException) { - LOG.info("Won't transfer the queue," + - " another RS took care of it because of: " + e.getMessage()); - } else { - LOG.info("Failed lock other rs", e); - } - return false; - } - return true; - } - - /** - * It "atomically" copies all the hlogs queues from another region server and returns them all - * sorted per peer cluster (appended with the dead server's znode). - * @param znode - * @return HLog queues sorted per peer cluster - */ - public SortedMap> copyQueuesFromRSUsingMulti(String znode) { - SortedMap> queues = new TreeMap>(); - String deadRSZnodePath = ZKUtil.joinZNode(rsZNode, znode);// hbase/replication/rs/deadrs - List peerIdsToProcess = null; - List listOfOps = new ArrayList(); - try { - peerIdsToProcess = ZKUtil.listChildrenNoWatch(this.zookeeper, deadRSZnodePath); - if (peerIdsToProcess == null) return queues; // node already processed - for (String peerId : peerIdsToProcess) { - String newPeerId = peerId + "-" + znode; - String newPeerZnode = ZKUtil.joinZNode(this.rsServerNameZnode, newPeerId); - // check the logs queue for the old peer cluster - String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId); - List hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode); - if (hlogs == null || hlogs.size() == 0) continue; // empty log queue. - // create the new cluster znode - SortedSet logQueue = new TreeSet(); - queues.put(newPeerId, logQueue); - ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY); - listOfOps.add(op); - // get the offset of the logs and set it to new znodes - for (String hlog : hlogs) { - String oldHlogZnode = ZKUtil.joinZNode(oldClusterZnode, hlog); - byte[] logOffset = ZKUtil.getData(this.zookeeper, oldHlogZnode); - LOG.debug("Creating " + hlog + " with data " + Bytes.toString(logOffset)); - String newLogZnode = ZKUtil.joinZNode(newPeerZnode, hlog); - listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset)); - // add ops for deleting - listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldHlogZnode)); - logQueue.add(hlog); - } - // add delete op for peer - listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode)); - } - // add delete op for dead rs - listOfOps.add(ZKUtilOp.deleteNodeFailSilent(deadRSZnodePath)); - LOG.debug(" The multi list size is: " + listOfOps.size()); - ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false); - LOG.info("Atomically moved the dead regionserver logs. "); - } catch (KeeperException e) { - // Multi call failed; it looks like some other regionserver took away the logs. - LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e); - queues.clear(); - } - return queues; - } - - /** - * This methods copies all the hlogs queues from another region server - * and returns them all sorted per peer cluster (appended with the dead - * server's znode) - * @param znode server names to copy - * @return all hlogs for all peers of that cluster, null if an error occurred - */ - public SortedMap> copyQueuesFromRS(String znode) { - // TODO this method isn't atomic enough, we could start copying and then - // TODO fail for some reason and we would end up with znodes we don't want. - SortedMap> queues = - new TreeMap>(); - try { - String nodePath = ZKUtil.joinZNode(rsZNode, znode); - List clusters = - ZKUtil.listChildrenNoWatch(this.zookeeper, nodePath); - // We have a lock znode in there, it will count as one. - if (clusters == null || clusters.size() <= 1) { - return queues; - } - // The lock isn't a peer cluster, remove it - clusters.remove(RS_LOCK_ZNODE); - for (String cluster : clusters) { - // We add the name of the recovered RS to the new znode, we can even - // do that for queues that were recovered 10 times giving a znode like - // number-startcode-number-otherstartcode-number-anotherstartcode-etc - String newCluster = cluster+"-"+znode; - String newClusterZnode = ZKUtil.joinZNode(rsServerNameZnode, newCluster); - String clusterPath = ZKUtil.joinZNode(nodePath, cluster); - List hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, clusterPath); - // That region server didn't have anything to replicate for this cluster - if (hlogs == null || hlogs.size() == 0) { - continue; - } - ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, newClusterZnode, - HConstants.EMPTY_BYTE_ARRAY); - SortedSet logQueue = new TreeSet(); - queues.put(newCluster, logQueue); - for (String hlog : hlogs) { - String z = ZKUtil.joinZNode(clusterPath, hlog); - byte[] positionBytes = ZKUtil.getData(this.zookeeper, z); - long position = 0; - try { - position = parseHLogPositionFrom(positionBytes); - } catch (DeserializationException e) { - LOG.warn("Failed parse of hlog position from the following znode: " + z); - } - LOG.debug("Creating " + hlog + " with data " + position); - String child = ZKUtil.joinZNode(newClusterZnode, hlog); - // Position doesn't actually change, we are just deserializing it for - // logging, so just use the already serialized version - ZKUtil.createAndWatch(this.zookeeper, child, positionBytes); - logQueue.add(hlog); - } - } - } catch (KeeperException e) { - this.abortable.abort("Copy queues from rs", e); - } - return queues; - } - - /** * Delete a complete queue of hlogs * @param peerZnode znode of the peer cluster queue of hlogs to delete */ public void deleteSource(String peerZnode, boolean closeConnection) { - try { - ZKUtil.deleteNodeRecursively(this.zookeeper, - ZKUtil.joinZNode(rsServerNameZnode, peerZnode)); - if (closeConnection) { - this.peerClusters.get(peerZnode).getZkw().close(); - this.peerClusters.remove(peerZnode); - } - } catch (KeeperException e) { - this.abortable.abort("Failed delete of " + peerZnode, e); + this.replicationQueues.removeQueue(peerZnode); + if (closeConnection) { + this.peerClusters.get(peerZnode).getZkw().close(); + this.peerClusters.remove(peerZnode); } } /** - * Recursive deletion of all znodes in specified rs' znode - * @param znode - */ - public void deleteRsQueues(String znode) { - String fullpath = ZKUtil.joinZNode(rsZNode, znode); - try { - List clusters = - ZKUtil.listChildrenNoWatch(this.zookeeper, fullpath); - for (String cluster : clusters) { - // We'll delete it later - if (cluster.equals(RS_LOCK_ZNODE)) { - continue; - } - String fullClusterPath = ZKUtil.joinZNode(fullpath, cluster); - ZKUtil.deleteNodeRecursively(this.zookeeper, fullClusterPath); - } - // Finish cleaning up - ZKUtil.deleteNodeRecursively(this.zookeeper, fullpath); - } catch (KeeperException e) { - if (e instanceof KeeperException.NoNodeException || - e instanceof KeeperException.NotEmptyException) { - // Testing a special case where another region server was able to - // create a lock just after we deleted it, but then was also able to - // delete the RS znode before us or its lock znode is still there. - if (e.getPath().equals(fullpath)) { - return; - } - } - this.abortable.abort("Failed delete of " + znode, e); - } - } - - /** * Delete this cluster's queues */ public void deleteOwnRSZNode() { - try { - ZKUtil.deleteNodeRecursively(this.zookeeper, - this.rsServerNameZnode); - } catch (KeeperException e) { - // if the znode is already expired, don't bother going further - if (e instanceof KeeperException.SessionExpiredException) { - return; - } - this.abortable.abort("Failed delete of " + this.rsServerNameZnode, e); - } + this.replicationQueues.removeAllQueues(); } /** @@ -975,22 +643,10 @@ * @param peerId znode of the peer cluster * @param hlog name of the hlog * @return the position in that hlog - * @throws KeeperException + * @throws KeeperException */ - public long getHLogRepPosition(String peerId, String hlog) - throws KeeperException { - String clusterZnode = ZKUtil.joinZNode(rsServerNameZnode, peerId); - String znode = ZKUtil.joinZNode(clusterZnode, hlog); - byte[] bytes = ZKUtil.getData(this.zookeeper, znode); - try { - return parseHLogPositionFrom(bytes); - } catch (DeserializationException de) { - LOG.warn("Failed parse of HLogPosition for peerId=" + peerId + " and hlog=" + hlog - + "znode content, continuing."); - } - // if we can not parse the position, start at the beginning of the hlog file - // again - return 0; + public long getHLogRepPosition(String peerId, String hlog) throws KeeperException { + return this.replicationQueues.getLogPosition(peerId, hlog); } /** @@ -1048,7 +704,7 @@ public Map getPeerClusters() { return this.peerClusters; } - + /** * Determine if a ZK path points to a peer node. * @param path path to be checked