Index: src/main/java/org/apache/hadoop/hbase/master/LogCleaner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/LogCleaner.java (revision 1045423) +++ src/main/java/org/apache/hadoop/hbase/master/LogCleaner.java (working copy) @@ -130,6 +130,11 @@ Path filePath = file.getPath(); if (HLog.validateHLogFilename(filePath.getName())) { for (LogCleanerDelegate logCleaner : logCleanersChain) { + if (logCleaner.isStopped()) { + LOG.warn("A log cleaner is stopped, won't delete any log."); + return; + } + if (!logCleaner.isLogDeletable(filePath) ) { // this log is not deletable, continue to process next log file continue FILE; Index: src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java (revision 1045423) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java (working copy) @@ -27,12 +27,14 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; @@ -106,9 +108,9 @@ endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime); } try { - ReplicationZookeeper zk = new ReplicationZookeeper(conf, - HConnectionManager.getConnection(conf). - getZooKeeperWatcher()); + HConnection conn = HConnectionManager.getConnection(conf); + ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf, + conn.getZooKeeperWatcher()); ReplicationPeer peer = zk.getPeer(conf.get(NAME+".peerId")); HTable replicatedTable = new HTable(peer.getConfiguration(), conf.get(NAME+".tableName")); @@ -150,8 +152,9 @@ throw new IOException("Replication needs to be enabled to verify it."); } try { - ReplicationZookeeper zk = new ReplicationZookeeper(conf, - HConnectionManager.getConnection(conf).getZooKeeperWatcher()); + HConnection conn = HConnectionManager.getConnection(conf); + ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf, + conn.getZooKeeperWatcher()); // Just verifying it we can connect ReplicationPeer peer = zk.getPeer(peerId); if (peer == null) { Index: src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java (revision 1045423) +++ src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java (working copy) @@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.master.LogCleanerDelegate; @@ -39,7 +40,7 @@ * Implementation of a log cleaner that checks if a log is still scheduled for * replication before deleting it when its TTL is over. */ -public class ReplicationLogCleaner implements LogCleanerDelegate { +public class ReplicationLogCleaner implements LogCleanerDelegate, Abortable { private static final Log LOG = LogFactory.getLog(ReplicationLogCleaner.class); private Configuration conf; private ReplicationZookeeper zkHelper; @@ -53,6 +54,16 @@ @Override public boolean isLogDeletable(Path filePath) { + + try { + if (!zkHelper.getReplication()) { + return false; + } + } catch (KeeperException e) { + abort("Cannot get the state of replication", e); + return false; + } + // all members of this class are null if replication is disabled, and we // return true since false would render the LogsCleaner useless if (this.conf == null) { @@ -121,7 +132,7 @@ try { ZooKeeperWatcher zkw = new ZooKeeperWatcher(this.conf, "replicationLogCleaner", null); - this.zkHelper = new ReplicationZookeeper(this.conf, zkw); + this.zkHelper = new ReplicationZookeeper(this, this.conf, zkw); } catch (KeeperException e) { LOG.error("Error while configuring " + this.getClass().getName(), e); } catch (IOException e) { @@ -150,4 +161,10 @@ public boolean isStopped() { return this.stopped; } + + @Override + public void abort(String why, Throwable e) { + LOG.warn("Aborting ReplicationLogCleaner because " + why, e); + stop(why); + } } \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (revision 1045423) +++ src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (working copy) @@ -100,6 +100,7 @@ private String ourClusterKey; // Abortable private Abortable abortable; + private ReplicationStatusTracker statusTracker; /** * Constructor used by clients of replication (like master and HBase clients) @@ -107,12 +108,14 @@ * @param zk zk connection to use * @throws IOException */ - public ReplicationZookeeper(final Configuration conf, final ZooKeeperWatcher zk) + public ReplicationZookeeper(final Abortable abortable, final Configuration conf, + final ZooKeeperWatcher zk) throws KeeperException { this.conf = conf; this.zookeeper = zk; - setZNodes(); + this.replicating = new AtomicBoolean(); + setZNodes(abortable); } /** @@ -128,23 +131,18 @@ this.abortable = server; this.zookeeper = server.getZooKeeper(); this.conf = server.getConfiguration(); - setZNodes(); + this.replicating = replicating; + setZNodes(server); this.peerClusters = new HashMap(); - this.replicating = replicating; ZKUtil.createWithParents(this.zookeeper, ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName)); - readReplicationStateZnode(); this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, server.getServerName()); ZKUtil.createWithParents(this.zookeeper, this.rsServerNameZnode); - // Set a tracker on replicationStateNodeNode - ReplicationStatusTracker tracker = - new ReplicationStatusTracker(this.zookeeper, server); - tracker.start(); connectExistingPeers(); } - private void setZNodes() throws KeeperException { + private void setZNodes(Abortable abortable) throws KeeperException { String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); String peersZNodeName = @@ -170,6 +168,11 @@ String idResult = Bytes.toString(data); this.clusterId = idResult == null? Byte.toString(HConstants.DEFAULT_CLUSTER_ID): idResult; + // Set a tracker on replicationStateNodeNode + this.statusTracker = + new ReplicationStatusTracker(this.zookeeper, abortable); + statusTracker.start(); + readReplicationStateZnode(); } private void connectExistingPeers() throws IOException, KeeperException { @@ -292,16 +295,12 @@ * Set the new replication state for this cluster * @param newState */ - public void setReplicating(boolean newState) throws IOException { - try { - ZKUtil.createWithParents(this.zookeeper, + public void setReplicating(boolean newState) throws KeeperException { + ZKUtil.createWithParents(this.zookeeper, ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName)); - ZKUtil.setData(this.zookeeper, - ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName), - Bytes.toBytes(Boolean.toString(newState))); - } catch (KeeperException e) { - throw new IOException("Unable to set the replication state", e); - } + ZKUtil.setData(this.zookeeper, + ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName), + Bytes.toBytes(Boolean.toString(newState))); } /** @@ -368,8 +367,18 @@ } } + /** + * Get the replication status of this cluster. If the state znode doesn't + * exist it will also create it and set it true. + * @return returns true when it's enabled, else false + * @throws KeeperException + */ public boolean getReplication() throws KeeperException { - byte [] data = ZKUtil.getDataAndWatch(this.zookeeper, getRepStateNode()); + byte [] data = this.statusTracker.getData(); + if (data == null || data.length == 0) { + setReplicating(true); + return true; + } return Boolean.parseBoolean(Bytes.toString(data)); } @@ -681,8 +690,10 @@ @Override public synchronized void nodeDataChanged(String path) { - super.nodeDataChanged(path); - readReplicationStateZnode(); + if (path.equals(node)) { + super.nodeDataChanged(path); + readReplicationStateZnode(); + } } } } \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (revision 1045423) +++ src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (working copy) @@ -24,6 +24,7 @@ import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -65,6 +66,7 @@ public class ReplicationAdmin { private final ReplicationZookeeper replicationZk; + private final HConnection connection; /** * Constructor that creates a connection to the local ZooKeeper ensemble. @@ -77,10 +79,10 @@ throw new RuntimeException("hbase.replication isn't true, please " + "enable it in order to use replication"); } - ZooKeeperWatcher zkw = HConnectionManager.getConnection(conf). - getZooKeeperWatcher(); + this.connection = HConnectionManager.getConnection(conf); + ZooKeeperWatcher zkw = this.connection.getZooKeeperWatcher(); try { - this.replicationZk = new ReplicationZookeeper(conf, zkw); + this.replicationZk = new ReplicationZookeeper(this.connection, conf, zkw); } catch (KeeperException e) { throw new IOException("Unable setup the ZooKeeper connection", e); } @@ -150,8 +152,13 @@ * @return the previous state */ public boolean setReplicating(boolean newState) throws IOException { - boolean prev = getReplicating(); - this.replicationZk.setReplicating(newState); + boolean prev = true; + try { + prev = getReplicating(); + this.replicationZk.setReplicating(newState); + } catch (KeeperException e) { + throw new IOException("Unable to set the replication state", e); + } return prev; }