diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index b69367a..b7a6447 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -27,6 +27,8 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -86,7 +88,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re Abortable abortable) { super(zk, conf, abortable); this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs"); - this.peerClusters = new HashMap(); + this.peerClusters = new ConcurrentHashMap(); } @Override @@ -195,18 +197,20 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re @Override public Map> getTableCFs(String id) throws IllegalArgumentException { - if (!this.peerClusters.containsKey(id)) { + ReplicationPeer replicationPeer = this.peerClusters.get(id); + if (replicationPeer == null) { throw new IllegalArgumentException("Peer with id= " + id + " is not connected"); } - return this.peerClusters.get(id).getTableCFs(); + return replicationPeer.getTableCFs(); } @Override public boolean getStatusOfConnectedPeer(String id) { - if (!this.peerClusters.containsKey(id)) { + ReplicationPeer replicationPeer = this.peerClusters.get(id); + if (replicationPeer == null) { throw new IllegalArgumentException("Peer with id= " + id + " is not connected"); - } - return this.peerClusters.get(id).getPeerEnabled().get(); + } + return replicationPeer.getPeerEnabled().get(); } @Override @@ -246,7 +250,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re if (peer == null) { return false; } - this.peerClusters.put(peerId, peer); + ((ConcurrentMap) peerClusters).putIfAbsent(peerId, peer); LOG.info("Added new peer cluster " + peer.getClusterKey()); return true; } @@ -256,7 +260,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re ReplicationPeer rp = this.peerClusters.get(peerId); if (rp != null) { rp.getZkw().close(); - this.peerClusters.remove(peerId); + ((ConcurrentMap) peerClusters).remove(peerId, rp); } } @@ -375,10 +379,11 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re @Override public long getTimestampOfLastChangeToPeer(String peerId) { - if (!peerClusters.containsKey(peerId)) { + ReplicationPeer peer = this.peerClusters.get(peerId); + if (peer == null) { throw new IllegalArgumentException("Unknown peer id: " + peerId); } - return peerClusters.get(peerId).getLastRegionserverUpdate(); + return peer.getLastRegionserverUpdate(); } /**