From 53b0b1759c0577cec185fd106891a55327df3eca Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Wed, 5 Nov 2014 15:46:07 -0800 Subject: [PATCH] HBASE-12429 Add port to ClusterManager's actions. --- .../org/apache/hadoop/hbase/ClusterManager.java | 16 +-- .../hadoop/hbase/DistributedHBaseCluster.java | 153 +++++++++++++-------- .../apache/hadoop/hbase/HBaseClusterManager.java | 14 +- .../apache/hadoop/hbase/chaos/actions/Action.java | 6 +- .../hbase/chaos/actions/BatchRestartRsAction.java | 4 +- .../java/org/apache/hadoop/hbase/HBaseCluster.java | 8 +- .../org/apache/hadoop/hbase/MiniHBaseCluster.java | 4 +- .../TestMasterOperationsForRegionReplicas.java | 2 +- 8 files changed, 123 insertions(+), 84 deletions(-) diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ClusterManager.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ClusterManager.java index dd96e43..2d46279 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/ClusterManager.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ClusterManager.java @@ -61,38 +61,38 @@ interface ClusterManager extends Configurable { /** * Start the service on the given host */ - void start(ServiceType service, String hostname) throws IOException; + void start(ServiceType service, String hostname, int port) throws IOException; /** * Stop the service on the given host */ - void stop(ServiceType service, String hostname) throws IOException; + void stop(ServiceType service, String hostname, int port) throws IOException; /** - * Restarts the service on the given host + * Restart the service on the given host */ - void restart(ServiceType service, String hostname) throws IOException; + void restart(ServiceType service, String hostname, int port) throws IOException; /** * Kills the service running on the given host */ - void kill(ServiceType service, String hostname) throws IOException; + void kill(ServiceType service, String hostname, int port) throws IOException; /** * Suspends the service running on the given host */ - void suspend(ServiceType service, String hostname) throws IOException; + void suspend(ServiceType service, String hostname, int port) throws IOException; /** * Resumes the services running on the given host */ - void resume(ServiceType service, String hostname) throws IOException; + void resume(ServiceType service, String hostname, int port) throws IOException; /** * Returns whether the service is running on the remote host. This only checks whether the * service still has a pid. */ - boolean isRunning(ServiceType service, String hostname) throws IOException; + boolean isRunning(ServiceType service, String hostname, int port) throws IOException; /* TODO: further API ideas: * diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java index 6bc4143..4a3a64a 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java @@ -19,8 +19,12 @@ package org.apache.hadoop.hbase; import java.io.IOException; import java.util.ArrayList; +import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.TreeSet; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; @@ -105,21 +109,25 @@ public class DistributedHBaseCluster extends HBaseCluster { } @Override - public void startRegionServer(String hostname) throws IOException { + public void startRegionServer(String hostname, int port) throws IOException { LOG.info("Starting RS on: " + hostname); - clusterManager.start(ServiceType.HBASE_REGIONSERVER, hostname); + clusterManager.start(ServiceType.HBASE_REGIONSERVER, hostname, port); } @Override public void killRegionServer(ServerName serverName) throws IOException { LOG.info("Aborting RS: " + serverName.getServerName()); - clusterManager.kill(ServiceType.HBASE_REGIONSERVER, serverName.getHostname()); + clusterManager.kill(ServiceType.HBASE_REGIONSERVER, + serverName.getHostname(), + serverName.getPort()); } @Override public void stopRegionServer(ServerName serverName) throws IOException { LOG.info("Stopping RS: " + serverName.getServerName()); - clusterManager.stop(ServiceType.HBASE_REGIONSERVER, serverName.getHostname()); + clusterManager.stop(ServiceType.HBASE_REGIONSERVER, + serverName.getHostname(), + serverName.getPort()); } @Override @@ -133,7 +141,7 @@ public class DistributedHBaseCluster extends HBaseCluster { long start = System.currentTimeMillis(); while ((System.currentTimeMillis() - start) < timeout) { - if (!clusterManager.isRunning(service, serverName.getHostname())) { + if (!clusterManager.isRunning(service, serverName.getHostname(), serverName.getPort())) { return; } Threads.sleep(1000); @@ -148,21 +156,21 @@ public class DistributedHBaseCluster extends HBaseCluster { } @Override - public void startMaster(String hostname) throws IOException { - LOG.info("Starting Master on: " + hostname); - clusterManager.start(ServiceType.HBASE_MASTER, hostname); + public void startMaster(String hostname, int port) throws IOException { + LOG.info("Starting Master on: " + hostname + ":" + port); + clusterManager.start(ServiceType.HBASE_MASTER, hostname, port); } @Override public void killMaster(ServerName serverName) throws IOException { LOG.info("Aborting Master: " + serverName.getServerName()); - clusterManager.kill(ServiceType.HBASE_MASTER, serverName.getHostname()); + clusterManager.kill(ServiceType.HBASE_MASTER, serverName.getHostname(), serverName.getPort()); } @Override public void stopMaster(ServerName serverName) throws IOException { LOG.info("Stopping Master: " + serverName.getServerName()); - clusterManager.stop(ServiceType.HBASE_MASTER, serverName.getHostname()); + clusterManager.stop(ServiceType.HBASE_MASTER, serverName.getHostname(), serverName.getPort()); } @Override @@ -207,13 +215,13 @@ public class DistributedHBaseCluster extends HBaseCluster { @Override public void waitUntilShutDown() { - //Simply wait for a few seconds for now (after issuing serverManager.kill + // Simply wait for a few seconds for now (after issuing serverManager.kill throw new RuntimeException("Not implemented yet"); } @Override public void shutdown() throws IOException { - //not sure we want this + // not sure we want this throw new RuntimeException("Not implemented yet"); } @@ -241,30 +249,35 @@ public class DistributedHBaseCluster extends HBaseCluster { protected boolean restoreMasters(ClusterStatus initial, ClusterStatus current) { List deferred = new ArrayList(); //check whether current master has changed - if (!ServerName.isSameHostnameAndPort(initial.getMaster(), current.getMaster())) { - LOG.info("Restoring cluster - Initial active master : " + initial.getMaster().getHostname() - + " has changed to : " + current.getMaster().getHostname()); + final ServerName initMaster = initial.getMaster(); + if (!ServerName.isSameHostnameAndPort(initMaster, current.getMaster())) { + LOG.info("Restoring cluster - Initial active master : " + + initMaster.getHostAndPort() + + " has changed to : " + + current.getMaster().getHostAndPort()); // If initial master is stopped, start it, before restoring the state. // It will come up as a backup master, if there is already an active master. try { - if (!clusterManager.isRunning(ServiceType.HBASE_MASTER, initial.getMaster().getHostname())) { - LOG.info("Restoring cluster - starting initial active master at:" + initial.getMaster().getHostname()); - startMaster(initial.getMaster().getHostname()); + if (!clusterManager.isRunning(ServiceType.HBASE_MASTER, + initMaster.getHostname(), initMaster.getPort())) { + LOG.info("Restoring cluster - starting initial active master at:" + + initMaster.getHostAndPort()); + startMaster(initMaster.getHostname(), initMaster.getPort()); } - //master has changed, we would like to undo this. - //1. Kill the current backups - //2. Stop current master - //3. Start backup masters + // master has changed, we would like to undo this. + // 1. Kill the current backups + // 2. Stop current master + // 3. Start backup masters for (ServerName currentBackup : current.getBackupMasters()) { - if (!ServerName.isSameHostnameAndPort(currentBackup, initial.getMaster())) { + if (!ServerName.isSameHostnameAndPort(currentBackup, initMaster)) { LOG.info("Restoring cluster - stopping backup master: " + currentBackup); stopMaster(currentBackup); } } LOG.info("Restoring cluster - stopping active master: " + current.getMaster()); stopMaster(current.getMaster()); - waitForActiveAndReadyMaster(); //wait so that active master takes over + waitForActiveAndReadyMaster(); // wait so that active master takes over } catch (IOException ex) { // if we fail to start the initial active master, we do not want to continue stopping // backup masters. Just keep what we have now @@ -275,9 +288,12 @@ public class DistributedHBaseCluster extends HBaseCluster { for (ServerName backup : initial.getBackupMasters()) { try { //these are not started in backup mode, but we should already have an active master - if(!clusterManager.isRunning(ServiceType.HBASE_MASTER, backup.getHostname())) { - LOG.info("Restoring cluster - starting initial backup master: " + backup.getHostname()); - startMaster(backup.getHostname()); + if (!clusterManager.isRunning(ServiceType.HBASE_MASTER, + backup.getHostname(), + backup.getPort())) { + LOG.info("Restoring cluster - starting initial backup master: " + + backup.getHostAndPort()); + startMaster(backup.getHostname(), backup.getPort()); } } catch (IOException ex) { deferred.add(ex); @@ -285,32 +301,34 @@ public class DistributedHBaseCluster extends HBaseCluster { } } else { //current master has not changed, match up backup masters - HashMap initialBackups = new HashMap(); - HashMap currentBackups = new HashMap(); + Set toStart = new TreeSet(new ServerNameIgnoreStartCodeComparator()); + Set toKill = new TreeSet(new ServerNameIgnoreStartCodeComparator()); + toStart.addAll(initial.getBackupMasters()); + toKill.addAll(current.getBackupMasters()); - for (ServerName server : initial.getBackupMasters()) { - initialBackups.put(server.getHostname(), server); - } for (ServerName server : current.getBackupMasters()) { - currentBackups.put(server.getHostname(), server); + toStart.remove(server); + } + for (ServerName server: initial.getBackupMasters()) { + toKill.remove(server); } - for (String hostname : Sets.difference(initialBackups.keySet(), currentBackups.keySet())) { + for (ServerName sn:toStart) { try { - if(!clusterManager.isRunning(ServiceType.HBASE_MASTER, hostname)) { - LOG.info("Restoring cluster - starting initial backup master: " + hostname); - startMaster(hostname); + if(!clusterManager.isRunning(ServiceType.HBASE_MASTER, sn.getHostname(), sn.getPort())) { + LOG.info("Restoring cluster - starting initial backup master: " + sn.getHostAndPort()); + startMaster(sn.getHostname(), sn.getPort()); } } catch (IOException ex) { deferred.add(ex); } } - for (String hostname : Sets.difference(currentBackups.keySet(), initialBackups.keySet())) { + for (ServerName sn:toKill) { try { - if(clusterManager.isRunning(ServiceType.HBASE_MASTER, hostname)) { - LOG.info("Restoring cluster - stopping backup master: " + hostname); - stopMaster(currentBackups.get(hostname)); + if(clusterManager.isRunning(ServiceType.HBASE_MASTER, sn.getHostname(), sn.getPort())) { + LOG.info("Restoring cluster - stopping backup master: " + sn.getHostAndPort()); + stopMaster(sn); } } catch (IOException ex) { deferred.add(ex); @@ -318,7 +336,8 @@ public class DistributedHBaseCluster extends HBaseCluster { } } if (!deferred.isEmpty()) { - LOG.warn("Restoring cluster - restoring region servers reported " + deferred.size() + " errors:"); + LOG.warn("Restoring cluster - restoring region servers reported " + + deferred.size() + " errors:"); for (int i=0; i { + @Override + public int compare(ServerName o1, ServerName o2) { + int compare = o1.getHostname().compareToIgnoreCase(o2.getHostname()); + if (compare != 0) return compare; + compare = o1.getPort() - o2.getPort(); + if (compare != 0) return compare; + return 0; + } + } + protected boolean restoreRegionServers(ClusterStatus initial, ClusterStatus current) { - HashMap initialServers = new HashMap(); - HashMap currentServers = new HashMap(); + Set toStart = new TreeSet(new ServerNameIgnoreStartCodeComparator()); + Set toKill = new TreeSet(new ServerNameIgnoreStartCodeComparator()); + toStart.addAll(initial.getBackupMasters()); + toKill.addAll(current.getBackupMasters()); - for (ServerName server : initial.getServers()) { - initialServers.put(server.getHostname(), server); - } for (ServerName server : current.getServers()) { - currentServers.put(server.getHostname(), server); + toStart.remove(server); + } + for (ServerName server: initial.getServers()) { + toKill.remove(server); } List deferred = new ArrayList(); - for (String hostname : Sets.difference(initialServers.keySet(), currentServers.keySet())) { + + for(ServerName sn:toStart) { try { - if(!clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER, hostname)) { - LOG.info("Restoring cluster - starting initial region server: " + hostname); - startRegionServer(hostname); + if (!clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER, + sn.getHostname(), + sn.getPort())) { + LOG.info("Restoring cluster - starting initial region server: " + sn.getHostAndPort()); + startRegionServer(sn.getHostname(), sn.getPort()); } } catch (IOException ex) { deferred.add(ex); } } - for (String hostname : Sets.difference(currentServers.keySet(), initialServers.keySet())) { + for(ServerName sn:toKill) { try { - if(clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER, hostname)) { - LOG.info("Restoring cluster - stopping initial region server: " + hostname); - stopRegionServer(currentServers.get(hostname)); + if (clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER, + sn.getHostname(), + sn.getPort())) { + LOG.info("Restoring cluster - stopping initial region server: " + sn.getHostAndPort()); + stopRegionServer(sn); } } catch (IOException ex) { deferred.add(ex); } } if (!deferred.isEmpty()) { - LOG.warn("Restoring cluster - restoring region servers reported " + deferred.size() + " errors:"); + LOG.warn("Restoring cluster - restoring region servers reported " + + deferred.size() + " errors:"); for (int i=0; i 0; } @Override - public void kill(ServiceType service, String hostname) throws IOException { + public void kill(ServiceType service, String hostname, int port) throws IOException { signal(service, SIGKILL, hostname); } @Override - public void suspend(ServiceType service, String hostname) throws IOException { + public void suspend(ServiceType service, String hostname, int port) throws IOException { signal(service, SIGSTOP, hostname); } @Override - public void resume(ServiceType service, String hostname) throws IOException { + public void resume(ServiceType service, String hostname, int port) throws IOException { signal(service, SIGCONT, hostname); } } diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/Action.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/Action.java index f54f7dc..ebc83ff 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/Action.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/Action.java @@ -114,7 +114,7 @@ public class Action { protected void startMaster(ServerName server) throws IOException { LOG.info("Starting master:" + server.getHostname()); - cluster.startMaster(server.getHostname()); + cluster.startMaster(server.getHostname(), server.getPort()); cluster.waitForActiveAndReadyMaster(startMasterTimeout); LOG.info("Started master: " + server); } @@ -129,8 +129,8 @@ public class Action { protected void startRs(ServerName server) throws IOException { LOG.info("Starting region server:" + server.getHostname()); - cluster.startRegionServer(server.getHostname()); - cluster.waitForRegionServerToStart(server.getHostname(), startRsTimeout); + cluster.startRegionServer(server.getHostname(), server.getPort()); + cluster.waitForRegionServerToStart(server.getHostname(), server.getPort(), startRsTimeout); LOG.info("Started region server:" + server + ". Reported num of rs:" + cluster.getClusterStatus().getServersSize()); } diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/BatchRestartRsAction.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/BatchRestartRsAction.java index edfd9c4..b6a5b50 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/BatchRestartRsAction.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/BatchRestartRsAction.java @@ -57,11 +57,11 @@ public class BatchRestartRsAction extends RestartActionBaseAction { for (ServerName server : selectedServers) { LOG.info("Starting region server:" + server.getHostname()); - cluster.startRegionServer(server.getHostname()); + cluster.startRegionServer(server.getHostname(), server.getPort()); } for (ServerName server : selectedServers) { - cluster.waitForRegionServerToStart(server.getHostname(), PolicyBasedChaosMonkey.TIMEOUT); + cluster.waitForRegionServerToStart(server.getHostname(), server.getPort(), PolicyBasedChaosMonkey.TIMEOUT); } LOG.info("Started " + selectedServers.size() +" region servers. Reported num of rs:" + cluster.getClusterStatus().getServersSize()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseCluster.java index 9e7a0c4..adbdd69 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseCluster.java @@ -118,7 +118,7 @@ public abstract class HBaseCluster implements Closeable, Configurable { * @param hostname the hostname to start the regionserver on * @throws IOException if something goes wrong */ - public abstract void startRegionServer(String hostname) throws IOException; + public abstract void startRegionServer(String hostname, int port) throws IOException; /** * Kills the region server process if this is a distributed cluster, otherwise @@ -139,12 +139,12 @@ public abstract class HBaseCluster implements Closeable, Configurable { * @return whether the operation finished with success * @throws IOException if something goes wrong or timeout occurs */ - public void waitForRegionServerToStart(String hostname, long timeout) + public void waitForRegionServerToStart(String hostname, int port, long timeout) throws IOException { long start = System.currentTimeMillis(); while ((System.currentTimeMillis() - start) < timeout) { for (ServerName server : getClusterStatus().getServers()) { - if (server.getHostname().equals(hostname)) { + if (server.getHostname().equals(hostname) && server.getPort() == port) { return; } } @@ -169,7 +169,7 @@ public abstract class HBaseCluster implements Closeable, Configurable { * @return whether the operation finished with success * @throws IOException if something goes wrong */ - public abstract void startMaster(String hostname) throws IOException; + public abstract void startMaster(String hostname, int port) throws IOException; /** * Kills the master process if this is a distributed cluster, otherwise, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java index 7672ac1..24b6e71 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java @@ -233,7 +233,7 @@ public class MiniHBaseCluster extends HBaseCluster { } @Override - public void startRegionServer(String hostname) throws IOException { + public void startRegionServer(String hostname, int port) throws IOException { this.startRegionServer(); } @@ -260,7 +260,7 @@ public class MiniHBaseCluster extends HBaseCluster { } @Override - public void startMaster(String hostname) throws IOException { + public void startMaster(String hostname, int port) throws IOException { this.startMaster(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterOperationsForRegionReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterOperationsForRegionReplicas.java index 289741e..846f8e6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterOperationsForRegionReplicas.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterOperationsForRegionReplicas.java @@ -157,7 +157,7 @@ public class TestMasterOperationsForRegionReplicas { ServerName master = TEST_UTIL.getHBaseClusterInterface().getClusterStatus().getMaster(); TEST_UTIL.getHBaseClusterInterface().stopMaster(master); TEST_UTIL.getHBaseClusterInterface().waitForMasterToStop(master, 30000); - TEST_UTIL.getHBaseClusterInterface().startMaster(master.getHostname()); + TEST_UTIL.getHBaseClusterInterface().startMaster(master.getHostname(), master.getPort()); TEST_UTIL.getHBaseClusterInterface().waitForActiveAndReadyMaster(); for (int i = 0; i < numRegions; i++) { for (int j = 0; j < numReplica; j++) { -- 2.2.1