diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapLoadGeneratorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapLoadGeneratorService.java index 8cfce1595f..1d339f26ff 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapLoadGeneratorService.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapLoadGeneratorService.java @@ -17,28 +17,29 @@ */ package org.apache.hadoop.hive.llap.daemon.impl; -import com.google.common.annotations.VisibleForTesting; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.service.AbstractService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.Random; -import java.util.concurrent.TimeUnit; - /** * Extra load generator service for LLAP. */ public class LlapLoadGeneratorService extends AbstractService { private static final Logger LOG = LoggerFactory.getLogger(LlapLoadGeneratorService.class); + private final ExecutorService executorService = Executors.newCachedThreadPool(); private long interval; private float threshold; private String[] victimsHostName; - @VisibleForTesting - Thread[] threads; public LlapLoadGeneratorService() { super("LlapLoadGeneratorService"); @@ -49,8 +50,8 @@ protected void serviceInit(Configuration conf) throws Exception { super.serviceInit(conf); threshold = HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVE_TEST_LOAD_UTILIZATION); if (threshold < 0 || threshold > 1.0) { - throw new IllegalArgumentException(HiveConf.ConfVars.HIVE_TEST_LOAD_UTILIZATION.varname + " should " + - "be between 0.0 and 1.0. The configuration specified [" + threshold + "]"); + throw new IllegalArgumentException(HiveConf.ConfVars.HIVE_TEST_LOAD_UTILIZATION.varname + " should " + + "be between 0.0 and 1.0. The configuration specified [" + threshold + "]"); } victimsHostName = HiveConf.getTrimmedStringsVar(conf, HiveConf.ConfVars.HIVE_TEST_LOAD_HOSTNAMES); interval = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TEST_LOAD_INTERVAL, TimeUnit.MILLISECONDS); @@ -59,22 +60,21 @@ protected void serviceInit(Configuration conf) throws Exception { @Override protected void serviceStart() throws UnknownHostException { - String localHostName = InetAddress.getLocalHost().getHostName(); - LOG.debug("Local hostname is: {}", localHostName); + final String localHostName = InetAddress.getLocalHost().getHostName(); + final int numProcs = Runtime.getRuntime().availableProcessors(); + + LOG.debug("Local hostname is: {} [cpus: {}]", localHostName, numProcs); for (String hostName : victimsHostName) { if (hostName.equalsIgnoreCase(localHostName)) { - LOG.debug("Starting load generator process on: {}", localHostName); - threads = new Thread[Runtime.getRuntime().availableProcessors()]; - Random random = new Random(); - for (int i = 0; i < threads.length; i++) { - threads[i] = new Thread(new Runnable() { + for (int i = 0; i < numProcs; i++) { + this.executorService.execute(new Runnable() { @Override public void run() { while (!Thread.interrupted()) { - if (random.nextFloat() <= threshold) { + if (ThreadLocalRandom.current().nextFloat() <= threshold) { // Keep it busy - long startTime = System.currentTimeMillis(); - while (System.currentTimeMillis() - startTime < interval) { + final long startTime = System.nanoTime(); + while ((System.nanoTime() - startTime) < TimeUnit.MILLISECONDS.toNanos(interval)) { // active loop, do nothing, just check interrupt status if (Thread.currentThread().isInterrupted()) { break; @@ -83,7 +83,7 @@ public void run() { } else { // Keep it idle try { - Thread.sleep(interval); + TimeUnit.MILLISECONDS.sleep(interval); } catch (InterruptedException e) { // Set the interrupt flag so we will stop the thread Thread.currentThread().interrupt(); @@ -92,16 +92,18 @@ public void run() { } } }); - threads[i].start(); } } } } + public int getActiveThreadCount() { + return ((ThreadPoolExecutor) this.executorService).getActiveCount(); + } + @Override protected void serviceStop() throws Exception { - for (int i = 0; i < threads.length; i++) { - threads[i].interrupt(); - } + this.executorService.shutdownNow(); + this.executorService.awaitTermination(3, TimeUnit.SECONDS); } } diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapLoadGeneratorService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapLoadGeneratorService.java index e4e76f5e89..7c2f65d966 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapLoadGeneratorService.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapLoadGeneratorService.java @@ -18,57 +18,81 @@ package org.apache.hadoop.hive.llap.daemon.impl; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertFalse; - -import org.apache.hadoop.hive.conf.HiveConf; -import org.junit.Test; +import java.io.IOException; import java.net.InetAddress; -import java.net.UnknownHostException; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hive.conf.HiveConf; +import org.junit.Test; + /** - * Test to make sure that the LLAP nodes are able to start with the load generator. + * Test to make sure that the LLAP nodes are able to start with the load + * generator. */ public class TestLlapLoadGeneratorService { + + /** + * Test that all threads in the generator stop when the generator itself is + * stopped. One batch of threads should only be kicked off for the local host. + * Having multiple hostnames in the list should not affect the final thread + * count. + */ @Test - public void testLoadGeneratorStops() throws InterruptedException, UnknownHostException { + public void testLoadGeneratorStops() throws InterruptedException, IOException { LlapLoadGeneratorService service = new LlapLoadGeneratorService(); + /* + * RFC 952 / RFC 1123 Valid characters for hostnames are ASCII(7) letters + * from a to z, the digits from 0 to 9, and the hyphen (-). A hostname may + * not start with a hyphen. Do not need to worry about it ever matching. + */ + final String hostNames = String.join(",", "-AnotherHost", InetAddress.getLocalHost().getHostName()); + HiveConf conf = new HiveConf(); - HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_TEST_LOAD_HOSTNAMES, - InetAddress.getLocalHost().getHostName() + ",???"); + HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_TEST_LOAD_HOSTNAMES, hostNames); HiveConf.setFloatVar(conf, HiveConf.ConfVars.HIVE_TEST_LOAD_UTILIZATION, 0.5f); HiveConf.setTimeVar(conf, HiveConf.ConfVars.HIVE_TEST_LOAD_INTERVAL, 5, TimeUnit.MILLISECONDS); service.init(conf); service.start(); - assertEquals("The number of threads is not correct", - Runtime.getRuntime().availableProcessors(), service.threads.length); - for(int i = 0; i < service.threads.length; i++) { - assertTrue("The thread [" + i + "] should be alive", service.threads[i].isAlive()); - } + + assertEquals("The number of threads is not correct", Runtime.getRuntime().availableProcessors(), + service.getActiveThreadCount()); + service.stop(); - Thread.sleep(1000); - for(int i = 0; i < service.threads.length; i++) { - Thread.State state = service.threads[i].getState(); - assertFalse("The thread [" + i + "] should be terminated", service.threads[i].isAlive()); - } + + assertEquals("Threads still running after service stopped", 0, service.getActiveThreadCount()); + + service.close(); } + /** + * Host CPU utilization cannot target more than 100% of the system resources. + * Test the validation is correct. + */ @Test(expected = RuntimeException.class) - public void testLoadGeneratorFails() throws InterruptedException, UnknownHostException { + public void testLoadGeneratorFails() throws InterruptedException, IOException { LlapLoadGeneratorService service = new LlapLoadGeneratorService(); HiveConf conf = new HiveConf(); - HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_TEST_LOAD_HOSTNAMES, - InetAddress.getLocalHost().getHostName() + ",???"); + + /* + * RFC 952 / RFC 1123 Valid characters for hostnames are ASCII(7) letters + * from a to z, the digits from 0 to 9, and the hyphen (-). A hostname may + * not start with a hyphen. Do not need to worry about it ever matching. + */ + final String hostNames = String.join(",", "-AnotherHost", InetAddress.getLocalHost().getHostName()); + + // More than 100 percent HiveConf.setFloatVar(conf, HiveConf.ConfVars.HIVE_TEST_LOAD_UTILIZATION, 1.2f); + + HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_TEST_LOAD_HOSTNAMES, hostNames); HiveConf.setTimeVar(conf, HiveConf.ConfVars.HIVE_TEST_LOAD_INTERVAL, 5, TimeUnit.MILLISECONDS); service.init(conf); service.start(); service.stop(); + service.close(); } }