commit 9f0f91c3518cd83ce462bd36065b20f815adad43 Author: Enis Soztutar Date: Fri Jul 10 16:06:29 2015 -0700 HBASE-13997 ScannerCallableWithReplicas cause Infinitely blocking (Zephyr Guo and Enis) diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/HBaseClusterManager.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/HBaseClusterManager.java index 8bdb5d6..10c44fb 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/HBaseClusterManager.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/HBaseClusterManager.java @@ -64,6 +64,7 @@ public class HBaseClusterManager extends Configured implements ClusterManager { private static final String RETRY_SLEEP_INTERVAL_KEY = "hbase.it.clustermanager.retry.sleep.interval"; private static final int DEFAULT_RETRY_SLEEP_INTERVAL = 1000; + private static final long INVALID_PID = -1L; protected RetryCounterFactory retryCounterFactory; @@ -148,17 +149,56 @@ public class HBaseClusterManager extends Configured implements ClusterManager { public abstract String getCommand(ServiceType service, Operation op); - public String isRunningCommand(ServiceType service) { - return findPidCommand(service); + /** + * Compose a command to check whether the given service is running with the given pid, or if pid + * not given, check whether there're any thread running with the service name + * @param service type of the service + * @param pid expecting pid of the service + * @return the command to check running + */ + public String isRunningCommand(ServiceType service, long pid) { + // use "tr" and "cut" to avoid the shell execution returns non-zero value + if (isValidPid(pid)) { + return String.format( + "ps aux | grep proc_%s | grep %d | grep -v grep | tr -s ' ' | cut -d ' ' -f2", service, + pid); + } else { + return String.format("ps aux | grep proc_%s | grep -v grep | tr -s ' ' | cut -d ' ' -f2", + service); + } + } + + private boolean isValidPid(long pid) { + return pid > 0; } - protected String findPidCommand(ServiceType service) { - return String.format("ps aux | grep proc_%s | grep -v grep | tr -s ' ' | cut -d ' ' -f2", + /** + * Compose a command to get the pid of the service thread. + *

+ * If port is given, get pid through the port, or else from the service name. + *

+ * Notice that we might got incorrect pid w/o port since there might be more than one thread + * running with the same service name + * @param service type of the service to get pid of + * @param port the port to which the service should be listening to + * @return the command to get pid + */ + protected String findPidCommand(ServiceType service, int port) { + if (port > 0) { + return String.format( + "netstat -nltp | grep %d | tr -s ' ' | cut -d ' ' -f7 | cut -d '/' -f1", port); + } else { + return String.format("ps aux | grep proc_%s | grep -v grep | tr -s ' ' | cut -d ' ' -f2", service); + } } - public String signalCommand(ServiceType service, String signal) { - return String.format("%s | xargs kill -s %s", findPidCommand(service), signal); + public String signalCommand(ServiceType service, String signal, int port, long pid) { + if (pid > 0) { + return String.format("kill -s %s %d", signal, pid); + } else { + return String.format("%s | xargs kill -s %s", findPidCommand(service, port), signal); + } } } @@ -270,29 +310,55 @@ public class HBaseClusterManager extends Configured implements ClusterManager { exec(hostname, service, Operation.RESTART); } - public void signal(ServiceType service, String signal, String hostname) throws IOException { - execWithRetries(hostname, getCommandProvider(service).signalCommand(service, signal)); + public void signal(ServiceType service, String signal, String hostname, int port, long pid) + throws IOException { + String command = getCommandProvider(service).signalCommand(service, signal, port, pid); + execWithRetries(hostname, command); } @Override public boolean isRunning(ServiceType service, String hostname, int port) throws IOException { - String ret = execWithRetries(hostname, getCommandProvider(service).isRunningCommand(service)) - .getSecond(); + long pid = getServicePid(service, hostname, port); + String command = getCommandProvider(service).isRunningCommand(service, pid); + String ret = execWithRetries(hostname, command).getSecond(); return ret.length() > 0; } + /** + * Get pid of the service process running on the target hostname + *

+ * Notice that we might got incorrect pid w/o port since there might be more than one thread + * running with the same service name + * @param service type of the service + * @param hostname host to check against + * @param port to which port the service thread should be listening + * @return pid of the service thread running on the target host + * @throws IOException if shell execution failed too many times + */ + private long getServicePid(ServiceType service, String hostname, int port) throws IOException { + String command = getCommandProvider(service).findPidCommand(service, port); + String pidStr = execWithRetries(hostname, command).getSecond(); + if (pidStr == null || pidStr.isEmpty()) { + return INVALID_PID; + } + return Integer.parseInt(pidStr.trim()); + } + @Override public void kill(ServiceType service, String hostname, int port) throws IOException { - signal(service, SIGKILL, hostname); + long pid = getServicePid(service, hostname, port); + signal(service, SIGKILL, hostname, port, pid); } @Override public void suspend(ServiceType service, String hostname, int port) throws IOException { - signal(service, SIGSTOP, hostname); + long pid = getServicePid(service, hostname, port); + signal(service, SIGSTOP, hostname, port, pid); } @Override public void resume(ServiceType service, String hostname, int port) throws IOException { - signal(service, SIGCONT, hostname); + long pid = getServicePid(service, hostname, port); + signal(service, SIGCONT, hostname, port, pid); } }