Index: hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java (revision 1501028) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java (working copy) @@ -69,7 +69,7 @@ Replication.decorateMasterConfiguration(conf); Server server = new DummyServer(); ReplicationZookeeper zkHelper = - new ReplicationZookeeper(server, new AtomicBoolean(true)); + new ReplicationZookeeper(server); Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java (revision 1501028) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java (working copy) @@ -69,10 +69,6 @@ Configuration testConf = new Configuration(conf); testConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, baseZKNode); ZooKeeperWatcher zkw1 = new ZooKeeperWatcher(testConf, "test1", null); - ReplicationStateInterface rsi = new ReplicationStateImpl(zkw1, testConf, zkw1); - rsi.init(); - rsi.setState(true); - rsi.close(); String fakeRs = ZKUtil.joinZNode(zkw1.rsZNode, "hostname1.example.org:1234"); ZKUtil.createWithParents(zkw1, fakeRs); ZKClusterId.setClusterId(zkw1, new ClusterId()); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java (revision 1501028) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java (working copy) @@ -111,7 +111,6 @@ zkw2 = new ZooKeeperWatcher(conf2, "cluster2", null, true); admin.addPeer("2", utility2.getClusterKey()); - setIsReplication(true); LOG.info("Setup second Zk"); CONF_WITH_LOCALFS = HBaseConfiguration.create(conf1); @@ -134,12 +133,6 @@ htable2 = new HTable(conf2, tableName); } - protected static void setIsReplication(boolean rep) throws Exception { - LOG.info("Set rep " + rep); - admin.setReplicating(rep); - Thread.sleep(SLEEP_TIME); - } - /** * @throws java.lang.Exception */ Index: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java (revision 1501028) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java (working copy) @@ -256,75 +256,6 @@ } /** - * Test stopping replication, trying to insert, make sure nothing's - * replicated, enable it, try replicating and it should work - * @throws Exception - */ - @Test(timeout=300000) - public void testStartStop() throws Exception { - - // Test stopping replication - setIsReplication(false); - - Put put = new Put(Bytes.toBytes("stop start")); - put.add(famName, row, row); - htable1.put(put); - - Get get = new Get(Bytes.toBytes("stop start")); - for (int i = 0; i < NB_RETRIES; i++) { - if (i==NB_RETRIES-1) { - break; - } - Result res = htable2.get(get); - if(res.size() >= 1) { - fail("Replication wasn't stopped"); - - } else { - LOG.info("Row not replicated, let's wait a bit more..."); - Thread.sleep(SLEEP_TIME); - } - } - - // Test restart replication - setIsReplication(true); - - htable1.put(put); - - for (int i = 0; i < NB_RETRIES; i++) { - if (i==NB_RETRIES-1) { - fail("Waited too much time for put replication"); - } - Result res = htable2.get(get); - if(res.size() == 0) { - LOG.info("Row not available"); - Thread.sleep(SLEEP_TIME); - } else { - assertArrayEquals(res.value(), row); - break; - } - } - - put = new Put(Bytes.toBytes("do not rep")); - put.add(noRepfamName, row, row); - htable1.put(put); - - get = new Get(Bytes.toBytes("do not rep")); - for (int i = 0; i < NB_RETRIES; i++) { - if (i == NB_RETRIES-1) { - break; - } - Result res = htable2.get(get); - if (res.size() >= 1) { - fail("Not supposed to be replicated"); - } else { - LOG.info("Row not replicated, let's wait a bit more..."); - Thread.sleep(SLEEP_TIME); - } - } - - } - - /** * Test disable/enable replication, trying to insert, make sure nothing's * replicated, enable it, the insert should be replicated * Index: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java (revision 1501028) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java (working copy) @@ -38,10 +38,8 @@ Path currentPath; @Override - public void init(Configuration conf, FileSystem fs, - ReplicationSourceManager manager, Stoppable stopper, - AtomicBoolean replicating, String peerClusterId) - throws IOException { + public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, + Stoppable stopper, String peerClusterId) throws IOException { this.manager = manager; this.peerClusterId = peerClusterId; } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (revision 1501028) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (working copy) @@ -227,8 +227,7 @@ LOG.debug("testNodeFailoverWorkerCopyQueuesFromRSUsingMulti"); conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); final Server server = new DummyServer("hostname0.example.org"); - AtomicBoolean replicating = new AtomicBoolean(true); - ReplicationZookeeper rz = new ReplicationZookeeper(server, replicating); + ReplicationZookeeper rz = new ReplicationZookeeper(server); // populate some znodes in the peer znode files.add("log1"); files.add("log2"); @@ -270,8 +269,7 @@ LOG.debug("testNodeFailoverDeadServerParsing"); conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true); final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com"); - AtomicBoolean replicating = new AtomicBoolean(true); - ReplicationZookeeper rz = new ReplicationZookeeper(server, replicating); + ReplicationZookeeper rz = new ReplicationZookeeper(server); // populate some znodes in the peer znode files.add("log1"); files.add("log2"); @@ -285,14 +283,14 @@ Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com"); // simulate three servers fail sequentially - ReplicationZookeeper rz1 = new ReplicationZookeeper(s1, new AtomicBoolean(true)); + ReplicationZookeeper rz1 = new ReplicationZookeeper(s1); SortedMap> testMap = rz1.claimQueues(server.getServerName().getServerName()); rz1.close(); - ReplicationZookeeper rz2 = new ReplicationZookeeper(s2, new AtomicBoolean(true)); + ReplicationZookeeper rz2 = new ReplicationZookeeper(s2); testMap = rz2.claimQueues(s1.getServerName().getServerName()); rz2.close(); - ReplicationZookeeper rz3 = new ReplicationZookeeper(s3, new AtomicBoolean(true)); + ReplicationZookeeper rz3 = new ReplicationZookeeper(s3); testMap = rz3.claimQueues(s2.getServerName().getServerName()); rz3.close(); @@ -319,7 +317,7 @@ public DummyNodeFailoverWorker(String znode, Server s) throws Exception { this.deadRsZnode = znode; this.server = s; - rz = new ReplicationZookeeper(server, new AtomicBoolean(true)); + rz = new ReplicationZookeeper(server); } @Override Index: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationZookeeper.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationZookeeper.java (revision 1501028) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationZookeeper.java (working copy) @@ -62,7 +62,7 @@ conf = utility.getConfiguration(); zkw = HBaseTestingUtility.getZooKeeperWatcher(utility); DummyServer server = new DummyServer(); - repZk = new ReplicationZookeeper(server, new AtomicBoolean()); + repZk = new ReplicationZookeeper(server); slaveClusterKey = conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get("hbase.zookeeper.property.clientPort") + ":/1"; String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (revision 1501028) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (working copy) @@ -69,8 +69,6 @@ private final List sources; // List of all the sources we got from died RSs private final List oldsources; - // Indicates if we are currently replicating - private final AtomicBoolean replicating; // Helper for zookeeper private final ReplicationZookeeper zkHelper; private final ReplicationQueues replicationQueues; @@ -103,16 +101,14 @@ * @param conf the configuration to use * @param stopper the stopper object for this region server * @param fs the file system to use - * @param replicating the status of the replication on this cluster * @param logDir the directory that contains all hlog directories of live RSs * @param oldLogDir the directory where old logs are archived */ public ReplicationSourceManager(final ReplicationZookeeper zkHelper, final ReplicationQueues replicationQueues, final Configuration conf, final Stoppable stopper, - final FileSystem fs, final AtomicBoolean replicating, final Path logDir, + final FileSystem fs,final Path logDir, final Path oldLogDir) { this.sources = new ArrayList(); - this.replicating = replicating; this.zkHelper = zkHelper; this.replicationQueues = replicationQueues; this.stopper = stopper; @@ -207,7 +203,7 @@ */ public ReplicationSourceInterface addSource(String id) throws IOException { ReplicationSourceInterface src = - getReplicationSource(this.conf, this.fs, this, stopper, replicating, id); + getReplicationSource(this.conf, this.fs, this, stopper, id); synchronized (this.hlogsById) { this.sources.add(src); this.hlogsById.put(id, new TreeSet()); @@ -260,11 +256,6 @@ } void preLogRoll(Path newLog) throws IOException { - if (!this.replicating.get()) { - LOG.warn("Replication stopped, won't add new log"); - return; - } - synchronized (this.hlogsById) { String name = newLog.getName(); for (ReplicationSourceInterface source : this.sources) { @@ -288,11 +279,6 @@ } void postLogRoll(Path newLog) throws IOException { - if (!this.replicating.get()) { - LOG.warn("Replication stopped, won't add new log"); - return; - } - // This only updates the sources we own, not the recovered ones for (ReplicationSourceInterface source : this.sources) { source.enqueueLog(newLog); @@ -313,7 +299,6 @@ * @param fs the file system to use * @param manager the manager to use * @param stopper the stopper object for this region server - * @param replicating the status of the replication on this cluster * @param peerId the id of the peer cluster * @return the created source * @throws IOException @@ -323,7 +308,6 @@ final FileSystem fs, final ReplicationSourceManager manager, final Stoppable stopper, - final AtomicBoolean replicating, final String peerId) throws IOException { ReplicationSourceInterface src; try { @@ -337,7 +321,7 @@ src = new ReplicationSource(); } - src.init(conf, fs, manager, stopper, replicating, peerId); + src.init(conf, fs, manager, stopper, peerId); return src; } @@ -600,7 +584,7 @@ String peerId = entry.getKey(); try { ReplicationSourceInterface src = getReplicationSource(conf, - fs, ReplicationSourceManager.this, stopper, replicating, peerId); + fs, ReplicationSourceManager.this, stopper, peerId); if (!zkHelper.getPeerClusters().contains((src.getPeerClusterId()))) { src.terminate("Recovered queue doesn't belong to any current peer"); break; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (revision 1501028) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (working copy) @@ -66,7 +66,6 @@ LogFactory.getLog(Replication.class); private boolean replication; private ReplicationSourceManager replicationManager; - private final AtomicBoolean replicating = new AtomicBoolean(true); private ReplicationZookeeper zkHelper; private ReplicationQueues replicationQueues; private Configuration conf; @@ -108,17 +107,16 @@ .build()); if (replication) { try { - this.zkHelper = new ReplicationZookeeper(server, this.replicating); + this.zkHelper = new ReplicationZookeeper(server); this.replicationQueues = new ReplicationQueuesZKImpl(server.getZooKeeper(), this.conf, this.server); this.replicationQueues.init(this.server.getServerName().toString()); } catch (KeeperException ke) { - throw new IOException("Failed replication handler create " + - "(replicating=" + this.replicating, ke); + throw new IOException("Failed replication handler create", ke); } this.replicationManager = new ReplicationSourceManager(zkHelper, replicationQueues, conf, this.server, fs, - this.replicating, logDir, oldLogDir); + logDir, oldLogDir); this.statsThreadPeriod = this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60); LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (revision 1501028) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (working copy) @@ -93,8 +93,6 @@ // ratio of region servers to chose from a slave cluster private float ratio; private Random random; - // should we replicate or not? - private AtomicBoolean replicating; private ReplicationQueueInfo replicationQueueInfo; // id of the peer cluster this source replicates to private String peerId; @@ -149,7 +147,6 @@ * @param fs file system to use * @param manager replication manager to ping to * @param stopper the atomic boolean to use to stop the regionserver - * @param replicating the atomic boolean that starts/stops replication * @param peerClusterZnode the name of our znode * @throws IOException */ @@ -157,7 +154,6 @@ final FileSystem fs, final ReplicationSourceManager manager, final Stoppable stopper, - final AtomicBoolean replicating, final String peerClusterZnode) throws IOException { this.stopper = stopper; @@ -185,7 +181,6 @@ this.ratio = this.conf.getFloat("replication.source.ratio", 0.1f); this.currentPeers = new ArrayList(); this.random = new Random(); - this.replicating = replicating; this.manager = manager; this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); @@ -419,7 +414,7 @@ // containing anything to replicate and if we're currently not set to replicate if (!(Bytes.equals(logKey.getTablename(), HConstants.ROOT_TABLE_NAME) || Bytes.equals(logKey.getTablename(), HConstants.META_TABLE_NAME)) && - edit.size() != 0 && replicating.get()) { + edit.size() != 0) { // Only set the clusterId if is a local key. // This ensures that the originator sets the cluster id // and all replicas retain the initial cluster id. @@ -714,8 +709,7 @@ * @return true if the peer is enabled, otherwise false */ protected boolean isPeerEnabled() { - return this.replicating.get() && - this.zkHelper.getPeerEnabled(this.peerId); + return this.zkHelper.getPeerEnabled(this.peerId); } /** Index: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java (revision 1501028) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java (working copy) @@ -39,7 +39,6 @@ * @param fs the file system to use * @param manager the manager to use * @param stopper the stopper object for this region server - * @param replicating the status of the replication on this cluster * @param peerClusterId the id of the peer cluster * @throws IOException */ @@ -47,7 +46,6 @@ final FileSystem fs, final ReplicationSourceManager manager, final Stoppable stopper, - final AtomicBoolean replicating, final String peerClusterId) throws IOException; /** Index: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java (revision 1501028) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java (working copy) @@ -30,8 +30,6 @@ import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl; -import org.apache.hadoop.hbase.replication.ReplicationStateImpl; -import org.apache.hadoop.hbase.replication.ReplicationStateInterface; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; @@ -49,7 +47,6 @@ private static final Log LOG = LogFactory.getLog(ReplicationLogCleaner.class); private ZooKeeperWatcher zkw; private ReplicationQueuesClient replicationQueues; - private ReplicationStateInterface replicationState; private final Set hlogs = new HashSet(); private boolean stopped = false; private boolean aborted; @@ -57,16 +54,6 @@ @Override public boolean isLogDeletable(FileStatus fStat) { - - try { - if (!replicationState.getState()) { - return false; - } - } catch (KeeperException e) { - abort("Cannot get the state of replication", e); - return false; - } - // all members of this class are null if replication is disabled, and we // return true since false would render the LogsCleaner useless if (this.getConf() == null) { @@ -136,8 +123,6 @@ try { this.zkw = new ZooKeeperWatcher(conf, "replicationLogCleaner", null); this.replicationQueues = new ReplicationQueuesClientZKImpl(zkw, conf, this); - this.replicationState = new ReplicationStateImpl(zkw, conf, this); - this.replicationState.init(); } catch (KeeperException e) { LOG.error("Error while configuring " + this.getClass().getName(), e); } catch (IOException e) { @@ -155,14 +140,6 @@ LOG.info("Stopping " + this.zkw); this.zkw.close(); } - if (this.replicationState != null) { - LOG.info("Stopping " + this.replicationState); - try { - this.replicationState.close(); - } catch (IOException e) { - LOG.error("Error while stopping " + this.replicationState, e); - } - } // Not sure why we're deleting a connection that we never acquired or used HConnectionManager.deleteConnection(this.getConf()); } Index: hbase-server/src/main/ruby/hbase/replication_admin.rb =================================================================== --- hbase-server/src/main/ruby/hbase/replication_admin.rb (revision 1501028) +++ hbase-server/src/main/ruby/hbase/replication_admin.rb (working copy) @@ -65,17 +65,5 @@ def disable_peer(id) @replication_admin.disablePeer(id) end - - #---------------------------------------------------------------------------------------------- - # Restart the replication, in an unknown state - def start_replication - @replication_admin.setReplicating(true) - end - - #---------------------------------------------------------------------------------------------- - # Kill switch for replication, stops all its features - def stop_replication - @replication_admin.setReplicating(false) - end end end Index: hbase-server/src/main/ruby/shell/commands/start_replication.rb =================================================================== --- hbase-server/src/main/ruby/shell/commands/start_replication.rb (revision 1501028) +++ hbase-server/src/main/ruby/shell/commands/start_replication.rb (working copy) @@ -1,42 +0,0 @@ -# -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -module Shell - module Commands - class StartReplication < Command - def help - return <<-EOF -Restarts all the replication features. The state in which each -stream starts in is undetermined. -WARNING: -start/stop replication is only meant to be used in critical load situations. -Examples: - - hbase> start_replication -EOF - end - - def command - format_simple_command do - replication_admin.start_replication - end - end - end - end -end Index: hbase-server/src/main/ruby/shell/commands/stop_replication.rb =================================================================== --- hbase-server/src/main/ruby/shell/commands/stop_replication.rb (revision 1501028) +++ hbase-server/src/main/ruby/shell/commands/stop_replication.rb (working copy) @@ -1,42 +0,0 @@ -# -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -module Shell - module Commands - class StopReplication < Command - def help - return <<-EOF -Stops all the replication features. The state in which each -stream stops in is undetermined. -WARNING: -start/stop replication is only meant to be used in critical load situations. -Examples: - - hbase> stop_replication -EOF - end - - def command - format_simple_command do - replication_admin.stop_replication - end - end - end - end -end Index: hbase-server/src/main/javadoc/org/apache/hadoop/hbase/replication/package.html =================================================================== --- hbase-server/src/main/javadoc/org/apache/hadoop/hbase/replication/package.html (revision 1501028) +++ hbase-server/src/main/javadoc/org/apache/hadoop/hbase/replication/package.html (working copy) @@ -132,16 +132,6 @@ In this case it indicates that 1 region server from the slave cluster was chosen for replication.

- -Should you want to stop the replication while the clusters are running, open -the shell on the master cluster and issue this command: -
-hbase(main):001:0> stop_replication
- -Replication of already queued edits will still happen after you -issued that command but new entries won't be. To start it back, simply replace -"false" with "true" in the command. -

Index: hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java (revision 1501028) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java (working copy) @@ -1,52 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.zookeeper.KeeperException; - -import java.io.Closeable; - -/** - * This provides an interface for getting and setting the replication state of a - * cluster. This state is used to indicate whether replication is enabled or - * disabled on a cluster. - */ -@InterfaceAudience.Private -public interface ReplicationStateInterface extends Closeable { - - /** - * Initialize the replication state interface. - */ - public void init() throws KeeperException; - - /** - * Get the current state of replication (i.e. ENABLED or DISABLED). - * @return true if replication is enabled, false otherwise - * @throws KeeperException - */ - public boolean getState() throws KeeperException; - - /** - * Set the state of replication. - * @param newState - * @throws KeeperException - */ - public void setState(boolean newState) throws KeeperException; -} \ No newline at end of file Index: hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java (revision 1501028) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java (working copy) @@ -1,174 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import com.google.protobuf.InvalidProtocolBufferException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.exceptions.DeserializationException; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.zookeeper.KeeperException; - -import java.io.IOException; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * ReplicationStateImpl is responsible for maintaining the replication state - * znode. - */ -public class ReplicationStateImpl extends ReplicationStateZKBase implements - ReplicationStateInterface { - - private final ReplicationStateTracker stateTracker; - private final AtomicBoolean replicating; - - private static final Log LOG = LogFactory.getLog(ReplicationStateImpl.class); - - public ReplicationStateImpl(final ZooKeeperWatcher zk, final Configuration conf, - final Abortable abortable, final AtomicBoolean replicating) { - super(zk, conf, abortable); - this.replicating = replicating; - - // Set a tracker on replicationStateNode - this.stateTracker = - new ReplicationStateTracker(this.zookeeper, this.stateZNode, this.abortable); - } - - public ReplicationStateImpl(final ZooKeeperWatcher zk, final Configuration conf, - final Abortable abortable) { - this(zk, conf, abortable, new AtomicBoolean()); - } - - @Override - public void init() throws KeeperException { - ZKUtil.createWithParents(this.zookeeper, this.stateZNode); - stateTracker.start(); - readReplicationStateZnode(); - } - - @Override - public boolean getState() throws KeeperException { - return getReplication(); - } - - @Override - public void setState(boolean newState) throws KeeperException { - setReplicating(newState); - } - - @Override - public void close() throws IOException { - if (stateTracker != null) stateTracker.stop(); - } - - /** - * @param bytes - * @return True if the passed in bytes are those of a pb - * serialized ENABLED state. - * @throws DeserializationException - */ - private boolean isStateEnabled(final byte[] bytes) throws DeserializationException { - ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes); - return ZooKeeperProtos.ReplicationState.State.ENABLED == state; - } - - /** - * @param bytes Content of a state znode. - * @return State parsed from the passed bytes. - * @throws DeserializationException - */ - private ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes) - throws DeserializationException { - ProtobufUtil.expectPBMagicPrefix(bytes); - int pblen = ProtobufUtil.lengthOfPBMagic(); - ZooKeeperProtos.ReplicationState.Builder builder = ZooKeeperProtos.ReplicationState - .newBuilder(); - ZooKeeperProtos.ReplicationState state; - try { - state = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build(); - return state.getState(); - } catch (InvalidProtocolBufferException e) { - throw new DeserializationException(e); - } - } - - /** - * Set the new replication state for this cluster - * @param newState - */ - private void setReplicating(boolean newState) throws KeeperException { - ZKUtil.createWithParents(this.zookeeper, this.stateZNode); - byte[] stateBytes = (newState == true) ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES; - ZKUtil.setData(this.zookeeper, this.stateZNode, stateBytes); - } - - /** - * Get the replication status of this cluster. If the state znode doesn't - * exist it will also create it and set it true. - * @return returns true when it's enabled, else false - * @throws KeeperException - */ - private boolean getReplication() throws KeeperException { - byte[] data = this.stateTracker.getData(false); - if (data == null || data.length == 0) { - setReplicating(true); - return true; - } - try { - return isStateEnabled(data); - } catch (DeserializationException e) { - throw ZKUtil.convert(e); - } - } - - /** - * This reads the state znode for replication and sets the atomic boolean - */ - private void readReplicationStateZnode() { - try { - this.replicating.set(getReplication()); - LOG.info("Replication is now " + (this.replicating.get() ? "started" : "stopped")); - } catch (KeeperException e) { - this.abortable.abort("Failed getting data on from " + this.stateZNode, e); - } - } - - /** - * Tracker for status of the replication - */ - private class ReplicationStateTracker extends ZooKeeperNodeTracker { - public ReplicationStateTracker(ZooKeeperWatcher watcher, String stateZnode, Abortable abortable) { - super(watcher, stateZnode, abortable); - } - - @Override - public synchronized void nodeDataChanged(String path) { - if (path.equals(node)) { - super.nodeDataChanged(path); - readReplicationStateZnode(); - } - } - } -} \ No newline at end of file Index: hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (revision 1501028) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (working copy) @@ -79,7 +79,6 @@ private final Configuration conf; // Abortable private Abortable abortable; - private final ReplicationStateInterface replicationState; private final ReplicationPeers replicationPeers; private final ReplicationQueues replicationQueues; @@ -95,8 +94,6 @@ this.conf = conf; this.zookeeper = zk; setZNodes(abortable); - this.replicationState = new ReplicationStateImpl(this.zookeeper, conf, abortable); - this.replicationState.init(); // TODO This interface is no longer used by anyone using this constructor. When this class goes // away, we will no longer have this null initialization business this.replicationQueues = null; @@ -108,19 +105,15 @@ * Constructor used by region servers, connects to the peer cluster right away. * * @param server - * @param replicating atomic boolean to start/stop replication * @throws IOException * @throws KeeperException */ - public ReplicationZookeeper(final Server server, final AtomicBoolean replicating) - throws IOException, KeeperException { + public ReplicationZookeeper(final Server server) throws IOException, KeeperException { super(server.getZooKeeper(), server.getConfiguration(), server); this.abortable = server; this.zookeeper = server.getZooKeeper(); this.conf = server.getConfiguration(); setZNodes(server); - this.replicationState = new ReplicationStateImpl(this.zookeeper, conf, server, replicating); - this.replicationState.init(); this.replicationQueues = new ReplicationQueuesZKImpl(this.zookeeper, this.conf, server); this.replicationQueues.init(server.getServerName().toString()); this.replicationPeers = new ReplicationPeersZKImpl(this.zookeeper, this.conf, server); @@ -228,25 +221,6 @@ } /** - * Get the replication status of this cluster. If the state znode doesn't exist it will also - * create it and set it true. - * @return returns true when it's enabled, else false - * @throws KeeperException - */ - public boolean getReplication() throws KeeperException { - return this.replicationState.getState(); - } - - /** - * Set the new replication state for this cluster - * @param newState - * @throws KeeperException - */ - public void setReplication(boolean newState) throws KeeperException { - this.replicationState.setState(newState); - } - - /** * Add a new log to the list of hlogs in zookeeper * @param filename name of the hlog's znode * @param peerId name of the cluster's znode @@ -391,6 +365,5 @@ @Override public void close() throws IOException { - if (replicationState != null) replicationState.close(); } } Index: hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (revision 1501028) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (working copy) @@ -163,36 +163,6 @@ } /** - * Get the current status of the kill switch, if the cluster is replicating - * or not. - * @return true if the cluster is replicated, otherwise false - */ - public boolean getReplicating() throws IOException { - try { - return this.replicationZk.getReplication(); - } catch (KeeperException e) { - throw new IOException("Couldn't get the replication status"); - } - } - - /** - * Kill switch for all replication-related features - * @param newState true to start replication, false to stop it. - * completely - * @return the previous state - */ - public boolean setReplicating(boolean newState) throws IOException { - boolean prev = true; - try { - prev = getReplicating(); - this.replicationZk.setReplication(newState); - } catch (KeeperException e) { - throw new IOException("Unable to set the replication state", e); - } - return prev; - } - - /** * Get the ZK-support tool created and used by this object for replication. * @return the ZK-support tool */