diff --git itests/qtest-druid/src/main/java/org/apache/hive/druid/ForkingDruidNode.java itests/qtest-druid/src/main/java/org/apache/hive/druid/ForkingDruidNode.java index f81a0cae6b..06db163fb7 100644 --- itests/qtest-druid/src/main/java/org/apache/hive/druid/ForkingDruidNode.java +++ itests/qtest-druid/src/main/java/org/apache/hive/druid/ForkingDruidNode.java @@ -33,6 +33,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; public class ForkingDruidNode extends DruidNode { @@ -56,7 +57,7 @@ private Process druidProcess = null; - private Boolean started = false; + private final AtomicBoolean started = new AtomicBoolean(false); private final List allowedPrefixes = Lists.newArrayList( "com.metamx", @@ -122,9 +123,9 @@ public ForkingDruidNode(String nodeType, @Override public void start() throws IOException { synchronized (started) { - if (started == false) { + if (started.get() == false) { druidProcess = processBuilder.start(); - started = true; + started.compareAndSet(false, true); } log.info("Started " + getNodeType()); } @@ -133,7 +134,7 @@ public void start() throws IOException { @Override public boolean isAlive() { synchronized (started) { - return started && druidProcess != null && druidProcess.isAlive(); + return started.get() && druidProcess != null && druidProcess.isAlive(); } } diff --git itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java index a9d381f0f7..046e6661e4 100644 --- itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java +++ itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java @@ -32,6 +32,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; /** * This class has the hooks to start and stop the external Druid Nodes @@ -39,39 +40,49 @@ public class MiniDruidCluster extends AbstractService { private static final Logger log = LoggerFactory.getLogger(MiniDruidCluster.class); - private static final String COMMON_DRUID_JVM_PROPPERTIES = "-Duser.timezone=UTC -Dfile.encoding=UTF-8 -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Ddruid.emitter=logging -Ddruid.emitter.logging.logLevel=info"; - - private static final List HISTORICAL_JVM_CONF = Arrays - .asList("-server", "-XX:MaxDirectMemorySize=10g", "-Xmx512m", "-Xmx512m", - COMMON_DRUID_JVM_PROPPERTIES - ); - - private static final List COORDINATOR_JVM_CONF = Arrays - .asList("-server", "-XX:MaxDirectMemorySize=2g", "-Xmx512m", "-Xms512m", - COMMON_DRUID_JVM_PROPPERTIES - ); - - private static final Map COMMON_DRUID_CONF = ImmutableMap.of( - "druid.metadata.storage.type", "derby", - "druid.storage.type", "hdfs", - "druid.processing.buffer.sizeBytes", "213870912", - "druid.processing.numThreads", "2", - "druid.worker.capacity", "4" - ); - - private static final Map COMMON_DRUID_HISTORICAL = ImmutableMap.of( - "druid.server.maxSize", "130000000000" - ); - - private static final Map COMMON_COORDINATOR_INDEXER = ImmutableMap - .of( - "druid.indexer.logs.type", "file", - "druid.coordinator.asOverlord.enabled", "true", - "druid.coordinator.asOverlord.overlordService", "druid/overlord", - "druid.coordinator.period", "PT2S", - "druid.manager.segments.pollDuration", "PT2S" - ); - private static final int MIN_PORT_NUMBER = 60000; + private static final String + COMMON_DRUID_JVM_PROPPERTIES = + "-Duser.timezone=UTC -Dfile.encoding=UTF-8 -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager " + + "-Ddruid.emitter=logging -Ddruid.emitter.logging.logLevel=info"; + + private static final List + HISTORICAL_JVM_CONF = + Arrays.asList("-server", "-XX:MaxDirectMemorySize=10g", "-Xmx512m", "-Xmx512m", COMMON_DRUID_JVM_PROPPERTIES); + + private static final List + COORDINATOR_JVM_CONF = + Arrays.asList("-server", "-XX:MaxDirectMemorySize=2g", "-Xmx512m", "-Xms512m", COMMON_DRUID_JVM_PROPPERTIES); + + private static final Map + COMMON_DRUID_CONF = + ImmutableMap.of("druid.metadata.storage.type", + "derby", + "druid.storage.type", + "hdfs", + "druid.processing.buffer.sizeBytes", + "213870912", + "druid.processing.numThreads", + "2", + "druid.worker.capacity", + "4"); + + private static final Map + COMMON_DRUID_HISTORICAL = + ImmutableMap.of("druid.server.maxSize", "130000000000"); + + private static final Map + COMMON_COORDINATOR_INDEXER = + ImmutableMap.of("druid.indexer.logs.type", + "file", + "druid.coordinator.asOverlord.enabled", + "true", + "druid.coordinator.asOverlord.overlordService", + "druid/overlord", + "druid.coordinator.period", + "PT2S", + "druid.manager.segments.pollDuration", + "PT2S"); + private static final int MIN_PORT_NUMBER = 1999; private static final int MAX_PORT_NUMBER = 65535; private final DruidNode historical; @@ -87,36 +98,42 @@ private final File logDirectory; private final String derbyURI; - - public MiniDruidCluster(String name) { - this(name, "/tmp/miniDruid/log", "/tmp/miniDruid/data", 2181, null); - } + private final int brokerPort; + private final int coordinatorPort; + private final int historicalPort; + private final int derbyPort; public MiniDruidCluster(String name, String logDir, String tmpDir, Integer zookeeperPort, String classpath) { super(name); - this.dataDirectory = new File(tmpDir, "druid-data"); - this.logDirectory = new File(logDir); - int derbyPort = findPort(MIN_PORT_NUMBER, MAX_PORT_NUMBER); - + this.dataDirectory = new File(tmpDir, name + "-data"); + this.logDirectory = new File(logDir, name + "-log"); + int start = ThreadLocalRandom.current().nextInt(MIN_PORT_NUMBER + 1, MAX_PORT_NUMBER - 4); + coordinatorPort = findPort(start, MAX_PORT_NUMBER); + brokerPort = findPort(coordinatorPort + 1, MAX_PORT_NUMBER); + historicalPort = findPort(brokerPort + 1, MAX_PORT_NUMBER); + derbyPort = findPort(historicalPort + 1, MAX_PORT_NUMBER); + log.info("Druid ports are set to {} {} {} {}", coordinatorPort, brokerPort, historicalPort , derbyPort); ensureCleanDirectory(dataDirectory); - derbyURI = String - .format("jdbc:derby://localhost:%s/%s/druid_derby/metadata.db;create=true", - derbyPort, - dataDirectory.getAbsolutePath() - ); - String segmentsCache = String - .format("[{\"path\":\"%s/druid/segment-cache\",\"maxSize\":130000000000}]", - dataDirectory.getAbsolutePath() - ); + derbyURI = + String.format("jdbc:derby://localhost:%s/%s/druid_derby/metadata.db;create=true", + derbyPort, + dataDirectory.getAbsolutePath()); + String + segmentsCache = + String.format("[{\"path\":\"%s/druid/segment-cache\",\"maxSize\":130000000000}]", + dataDirectory.getAbsolutePath()); String indexingLogDir = new File(logDirectory, "indexer-log").getAbsolutePath(); ImmutableMap.Builder coordinatorMapBuilder = new ImmutableMap.Builder(); ImmutableMap.Builder historicalMapBuilder = new ImmutableMap.Builder(); + ImmutableMap.Builder brokerMapBuilder = new ImmutableMap.Builder(); - Map coordinatorProperties = coordinatorMapBuilder.putAll(COMMON_DRUID_CONF) + Map + coordinatorProperties = + coordinatorMapBuilder.putAll(COMMON_DRUID_CONF) .putAll(COMMON_COORDINATOR_INDEXER) .put("druid.metadata.storage.connector.connectURI", derbyURI) .put("druid.metadata.storage.connector.port", String.valueOf(derbyPort)) @@ -125,25 +142,32 @@ public MiniDruidCluster(String name, String logDir, String tmpDir, Integer zooke .put("druid.coordinator.startDelay", "PT1S") .put("druid.indexer.runner", "local") .put("druid.storage.storageDirectory", getDeepStorageDir()) + .put("druid.plaintextPort", String.valueOf(coordinatorPort)) + .put("druid.zk.paths.base", "/" + name) .build(); - Map historicalProperties = historicalMapBuilder.putAll(COMMON_DRUID_CONF) + Map + historicalProperties = + historicalMapBuilder.putAll(COMMON_DRUID_CONF) .putAll(COMMON_DRUID_HISTORICAL) .put("druid.zk.service.host", "localhost:" + zookeeperPort) .put("druid.segmentCache.locations", segmentsCache) .put("druid.storage.storageDirectory", getDeepStorageDir()) + .put("druid.plaintextPort", String.valueOf(historicalPort)) + .put("druid.zk.paths.base", "/" + name) .build(); - coordinator = new ForkingDruidNode("coordinator", classpath, coordinatorProperties, - COORDINATOR_JVM_CONF, - logDirectory, null - ); - historical = new ForkingDruidNode("historical", classpath, historicalProperties, HISTORICAL_JVM_CONF, - logDirectory, null - ); - broker = new ForkingDruidNode("broker", classpath, historicalProperties, HISTORICAL_JVM_CONF, - logDirectory, null - ); + Map brokerProperties = brokerMapBuilder.putAll(COMMON_DRUID_CONF) + .put("druid.zk.service.host", "localhost:" + zookeeperPort) + .put("druid.segmentCache.locations", segmentsCache) + .put("druid.storage.storageDirectory", getDeepStorageDir()) + .put("druid.plaintextPort", String.valueOf(brokerPort)) + .put("druid.zk.paths.base", "/" + name) + .build(); + coordinator = + new ForkingDruidNode("coordinator", classpath, coordinatorProperties, COORDINATOR_JVM_CONF, logDirectory, null); + historical = + new ForkingDruidNode("historical", classpath, historicalProperties, HISTORICAL_JVM_CONF, logDirectory, null); + broker = new ForkingDruidNode("broker", classpath, brokerProperties, HISTORICAL_JVM_CONF, logDirectory, null); druidNodes = Arrays.asList(coordinator, historical, broker); - } private int findPort(int start, int end) { @@ -151,7 +175,7 @@ private int findPort(int start, int end) { while (!available(port)) { port++; if (port == end) { - throw new RuntimeException("can not find free port for range " + start + ":" + end); + throw new RuntimeException("can not find free port for range " + start + ":" + end); } } return port; @@ -162,7 +186,7 @@ private int findPort(int start, int end) { * * @param port the port to check for availability */ - public static boolean available(int port) { + private static boolean available(int port) { if (port < MIN_PORT_NUMBER || port > MAX_PORT_NUMBER) { throw new IllegalArgumentException("Invalid start port: " + port); } @@ -193,7 +217,7 @@ public static boolean available(int port) { return false; } - private static void ensureCleanDirectory(File dir){ + private static void ensureCleanDirectory(File dir) { try { if (dir.exists()) { // need to clean data directory to ensure that there is no interference from old runs @@ -211,15 +235,13 @@ private static void ensureCleanDirectory(File dir){ } } - @Override - protected void serviceStart() throws Exception { - druidNodes.stream().forEach(node -> { + @Override protected void serviceStart() { + druidNodes.forEach(node -> { try { node.start(); } catch (IOException e) { - log.error("Failed to start node " + node.getNodeType() - + " Consequently will destroy the cluster"); - druidNodes.stream().filter(node1 -> node1.isAlive()).forEach(nodeToStop -> { + log.error("Failed to start node " + node.getNodeType() + " Consequently will destroy the cluster"); + druidNodes.stream().filter(DruidNode::isAlive).forEach(nodeToStop -> { try { log.info("Stopping Node " + nodeToStop.getNodeType()); nodeToStop.close(); @@ -232,9 +254,8 @@ protected void serviceStart() throws Exception { }); } - @Override - protected void serviceStop() throws Exception { - druidNodes.stream().forEach(node -> { + @Override protected void serviceStop() { + druidNodes.forEach(node -> { try { node.close(); } catch (IOException e) { @@ -244,7 +265,6 @@ protected void serviceStop() throws Exception { }); } - public String getMetadataURI() { return derbyURI; } @@ -253,12 +273,16 @@ public String getDeepStorageDir() { return dataDirectory.getAbsolutePath() + File.separator + "deep-storage"; } - public String getCoordinatorURI(){ - return "localhost:8081"; + public String getCoordinatorURI() { + return "localhost:" + coordinatorPort; } - public String getOverlordURI(){ + public String getOverlordURI() { // Overlord and coordinator both run in same JVM. return getCoordinatorURI(); } + + public String getBrokerURI() { + return "localhost:" + brokerPort; + } } diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties index 5aadf2c8dd..e07cc7ec7e 100644 --- itests/src/test/resources/testconfiguration.properties +++ itests/src/test/resources/testconfiguration.properties @@ -1836,7 +1836,12 @@ spark.perf.disabled.query.files=query14.q,\ cbo_query99.q,\ mv_query44.q -druid.query.files=druidmini_test1.q,\ +druid.query.files= druidkafkamini_basic.q, \ + druidkafkamini_avro.q, \ + druidkafkamini_csv.q, \ + druidkafkamini_delimited.q, \ + kafka_storage_handler.q\ + druidmini_test1.q,\ druidmini_test_ts.q,\ druidmini_joins.q,\ druidmini_test_insert.q,\ @@ -1848,12 +1853,7 @@ druid.query.files=druidmini_test1.q,\ druidmini_extractTime.q,\ druidmini_test_alter.q,\ druidmini_floorTime.q, \ - druidmini_masking.q, \ - druidkafkamini_basic.q, \ - druidkafkamini_avro.q, \ - druidkafkamini_csv.q, \ - druidkafkamini_delimited.q, \ - kafka_storage_handler.q + druidmini_masking.q druid.llap.local.query.files=druidmini_noop.q diff --git itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index dcf2d49f0f..3c7d77be16 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -51,6 +51,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -306,6 +307,7 @@ public void initConf() throws Exception { conf.set("hive.druid.metadata.uri", druidCluster.getMetadataURI()); conf.set("hive.druid.coordinator.address.default", druidCluster.getCoordinatorURI()); conf.set("hive.druid.overlord.address.default", druidCluster.getOverlordURI()); + conf.set("hive.druid.broker.address.default", druidCluster.getBrokerURI()); final Path scratchDir = fs .makeQualified(new Path(System.getProperty("test.tmp.dir"), "druidStagingDir")); fs.mkdirs(scratchDir); @@ -607,7 +609,8 @@ private void setupMiniCluster(HadoopShims shims, String confDir) throws if (clusterType == MiniClusterType.druidKafka || clusterType == MiniClusterType.druidLocal) { final String tempDir = System.getProperty("test.tmp.dir"); - druidCluster = new MiniDruidCluster("mini-druid", + String randomId = UUID.randomUUID().toString(); + druidCluster = new MiniDruidCluster("mini-druid-" + randomId, logDir, tempDir, setup.zkPort, diff --git testutils/ptest2/conf/deployed/master-mr2.properties testutils/ptest2/conf/deployed/master-mr2.properties index ad5405f2b4..f0199f861a 100644 --- testutils/ptest2/conf/deployed/master-mr2.properties +++ testutils/ptest2/conf/deployed/master-mr2.properties @@ -182,7 +182,7 @@ qFileTest.erasurecodingCli.groups.normal = mainProperties.${erasurecoding.only.q qFileTest.miniDruid.driver = TestMiniDruidCliDriver qFileTest.miniDruid.directory = ql/src/test/queries/clientpositive -qFileTest.miniDruid.batchSize = 55 +qFileTest.miniDruid.batchSize = 7 qFileTest.miniDruid.queryFilesProperty = qfile qFileTest.miniDruid.include = normal -qFileTest.miniDruid.groups.normal = mainProperties.${druid.query.files} +qFileTest.miniDruid.groups.normal = mainProperties.${druid.query.files} \ No newline at end of file