diff --git itests/qtest-druid/src/main/java/org/apache/hive/druid/ForkingDruidNode.java itests/qtest-druid/src/main/java/org/apache/hive/druid/ForkingDruidNode.java index f81a0cae6b..8234084965 100644 --- itests/qtest-druid/src/main/java/org/apache/hive/druid/ForkingDruidNode.java +++ itests/qtest-druid/src/main/java/org/apache/hive/druid/ForkingDruidNode.java @@ -27,13 +27,10 @@ import java.io.File; import java.io.IOException; -import java.net.URLClassLoader; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; public class ForkingDruidNode extends DruidNode { private final static String DEFAULT_JAVA_CMD = "java"; diff --git itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java index a9d381f0f7..4dbaa4a003 100644 --- itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java +++ itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java @@ -39,38 +39,48 @@ public class MiniDruidCluster extends AbstractService { private static final Logger log = LoggerFactory.getLogger(MiniDruidCluster.class); - private static final String COMMON_DRUID_JVM_PROPPERTIES = "-Duser.timezone=UTC -Dfile.encoding=UTF-8 -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Ddruid.emitter=logging -Ddruid.emitter.logging.logLevel=info"; - - private static final List HISTORICAL_JVM_CONF = Arrays - .asList("-server", "-XX:MaxDirectMemorySize=10g", "-Xmx512m", "-Xmx512m", - COMMON_DRUID_JVM_PROPPERTIES - ); - - private static final List COORDINATOR_JVM_CONF = Arrays - .asList("-server", "-XX:MaxDirectMemorySize=2g", "-Xmx512m", "-Xms512m", - COMMON_DRUID_JVM_PROPPERTIES - ); - - private static final Map COMMON_DRUID_CONF = ImmutableMap.of( - "druid.metadata.storage.type", "derby", - "druid.storage.type", "hdfs", - "druid.processing.buffer.sizeBytes", "213870912", - "druid.processing.numThreads", "2", - "druid.worker.capacity", "4" - ); - - private static final Map COMMON_DRUID_HISTORICAL = ImmutableMap.of( - "druid.server.maxSize", "130000000000" - ); - - private static final Map COMMON_COORDINATOR_INDEXER = ImmutableMap - .of( - "druid.indexer.logs.type", "file", - "druid.coordinator.asOverlord.enabled", "true", - "druid.coordinator.asOverlord.overlordService", "druid/overlord", - "druid.coordinator.period", "PT2S", - "druid.manager.segments.pollDuration", "PT2S" - ); + private static final String + COMMON_DRUID_JVM_PROPERTIES = + "-Duser.timezone=UTC -Dfile.encoding=UTF-8 -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager " + + "-Ddruid.emitter=logging -Ddruid.emitter.logging.logLevel=info"; + + private static final List + HISTORICAL_JVM_CONF = + Arrays.asList("-server", "-XX:MaxDirectMemorySize=10g", "-Xmx512m", "-Xmx512m", COMMON_DRUID_JVM_PROPERTIES); + + private static final List + COORDINATOR_JVM_CONF = + Arrays.asList("-server", "-XX:MaxDirectMemorySize=2g", "-Xmx512m", "-Xms512m", COMMON_DRUID_JVM_PROPERTIES); + + private static final Map + COMMON_DRUID_CONF = + ImmutableMap.of("druid.metadata.storage.type", + "derby", + "druid.storage.type", + "hdfs", + "druid.processing.buffer.sizeBytes", + "213870912", + "druid.processing.numThreads", + "2", + "druid.worker.capacity", + "4"); + + private static final Map + COMMON_DRUID_HISTORICAL = + ImmutableMap.of("druid.server.maxSize", "130000000000"); + + private static final Map + COMMON_COORDINATOR_INDEXER = + ImmutableMap.of("druid.indexer.logs.type", + "file", + "druid.coordinator.asOverlord.enabled", + "true", + "druid.coordinator.asOverlord.overlordService", + "druid/overlord", + "druid.coordinator.period", + "PT2S", + "druid.manager.segments.pollDuration", + "PT2S"); private static final int MIN_PORT_NUMBER = 60000; private static final int MAX_PORT_NUMBER = 65535; @@ -92,31 +102,30 @@ public MiniDruidCluster(String name) { this(name, "/tmp/miniDruid/log", "/tmp/miniDruid/data", 2181, null); } - public MiniDruidCluster(String name, String logDir, String tmpDir, Integer zookeeperPort, String classpath) { super(name); this.dataDirectory = new File(tmpDir, "druid-data"); this.logDirectory = new File(logDir); int derbyPort = findPort(MIN_PORT_NUMBER, MAX_PORT_NUMBER); - ensureCleanDirectory(dataDirectory); - derbyURI = String - .format("jdbc:derby://localhost:%s/%s/druid_derby/metadata.db;create=true", - derbyPort, - dataDirectory.getAbsolutePath() - ); - String segmentsCache = String - .format("[{\"path\":\"%s/druid/segment-cache\",\"maxSize\":130000000000}]", - dataDirectory.getAbsolutePath() - ); + derbyURI = + String.format("jdbc:derby://localhost:%s/%s/druid_derby/metadata.db;create=true", + derbyPort, + dataDirectory.getAbsolutePath()); + String + segmentsCache = + String.format("[{\"path\":\"%s/druid/segment-cache\",\"maxSize\":130000000000}]", + dataDirectory.getAbsolutePath()); String indexingLogDir = new File(logDirectory, "indexer-log").getAbsolutePath(); ImmutableMap.Builder coordinatorMapBuilder = new ImmutableMap.Builder(); ImmutableMap.Builder historicalMapBuilder = new ImmutableMap.Builder(); - Map coordinatorProperties = coordinatorMapBuilder.putAll(COMMON_DRUID_CONF) + Map + coordinatorProperties = + coordinatorMapBuilder.putAll(COMMON_DRUID_CONF) .putAll(COMMON_COORDINATOR_INDEXER) .put("druid.metadata.storage.connector.connectURI", derbyURI) .put("druid.metadata.storage.connector.port", String.valueOf(derbyPort)) @@ -126,24 +135,20 @@ public MiniDruidCluster(String name, String logDir, String tmpDir, Integer zooke .put("druid.indexer.runner", "local") .put("druid.storage.storageDirectory", getDeepStorageDir()) .build(); - Map historicalProperties = historicalMapBuilder.putAll(COMMON_DRUID_CONF) + Map + historicalProperties = + historicalMapBuilder.putAll(COMMON_DRUID_CONF) .putAll(COMMON_DRUID_HISTORICAL) .put("druid.zk.service.host", "localhost:" + zookeeperPort) .put("druid.segmentCache.locations", segmentsCache) .put("druid.storage.storageDirectory", getDeepStorageDir()) .build(); - coordinator = new ForkingDruidNode("coordinator", classpath, coordinatorProperties, - COORDINATOR_JVM_CONF, - logDirectory, null - ); - historical = new ForkingDruidNode("historical", classpath, historicalProperties, HISTORICAL_JVM_CONF, - logDirectory, null - ); - broker = new ForkingDruidNode("broker", classpath, historicalProperties, HISTORICAL_JVM_CONF, - logDirectory, null - ); + coordinator = + new ForkingDruidNode("coordinator", classpath, coordinatorProperties, COORDINATOR_JVM_CONF, logDirectory, null); + historical = + new ForkingDruidNode("historical", classpath, historicalProperties, HISTORICAL_JVM_CONF, logDirectory, null); + broker = new ForkingDruidNode("broker", classpath, historicalProperties, HISTORICAL_JVM_CONF, logDirectory, null); druidNodes = Arrays.asList(coordinator, historical, broker); - } private int findPort(int start, int end) { @@ -151,7 +156,7 @@ private int findPort(int start, int end) { while (!available(port)) { port++; if (port == end) { - throw new RuntimeException("can not find free port for range " + start + ":" + end); + throw new RuntimeException("can not find free port for range " + start + ":" + end); } } return port; @@ -193,7 +198,7 @@ public static boolean available(int port) { return false; } - private static void ensureCleanDirectory(File dir){ + public static void ensureCleanDirectory(File dir) { try { if (dir.exists()) { // need to clean data directory to ensure that there is no interference from old runs @@ -211,14 +216,12 @@ private static void ensureCleanDirectory(File dir){ } } - @Override - protected void serviceStart() throws Exception { + @Override protected void serviceStart() throws Exception { druidNodes.stream().forEach(node -> { try { node.start(); } catch (IOException e) { - log.error("Failed to start node " + node.getNodeType() - + " Consequently will destroy the cluster"); + log.error("Failed to start node " + node.getNodeType() + " Consequently will destroy the cluster"); druidNodes.stream().filter(node1 -> node1.isAlive()).forEach(nodeToStop -> { try { log.info("Stopping Node " + nodeToStop.getNodeType()); @@ -232,8 +235,7 @@ protected void serviceStart() throws Exception { }); } - @Override - protected void serviceStop() throws Exception { + @Override protected void serviceStop() throws Exception { druidNodes.stream().forEach(node -> { try { node.close(); @@ -244,7 +246,6 @@ protected void serviceStop() throws Exception { }); } - public String getMetadataURI() { return derbyURI; } @@ -253,11 +254,11 @@ public String getDeepStorageDir() { return dataDirectory.getAbsolutePath() + File.separator + "deep-storage"; } - public String getCoordinatorURI(){ + public String getCoordinatorURI() { return "localhost:8081"; } - public String getOverlordURI(){ + public String getOverlordURI() { // Overlord and coordinator both run in same JVM. return getCoordinatorURI(); } diff --git itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java index 3f2c9a7b34..e59985a947 100644 --- itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java +++ itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java @@ -7,7 +7,9 @@ import kafka.utils.ZKStringSerializer$; import kafka.utils.ZkUtils; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.service.AbstractService; +import org.apache.hive.druid.MiniDruidCluster; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; @@ -50,6 +52,19 @@ public SingleNodeKafkaCluster(String name, String logDir, Integer zkPort){ super(name); Properties properties = new Properties(); + File dir = new File(logDir); + if (dir.exists()) { + // need to clean data directory to ensure that there is no interference from old runs + // Cleaning is happening here to allow debugging in case of tests fail + // we don;t have to clean logs since it is an append mode + log.info("Cleaning the druid directory [{}]", dir.getAbsolutePath()); + try { + FileUtils.deleteDirectory(dir); + } catch (IOException e) { + log.error("Failed to clean druid directory"); + throw new RuntimeException(e); + } + } this.zkString = String.format("localhost:%d", zkPort); properties.setProperty("zookeeper.connect", zkString); properties.setProperty("broker.id", String.valueOf(1)); diff --git itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidKafkaCliDriver.java itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidKafkaCliDriver.java new file mode 100644 index 0000000000..a330aff2ef --- /dev/null +++ itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidKafkaCliDriver.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.cli; + +import org.apache.hadoop.hive.cli.control.CliAdapter; +import org.apache.hadoop.hive.cli.control.CliConfigs; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestRule; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.io.File; +import java.util.List; + +@RunWith(Parameterized.class) +public class TestMiniDruidKafkaCliDriver +{ + + static CliAdapter adapter = new CliConfigs.MiniDruidKafkaCliConfig().getCliAdapter(); + + @Parameters(name = "{0}") + public static List getParameters() throws Exception { + return adapter.getParameters(); + } + + @ClassRule + public static TestRule cliClassRule = adapter.buildClassRule(); + + @Rule + public TestRule cliTestRule = adapter.buildTestRule(); + + private String name; + private File qfile; + + public TestMiniDruidKafkaCliDriver(String name, File qfile) { + this.name = name; + this.qfile = qfile; + } + + @Test + public void testCliDriver() throws Exception { + adapter.runTest(name, qfile); + } + +} diff --git itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniHiveKafkaCliDriver.java itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniHiveKafkaCliDriver.java new file mode 100644 index 0000000000..d1eeb55d8e --- /dev/null +++ itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniHiveKafkaCliDriver.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.cli; + +import org.apache.hadoop.hive.cli.control.CliAdapter; +import org.apache.hadoop.hive.cli.control.CliConfigs; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestRule; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.io.File; +import java.util.List; + +@RunWith(Parameterized.class) +public class TestMiniHiveKafkaCliDriver +{ + + static CliAdapter adapter = new CliConfigs.MiniKafkaCliConfig().getCliAdapter(); + + @Parameters(name = "{0}") + public static List getParameters() throws Exception { + return adapter.getParameters(); + } + + @ClassRule + public static TestRule cliClassRule = adapter.buildClassRule(); + + @Rule + public TestRule cliTestRule = adapter.buildTestRule(); + + private String name; + private File qfile; + + public TestMiniHiveKafkaCliDriver(String name, File qfile) { + this.name = name; + this.qfile = qfile; + } + + @Test + public void testCliDriver() throws Exception { + adapter.runTest(name, qfile); + } + +} diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties index 5aadf2c8dd..d8f559c85c 100644 --- itests/src/test/resources/testconfiguration.properties +++ itests/src/test/resources/testconfiguration.properties @@ -1848,12 +1848,14 @@ druid.query.files=druidmini_test1.q,\ druidmini_extractTime.q,\ druidmini_test_alter.q,\ druidmini_floorTime.q, \ - druidmini_masking.q, \ - druidkafkamini_basic.q, \ + druidmini_masking.q + +druid.kafka.query.files=druidkafkamini_basic.q, \ druidkafkamini_avro.q, \ druidkafkamini_csv.q, \ - druidkafkamini_delimited.q, \ - kafka_storage_handler.q + druidkafkamini_delimited.q + +hive.kafka.query.files=kafka_storage_handler.q druid.llap.local.query.files=druidmini_noop.q diff --git itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java index 7ac7ba1260..2017c94f89 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java +++ itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java @@ -56,6 +56,8 @@ public CliConfig() { excludesFrom(testConfigProps, "disabled.query.files"); excludesFrom(testConfigProps, "localSpark.only.query.files"); excludesFrom(testConfigProps, "druid.query.files"); + excludesFrom(testConfigProps, "druid.kafka.query.files"); + excludesFrom(testConfigProps, "hive.kafka.query.files"); excludesFrom(testConfigProps, "erasurecoding.only.query.files"); excludeQuery("fouter_join_ppr.q"); // Disabled in HIVE-19509 @@ -187,6 +189,27 @@ public MiniDruidCliConfig() { setResultsDir("ql/src/test/results/clientpositive/druid"); setLogDir("itests/qtest/target/tmp/log"); + setInitScript("q_test_druid_init.sql"); + setCleanupScript("q_test_cleanup_druid.sql"); + setHiveConfDir("data/conf/llap"); + setClusterType(MiniClusterType.druid); + setMetastoreType(MetastoreType.sql); + setFsType(QTestUtil.FsType.hdfs); + } catch (Exception e) { + throw new RuntimeException("can't construct cliconfig", e); + } + } + } + + public static class MiniDruidKafkaCliConfig extends AbstractCliConfig { + public MiniDruidKafkaCliConfig() { + super(CoreCliDriver.class); + try { + setQueryDir("ql/src/test/queries/clientpositive"); + includesFrom(testConfigProps, "druid.kafka.query.files"); + setResultsDir("ql/src/test/results/clientpositive/druid"); + setLogDir("itests/qtest/target/tmp/log"); + setInitScript("q_test_druid_init.sql"); setCleanupScript("q_test_cleanup_druid.sql"); setHiveConfDir("data/conf/llap"); @@ -199,6 +222,24 @@ public MiniDruidCliConfig() { } } + public static class MiniKafkaCliConfig extends AbstractCliConfig { + public MiniKafkaCliConfig() { + super(CoreCliDriver.class); + try { + setQueryDir("ql/src/test/queries/clientpositive"); + includesFrom(testConfigProps, "hive.kafka.query.files"); + setResultsDir("ql/src/test/results/clientpositive/kafka"); + setLogDir("itests/qtest/target/tmp/log"); + setHiveConfDir("data/conf/llap"); + setClusterType(MiniClusterType.kafka); + setMetastoreType(MetastoreType.sql); + setFsType(QTestUtil.FsType.hdfs); + } catch (Exception e) { + throw new RuntimeException("can't construct cliconfig", e); + } + } + } + public static class MiniLlapLocalCliConfig extends AbstractCliConfig { public MiniLlapLocalCliConfig() { diff --git itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index dcf2d49f0f..56d52aac95 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -135,7 +135,6 @@ /** * QTestUtil. - * */ public class QTestUtil { @@ -150,21 +149,19 @@ private final static String defaultInitScript = "q_test_init.sql"; private final static String defaultCleanupScript = "q_test_cleanup.sql"; private static SimpleDateFormat formatter = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss"); - private final String[] testOnlyCommands = new String[]{"crypto", "erasure"}; + private final String[] testOnlyCommands = new String[]{ "crypto", "erasure" }; public static final String TEST_TMP_DIR_PROPERTY = "test.tmp.dir"; // typically target/tmp private static final String BUILD_DIR_PROPERTY = "build.dir"; // typically target private static final String TEST_SRC_TABLES_PROPERTY = "test.src.tables"; - /** * The default Erasure Coding Policy to use in Erasure Coding tests. */ public static final String DEFAULT_TEST_EC_POLICY = "RS-3-2-1024k"; private String testWarehouse; - @Deprecated - private final String testFiles; + @Deprecated private final String testFiles; private final File datasetDir; private final String outDir; protected final String logDir; @@ -204,19 +201,19 @@ private SingleNodeKafkaCluster kafkaCluster; public static Set getSrcTables() { - if (srcTables == null){ + if (srcTables == null) { initSrcTables(); } return srcTables; } - public static void addSrcTable(String table){ + public static void addSrcTable(String table) { getSrcTables().add(table); storeSrcTables(); } public static Set initSrcTables() { - if (srcTables == null){ + if (srcTables == null) { initSrcTablesFromSystemProperty(); storeSrcTables(); } @@ -228,7 +225,7 @@ private static void storeSrcTables() { System.setProperty(TEST_SRC_TABLES_PROPERTY, String.join(",", srcTables)); } - private static void initSrcTablesFromSystemProperty(){ + private static void initSrcTablesFromSystemProperty() { srcTables = new HashSet(); // FIXME: moved default value to here...for now // i think this features is never really used from the command line @@ -241,7 +238,7 @@ private static void initSrcTablesFromSystemProperty(){ } private CliDriver getCliDriver() { - if(cliDriver == null){ + if (cliDriver == null) { throw new RuntimeException("no clidriver"); } return cliDriver; @@ -249,6 +246,7 @@ private CliDriver getCliDriver() { /** * Returns the default UDF names which should not be removed when resetting the test database + * * @return The list of the UDF names not to remove */ private Set getSrcUDFs() { @@ -256,8 +254,7 @@ private CliDriver getCliDriver() { // FIXME: moved default value to here...for now // i think this features is never really used from the command line String defaultTestSrcUDFs = "qtest_get_java_boolean"; - for (String srcUDF : System.getProperty("test.src.udfs", defaultTestSrcUDFs).trim().split(",")) - { + for (String srcUDF : System.getProperty("test.src.udfs", defaultTestSrcUDFs).trim().split(",")) { srcUDF = srcUDF.trim(); if (!srcUDF.isEmpty()) { srcUDFs.add(srcUDF); @@ -276,19 +273,18 @@ public HiveConf getConf() { public void initConf() throws Exception { String vectorizationEnabled = System.getProperty("test.vectorization.enabled"); - if(vectorizationEnabled != null && vectorizationEnabled.equalsIgnoreCase("true")) { + if (vectorizationEnabled != null && vectorizationEnabled.equalsIgnoreCase("true")) { conf.setBoolVar(ConfVars.HIVE_VECTORIZATION_ENABLED, true); } // Plug verifying metastore in for testing DirectSQL. - conf.setVar(ConfVars.METASTORE_RAW_STORE_IMPL, - "org.apache.hadoop.hive.metastore.VerifyingObjectStore"); + conf.setVar(ConfVars.METASTORE_RAW_STORE_IMPL, "org.apache.hadoop.hive.metastore.VerifyingObjectStore"); if (mr != null) { mr.setupConfiguration(conf); // TODO Ideally this should be done independent of whether mr is setup or not. - setFsRelatedProperties(conf, fs.getScheme().equals("file"),fs); + setFsRelatedProperties(conf, fs.getScheme().equals("file"), fs); } if (llapCluster != null) { @@ -306,8 +302,7 @@ public void initConf() throws Exception { conf.set("hive.druid.metadata.uri", druidCluster.getMetadataURI()); conf.set("hive.druid.coordinator.address.default", druidCluster.getCoordinatorURI()); conf.set("hive.druid.overlord.address.default", druidCluster.getOverlordURI()); - final Path scratchDir = fs - .makeQualified(new Path(System.getProperty("test.tmp.dir"), "druidStagingDir")); + final Path scratchDir = fs.makeQualified(new Path(System.getProperty("test.tmp.dir"), "druidStagingDir")); fs.mkdirs(scratchDir); conf.set("hive.druid.working.directory", scratchDir.toUri().getPath()); } @@ -326,7 +321,7 @@ private void setFsRelatedProperties(HiveConf conf, boolean isLocalFs, FileSystem Path path = new Path(fsUriString, buildDir); // Create a fake fs root for local fs - Path localFsRoot = new Path(path, "localfs"); + Path localFsRoot = new Path(path, "localfs"); warehousePath = new Path(localFsRoot, "warehouse"); jarPath = new Path(localFsRoot, "jar"); userInstallPath = new Path(localFsRoot, "user_install"); @@ -366,50 +361,36 @@ private void createRemoteDirs() { try { fs.mkdirs(warehousePath); } catch (IOException e) { - LOG.error("Failed to create path={}. Continuing. Exception message={}", warehousePath, - e.getMessage()); + LOG.error("Failed to create path={}. Continuing. Exception message={}", warehousePath, e.getMessage()); } try { fs.mkdirs(hiveJarPath); } catch (IOException e) { - LOG.error("Failed to create path={}. Continuing. Exception message={}", warehousePath, - e.getMessage()); + LOG.error("Failed to create path={}. Continuing. Exception message={}", warehousePath, e.getMessage()); } try { fs.mkdirs(userInstallPath); } catch (IOException e) { - LOG.error("Failed to create path={}. Continuing. Exception message={}", warehousePath, - e.getMessage()); + LOG.error("Failed to create path={}. Continuing. Exception message={}", warehousePath, e.getMessage()); } } private enum CoreClusterType { - MR, - TEZ, - SPARK + MR, TEZ, SPARK } public enum FsType { - local, - hdfs, - encrypted_hdfs, - erasure_coded_hdfs, + local, hdfs, encrypted_hdfs, erasure_coded_hdfs, } public enum MiniClusterType { - mr(CoreClusterType.MR, FsType.hdfs), - tez(CoreClusterType.TEZ, FsType.hdfs), - tez_local(CoreClusterType.TEZ, FsType.local), - spark(CoreClusterType.SPARK, FsType.local), - miniSparkOnYarn(CoreClusterType.SPARK, FsType.hdfs), - llap(CoreClusterType.TEZ, FsType.hdfs), - llap_local(CoreClusterType.TEZ, FsType.local), - none(CoreClusterType.MR, FsType.local), - druidLocal(CoreClusterType.TEZ, FsType.local), - druidKafka(CoreClusterType.TEZ, FsType.hdfs), - kafka(CoreClusterType.TEZ, FsType.hdfs); - + mr(CoreClusterType.MR, FsType.hdfs), tez(CoreClusterType.TEZ, FsType.hdfs), tez_local(CoreClusterType.TEZ, + FsType.local), spark(CoreClusterType.SPARK, FsType.local), miniSparkOnYarn(CoreClusterType.SPARK, + FsType.hdfs), llap(CoreClusterType.TEZ, FsType.hdfs), llap_local(CoreClusterType.TEZ, FsType.local), none( + CoreClusterType.MR, + FsType.local), druidLocal(CoreClusterType.TEZ, FsType.local), druid(CoreClusterType.TEZ, + FsType.hdfs), druidKafka(CoreClusterType.TEZ, FsType.hdfs), kafka(CoreClusterType.TEZ, FsType.hdfs); private final CoreClusterType coreClusterType; private final FsType defaultFsType; @@ -445,10 +426,13 @@ public static MiniClusterType valueForString(String type) { return llap_local; } else if (type.equals("druidLocal")) { return druidLocal; + } else if (type.equals("druid")) { + return druid; } else if (type.equals("druid-kafka")) { return druidKafka; - } - else { + } else if (type.equals("kafka")) { + return kafka; + } else { return none; } } @@ -464,10 +448,16 @@ private String getKeyProviderURI() { } public QTestUtil(QTestArguments testArgs) throws Exception { - LOG.info("Setting up QTestUtil with outDir={}, logDir={}, clusterType={}, confDir={}," + - " initScript={}, cleanupScript={}, withLlapIo={}, fsType={}", - testArgs.getOutDir(), testArgs.getLogDir(), testArgs.getClusterType(), testArgs.getConfDir(), - testArgs.getInitScript(), testArgs.getCleanupScript(), testArgs.isWithLlapIo(), testArgs.getFsType()); + LOG.info("Setting up QTestUtil with outDir={}, logDir={}, clusterType={}, confDir={}," + + " initScript={}, cleanupScript={}, withLlapIo={}, fsType={}", + testArgs.getOutDir(), + testArgs.getLogDir(), + testArgs.getClusterType(), + testArgs.getConfDir(), + testArgs.getInitScript(), + testArgs.getCleanupScript(), + testArgs.isWithLlapIo(), + testArgs.getFsType()); Preconditions.checkNotNull(testArgs.getClusterType(), "ClusterType cannot be null"); @@ -479,10 +469,11 @@ public QTestUtil(QTestArguments testArgs) throws Exception { // HIVE-14443 move this fall-back logic to CliConfigs if (testArgs.getConfDir() != null && !testArgs.getConfDir().isEmpty()) { - HiveConf.setHiveSiteLocation(new URL( - "file://"+ new File(testArgs.getConfDir()).toURI().getPath() + "/hive-site.xml")); + HiveConf.setHiveSiteLocation(new URL("file://" + + new File(testArgs.getConfDir()).toURI().getPath() + + "/hive-site.xml")); MetastoreConf.setHiveSiteLocation(HiveConf.getHiveSiteLocation()); - System.out.println("Setting hive-site: "+HiveConf.getHiveSiteLocation()); + System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation()); } queryState = new QueryState.Builder().withHiveConf(new HiveConf(IDriver.class)).build(); @@ -510,7 +501,6 @@ public QTestUtil(QTestArguments testArgs) throws Exception { LlapProxy.initializeLlapIo(conf); } - // Use the current directory if it is not specified String dataDir = conf.get("test.data.files"); if (dataDir == null) { @@ -520,9 +510,10 @@ public QTestUtil(QTestArguments testArgs) throws Exception { conf.set("test.data.dir", dataDir); // Use path relative to dataDir directory if it is not specified - datasetDir = conf.get("test.data.set.files") == null - ? new File(new File(dataDir).getAbsolutePath() + "/datasets") - : new File(conf.get("test.data.set.files")); + datasetDir = + conf.get("test.data.set.files") == null ? + new File(new File(dataDir).getAbsolutePath() + "/datasets") : + new File(conf.get("test.data.set.files")); String scriptsDir = getScriptsDir(); @@ -552,7 +543,7 @@ private void setupFileSystem(HadoopShims shims) throws IOException { if (fsType == FsType.local) { fs = FileSystem.getLocal(conf); - } else if (fsType == FsType.hdfs || fsType == FsType.encrypted_hdfs|| fsType == FsType.erasure_coded_hdfs) { + } else if (fsType == FsType.hdfs || fsType == FsType.encrypted_hdfs || fsType == FsType.erasure_coded_hdfs) { int numDataNodes = 4; // Setup before getting dfs @@ -599,80 +590,67 @@ private void setupFileSystem(HadoopShims shims) throws IOException { } } - private void setupMiniCluster(HadoopShims shims, String confDir) throws - IOException { + private void setupMiniCluster(HadoopShims shims, String confDir) throws IOException { String uriString = fs.getUri().toString(); if (clusterType == MiniClusterType.druidKafka - || clusterType == MiniClusterType.druidLocal) { + || clusterType == MiniClusterType.druidLocal + || clusterType == MiniClusterType.druid) { final String tempDir = System.getProperty("test.tmp.dir"); - druidCluster = new MiniDruidCluster("mini-druid", - logDir, - tempDir, - setup.zkPort, - Utilities.jarFinderGetJar(MiniDruidCluster.class) - ); + druidCluster = + new MiniDruidCluster("mini-druid", + logDir, + tempDir, + setup.zkPort, + Utilities.jarFinderGetJar(MiniDruidCluster.class)); final Path druidDeepStorage = fs.makeQualified(new Path(druidCluster.getDeepStorageDir())); fs.mkdirs(druidDeepStorage); - conf.set("hive.druid.storage.storageDirectory", druidDeepStorage.toUri().getPath()); - conf.set("hive.druid.metadata.db.type", "derby"); - conf.set("hive.druid.metadata.uri", druidCluster.getMetadataURI()); - final Path scratchDir = fs - .makeQualified(new Path(System.getProperty("test.tmp.dir"), "druidStagingDir")); + final Path scratchDir = fs.makeQualified(new Path(System.getProperty("test.tmp.dir"), "druidStagingDir")); fs.mkdirs(scratchDir); conf.set("hive.druid.working.directory", scratchDir.toUri().getPath()); druidCluster.init(conf); druidCluster.start(); } - if (clusterType == MiniClusterType.kafka - || clusterType == MiniClusterType.druidKafka - || clusterType == MiniClusterType.druidLocal) { - kafkaCluster = new SingleNodeKafkaCluster("kafka", - logDir + "/kafka-cluster", - setup.zkPort - ); + if (clusterType == MiniClusterType.kafka || clusterType == MiniClusterType.druidKafka) { + kafkaCluster = + new SingleNodeKafkaCluster("kafka", System.getProperty("test.tmp.dir") + "/kafka-cluster", setup.zkPort); kafkaCluster.init(conf); kafkaCluster.start(); - kafkaCluster.createTopicWithData( - "test-topic", - new File(getScriptsDir(), "kafka_init_data.json") - ); - kafkaCluster.createTopicWithData( - "wiki_kafka_csv", - new File(getScriptsDir(), "kafka_init_data.csv") - ); + kafkaCluster.createTopicWithData("test-topic", new File(getScriptsDir(), "kafka_init_data.json")); + kafkaCluster.createTopicWithData("wiki_kafka_csv", new File(getScriptsDir(), "kafka_init_data.csv")); kafkaCluster.createTopicWithData("wiki_kafka_avro_table", getAvroRows()); } if (clusterType.getCoreClusterType() == CoreClusterType.TEZ) { if (confDir != null && !confDir.isEmpty()) { - conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() - + "/tez-site.xml")); + conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() + "/tez-site.xml")); } int numTrackers = 2; - if (EnumSet.of( - MiniClusterType.llap, + if (EnumSet.of(MiniClusterType.llap, MiniClusterType.llap_local, MiniClusterType.druidLocal, - MiniClusterType.druidKafka - ).contains(clusterType)) { + MiniClusterType.druidKafka, + MiniClusterType.druid, + MiniClusterType.kafka).contains(clusterType)) { llapCluster = LlapItUtils.startAndGetMiniLlapCluster(conf, setup.zooKeeperCluster, confDir); } if (EnumSet.of(MiniClusterType.llap_local, MiniClusterType.tez_local, MiniClusterType.druidLocal) - .contains(clusterType)) { - mr = shims.getLocalMiniTezCluster(conf, - clusterType == MiniClusterType.llap_local - || clusterType == MiniClusterType.druidLocal - ); + .contains(clusterType)) { + mr = + shims.getLocalMiniTezCluster(conf, + clusterType == MiniClusterType.llap_local || clusterType == MiniClusterType.druidLocal); } else { - mr = shims.getMiniTezCluster( - conf, - numTrackers, - uriString, - EnumSet.of(MiniClusterType.llap, MiniClusterType.llap_local, MiniClusterType.druidKafka).contains(clusterType) - ); + mr = + shims.getMiniTezCluster(conf, + numTrackers, + uriString, + EnumSet.of(MiniClusterType.llap, + MiniClusterType.llap_local, + MiniClusterType.druidKafka, + MiniClusterType.druid, + MiniClusterType.kafka).contains(clusterType)); } } else if (clusterType == MiniClusterType.miniSparkOnYarn) { mr = shims.getMiniSparkCluster(conf, 2, uriString, 1); @@ -685,42 +663,38 @@ private void setupMiniCluster(HadoopShims shims, String confDir) throws int numRows = 10; List events; final DatumWriter writer = new SpecificDatumWriter<>(Wikipedia.getClassSchema()); - events = - IntStream.rangeClosed(0, numRows) - .mapToObj(i -> Wikipedia.newBuilder() - // 1534736225090 -> 08/19/2018 20:37:05 - .setTimestamp(formatter.format(new Timestamp(1534736225090L + 1000 * 3600 * i))) - .setAdded(i * 300) - .setDeleted(-i) - .setIsrobot(i % 2 == 0) - .setChannel("chanel number " + i) - .setComment("comment number " + i) - .setCommentlength(i) - .setDiffurl(String.format("url %s", i)) - .setFlags("flag") - .setIsminor(i % 2 > 0) - .setIsanonymous(i % 3 != 0) - .setNamespace("namespace") - .setIsunpatrolled(new Boolean(i % 3 == 0)) - .setIsnew(new Boolean(i % 2 > 0)) - .setPage(String.format("page is %s", i * 100)) - .setDelta(i) - .setDeltabucket(i * 100.4) - .setUser("test-user-" + i) - .build()) - .map(genericRecord -> { - java.io.ByteArrayOutputStream out = new java.io.ByteArrayOutputStream(); - BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); - try { - writer.write(genericRecord, encoder); - encoder.flush(); - out.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - return out.toByteArray(); - }) - .collect(Collectors.toList()); + events = IntStream.rangeClosed(0, numRows).mapToObj(i -> Wikipedia.newBuilder() + // 1534736225090 -> 08/19/2018 20:37:05 + .setTimestamp(formatter.format(new Timestamp(1534736225090L + 1000 * 3600 * i))) + .setAdded(i * 300) + .setDeleted(-i) + .setIsrobot(i % 2 == 0) + .setChannel("chanel number " + i) + .setComment("comment number " + i) + .setCommentlength(i) + .setDiffurl(String.format("url %s", i)) + .setFlags("flag") + .setIsminor(i % 2 > 0) + .setIsanonymous(i % 3 != 0) + .setNamespace("namespace") + .setIsunpatrolled(new Boolean(i % 3 == 0)) + .setIsnew(new Boolean(i % 2 > 0)) + .setPage(String.format("page is %s", i * 100)) + .setDelta(i) + .setDeltabucket(i * 100.4) + .setUser("test-user-" + i) + .build()).map(genericRecord -> { + java.io.ByteArrayOutputStream out = new java.io.ByteArrayOutputStream(); + BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); + try { + writer.write(genericRecord, encoder); + encoder.flush(); + out.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + return out.toByteArray(); + }).collect(Collectors.toList()); return events; } @@ -764,8 +738,9 @@ public void shutdown() throws Exception { } public String readEntireFileIntoString(File queryFile) throws IOException { - InputStreamReader isr = new InputStreamReader( - new BufferedInputStream(new FileInputStream(queryFile)), QTestUtil.UTF_8); + InputStreamReader + isr = + new InputStreamReader(new BufferedInputStream(new FileInputStream(queryFile)), QTestUtil.UTF_8); StringWriter sw = new StringWriter(); try { IOUtils.copy(isr, sw); @@ -781,7 +756,7 @@ public void addFile(String queryFile) throws IOException { addFile(new File(queryFile), false); } - public void addFile(File qf, boolean partial) throws IOException { + public void addFile(File qf, boolean partial) throws IOException { String query = readEntireFileIntoString(qf); qMap.put(qf.getName(), query); if (partial) { @@ -859,8 +834,7 @@ public void clearTablesCreatedDuringTests() throws Exception { return; } - conf.set("hive.metastore.filter.hook", - "org.apache.hadoop.hive.metastore.DefaultMetaStoreFilterHookImpl"); + conf.set("hive.metastore.filter.hook", "org.apache.hadoop.hive.metastore.DefaultMetaStoreFilterHookImpl"); db = Hive.get(conf); // First delete any MVs to avoid race conditions @@ -921,10 +895,10 @@ public void clearTablesCreatedDuringTests() throws Exception { SessionState.get().setCurrentDatabase(DEFAULT_DATABASE_NAME); List roleNames = db.getAllRoleNames(); - for (String roleName : roleNames) { - if (!"PUBLIC".equalsIgnoreCase(roleName) && !"ADMIN".equalsIgnoreCase(roleName)) { - db.dropRole(roleName); - } + for (String roleName : roleNames) { + if (!"PUBLIC".equalsIgnoreCase(roleName) && !"ADMIN".equalsIgnoreCase(roleName)) { + db.dropRole(roleName); + } } } @@ -942,7 +916,8 @@ public void newSession(boolean canReuseSession) throws Exception { // renew the metastore since the cluster type is unencrypted db = Hive.get(conf); // propagate new conf to meta store - HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_AUTHENTICATOR_MANAGER, + HiveConf.setVar(conf, + HiveConf.ConfVars.HIVE_AUTHENTICATOR_MANAGER, "org.apache.hadoop.hive.ql.security.DummyAuthenticator"); CliSessionState ss = new CliSessionState(conf); ss.in = System.in; @@ -959,6 +934,7 @@ public void newSession(boolean canReuseSession) throws Exception { setSessionOutputs("that_shouldnt_happen_there", ss, outf); } + /** * Clear out any side effects of running tests */ @@ -989,7 +965,7 @@ public void cleanUp() throws Exception { public void cleanUp(String fileName) throws Exception { boolean canReuseSession = (fileName == null) || !qNoSessionReuseQuerySet.contains(fileName); - if(!isSessionStateStarted) { + if (!isSessionStateStarted) { startSessionState(canReuseSession); } if (System.getenv(QTEST_LEAVE_FILES) != null) { @@ -1008,8 +984,8 @@ public void cleanUp(String fileName) throws Exception { FileSystem fs = p.getFileSystem(conf); try { - FileStatus [] ls = fs.listStatus(p); - for (int i=0; (ls != null) && (i outputs) - throws Exception { + public QTestProcessExecResult checkCompareCliDriverResults(String tname, List outputs) throws Exception { assert outputs.size() > 1; qOutProcessor.maskPatterns(outputs.get(0), tname); for (int i = 1; i < outputs.size(); ++i) { qOutProcessor.maskPatterns(outputs.get(i), tname); - QTestProcessExecResult result = executeDiffCommand( - outputs.get(i - 1), outputs.get(i), false, qSortSet.contains(tname)); + QTestProcessExecResult + result = + executeDiffCommand(outputs.get(i - 1), outputs.get(i), false, qSortSet.contains(tname)); if (result.getReturnCode() != 0) { System.out.println("Files don't match: " + outputs.get(i - 1) + " and " + outputs.get(i)); return result; @@ -1554,21 +1532,16 @@ private static void overwriteResults(String inFileName, String outFileName) thro // once Hive uses JAVA 7. System.out.println("Overwriting results " + inFileName + " to " + outFileName); int result = executeCmd(new String[]{ - "cp", - getQuotedString(inFileName), - getQuotedString(outFileName) - }).getReturnCode(); + "cp", getQuotedString(inFileName), getQuotedString(outFileName) }).getReturnCode(); if (result != 0) { - throw new IllegalStateException("Unexpected error while overwriting " + - inFileName + " with " + outFileName); + throw new IllegalStateException("Unexpected error while overwriting " + inFileName + " with " + outFileName); } } private static QTestProcessExecResult executeDiffCommand(String inFileName, String outFileName, boolean ignoreWhiteSpace, - boolean sortResults - ) throws Exception { + boolean sortResults) throws Exception { QTestProcessExecResult result; @@ -1617,9 +1590,7 @@ private static QTestProcessExecResult executeDiffCommand(String inFileName, private static void sortFiles(String in, String out) throws Exception { int result = executeCmd(new String[]{ - "sort", - getQuotedString(in), - }, out, null).getReturnCode(); + "sort", getQuotedString(in), }, out, null).getReturnCode(); if (result != 0) { throw new IllegalStateException("Unexpected error while sorting " + in); } @@ -1633,22 +1604,25 @@ private static QTestProcessExecResult executeCmd(String[] args) throws Exception return executeCmd(args, null, null); } - private static QTestProcessExecResult executeCmd(Collection args, String outFile, - String errFile) throws Exception { + private static QTestProcessExecResult executeCmd(Collection args, String outFile, String errFile) + throws Exception { String[] cmdArray = args.toArray(new String[args.size()]); return executeCmd(cmdArray, outFile, errFile); } - private static QTestProcessExecResult executeCmd(String[] args, String outFile, - String errFile) throws Exception { + private static QTestProcessExecResult executeCmd(String[] args, String outFile, String errFile) throws Exception { System.out.println("Running: " + org.apache.commons.lang.StringUtils.join(args, ' ')); - PrintStream out = outFile == null ? - SessionState.getConsole().getChildOutStream() : - new PrintStream(new FileOutputStream(outFile), true, "UTF-8"); - PrintStream err = errFile == null ? - SessionState.getConsole().getChildErrStream() : - new PrintStream(new FileOutputStream(errFile), true, "UTF-8"); + PrintStream + out = + outFile == null ? + SessionState.getConsole().getChildOutStream() : + new PrintStream(new FileOutputStream(outFile), true, "UTF-8"); + PrintStream + err = + errFile == null ? + SessionState.getConsole().getChildErrStream() : + new PrintStream(new FileOutputStream(errFile), true, "UTF-8"); Process executor = Runtime.getRuntime().exec(args); @@ -1678,7 +1652,7 @@ private static QTestProcessExecResult executeCmd(String[] args, String outFile, create(result, new String(bos.toByteArray(), StandardCharsets.UTF_8)); } - private static String getQuotedString(String str){ + private static String getQuotedString(String str) { return str; } @@ -1704,8 +1678,7 @@ public ASTNode parseQuery(String tname) throws Exception { * QTestSetup defines test fixtures which are reused across testcases, * and are needed before any test can be run */ - public static class QTestSetup - { + public static class QTestSetup { private MiniZooKeeperCluster zooKeeperCluster = null; private int zkPort; private ZooKeeper zooKeeper; @@ -1717,7 +1690,7 @@ public void preTest(HiveConf conf) throws Exception { if (zooKeeperCluster == null) { //create temp dir - String tmpBaseDir = System.getProperty(TEST_TMP_DIR_PROPERTY); + String tmpBaseDir = System.getProperty(TEST_TMP_DIR_PROPERTY); File tmpDir = Files.createTempDirectory(Paths.get(tmpBaseDir), "tmp_").toFile(); zooKeeperCluster = new MiniZooKeeperCluster(); @@ -1728,10 +1701,11 @@ public void preTest(HiveConf conf) throws Exception { zooKeeper.close(); } - int sessionTimeout = (int) conf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS); + int + sessionTimeout = + (int) conf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS); zooKeeper = new ZooKeeper("localhost:" + zkPort, sessionTimeout, new Watcher() { - @Override - public void process(WatchedEvent arg0) { + @Override public void process(WatchedEvent arg0) { } }); @@ -1764,7 +1738,6 @@ public void tearDown() throws Exception { /** * QTRunner: Runnable class for running a single query file. - * **/ public static class QTRunner implements Runnable { private final QTestUtil qt; @@ -1775,8 +1748,7 @@ public QTRunner(QTestUtil qt, File file) { this.file = file; } - @Override - public void run() { + @Override public void run() { try { qt.startSessionState(false); // assumption is that environment has already been cleaned once globally @@ -1784,8 +1756,7 @@ public void run() { qt.cliInit(file); qt.executeClient(file.getName()); } catch (Throwable e) { - System.err.println("Query file " + file.getName() + " failed with exception " - + e.getMessage()); + System.err.println("Query file " + file.getName() + " failed with exception " + e.getMessage()); e.printStackTrace(); outputTestFailureHelpMessage(); } @@ -1795,31 +1766,30 @@ public void run() { /** * Setup to execute a set of query files. Uses QTestUtil to do so. * - * @param qfiles - * array of input query files containing arbitrary number of hive - * queries - * @param resDir - * output directory - * @param logDir - * log directory + * @param qfiles array of input query files containing arbitrary number of hive + * queries + * @param resDir output directory + * @param logDir log directory * @return one QTestUtil for each query file */ - public static QTestUtil[] queryListRunnerSetup(File[] qfiles, String resDir, - String logDir, String initScript, String cleanupScript) throws Exception - { + public static QTestUtil[] queryListRunnerSetup(File[] qfiles, + String resDir, + String logDir, + String initScript, + String cleanupScript) throws Exception { QTestUtil[] qt = new QTestUtil[qfiles.length]; for (int i = 0; i < qfiles.length; i++) { - qt[i] = new QTestUtil( - QTestArguments.QTestArgumentsBuilder.instance() - .withOutDir(resDir) - .withLogDir(logDir) - .withClusterType(MiniClusterType.none) - .withConfDir(null) - .withInitScript(initScript == null ? defaultInitScript : initScript) - .withCleanupScript(cleanupScript == null ? defaultCleanupScript : cleanupScript) - .withLlapIo(false) - .build()); + qt[i] = + new QTestUtil(QTestArguments.QTestArgumentsBuilder.instance() + .withOutDir(resDir) + .withLogDir(logDir) + .withClusterType(MiniClusterType.none) + .withConfDir(null) + .withInitScript(initScript == null ? defaultInitScript : initScript) + .withCleanupScript(cleanupScript == null ? defaultCleanupScript : cleanupScript) + .withLlapIo(false) + .build()); qt[i].addFile(qfiles[i], false); qt[i].clearTestSideEffects(); @@ -1831,16 +1801,12 @@ public void run() { /** * Executes a set of query files in sequence. * - * @param qfiles - * array of input query files containing arbitrary number of hive - * queries - * @param qt - * array of QTestUtils, one per qfile + * @param qfiles array of input query files containing arbitrary number of hive + * queries + * @param qt array of QTestUtils, one per qfile * @return true if all queries passed, false otw */ - public static boolean queryListRunnerSingleThreaded(File[] qfiles, QTestUtil[] qt) - throws Exception - { + public static boolean queryListRunnerSingleThreaded(File[] qfiles, QTestUtil[] qt) throws Exception { boolean failed = false; qt[0].cleanUp(); qt[0].createSources(); @@ -1869,21 +1835,16 @@ public static boolean queryListRunnerSingleThreaded(File[] qfiles, QTestUtil[] q /** * Executes a set of query files parallel. - * + *

* Each query file is run in a separate thread. The caller has to arrange * that different query files do not collide (in terms of destination tables) * - * @param qfiles - * array of input query files containing arbitrary number of hive - * queries - * @param qt - * array of QTestUtils, one per qfile + * @param qfiles array of input query files containing arbitrary number of hive + * queries + * @param qt array of QTestUtils, one per qfile * @return true if all queries passed, false otw - * */ - public static boolean queryListRunnerMultiThreaded(File[] qfiles, QTestUtil[] qt) - throws Exception - { + public static boolean queryListRunnerMultiThreaded(File[] qfiles, QTestUtil[] qt) throws Exception { boolean failed = false; // in multithreaded mode - do cleanup/initialization just once @@ -1925,10 +1886,9 @@ public static boolean queryListRunnerMultiThreaded(File[] qfiles, QTestUtil[] qt } public static void outputTestFailureHelpMessage() { - System.err.println( - "See ./ql/target/tmp/log/hive.log or ./itests/qtest/target/tmp/log/hive.log, or check " + - "./ql/target/surefire-reports or ./itests/qtest/target/surefire-reports/ for specific " + - "test cases logs."); + System.err.println("See ./ql/target/tmp/log/hive.log or ./itests/qtest/target/tmp/log/hive.log, or check " + + "./ql/target/surefire-reports or ./itests/qtest/target/surefire-reports/ for specific " + + "test cases logs."); System.err.flush(); } @@ -1952,8 +1912,7 @@ private static void ensureQvFileList(String queryDir) { // Not thread-safe. System.out.println("Getting versions from " + queryDir); cachedQvFileList = (new File(queryDir)).list(new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { + @Override public boolean accept(File dir, String name) { return name.toLowerCase().endsWith(".qv"); } }); @@ -1962,8 +1921,7 @@ public boolean accept(File dir, String name) { } Arrays.sort(cachedQvFileList, String.CASE_INSENSITIVE_ORDER); List defaults = getVersionFilesInternal("default"); - cachedDefaultQvFileList = (defaults != null) - ? ImmutableList.copyOf(defaults) : ImmutableList.of(); + cachedDefaultQvFileList = (defaults != null) ? ImmutableList.copyOf(defaults) : ImmutableList.of(); } private static List getVersionFilesInternal(String tname) { @@ -1992,25 +1950,34 @@ public boolean accept(File dir, String name) { public void failed(int ecode, String fname, String debugHint) { String command = SessionState.get() != null ? SessionState.get().getLastCommand() : null; - String message = "Client execution failed with error code = " + ecode + - (command != null ? " running \"" + command : "") + "\" fname=" + fname + " " + - (debugHint != null ? debugHint : ""); + String + message = + "Client execution failed with error code = " + + ecode + + (command != null ? " running \"" + command : "") + + "\" fname=" + + fname + + " " + + (debugHint != null ? debugHint : ""); LOG.error(message); Assert.fail(message); } // for negative tests, which is succeeded.. no need to print the query string public void failed(String fname, String debugHint) { - Assert.fail( - "Client Execution was expected to fail, but succeeded with error code 0 for fname=" + - fname + (debugHint != null ? (" " + debugHint) : "")); + Assert.fail("Client Execution was expected to fail, but succeeded with error code 0 for fname=" + fname + (debugHint + != null ? (" " + debugHint) : "")); } public void failedDiff(int ecode, String fname, String debugHint) { - String message = - "Client Execution succeeded but contained differences " + - "(error code = " + ecode + ") after executing " + - fname + (debugHint != null ? (" " + debugHint) : ""); + String + message = + "Client Execution succeeded but contained differences " + + "(error code = " + + ecode + + ") after executing " + + fname + + (debugHint != null ? (" " + debugHint) : ""); LOG.error(message); Assert.fail(message); } @@ -2019,10 +1986,8 @@ public void failed(Exception e, String fname, String debugHint) { String command = SessionState.get() != null ? SessionState.get().getLastCommand() : null; System.err.println("Failed query: " + fname); System.err.flush(); - Assert.fail("Unexpected exception " + - org.apache.hadoop.util.StringUtils.stringifyException(e) + "\n" + - (command != null ? " running " + command : "") + - (debugHint != null ? debugHint : "")); + Assert.fail("Unexpected exception " + org.apache.hadoop.util.StringUtils.stringifyException(e) + "\n" + (command + != null ? " running " + command : "") + (debugHint != null ? debugHint : "")); } public QOutProcessor getQOutProcessor() { @@ -2035,6 +2000,7 @@ public static void initEventNotificationPoll() throws Exception { /** * Should deleted test tables have their data purged. + * * @return true if data should be purged */ private static boolean fsNeedsPurge(FsType type) { diff --git ql/src/test/queries/clientpositive/druidmini_test_insert.q ql/src/test/queries/clientpositive/druidmini_test_insert.q index 28fa6d7aff..96644f7e0d 100644 --- ql/src/test/queries/clientpositive/druidmini_test_insert.q +++ ql/src/test/queries/clientpositive/druidmini_test_insert.q @@ -153,4 +153,4 @@ select * from druid_test_table_n9 where `__time` = cast('2015-03-12 00:00:00' as DROP TABLE test_base_table; DROP TABLE druid_test_table_n9; -drop database druid_test_dst; +drop database druid_test_dst; diff --git ql/src/test/results/clientpositive/druid/druidmini_test_insert.q.out ql/src/test/results/clientpositive/druid/druidmini_test_insert.q.out index b74d33c4ea..bb309e51d8 100644 --- ql/src/test/results/clientpositive/druid/druidmini_test_insert.q.out +++ ql/src/test/results/clientpositive/druid/druidmini_test_insert.q.out @@ -865,11 +865,11 @@ POSTHOOK: query: DROP TABLE druid_test_table_n9 POSTHOOK: type: DROPTABLE POSTHOOK: Input: druid_test_dst@druid_test_table_n9 POSTHOOK: Output: druid_test_dst@druid_test_table_n9 -PREHOOK: query: drop database druid_test_dst +PREHOOK: query: drop database druid_test_dst PREHOOK: type: DROPDATABASE PREHOOK: Input: database:druid_test_dst PREHOOK: Output: database:druid_test_dst -POSTHOOK: query: drop database druid_test_dst +POSTHOOK: query: drop database druid_test_dst POSTHOOK: type: DROPDATABASE POSTHOOK: Input: database:druid_test_dst POSTHOOK: Output: database:druid_test_dst diff --git ql/src/test/results/clientpositive/druid/kafka_storage_handler.q.out ql/src/test/results/clientpositive/kafka/kafka_storage_handler.q.out similarity index 100% rename from ql/src/test/results/clientpositive/druid/kafka_storage_handler.q.out rename to ql/src/test/results/clientpositive/kafka/kafka_storage_handler.q.out diff --git testutils/ptest2/conf/deployed/master-mr2.properties testutils/ptest2/conf/deployed/master-mr2.properties index ad5405f2b4..9835d6f28f 100644 --- testutils/ptest2/conf/deployed/master-mr2.properties +++ testutils/ptest2/conf/deployed/master-mr2.properties @@ -182,7 +182,22 @@ qFileTest.erasurecodingCli.groups.normal = mainProperties.${erasurecoding.only.q qFileTest.miniDruid.driver = TestMiniDruidCliDriver qFileTest.miniDruid.directory = ql/src/test/queries/clientpositive -qFileTest.miniDruid.batchSize = 55 +qFileTest.miniDruid.batchSize = 15 qFileTest.miniDruid.queryFilesProperty = qfile qFileTest.miniDruid.include = normal qFileTest.miniDruid.groups.normal = mainProperties.${druid.query.files} + + +qFileTest.miniDruidKafka.driver = TestMiniDruidKafkaCliDriver +qFileTest.miniDruidKafka.directory = ql/src/test/queries/clientpositive +qFileTest.miniDruidKafka.batchSize = 15 +qFileTest.miniDruidKafka.queryFilesProperty = qfile +qFileTest.miniDruidKafka.include = normal +qFileTest.miniDruidKafka.groups.normal = mainProperties.${druid.kafka.query.files} + +qFileTest.miniKafka.driver = TestMiniHiveKafkaCliDriver +qFileTest.miniKafka.directory = ql/src/test/queries/clientpositive +qFileTest.miniKafka.batchSize = 15 +qFileTest.miniKafka.queryFilesProperty = qfile +qFileTest.miniKafka.include = normal +qFileTest.miniKafka.groups.normal = mainProperties.${hive.kafka.query.files}