diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java index 39a3f31d8c..6d3e70e335 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java @@ -48,17 +48,14 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint private static final Log LOG = LogFactory.getLog(HBaseReplicationEndpoint.class); - private Object zkwLock = new Object(); private ZooKeeperWatcher zkw = null; private List regionServers = new ArrayList(0); private long lastRegionServerUpdate; - protected void disconnect() { - synchronized (zkwLock) { - if (zkw != null) { - zkw.close(); - } + protected synchronized void disconnect() { + if (zkw != null) { + zkw.close(); } } @@ -103,9 +100,7 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint public synchronized UUID getPeerUUID() { UUID peerUUID = null; try { - synchronized (zkwLock) { - peerUUID = ZKClusterId.getUUIDForCluster(zkw); - } + peerUUID = ZKClusterId.getUUIDForCluster(zkw); } catch (KeeperException ke) { reconnect(ke); } @@ -116,25 +111,19 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint * Get the ZK connection to this peer * @return zk connection */ - protected ZooKeeperWatcher getZkw() { - synchronized (zkwLock) { - return zkw; - } + protected synchronized ZooKeeperWatcher getZkw() { + return zkw; } /** * Closes the current ZKW (if not null) and creates a new one * @throws IOException If anything goes wrong connecting */ - void reloadZkWatcher() throws IOException { - synchronized (zkwLock) { - if (zkw != null) { - zkw.close(); - } - zkw = new ZooKeeperWatcher(ctx.getConfiguration(), + synchronized void reloadZkWatcher() throws IOException { + if (zkw != null) zkw.close(); + zkw = new ZooKeeperWatcher(ctx.getConfiguration(), "connection to cluster: " + ctx.getPeerId(), this); - zkw.registerListener(new PeerRegionServerListener(this)); - } + getZkw().registerListener(new PeerRegionServerListener(this)); } @Override @@ -172,15 +161,13 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint * for this peer cluster * @return list of addresses */ - public List getRegionServers() { + // Synchronize peer cluster connection attempts to avoid races and rate + // limit connections when multiple replication sources try to connect to + // the peer cluster. If the peer cluster is down we can get out of control + // over time. + public synchronized List getRegionServers() { try { - // Synchronize peer cluster connection attempts to avoid races and rate - // limit connections when multiple replication sources try to connect to - // the peer cluster. If the peer cluster is down we can get out of control - // over time. - synchronized (zkwLock) { - setRegionServers(fetchSlavesAddresses(zkw)); - } + setRegionServers(fetchSlavesAddresses(this.getZkw())); } catch (KeeperException ke) { if (LOG.isDebugEnabled()) { LOG.debug("Fetch slaves addresses failed", ke);