From e52b47e1ab5bc45226a62fed391c44eefdb77d00 Mon Sep 17 00:00:00 2001 From: Rishit Shroff Date: Wed, 24 Dec 2014 16:27:26 -0800 Subject: [PATCH] Replace HServerAddress with HostAndPort Summary: HServerAddress was used to keep the track of the host address and port. However, HServerAddress is no long present in the trunk and is replaced with ServerName. ServerName still uses startcode, which does not matter with consensus protocol. Hence, this diff uses HostAndPort instead of HServerAddress everywhere. Test Plan: mvn clean package, all tests pass. Reviewers: reddragon, eclark, fantasist, tedyu Blame Rev: --- .../org/apache/hadoop/hbase/HServerAddress.java | 243 --------------------- .../hbase/consensus/client/QuorumClient.java | 15 +- .../consensus/client/QuorumThriftClientAgent.java | 25 ++- .../consensus/client/QuorumThriftClientCLI.java | 12 - .../consensus/quorum/AbstractPeerManager.java | 12 +- .../consensus/quorum/ImmutableRaftContext.java | 4 +- .../quorum/JointConsensusPeerManager.java | 12 +- .../hbase/consensus/quorum/MutableRaftContext.java | 4 +- .../hadoop/hbase/consensus/quorum/QuorumInfo.java | 68 +++--- .../hbase/consensus/quorum/RaftQuorumContext.java | 27 ++- .../consensus/server/LocalConsensusServer.java | 58 +++-- .../hbase/consensus/server/peer/AbstractPeer.java | 16 +- .../consensus/server/peer/PeerConsensusServer.java | 8 +- .../hadoop/hbase/consensus/util/RaftUtil.java | 16 +- .../hadoop/hbase/consensus/LocalTestBed.java | 32 +-- .../hadoop/hbase/consensus/RaftTestUtil.java | 60 ++--- .../consensus/TestBasicQuorumMembershipChange.java | 18 +- 17 files changed, 189 insertions(+), 441 deletions(-) delete mode 100644 hbase-consensus/src/main/java/org/apache/hadoop/hbase/HServerAddress.java diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/HServerAddress.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/HServerAddress.java deleted file mode 100644 index b091ad5..0000000 --- a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/HServerAddress.java +++ /dev/null @@ -1,243 +0,0 @@ -/** - * Copyright 2007 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.io.WritableComparable; - -/** - * HServerAddress is a "label" for a HBase server made of host and port number. - */ -public class HServerAddress implements WritableComparable { - - private static final Log LOG = LogFactory.getLog(HServerAddress.class); - - private InetSocketAddress address; - private String stringValue; - private String hostAddress; - - /** - * We don't expect the IP addresses of HBase servers to change, so we cache them - * indefinitely. At this level we only do positive caching. - */ - private static ConcurrentMap addressCache = - new ConcurrentHashMap(); - - public HServerAddress() { - this.address = null; - this.stringValue = null; - this.hostAddress = null; - } - - /** - * Construct an instance from an {@link InetSocketAddress}. - * @param address InetSocketAddress of server - */ - public HServerAddress(InetSocketAddress address) { - this.address = address; - this.stringValue = getHostAddressWithPort(); - checkBindAddressCanBeResolved(); - } - - /** - * @param hostAndPort Hostname and port formatted as <hostname> ':' <port> - */ - public HServerAddress(String hostAndPort) { - int colonIndex = hostAndPort.lastIndexOf(':'); - if (colonIndex < 0) { - throw new IllegalArgumentException("Not a host:port pair: " + hostAndPort); - } - String host = hostAndPort.substring(0, colonIndex); - int port = Integer.parseInt(hostAndPort.substring(colonIndex + 1)); - address = addressCache.get(hostAndPort); - if (address == null) { - this.address = new InetSocketAddress(host, port); - if (getBindAddress() != null) { - // Resolved the hostname successfully, cache it. - InetSocketAddress existingAddress = addressCache.putIfAbsent(hostAndPort, address); - if (existingAddress != null) { - // Another thread cached the address ahead of us, reuse it. - this.address = existingAddress; - } - } - } - this.stringValue = getHostAddressWithPort(); - checkBindAddressCanBeResolved(); - } - - /** - * @param bindAddress Hostname - * @param port Port number - */ - public HServerAddress(String bindAddress, int port) { - this.address = new InetSocketAddress(bindAddress, port); - this.stringValue = getHostAddressWithPort(); - checkBindAddressCanBeResolved(); - } - - /** - * Copy-constructor. - * @param other HServerAddress to copy from - */ - public HServerAddress(HServerAddress other) { - String bindAddress = other.getBindAddress(); - int port = other.getPort(); - this.address = new InetSocketAddress(bindAddress, port); - stringValue = other.stringValue; - checkBindAddressCanBeResolved(); - } - - /** - * Get the normalized hostAddress:port as a string format - * @param address - * @return the normalized hostAddress:port as a string format - */ - public String getHostAddressWithPort() { - if (address == null) return null; - return this.getBindAddress() + ":" + address.getPort(); - } - - /** - * Get the normalized hostName:port as a string format - * @param address - * @return the normalized hostName:port as a string format - */ - public String getHostNameWithPort() { - if (address == null) return null; - return address.getHostName() + ":" + - address.getPort(); - } - - /** @return Bind address */ - public String getBindAddress() { - if (this.hostAddress != null) - return hostAddress; - - final InetAddress addr = address.getAddress(); - if (addr != null) { - return addr.getHostAddress(); - } else { - LOG.error("Could not resolve the DNS name of " + stringValue); - return null; - } - } - - private void checkBindAddressCanBeResolved() { - if ((this.hostAddress = getBindAddress()) == null) { - throw new IllegalArgumentException("Could not resolve the" - + " DNS name of " + stringValue); - } - } - - /** @return Port number */ - public int getPort() { - return address.getPort(); - } - - /** @return Hostname */ - public String getHostname() { - return address.getHostName(); - } - - /** @return The InetSocketAddress */ - public InetSocketAddress getInetSocketAddress() { - return address; - } - - /** - * @return String formatted as <bind address> ':' <port> - */ - @Override - public String toString() { - return stringValue == null ? "" : stringValue; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null) { - return false; - } - if (getClass() != o.getClass()) { - return false; - } - return compareTo((HServerAddress) o) == 0; - } - - @Override - public int hashCode() { - return address.hashCode(); - } - - // - // Writable - // - - @Override - public void readFields(DataInput in) throws IOException { - String bindAddress = in.readUTF(); - int port = in.readInt(); - - if (bindAddress == null || bindAddress.length() == 0) { - address = null; - stringValue = null; - } else { - address = new InetSocketAddress(bindAddress, port); - stringValue = getHostAddressWithPort(); - checkBindAddressCanBeResolved(); - } - } - - @Override - public void write(DataOutput out) throws IOException { - if (address == null) { - out.writeUTF(""); - out.writeInt(0); - } else { - out.writeUTF(address.getAddress().getHostAddress()); - out.writeInt(address.getPort()); - } - } - - // - // Comparable - // - - @Override - public int compareTo(HServerAddress o) { - if (address == null) return -1; - // Addresses as Strings may not compare though address is for the one - // server with only difference being that one address has hostname - // resolved whereas other only has IP. - if (address.equals(o.address)) return 0; - return toString().compareTo(o.toString()); - } -} diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/client/QuorumClient.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/client/QuorumClient.java index 0fcca92..ae8de2b 100644 --- a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/client/QuorumClient.java +++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/client/QuorumClient.java @@ -20,9 +20,9 @@ package org.apache.hadoop.hbase.consensus.client; */ +import com.google.common.net.HostAndPort; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.client.NoLeaderForRegionException; import org.apache.hadoop.hbase.consensus.log.LogFileInfo; import org.apache.hadoop.hbase.consensus.quorum.QuorumInfo; @@ -196,8 +196,7 @@ public class QuorumClient { final Future future = service.take(); String leaderAddrString = future.get(); if (leaderAddrString != null && !leaderAddrString.isEmpty()) { - HServerAddress leaderAddr = new HServerAddress(leaderAddrString); - leader = agentMap.get(leaderAddr.getHostNameWithPort()); + leader = agentMap.get(leaderAddrString); if (leader != null) { LOG.debug(String.format( "The current leader for the region %s is %s.", @@ -235,7 +234,7 @@ public class QuorumClient { } public synchronized List>> getPeerCommittedLogStatus( - HServerAddress localServerAddr, + HostAndPort localServerAddr, String quorumName, long minIndex) throws Exception { List>> result = new ArrayList<>(); @@ -243,7 +242,7 @@ public class QuorumClient { return result; } String localConsensusAddr = - RaftUtil.getLocalConsensusAddress(localServerAddr).getHostNameWithPort(); + RaftUtil.getLocalConsensusAddress(localServerAddr).toString(); for (QuorumThriftClientAgent agent : agents) { if (!agent.getServerAddress().equals(localConsensusAddr)) { List logFiles = agent.getCommittedLogStatus(quorumName, minIndex); @@ -282,11 +281,11 @@ public class QuorumClient { conf.getInt("hbase.regionserver.quorum.client.retry.cnt", 1); this.agents = new ArrayList<>(config.getPeersWithRank().size()); this.agentMap = new HashMap<>(); - for (HServerAddress address : config.getPeersWithRank().keySet()) { + for (HostAndPort address : config.getPeersWithRank().keySet()) { try { QuorumThriftClientAgent agent = new QuorumThriftClientAgent( - RaftUtil.getLocalConsensusAddress(address).getHostAddressWithPort(), - connectionTimeout, readTimeout, writeTimeout, retryCount); + RaftUtil.getLocalConsensusAddress(address).toString(), + connectionTimeout, readTimeout, writeTimeout, retryCount); this.agents.add(agent); agentMap.put(agent.getServerAddress(), agent); } catch (IOException e) { diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/client/QuorumThriftClientAgent.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/client/QuorumThriftClientAgent.java index 075ed64..15e8c4f 100644 --- a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/client/QuorumThriftClientAgent.java +++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/client/QuorumThriftClientAgent.java @@ -29,7 +29,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import io.airlift.units.Duration; -import org.apache.hadoop.hbase.HServerAddress; +import com.google.common.net.HostAndPort; import org.apache.hadoop.hbase.consensus.log.LogFileInfo; import org.apache.hadoop.hbase.consensus.rpc.PeerStatus; import org.apache.hadoop.hbase.consensus.server.ConsensusService; @@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; import java.io.IOException; +import java.net.InetSocketAddress; import java.net.NoRouteToHostException; import java.nio.ByteBuffer; import java.util.List; @@ -55,7 +56,7 @@ import java.util.concurrent.TimeoutException; @NotThreadSafe public class QuorumThriftClientAgent { private final static Logger LOG = LoggerFactory.getLogger(QuorumThriftClientAgent.class); - private final HServerAddress address; + private final HostAndPort address; private ThriftClient thriftClient; private ListenableFuture futureConnection; @@ -66,7 +67,7 @@ public class QuorumThriftClientAgent { public QuorumThriftClientAgent(String address, int connectionTimeout, int readTimeout, int writeTimeout, int connectionRetry) throws IOException { - this.address = new HServerAddress(address); + this.address = HostAndPort.fromString(address); this.thriftClientConf = new ThriftClientConfig() .setConnectTimeout(new Duration(connectionTimeout, TimeUnit.MILLISECONDS)) .setReadTimeout(new Duration(readTimeout, TimeUnit.MILLISECONDS)) @@ -101,7 +102,7 @@ public class QuorumThriftClientAgent { }); } catch (Exception e) { LOG.error(String.format("%s. Cannot send replicate commit to %s", - regionId, address.getHostAddressWithPort())); + regionId, address.toString())); handleFailure(localAgent, e, result); } } @@ -140,7 +141,7 @@ public class QuorumThriftClientAgent { }); } catch (Exception e) { LOG.error(String.format("%s. Cannot send replicate commit to %s", - regionId, address.getHostAddressWithPort())); + regionId, address.toString())); handleFailure(localAgent, e, result); } } @@ -172,7 +173,7 @@ public class QuorumThriftClientAgent { LOG.error(String.format( "%s Cannot send the request to change the " + " quorum to server %s", regionId, - address.getHostAddressWithPort())); + address.toString())); handleFailure(localAgent, e, result); } } @@ -200,7 +201,7 @@ public class QuorumThriftClientAgent { }); } catch (Exception e) { LOG.error(String.format("%s. Cannot send replicate commit to %s", - regionId, address.getHostAddressWithPort())); + regionId, address.toString())); handleFailure(localAgent, e, result); } } @@ -230,7 +231,7 @@ public class QuorumThriftClientAgent { }); } catch (Exception e) { LOG.error(String.format("%s. Cannot send replicate commit to %s", - quorumName, address.getHostAddressWithPort())); + quorumName, address.toString())); handleFailure(localAgent, e, result); } } @@ -258,7 +259,7 @@ public class QuorumThriftClientAgent { }); } catch (Exception e) { LOG.error(String.format("Cannot send replicate commit to %s", - address.getHostAddressWithPort())); + address.toString())); handleFailure(localAgent, e, result); } } @@ -268,7 +269,7 @@ public class QuorumThriftClientAgent { } public synchronized String getServerAddress() { - return this.address.getHostNameWithPort(); + return this.address.toString(); } private ConsensusService getConsensusServiceAgent() @@ -313,7 +314,7 @@ public class QuorumThriftClientAgent { RaftUtil.getThriftClientManager(), ConsensusService.class, thriftClientConf, this.toString()); futureConnection = thriftClient.open(new FramedClientConnector( - address.getInetSocketAddress(), + new InetSocketAddress(address.getHostText(), address.getPort()), TDuplexProtocolFactory.fromSingleFactory( new TCompactProtocol.Factory()))); agent = futureConnection.get(); @@ -352,7 +353,7 @@ public class QuorumThriftClientAgent { return; } LOG.error(String.format("Ran into error while talking to %s.", - address.getHostAddressWithPort()), t); + address.toString()), t); synchronized (this) { if (agent == localAgent) { agent = null; diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/client/QuorumThriftClientCLI.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/client/QuorumThriftClientCLI.java index e3fa9f3..39eaea2 100644 --- a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/client/QuorumThriftClientCLI.java +++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/client/QuorumThriftClientCLI.java @@ -26,19 +26,7 @@ import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; - -import com.facebook.swift.service.ThriftClient; -import com.facebook.swift.service.ThriftClientConfig; -import com.facebook.swift.service.ThriftClientManager; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.consensus.rpc.PeerStatus; -import org.apache.hadoop.hbase.consensus.server.ConsensusService; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; - -import java.io.IOException; -import java.util.List; public class QuorumThriftClientCLI { diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/AbstractPeerManager.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/AbstractPeerManager.java index 766dc2a..a7f0d33 100644 --- a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/AbstractPeerManager.java +++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/AbstractPeerManager.java @@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.consensus.quorum; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HServerAddress; +import com.google.common.net.HostAndPort; import org.apache.hadoop.hbase.consensus.protocol.EditId; import org.apache.hadoop.hbase.consensus.rpc.AppendRequest; import org.apache.hadoop.hbase.consensus.rpc.VoteRequest; @@ -91,20 +91,20 @@ public abstract class AbstractPeerManager implements PeerManagerInterface { } // Utility functions - protected Map initializePeers(final Map peers) { + protected Map initializePeers(final Map peers) { Map peerServers = new HashMap<>(); // Initialize the RaftQuorum by setting up the PeerServer - for (HServerAddress hostAddress : peers.keySet()) { + for (HostAndPort hostAddress : peers.keySet()) { if (!hostAddress.equals(c.getServerAddress())) { // Generate the PeerServer port: RegionServer Port + fixed port jump - HServerAddress peerAddress = RaftUtil + HostAndPort peerAddress = RaftUtil .getLocalConsensusAddress(hostAddress); int peerRank = peers.get(hostAddress); - peerServers.put(peerAddress.getHostAddressWithPort(), + peerServers.put(peerAddress.toString(), new PeerConsensusServer(peerAddress, peerRank, c, c.getConf())); - lastAckedIndexMap.put(peerAddress.getHostAddressWithPort(), + lastAckedIndexMap.put(peerAddress.toString(), HConstants.UNDEFINED_TERM_INDEX); } } diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/ImmutableRaftContext.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/ImmutableRaftContext.java index 1eb2083..b2da589 100644 --- a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/ImmutableRaftContext.java +++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/ImmutableRaftContext.java @@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.consensus.quorum; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HServerAddress; +import com.google.common.net.HostAndPort; import org.apache.hadoop.hbase.consensus.log.CommitLogManagerInterface; import org.apache.hadoop.hbase.consensus.metrics.ConsensusMetrics; import org.apache.hadoop.hbase.consensus.protocol.ConsensusHost; @@ -149,7 +149,7 @@ public interface ImmutableRaftContext { ConsensusMetrics getConsensusMetrics(); - Map getNewConfiguration(); + Map getNewConfiguration(); QuorumMembershipChangeRequest getUpdateMembershipRequest(); diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/JointConsensusPeerManager.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/JointConsensusPeerManager.java index 8e06c3e..1b5360b 100644 --- a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/JointConsensusPeerManager.java +++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/JointConsensusPeerManager.java @@ -20,7 +20,7 @@ package org.apache.hadoop.hbase.consensus.quorum; */ -import org.apache.hadoop.hbase.HServerAddress; +import com.google.common.net.HostAndPort; import org.apache.hadoop.hbase.consensus.util.RaftUtil; import org.apache.hadoop.hbase.consensus.fsm.Event; import org.apache.hadoop.hbase.consensus.metrics.ConsensusMetrics; @@ -76,21 +76,21 @@ public class JointConsensusPeerManager extends AbstractPeerManager { // Initialize the new peers if (newPeerServers == null) { newPeerServers = new HashMap<>(); - Map newPeers = new HashMap<>(); + Map newPeers = new HashMap<>(); // There can be an overlap between the new and old configuration. Hence, // we should initialize a peer only once. So, lets remove the peers which // are already initialized. newPeers.putAll(newConfig.getPeersWithRank()); - Iterator> newPeerIterator = + Iterator> newPeerIterator = newPeers.entrySet().iterator(); - HServerAddress peerAddress; + HostAndPort peerAddress; while (newPeerIterator.hasNext()) { - Map.Entry e = newPeerIterator.next(); + Map.Entry e = newPeerIterator.next(); peerAddress = e.getKey(); int newPeerRank = e.getValue(); String consensusServerAddress = - RaftUtil.getLocalConsensusAddress(peerAddress).getHostAddressWithPort(); + RaftUtil.getLocalConsensusAddress(peerAddress).toString(); if (oldPeerServers.get(consensusServerAddress) != null) { PeerServer oldPeerServer = oldPeerServers.get(consensusServerAddress); oldPeerServer.setRank(newPeerRank); diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/MutableRaftContext.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/MutableRaftContext.java index 815a040..6ef4efe 100644 --- a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/MutableRaftContext.java +++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/MutableRaftContext.java @@ -24,7 +24,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.ExecutorService; -import org.apache.hadoop.hbase.HServerAddress; +import com.google.common.net.HostAndPort; import org.apache.hadoop.hbase.consensus.fsm.Event; import org.apache.hadoop.hbase.consensus.protocol.ConsensusHost; import org.apache.hadoop.hbase.consensus.protocol.EditId; @@ -122,7 +122,7 @@ public interface MutableRaftContext extends ImmutableRaftContext { PeerManagerInterface getPeerManager(); - HServerAddress getServerAddress(); + HostAndPort getServerAddress(); void updateToJointQuorumMembership(final QuorumInfo config) throws IOException; diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/QuorumInfo.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/QuorumInfo.java index 3c10bba..59bc921 100644 --- a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/QuorumInfo.java +++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/QuorumInfo.java @@ -20,13 +20,19 @@ package org.apache.hadoop.hbase.consensus.quorum; */ +import com.google.common.net.HostAndPort; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.consensus.util.RaftUtil; import org.apache.hadoop.hbase.util.Bytes; -import java.util.*; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; public class QuorumInfo { public static final int PAYLOAD_HEADER_SIZE = @@ -37,12 +43,12 @@ public class QuorumInfo { // For compatability with non-hydrabase mode public static String LOCAL_DC_KEY = "LOCAL_DC_KEY_FOR_NON_HYDRABASE_MODE"; - private Map> peers = null; - private Map peersWithRank = null; + private Map> peers = null; + private Map peersWithRank = null; private Set peersAsString = null; private final String quorumName; - public QuorumInfo(final Map> peers, + public QuorumInfo(final Map> peers, final String quorumName) { this.peers = peers; this.quorumName = quorumName; @@ -53,9 +59,9 @@ public class QuorumInfo { this.quorumName = info.quorumName; peers = new HashMap<>(); for (String domain : info.getPeers().keySet()) { - final Map peersInDomain = new HashMap<>(); - for (HServerAddress peer : info.getPeers().get(domain).keySet()) { - peersInDomain.put(new HServerAddress(peer.getHostname(), peer.getPort()), + final Map peersInDomain = new HashMap<>(); + for (HostAndPort peer : info.getPeers().get(domain).keySet()) { + peersInDomain.put(HostAndPort.fromParts(peer.getHostText(), peer.getPort()), info.getPeersWithRank().get(peer)); } peers.put(domain, peersInDomain); @@ -67,7 +73,7 @@ public class QuorumInfo { return (peers == null ? 0 : peers.values().iterator().next().size()); } - public Map getPeersWithRank() { + public Map getPeersWithRank() { return peersWithRank; } @@ -75,14 +81,14 @@ public class QuorumInfo { return peersAsString; } - public Map getPeersWithCluster() { + public Map getPeersWithCluster() { if (peers != null) { // TODO: Consider cache this map instead of computing it every time - Map peersWithCluster = new TreeMap(); - for (Map.Entry> entry : peers + Map peersWithCluster = new TreeMap(); + for (Map.Entry> entry : peers .entrySet()) { String cluster = entry.getKey(); - for (HServerAddress serverAddress : entry.getValue().keySet()) { + for (HostAndPort serverAddress : entry.getValue().keySet()) { peersWithCluster.put(serverAddress, cluster); } } @@ -91,11 +97,11 @@ public class QuorumInfo { return null; } - public Map> getPeers() { + public Map> getPeers() { return peers; } - public void setPeers(Map> peers) { + public void setPeers(Map> peers) { this.peers = peers; populateInternalMaps(); } @@ -135,14 +141,14 @@ public class QuorumInfo { payload.putInt(dcName.length); payload.put(dcName); - Set numPeers = s.getPeers().get(dc).keySet(); + Set numPeers = s.getPeers().get(dc).keySet(); // Number of peers payload.putInt(numPeers.size()); - for (HServerAddress peer : numPeers) { + for (HostAndPort peer : numPeers) { // Peer Info - currPeerInfo = peer.getHostAddressWithPort().getBytes(); + currPeerInfo = peer.toString().getBytes(); payload.putInt(currPeerInfo.length); payload.put(currPeerInfo); @@ -188,8 +194,8 @@ public class QuorumInfo { int numDCs, numPeers, quorumNameLength, dcNameLength, peerNameLength = 0; String quorumName, dcName, peerName; - Map> dcLevelInfo; - Map perDCPeersMap; + Map> dcLevelInfo; + Map perDCPeersMap; for (int confIndex = 0; confIndex < numConfigs; ++confIndex) { @@ -232,7 +238,7 @@ public class QuorumInfo { currOffset += peerNameLength; // Put the peer name and rank in the peer Map - perDCPeersMap.put(new HServerAddress(peerName), data.getInt(currOffset)); + perDCPeersMap.put(HostAndPort.fromString(peerName), data.getInt(currOffset)); currOffset += Bytes.SIZEOF_INT; } @@ -263,15 +269,15 @@ public class QuorumInfo { size += Bytes.SIZEOF_INT; size += dc.getBytes().length; - Set numPeers = s.getPeers().get(dc).keySet(); + Set numPeers = s.getPeers().get(dc).keySet(); // Number of peers size += Bytes.SIZEOF_INT; - for (HServerAddress peer : numPeers) { + for (HostAndPort peer : numPeers) { // Peer Address in String format size += Bytes.SIZEOF_INT; - size += peer.getHostAddressWithPort().length(); + size += peer.toString().length(); // Peer Rank size += Bytes.SIZEOF_INT; } @@ -310,11 +316,11 @@ public class QuorumInfo { private void populateInternalMaps() { if (peers != null) { peersAsString = new HashSet<>(); - peersWithRank = new TreeMap<>(); - for (Map map : peers.values()) { + peersWithRank = new HashMap<>(); + for (Map map : peers.values()) { peersWithRank.putAll(map); - for (HServerAddress peer : map.keySet()) { - peersAsString.add(RaftUtil.getLocalConsensusAddress(peer).getHostAddressWithPort()); + for (HostAndPort peer : map.keySet()) { + peersAsString.add(RaftUtil.getLocalConsensusAddress(peer).toString()); } } } @@ -323,8 +329,8 @@ public class QuorumInfo { public String getDomain(final String serverAddr) { String domain = ""; for (String c : peers.keySet()) { - for (HServerAddress peer : peers.get(c).keySet()) { - if (serverAddr.equals(peer.getHostAddressWithPort())) { + for (HostAndPort peer : peers.get(c).keySet()) { + if (serverAddr.equals(peer.toString())) { domain = c; break; } @@ -333,7 +339,7 @@ public class QuorumInfo { return domain; } - public int getRank(final HServerAddress address) { + public int getRank(final HostAndPort address) { int rank = 0; if (peersWithRank.containsKey(address)) { rank = peersWithRank.get(address); diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/RaftQuorumContext.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/RaftQuorumContext.java index 2d736d7..49ca4f5 100644 --- a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/RaftQuorumContext.java +++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/quorum/RaftQuorumContext.java @@ -20,10 +20,10 @@ package org.apache.hadoop.hbase.consensus.quorum; */ +import com.google.common.net.HostAndPort; import com.google.common.util.concurrent.ListenableFuture; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.consensus.exceptions.NewLeaderException; import org.apache.hadoop.hbase.consensus.fsm.ConstitutentFSMService; import org.apache.hadoop.hbase.consensus.fsm.Event; @@ -63,7 +63,6 @@ import org.apache.thrift.protocol.TCompactProtocol; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.util.concurrent.ListenableFuture; import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; @@ -113,10 +112,10 @@ public class RaftQuorumContext implements ImmutableRaftContext, private ConsensusHost lastVotedFor; /** Consensus Server address */ - private final HServerAddress localConsensusServerAddress; + private final HostAndPort localConsensusServerAddress; /** HRegionServer address */ - protected final HServerAddress regionServerAddress; + protected final HostAndPort regionServerAddress; /** Peer Address */ private final String myAddress; @@ -217,7 +216,7 @@ public class RaftQuorumContext implements ImmutableRaftContext, public RaftQuorumContext(final QuorumInfo info, final Configuration config, - final HServerAddress consensusServerAddress, + final HostAndPort consensusServerAddress, final String metricsMBeanNamePrefix, final AggregateTimer aggregateTimer, final SerialExecutorService serialExecutor, @@ -227,15 +226,15 @@ public class RaftQuorumContext implements ImmutableRaftContext, conf = config; this.metrics = new ConsensusMetrics( metricsMBeanNamePrefix + info.getQuorumName(), - consensusServerAddress.getHostNameWithPort().replace(':', '.')); + consensusServerAddress.toString().replace(':', '.')); this.aggregateTimer = aggregateTimer; this.serialExecutor = serialExecutor; random = new Random(System.currentTimeMillis()); localConsensusServerAddress = consensusServerAddress; this.updateMembershipRequest = null; regionServerAddress = RaftUtil.getHRegionServerAddress( - new HServerAddress(localConsensusServerAddress)); - myAddress = localConsensusServerAddress.getHostAddressWithPort(); + localConsensusServerAddress); + myAddress = localConsensusServerAddress.toString(); majorityCnt = info.getPeersWithRank().size() / 2 + 1 ; initializeRank(); @@ -371,7 +370,7 @@ public class RaftQuorumContext implements ImmutableRaftContext, } @Override - public HServerAddress getServerAddress() { + public HostAndPort getServerAddress() { return regionServerAddress; } @@ -420,13 +419,13 @@ public class RaftQuorumContext implements ImmutableRaftContext, oldManager = (JointConsensusPeerManager)peerManager; // Verify that all the peers in the new config are in the new peer servers list - for (HServerAddress peer : newConfig.getPeersWithRank().keySet()) { - if (peer.getHostAddressWithPort().equals( - this.regionServerAddress.getHostAddressWithPort())) { + for (HostAndPort peer : newConfig.getPeersWithRank().keySet()) { + if (peer.toString().equals( + this.regionServerAddress.toString())) { continue; } if (oldManager.getNewPeerServers().get( - RaftUtil.getLocalConsensusAddress(peer).getHostAddressWithPort()) == null) { + RaftUtil.getLocalConsensusAddress(peer).toString()) == null) { throw new IOException("Invalid list of new peers. Cannot update the" + " quorum to new config. Reason: Peer " + peer + " not present."); } @@ -621,7 +620,7 @@ public class RaftQuorumContext implements ImmutableRaftContext, } @Override - public Map getNewConfiguration() { + public Map getNewConfiguration() { if (updateMembershipRequest != null) { return updateMembershipRequest.getConfig().getPeersWithRank(); } diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/server/LocalConsensusServer.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/server/LocalConsensusServer.java index 8094b4c..4eea0d7 100644 --- a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/server/LocalConsensusServer.java +++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/server/LocalConsensusServer.java @@ -20,18 +20,19 @@ package org.apache.hadoop.hbase.consensus.server; */ +import com.facebook.nifty.codec.DefaultThriftFrameCodecFactory; +import com.facebook.nifty.codec.ThriftFrameCodecFactory; +import com.facebook.nifty.core.NiftyTimer; +import com.facebook.nifty.duplex.TDuplexProtocolFactory; +import com.facebook.swift.codec.ThriftCodecManager; +import com.facebook.swift.service.ThriftEventHandler; +import com.facebook.swift.service.ThriftServer; +import com.facebook.swift.service.ThriftServerConfig; +import com.facebook.swift.service.ThriftServiceProcessor; +import com.google.common.collect.ImmutableMap; +import com.google.common.net.HostAndPort; +import com.google.inject.Inject; import io.airlift.units.Duration; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; @@ -39,7 +40,6 @@ import org.apache.commons.cli.Options; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.consensus.client.QuorumClient; import org.apache.hadoop.hbase.consensus.quorum.AggregateTimer; import org.apache.hadoop.hbase.consensus.quorum.QuorumInfo; @@ -58,17 +58,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.weakref.jmx.MBeanExporter; -import com.facebook.nifty.codec.DefaultThriftFrameCodecFactory; -import com.facebook.nifty.codec.ThriftFrameCodecFactory; -import com.facebook.nifty.core.NiftyTimer; -import com.facebook.nifty.duplex.TDuplexProtocolFactory; -import com.facebook.swift.codec.ThriftCodecManager; -import com.facebook.swift.service.ThriftEventHandler; -import com.facebook.swift.service.ThriftServer; -import com.facebook.swift.service.ThriftServerConfig; -import com.facebook.swift.service.ThriftServiceProcessor; -import com.google.common.collect.ImmutableMap; -import com.google.inject.Inject; +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; public class LocalConsensusServer { public static final ImmutableMap @@ -184,9 +182,9 @@ public class LocalConsensusServer { return server; } - public void startInfoServer(HServerAddress local, Configuration conf) throws IOException { + public void startInfoServer(HostAndPort local, Configuration conf) throws IOException { this.infoServer = new InfoServer(HConstants.QUORUM_MONITORING_PAGE_KEY, - local.getBindAddress(), server.getPort() + 1, false, + local.getHostText(), server.getPort() + 1, false, conf); this.infoServer.setAttribute(HConstants.QUORUM_MONITORING_PAGE_KEY, this); this.infoServer.start(); @@ -220,9 +218,9 @@ public class LocalConsensusServer { String regionId = null; String serverList; String servers[] = null; - HServerAddress localHost = null; + HostAndPort localHost = null; int localIndex = -1; - HashMap peers = new HashMap<>(); + HashMap peers = new HashMap<>(); int rank = 5; boolean enableJetty = false; @@ -273,7 +271,7 @@ public class LocalConsensusServer { for (int i = 0; i < servers.length; i++) { String localConsensusServerAddress = servers[i]; - HServerAddress cur = new HServerAddress(localConsensusServerAddress); + HostAndPort cur = HostAndPort.fromString(localConsensusServerAddress); peers.put(RaftUtil.getHRegionServerAddress(cur), rank--); System.out.println("Quorum server " + localConsensusServerAddress); @@ -300,7 +298,7 @@ public class LocalConsensusServer { if (localIndex != -1) { // overwrite the log directory for the local mode - configuration.set("hbase.consensus.log.path", "/tmp/wal/" + localHost.getHostAddressWithPort()); + configuration.set("hbase.consensus.log.path", "/tmp/wal/" + localHost.toString()); } // Start the local consensus server @@ -333,7 +331,7 @@ public class LocalConsensusServer { try { consensusServer.startInfoServer(localHost, configuration); System.out.println("Start the jetty server at " + - localHost.getBindAddress() + ":" + localHost.getPort() + 1); + localHost.getHostText() + ":" + localHost.getPort() + 1); } catch (Exception e) { LOG.error("Unable to start the jetty server ", e); } diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/server/peer/AbstractPeer.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/server/peer/AbstractPeer.java index 442f4d4..df6aa55 100644 --- a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/server/peer/AbstractPeer.java +++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/server/peer/AbstractPeer.java @@ -25,7 +25,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HServerAddress; +import com.google.common.net.HostAndPort; import org.apache.hadoop.hbase.consensus.fsm.Event; import org.apache.hadoop.hbase.consensus.fsm.FiniteStateMachine; import org.apache.hadoop.hbase.consensus.fsm.FiniteStateMachineService; @@ -69,7 +69,7 @@ public abstract class AbstractPeer implements PeerServer { protected int connectionRetryInterval; private static Logger LOG = LoggerFactory.getLogger(AbstractPeer.class); - private final HServerAddress address; + private final HostAndPort address; private String sourceString = null; /** Last successfully acked Edit Id by the peer */ private EditId lastEditID = TransactionLogManager.UNDEFINED_EDIT_ID; @@ -83,7 +83,7 @@ public abstract class AbstractPeer implements PeerServer { private final PeerMetrics metrics; - public AbstractPeer(final HServerAddress address, final int rank, + public AbstractPeer(final HostAndPort address, final int rank, final MutableRaftContext replica, final Configuration conf) { this.rank = rank; @@ -96,7 +96,7 @@ public abstract class AbstractPeer implements PeerServer { // MBean name properties can not contain colons, so replace with a period. // TODO (arjen): Make sure this works for IPv6. - String peerId = address.getHostNameWithPort().replace(':', '.'); + String peerId = address.toString().replace(':', '.'); metrics = replica.getConsensusMetrics().createPeerMetrics(peerId); useAggregateTimer = conf.getBoolean( HConstants.QUORUM_USE_AGGREGATE_TIMER_KEY, @@ -105,7 +105,7 @@ public abstract class AbstractPeer implements PeerServer { HConstants.USE_FSMMUX_FOR_PSM_DEFAULT); } - public HServerAddress getAddress() { + public HostAndPort getAddress() { return address; } @@ -147,7 +147,7 @@ public abstract class AbstractPeer implements PeerServer { public void updatePeerAvailabilityStatus(boolean isAvailable) { if (this.dataStoreEventListener != null) { this.dataStoreEventListener.updatePeerAvailabilityStatus( - this.address.getHostAddressWithPort(), isAvailable); + this.address.toString(), isAvailable); } else { LOG.warn("dataStoreEventListener has not been registered for this peer: " + toString()); } @@ -171,7 +171,7 @@ public abstract class AbstractPeer implements PeerServer { FiniteStateMachine fsm = new PeerStateMachine( "PeerFSM-" + raftContext.getQuorumName() + ":" + raftContext.getRanking() + "-->" + - this.address.getHostAddressWithPort() + ":" + this.rank, + this.address.toString() + ":" + this.rank, this); FiniteStateMachineService fsmService; @@ -219,7 +219,7 @@ public abstract class AbstractPeer implements PeerServer { @Override public String getPeerServerName() { - return address.getHostAddressWithPort(); + return address.toString(); } @Override diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/server/peer/PeerConsensusServer.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/server/peer/PeerConsensusServer.java index f1b7387..8ed25e3 100644 --- a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/server/peer/PeerConsensusServer.java +++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/server/peer/PeerConsensusServer.java @@ -28,7 +28,7 @@ import com.google.common.util.concurrent.ListenableFuture; import io.airlift.units.Duration; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HServerAddress; +import com.google.common.net.HostAndPort; import org.apache.hadoop.hbase.consensus.quorum.MutableRaftContext; import org.apache.hadoop.hbase.consensus.server.ConsensusService; import org.apache.hadoop.hbase.consensus.util.RaftUtil; @@ -37,6 +37,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.concurrent.NotThreadSafe; +import java.net.InetSocketAddress; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -50,7 +51,7 @@ public class PeerConsensusServer extends AbstractPeer { private ListenableFuture futureConnection; private ConsensusService agent; - public PeerConsensusServer(final HServerAddress address, + public PeerConsensusServer(final HostAndPort address, final int rank, final MutableRaftContext replica, final Configuration conf) { @@ -94,7 +95,8 @@ public class PeerConsensusServer extends AbstractPeer { new ThriftClient<>(RaftUtil.getThriftClientManager(), ConsensusService.class, thriftClientConf, this.toString()); ListenableFuture futureConnection = thriftClient.open( - new FramedClientConnector(getAddress().getInetSocketAddress(), + new FramedClientConnector(new InetSocketAddress(getAddress().getHostText(), + getAddress().getPort()), TDuplexProtocolFactory.fromSingleFactory( new TCompactProtocol.Factory()))); setAgent(futureConnection, futureConnection.get( diff --git a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/util/RaftUtil.java b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/util/RaftUtil.java index 776d159..0829ffb 100644 --- a/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/util/RaftUtil.java +++ b/hbase-consensus/src/main/java/org/apache/hadoop/hbase/consensus/util/RaftUtil.java @@ -28,15 +28,13 @@ import com.facebook.swift.service.ThriftClientEventHandler; import com.facebook.swift.service.ThriftClientManager; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableSet; +import com.google.common.net.HostAndPort; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HServerAddress; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.consensus.quorum.AggregateTimer; import org.apache.hadoop.hbase.consensus.quorum.QuorumInfo; import org.apache.hadoop.hbase.consensus.quorum.RepeatingTimer; import org.apache.hadoop.hbase.consensus.quorum.TimeoutEventHandler; import org.apache.hadoop.hbase.consensus.quorum.Timer; -import org.apache.hadoop.hbase.util.Bytes; import org.jboss.netty.channel.socket.nio.NioSocketChannelConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,9 +73,9 @@ public class RaftUtil { return createDummyQuorumInfo(region, null); } - public static QuorumInfo createDummyQuorumInfo(String region, Map peers) { - Map> peerMap = new HashMap<>(); + Map> peerMap = new HashMap<>(); peerMap.put(QuorumInfo.LOCAL_DC_KEY, peers); return new QuorumInfo(peerMap, region); } @@ -89,15 +87,15 @@ public class RaftUtil { return Joiner.on(", ").useForNull("null").join(list); } - public static HServerAddress getHRegionServerAddress(HServerAddress + public static HostAndPort getHRegionServerAddress(HostAndPort localConsensusServerAddress) { - return new HServerAddress(localConsensusServerAddress.getBindAddress(), + return HostAndPort.fromParts(localConsensusServerAddress.getHostText(), localConsensusServerAddress.getPort() - HConstants.CONSENSUS_SERVER_PORT_JUMP); } - public static HServerAddress getLocalConsensusAddress(HServerAddress + public static HostAndPort getLocalConsensusAddress(HostAndPort regionServerAddress) { - return new HServerAddress(regionServerAddress.getBindAddress(), + return HostAndPort.fromParts(regionServerAddress.getHostText(), regionServerAddress.getPort() + HConstants.CONSENSUS_SERVER_PORT_JUMP); } diff --git a/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/LocalTestBed.java b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/LocalTestBed.java index b5513d2..89e6b7f 100644 --- a/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/LocalTestBed.java +++ b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/LocalTestBed.java @@ -22,7 +22,7 @@ package org.apache.hadoop.hbase.consensus; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HServerAddress; +import com.google.common.net.HostAndPort; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.consensus.exceptions.LeaderNotReadyException; import org.apache.hadoop.hbase.consensus.exceptions.NewLeaderException; @@ -117,8 +117,8 @@ public class LocalTestBed { public static class LinkState { - private HServerAddress src; - private HServerAddress dst; + private HostAndPort src; + private HostAndPort dst; private State state; private long delay = 0L; private long hiccupStopTime = 0L; @@ -128,17 +128,17 @@ public class LocalTestBed { UP, DOWN } - public LinkState(HServerAddress src, HServerAddress dst, State state) { + public LinkState(HostAndPort src, HostAndPort dst, State state) { this.src = src; this.dst = dst; this.state = state; } - public HServerAddress getSrc() { + public HostAndPort getSrc() { return src; } - public HServerAddress getDst() { + public HostAndPort getDst() { return dst; } @@ -192,8 +192,8 @@ public class LocalTestBed { private Random prng; // the map is maintained per pair of nodes - private ConcurrentHashMap> links - = new ConcurrentHashMap>(); + private ConcurrentHashMap> links + = new ConcurrentHashMap<>(); private ConcurrentHashMap nodes = new ConcurrentHashMap(); @@ -207,17 +207,17 @@ public class LocalTestBed { prng = new Random(config.getLong("seed", System.currentTimeMillis())); } - public LinkState getLinkState(HServerAddress src, HServerAddress dst) { + public LinkState getLinkState(HostAndPort src, HostAndPort dst) { if (src == null || dst == null || src.equals(dst)) { return null; } if (src.toString().compareTo(dst.toString()) > 0) { - HServerAddress tmp = src; + HostAndPort tmp = src; src = dst; dst = tmp; } if (links.get(src) == null) { - links.put(src, new ConcurrentHashMap()); + links.put(src, new ConcurrentHashMap()); } if (links.get(src).get(dst) == null) { links.get(src).put(dst, new LinkState(src, dst, LinkState.State.UP)); @@ -239,9 +239,9 @@ public class LocalTestBed { Set currentHiccups = new HashSet(); Set allHiccups = new HashSet(); // include future hiccups Set nohiccups = new HashSet(); - for (HServerAddress dst : quorumInfo.getPeersWithRank().keySet()) { + for (HostAndPort dst : quorumInfo.getPeersWithRank().keySet()) { dst = RaftUtil.getLocalConsensusAddress(dst); - for (HServerAddress src : quorumInfo.getPeersWithRank().keySet()) { + for (HostAndPort src : quorumInfo.getPeersWithRank().keySet()) { src = RaftUtil.getLocalConsensusAddress(src); if (src.equals(dst)) { continue; @@ -293,11 +293,11 @@ public class LocalTestBed { } } - for (HServerAddress dst : quorumInfo.getPeersWithRank().keySet()) { + for (HostAndPort dst : quorumInfo.getPeersWithRank().keySet()) { dst = RaftUtil.getLocalConsensusAddress(dst); InstrumentedConsensusServiceImpl service = (InstrumentedConsensusServiceImpl) - (servers.get(dst.getHostAddressWithPort()).getHandler()); - for (HServerAddress src : quorumInfo.getPeersWithRank().keySet()) { + (servers.get(dst.toString()).getHandler()); + for (HostAndPort src : quorumInfo.getPeersWithRank().keySet()) { src = RaftUtil.getLocalConsensusAddress(src); if (!src.equals(dst) && !clear) { long delay = (long)(prng.nextDouble()*(maxRandomDelay-minRandomDelay) + minRandomDelay); diff --git a/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/RaftTestUtil.java b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/RaftTestUtil.java index fdd3009..60c9113 100644 --- a/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/RaftTestUtil.java +++ b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/RaftTestUtil.java @@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HServerAddress; +import com.google.common.net.HostAndPort; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.consensus.client.QuorumClient; @@ -147,8 +147,8 @@ public class RaftTestUtil { LocalConsensusServer server = new LocalConsensusServer(service, handlers, conf); server.initialize(config); - HServerAddress addr = new HServerAddress(LOCAL_HOST + ":" + port); - getServers().put(addr.getHostAddressWithPort(), server); + HostAndPort addr = HostAndPort.fromString(LOCAL_HOST + ":" + port); + getServers().put(addr.toString(), server); service.setIdentity(LOCAL_HOST + ":" + port); server.startService(); return server; @@ -183,11 +183,11 @@ public class RaftTestUtil { * Allows override. */ public RaftQuorumContext createRaftQuorumContext(QuorumInfo info, Configuration conf, - HServerAddress address, LocalConsensusServer server) { + HostAndPort address, LocalConsensusServer server) { String mBeansPrefix = "Test."; try { return raftQuorumContextClass.getConstructor(QuorumInfo.class, Configuration.class, - HServerAddress.class, String.class, AggregateTimer.class, SerialExecutorService.class, + HostAndPort.class, String.class, AggregateTimer.class, SerialExecutorService.class, ExecutorService.class).newInstance(info, conf, address, mBeansPrefix, server.aggregateTimer, server.serialExecutorService, server.getExecServiceForThriftClients()); @@ -219,16 +219,16 @@ public class RaftTestUtil { public void addQuorumForServer(final LocalConsensusServer server, final QuorumInfo info, int[] mockLog) throws IOException { - HServerAddress consensusServerAddress = new HServerAddress(LOCAL_HOST, + HostAndPort consensusServerAddress = HostAndPort.fromParts(LOCAL_HOST, server.getThriftServer().getPort()); Configuration conf = HBaseConfiguration.create(getConf()); conf.set(HConstants.RAFT_TRANSACTION_LOG_DIRECTORY_KEY, raftDirectory + "/" + - consensusServerAddress.getHostAddressWithPort() + "/wal"); + consensusServerAddress.toString() + "/wal"); conf.set(HConstants.RAFT_METADATA_DIRECTORY_KEY, raftDirectory + "/" + - consensusServerAddress.getHostAddressWithPort() + "/metadata"); + consensusServerAddress.toString() + "/metadata"); conf.setInt( HConstants.RAFT_LOG_DELETION_INTERVAL_KEY, @@ -279,7 +279,7 @@ public class RaftTestUtil { } public boolean checkHealth(QuorumInfo quorumInfo, LocalConsensusServer server) throws Exception { - HServerAddress consensusServerAddress = new HServerAddress(LOCAL_HOST, + HostAndPort consensusServerAddress = HostAndPort.fromParts(LOCAL_HOST, server.getThriftServer().getPort()); int timeout = 5000; LOG.info("Getting QuorumThriftClientAgent for " + consensusServerAddress); @@ -300,14 +300,14 @@ public class RaftTestUtil { getServers().put(contextAddress, server); - HServerAddress consensusServerAddress = new HServerAddress(LOCAL_HOST, + HostAndPort consensusServerAddress = HostAndPort.fromParts(LOCAL_HOST, server.getThriftServer().getPort()); Configuration conf = HBaseConfiguration.create(); conf.set(HConstants.RAFT_TRANSACTION_LOG_DIRECTORY_KEY, raftDirectory + - "/"+ consensusServerAddress.getHostAddressWithPort() + "/wal"); + "/"+ consensusServerAddress.toString() + "/wal"); conf.set(HConstants.RAFT_METADATA_DIRECTORY_KEY, raftDirectory + "/" + - consensusServerAddress.getHostAddressWithPort() + "/metadata"); + consensusServerAddress.toString() + "/metadata"); conf.setInt( HConstants.RAFT_LOG_DELETION_INTERVAL_KEY, 100); @@ -509,7 +509,7 @@ public class RaftTestUtil { if (context == null || info.getPeersWithRank().get(RaftUtil.getHRegionServerAddress( - new HServerAddress(RaftTestUtil.LOCAL_HOST, + HostAndPort.fromParts(RaftTestUtil.LOCAL_HOST, server.getThriftServer().getPort()) )) == null) { if (verbose) { @@ -551,12 +551,12 @@ public class RaftTestUtil { } public QuorumInfo initializePeers() { - Map peers = new HashMap<>(); + Map peers = new HashMap<>(); int rank = servers.size(); for (LocalConsensusServer server : servers.values()) { int regionServerPort = server.getThriftServer().getPort() - HConstants.CONSENSUS_SERVER_PORT_JUMP; - peers.put(new HServerAddress(RaftTestUtil.LOCAL_HOST, regionServerPort), + peers.put(HostAndPort.fromParts(RaftTestUtil.LOCAL_HOST, regionServerPort), Math.max(rank--, 0)); } @@ -565,7 +565,7 @@ public class RaftTestUtil { HColumnDescriptor hcd = new HColumnDescriptor(FAMILY).setMaxVersions(Integer.MAX_VALUE); table.addFamily(hcd); - Map> peerMap = new HashMap<>(); + Map> peerMap = new HashMap<>(); peerMap.put(QuorumInfo.LOCAL_DC_KEY, peers); String quorumName = "dummyTable,123,deadbeef."; @@ -643,9 +643,9 @@ public class RaftTestUtil { public boolean simulatePacketDropForServer(final QuorumInfo quorumInfo, int rank, final InstrumentedConsensusServiceImpl.PacketDropStyle style) { - HServerAddress server = null; + HostAndPort server = null; - for (HServerAddress s : quorumInfo.getPeersWithRank().keySet()) { + for (HostAndPort s : quorumInfo.getPeersWithRank().keySet()) { if (quorumInfo.getPeersWithRank().get(s) == rank) { server = s; break; @@ -657,7 +657,7 @@ public class RaftTestUtil { } InstrumentedConsensusServiceImpl service = (InstrumentedConsensusServiceImpl) - (servers.get(RaftUtil.getLocalConsensusAddress(server).getHostAddressWithPort()). + (servers.get(RaftUtil.getLocalConsensusAddress(server).toString()). getHandler()); service.setPacketDropStyle(style); return true; @@ -665,9 +665,9 @@ public class RaftTestUtil { public long getHiccupPacketDropCount(final QuorumInfo quorumInfo) { long count = 0; - for (HServerAddress server : quorumInfo.getPeersWithRank().keySet()) { + for (HostAndPort server : quorumInfo.getPeersWithRank().keySet()) { InstrumentedConsensusServiceImpl service = (InstrumentedConsensusServiceImpl) - (servers.get(RaftUtil.getLocalConsensusAddress(server).getHostAddressWithPort()).getHandler()); + (servers.get(RaftUtil.getLocalConsensusAddress(server).toString()).getHandler()); count += service.getHiccupPacketDropCount(); } return count; @@ -675,9 +675,9 @@ public class RaftTestUtil { public long getPacketDropCount(final QuorumInfo quorumInfo) { long count = 0; - for (HServerAddress server : quorumInfo.getPeersWithRank().keySet()) { + for (HostAndPort server : quorumInfo.getPeersWithRank().keySet()) { InstrumentedConsensusServiceImpl service = (InstrumentedConsensusServiceImpl) - (servers.get(RaftUtil.getLocalConsensusAddress(server).getHostAddressWithPort()).getHandler()); + (servers.get(RaftUtil.getLocalConsensusAddress(server).toString()).getHandler()); count += service.getPacketDropCount(); } return count; @@ -695,9 +695,9 @@ public class RaftTestUtil { public RaftQuorumContext getRaftQuorumContextByRank(QuorumInfo quorumInfo, int rank) { String peerAddress = null; - for (HServerAddress addr : quorumInfo.getPeersWithRank().keySet()) { + for (HostAndPort addr : quorumInfo.getPeersWithRank().keySet()) { if (quorumInfo.getPeersWithRank().get(addr) == rank) { - peerAddress = RaftUtil.getLocalConsensusAddress(addr).getHostAddressWithPort(); + peerAddress = RaftUtil.getLocalConsensusAddress(addr).toString(); } } @@ -707,9 +707,9 @@ public class RaftTestUtil { public LocalConsensusServer stopLocalConsensusServer(QuorumInfo quorumInfo, int rank) { String peerAddress = null; - for (HServerAddress addr : quorumInfo.getPeersWithRank().keySet()) { + for (HostAndPort addr : quorumInfo.getPeersWithRank().keySet()) { if (quorumInfo.getPeersWithRank().get(addr) == rank) { - peerAddress = RaftUtil.getLocalConsensusAddress(addr).getHostAddressWithPort(); + peerAddress = RaftUtil.getLocalConsensusAddress(addr).toString(); } } @@ -735,13 +735,13 @@ public class RaftTestUtil { public List getQuorumContexts( final QuorumInfo quorumInfo) { - Set replias = quorumInfo.getPeersWithRank().keySet(); + Set replias = quorumInfo.getPeersWithRank().keySet(); List contexts = new ArrayList<>(replias.size()); - for (HServerAddress address : replias) { + for (HostAndPort address : replias) { String consensusServerAddress = RaftUtil.getLocalConsensusAddress(address). - getHostAddressWithPort(); + toString(); if (getServers().containsKey(consensusServerAddress)) { contexts.add(getRaftQuorumContextByAddress(quorumInfo, consensusServerAddress)); diff --git a/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/TestBasicQuorumMembershipChange.java b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/TestBasicQuorumMembershipChange.java index be65d34..d6464f0 100644 --- a/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/TestBasicQuorumMembershipChange.java +++ b/hbase-consensus/src/test/java/org/apache/hadoop/hbase/consensus/TestBasicQuorumMembershipChange.java @@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.consensus; import junit.framework.Assert; - import org.apache.hadoop.hbase.HServerAddress; + import com.google.common.net.HostAndPort; import org.apache.hadoop.hbase.client.NoLeaderForRegionException; import org.apache.hadoop.hbase.consensus.quorum.QuorumInfo; import org.apache.hadoop.hbase.consensus.server.LocalConsensusServer; @@ -188,23 +188,23 @@ import java.util.concurrent.ThreadPoolExecutor; QuorumInfo info = new QuorumInfo(quorumInfo); boolean leaderReplaced = false; - List peerToReplaceAddr; + List peerToReplaceAddr; peerToReplaceAddr = new ArrayList<>(); - List> newServers = new ArrayList<>(); + List> newServers = new ArrayList<>(); - Map currentPeers = + Map currentPeers = info.getPeers().get(QuorumInfo.LOCAL_DC_KEY); - HServerAddress oldPeer, newPeer; + HostAndPort oldPeer, newPeer; for (int newServerPort : ports) { - newPeer = new HServerAddress(RAFT_TEST_UTIL.LOCAL_HOST, + newPeer = HostAndPort.fromParts(RAFT_TEST_UTIL.LOCAL_HOST, newServerPort - HConstants.CONSENSUS_SERVER_PORT_JUMP); LOG.debug("Adding new server with address " + newPeer); if (replaceLeader && !leaderReplaced) { oldPeer = RaftUtil.getHRegionServerAddress( - new HServerAddress(client.getLeader().getServerAddress())); + HostAndPort.fromString(client.getLeader().getServerAddress())); leaderReplaced = true; System.out.println( "Replacing leader " + oldPeer + " with port " + newPeer); @@ -217,14 +217,14 @@ import java.util.concurrent.ThreadPoolExecutor; peerToReplaceAddr.add(oldPeer); int rank = currentPeers.remove(oldPeer); - newServers.add(new Pair(newPeer, rank)); + newServers.add(new Pair<>(newPeer, rank)); } // Make sure we actually removed the required number of peers Assert.assertTrue(info.getPeers().get(QuorumInfo.LOCAL_DC_KEY).size() == QUORUM_SIZE - ports.length); - for (Pair server : newServers) { + for (Pair server : newServers) { // Update the config info.getPeers().get(QuorumInfo.LOCAL_DC_KEY).put(server.getFirst(), server.getSecond()); -- 2.1.0