diff --git itests/qtest-druid/src/main/java/org/apache/hive/druid/ForkingDruidNode.java itests/qtest-druid/src/main/java/org/apache/hive/druid/ForkingDruidNode.java index f81a0cae6b..8234084965 100644 --- itests/qtest-druid/src/main/java/org/apache/hive/druid/ForkingDruidNode.java +++ itests/qtest-druid/src/main/java/org/apache/hive/druid/ForkingDruidNode.java @@ -27,13 +27,10 @@ import java.io.File; import java.io.IOException; -import java.net.URLClassLoader; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; public class ForkingDruidNode extends DruidNode { private final static String DEFAULT_JAVA_CMD = "java"; diff --git itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java index a9d381f0f7..4dbaa4a003 100644 --- itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java +++ itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java @@ -39,38 +39,48 @@ public class MiniDruidCluster extends AbstractService { private static final Logger log = LoggerFactory.getLogger(MiniDruidCluster.class); - private static final String COMMON_DRUID_JVM_PROPPERTIES = "-Duser.timezone=UTC -Dfile.encoding=UTF-8 -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Ddruid.emitter=logging -Ddruid.emitter.logging.logLevel=info"; - - private static final List HISTORICAL_JVM_CONF = Arrays - .asList("-server", "-XX:MaxDirectMemorySize=10g", "-Xmx512m", "-Xmx512m", - COMMON_DRUID_JVM_PROPPERTIES - ); - - private static final List COORDINATOR_JVM_CONF = Arrays - .asList("-server", "-XX:MaxDirectMemorySize=2g", "-Xmx512m", "-Xms512m", - COMMON_DRUID_JVM_PROPPERTIES - ); - - private static final Map COMMON_DRUID_CONF = ImmutableMap.of( - "druid.metadata.storage.type", "derby", - "druid.storage.type", "hdfs", - "druid.processing.buffer.sizeBytes", "213870912", - "druid.processing.numThreads", "2", - "druid.worker.capacity", "4" - ); - - private static final Map COMMON_DRUID_HISTORICAL = ImmutableMap.of( - "druid.server.maxSize", "130000000000" - ); - - private static final Map COMMON_COORDINATOR_INDEXER = ImmutableMap - .of( - "druid.indexer.logs.type", "file", - "druid.coordinator.asOverlord.enabled", "true", - "druid.coordinator.asOverlord.overlordService", "druid/overlord", - "druid.coordinator.period", "PT2S", - "druid.manager.segments.pollDuration", "PT2S" - ); + private static final String + COMMON_DRUID_JVM_PROPERTIES = + "-Duser.timezone=UTC -Dfile.encoding=UTF-8 -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager " + + "-Ddruid.emitter=logging -Ddruid.emitter.logging.logLevel=info"; + + private static final List + HISTORICAL_JVM_CONF = + Arrays.asList("-server", "-XX:MaxDirectMemorySize=10g", "-Xmx512m", "-Xmx512m", COMMON_DRUID_JVM_PROPERTIES); + + private static final List + COORDINATOR_JVM_CONF = + Arrays.asList("-server", "-XX:MaxDirectMemorySize=2g", "-Xmx512m", "-Xms512m", COMMON_DRUID_JVM_PROPERTIES); + + private static final Map + COMMON_DRUID_CONF = + ImmutableMap.of("druid.metadata.storage.type", + "derby", + "druid.storage.type", + "hdfs", + "druid.processing.buffer.sizeBytes", + "213870912", + "druid.processing.numThreads", + "2", + "druid.worker.capacity", + "4"); + + private static final Map + COMMON_DRUID_HISTORICAL = + ImmutableMap.of("druid.server.maxSize", "130000000000"); + + private static final Map + COMMON_COORDINATOR_INDEXER = + ImmutableMap.of("druid.indexer.logs.type", + "file", + "druid.coordinator.asOverlord.enabled", + "true", + "druid.coordinator.asOverlord.overlordService", + "druid/overlord", + "druid.coordinator.period", + "PT2S", + "druid.manager.segments.pollDuration", + "PT2S"); private static final int MIN_PORT_NUMBER = 60000; private static final int MAX_PORT_NUMBER = 65535; @@ -92,31 +102,30 @@ public MiniDruidCluster(String name) { this(name, "/tmp/miniDruid/log", "/tmp/miniDruid/data", 2181, null); } - public MiniDruidCluster(String name, String logDir, String tmpDir, Integer zookeeperPort, String classpath) { super(name); this.dataDirectory = new File(tmpDir, "druid-data"); this.logDirectory = new File(logDir); int derbyPort = findPort(MIN_PORT_NUMBER, MAX_PORT_NUMBER); - ensureCleanDirectory(dataDirectory); - derbyURI = String - .format("jdbc:derby://localhost:%s/%s/druid_derby/metadata.db;create=true", - derbyPort, - dataDirectory.getAbsolutePath() - ); - String segmentsCache = String - .format("[{\"path\":\"%s/druid/segment-cache\",\"maxSize\":130000000000}]", - dataDirectory.getAbsolutePath() - ); + derbyURI = + String.format("jdbc:derby://localhost:%s/%s/druid_derby/metadata.db;create=true", + derbyPort, + dataDirectory.getAbsolutePath()); + String + segmentsCache = + String.format("[{\"path\":\"%s/druid/segment-cache\",\"maxSize\":130000000000}]", + dataDirectory.getAbsolutePath()); String indexingLogDir = new File(logDirectory, "indexer-log").getAbsolutePath(); ImmutableMap.Builder coordinatorMapBuilder = new ImmutableMap.Builder(); ImmutableMap.Builder historicalMapBuilder = new ImmutableMap.Builder(); - Map coordinatorProperties = coordinatorMapBuilder.putAll(COMMON_DRUID_CONF) + Map + coordinatorProperties = + coordinatorMapBuilder.putAll(COMMON_DRUID_CONF) .putAll(COMMON_COORDINATOR_INDEXER) .put("druid.metadata.storage.connector.connectURI", derbyURI) .put("druid.metadata.storage.connector.port", String.valueOf(derbyPort)) @@ -126,24 +135,20 @@ public MiniDruidCluster(String name, String logDir, String tmpDir, Integer zooke .put("druid.indexer.runner", "local") .put("druid.storage.storageDirectory", getDeepStorageDir()) .build(); - Map historicalProperties = historicalMapBuilder.putAll(COMMON_DRUID_CONF) + Map + historicalProperties = + historicalMapBuilder.putAll(COMMON_DRUID_CONF) .putAll(COMMON_DRUID_HISTORICAL) .put("druid.zk.service.host", "localhost:" + zookeeperPort) .put("druid.segmentCache.locations", segmentsCache) .put("druid.storage.storageDirectory", getDeepStorageDir()) .build(); - coordinator = new ForkingDruidNode("coordinator", classpath, coordinatorProperties, - COORDINATOR_JVM_CONF, - logDirectory, null - ); - historical = new ForkingDruidNode("historical", classpath, historicalProperties, HISTORICAL_JVM_CONF, - logDirectory, null - ); - broker = new ForkingDruidNode("broker", classpath, historicalProperties, HISTORICAL_JVM_CONF, - logDirectory, null - ); + coordinator = + new ForkingDruidNode("coordinator", classpath, coordinatorProperties, COORDINATOR_JVM_CONF, logDirectory, null); + historical = + new ForkingDruidNode("historical", classpath, historicalProperties, HISTORICAL_JVM_CONF, logDirectory, null); + broker = new ForkingDruidNode("broker", classpath, historicalProperties, HISTORICAL_JVM_CONF, logDirectory, null); druidNodes = Arrays.asList(coordinator, historical, broker); - } private int findPort(int start, int end) { @@ -151,7 +156,7 @@ private int findPort(int start, int end) { while (!available(port)) { port++; if (port == end) { - throw new RuntimeException("can not find free port for range " + start + ":" + end); + throw new RuntimeException("can not find free port for range " + start + ":" + end); } } return port; @@ -193,7 +198,7 @@ public static boolean available(int port) { return false; } - private static void ensureCleanDirectory(File dir){ + public static void ensureCleanDirectory(File dir) { try { if (dir.exists()) { // need to clean data directory to ensure that there is no interference from old runs @@ -211,14 +216,12 @@ private static void ensureCleanDirectory(File dir){ } } - @Override - protected void serviceStart() throws Exception { + @Override protected void serviceStart() throws Exception { druidNodes.stream().forEach(node -> { try { node.start(); } catch (IOException e) { - log.error("Failed to start node " + node.getNodeType() - + " Consequently will destroy the cluster"); + log.error("Failed to start node " + node.getNodeType() + " Consequently will destroy the cluster"); druidNodes.stream().filter(node1 -> node1.isAlive()).forEach(nodeToStop -> { try { log.info("Stopping Node " + nodeToStop.getNodeType()); @@ -232,8 +235,7 @@ protected void serviceStart() throws Exception { }); } - @Override - protected void serviceStop() throws Exception { + @Override protected void serviceStop() throws Exception { druidNodes.stream().forEach(node -> { try { node.close(); @@ -244,7 +246,6 @@ protected void serviceStop() throws Exception { }); } - public String getMetadataURI() { return derbyURI; } @@ -253,11 +254,11 @@ public String getDeepStorageDir() { return dataDirectory.getAbsolutePath() + File.separator + "deep-storage"; } - public String getCoordinatorURI(){ + public String getCoordinatorURI() { return "localhost:8081"; } - public String getOverlordURI(){ + public String getOverlordURI() { // Overlord and coordinator both run in same JVM. return getCoordinatorURI(); } diff --git itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java index 3f2c9a7b34..e59985a947 100644 --- itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java +++ itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java @@ -7,7 +7,9 @@ import kafka.utils.ZKStringSerializer$; import kafka.utils.ZkUtils; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.service.AbstractService; +import org.apache.hive.druid.MiniDruidCluster; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; @@ -50,6 +52,19 @@ public SingleNodeKafkaCluster(String name, String logDir, Integer zkPort){ super(name); Properties properties = new Properties(); + File dir = new File(logDir); + if (dir.exists()) { + // need to clean data directory to ensure that there is no interference from old runs + // Cleaning is happening here to allow debugging in case of tests fail + // we don;t have to clean logs since it is an append mode + log.info("Cleaning the druid directory [{}]", dir.getAbsolutePath()); + try { + FileUtils.deleteDirectory(dir); + } catch (IOException e) { + log.error("Failed to clean druid directory"); + throw new RuntimeException(e); + } + } this.zkString = String.format("localhost:%d", zkPort); properties.setProperty("zookeeper.connect", zkString); properties.setProperty("broker.id", String.valueOf(1)); diff --git itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidKafkaCliDriver.java itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidKafkaCliDriver.java new file mode 100644 index 0000000000..a330aff2ef --- /dev/null +++ itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidKafkaCliDriver.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.cli; + +import org.apache.hadoop.hive.cli.control.CliAdapter; +import org.apache.hadoop.hive.cli.control.CliConfigs; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestRule; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.io.File; +import java.util.List; + +@RunWith(Parameterized.class) +public class TestMiniDruidKafkaCliDriver +{ + + static CliAdapter adapter = new CliConfigs.MiniDruidKafkaCliConfig().getCliAdapter(); + + @Parameters(name = "{0}") + public static List getParameters() throws Exception { + return adapter.getParameters(); + } + + @ClassRule + public static TestRule cliClassRule = adapter.buildClassRule(); + + @Rule + public TestRule cliTestRule = adapter.buildTestRule(); + + private String name; + private File qfile; + + public TestMiniDruidKafkaCliDriver(String name, File qfile) { + this.name = name; + this.qfile = qfile; + } + + @Test + public void testCliDriver() throws Exception { + adapter.runTest(name, qfile); + } + +} diff --git itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniHiveKafkaCliDriver.java itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniHiveKafkaCliDriver.java new file mode 100644 index 0000000000..d1eeb55d8e --- /dev/null +++ itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniHiveKafkaCliDriver.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.cli; + +import org.apache.hadoop.hive.cli.control.CliAdapter; +import org.apache.hadoop.hive.cli.control.CliConfigs; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestRule; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.io.File; +import java.util.List; + +@RunWith(Parameterized.class) +public class TestMiniHiveKafkaCliDriver +{ + + static CliAdapter adapter = new CliConfigs.MiniKafkaCliConfig().getCliAdapter(); + + @Parameters(name = "{0}") + public static List getParameters() throws Exception { + return adapter.getParameters(); + } + + @ClassRule + public static TestRule cliClassRule = adapter.buildClassRule(); + + @Rule + public TestRule cliTestRule = adapter.buildTestRule(); + + private String name; + private File qfile; + + public TestMiniHiveKafkaCliDriver(String name, File qfile) { + this.name = name; + this.qfile = qfile; + } + + @Test + public void testCliDriver() throws Exception { + adapter.runTest(name, qfile); + } + +} diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties index 5aadf2c8dd..d8f559c85c 100644 --- itests/src/test/resources/testconfiguration.properties +++ itests/src/test/resources/testconfiguration.properties @@ -1848,12 +1848,14 @@ druid.query.files=druidmini_test1.q,\ druidmini_extractTime.q,\ druidmini_test_alter.q,\ druidmini_floorTime.q, \ - druidmini_masking.q, \ - druidkafkamini_basic.q, \ + druidmini_masking.q + +druid.kafka.query.files=druidkafkamini_basic.q, \ druidkafkamini_avro.q, \ druidkafkamini_csv.q, \ - druidkafkamini_delimited.q, \ - kafka_storage_handler.q + druidkafkamini_delimited.q + +hive.kafka.query.files=kafka_storage_handler.q druid.llap.local.query.files=druidmini_noop.q diff --git itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java index 7ac7ba1260..2017c94f89 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java +++ itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java @@ -56,6 +56,8 @@ public CliConfig() { excludesFrom(testConfigProps, "disabled.query.files"); excludesFrom(testConfigProps, "localSpark.only.query.files"); excludesFrom(testConfigProps, "druid.query.files"); + excludesFrom(testConfigProps, "druid.kafka.query.files"); + excludesFrom(testConfigProps, "hive.kafka.query.files"); excludesFrom(testConfigProps, "erasurecoding.only.query.files"); excludeQuery("fouter_join_ppr.q"); // Disabled in HIVE-19509 @@ -187,6 +189,27 @@ public MiniDruidCliConfig() { setResultsDir("ql/src/test/results/clientpositive/druid"); setLogDir("itests/qtest/target/tmp/log"); + setInitScript("q_test_druid_init.sql"); + setCleanupScript("q_test_cleanup_druid.sql"); + setHiveConfDir("data/conf/llap"); + setClusterType(MiniClusterType.druid); + setMetastoreType(MetastoreType.sql); + setFsType(QTestUtil.FsType.hdfs); + } catch (Exception e) { + throw new RuntimeException("can't construct cliconfig", e); + } + } + } + + public static class MiniDruidKafkaCliConfig extends AbstractCliConfig { + public MiniDruidKafkaCliConfig() { + super(CoreCliDriver.class); + try { + setQueryDir("ql/src/test/queries/clientpositive"); + includesFrom(testConfigProps, "druid.kafka.query.files"); + setResultsDir("ql/src/test/results/clientpositive/druid"); + setLogDir("itests/qtest/target/tmp/log"); + setInitScript("q_test_druid_init.sql"); setCleanupScript("q_test_cleanup_druid.sql"); setHiveConfDir("data/conf/llap"); @@ -199,6 +222,24 @@ public MiniDruidCliConfig() { } } + public static class MiniKafkaCliConfig extends AbstractCliConfig { + public MiniKafkaCliConfig() { + super(CoreCliDriver.class); + try { + setQueryDir("ql/src/test/queries/clientpositive"); + includesFrom(testConfigProps, "hive.kafka.query.files"); + setResultsDir("ql/src/test/results/clientpositive/kafka"); + setLogDir("itests/qtest/target/tmp/log"); + setHiveConfDir("data/conf/llap"); + setClusterType(MiniClusterType.kafka); + setMetastoreType(MetastoreType.sql); + setFsType(QTestUtil.FsType.hdfs); + } catch (Exception e) { + throw new RuntimeException("can't construct cliconfig", e); + } + } + } + public static class MiniLlapLocalCliConfig extends AbstractCliConfig { public MiniLlapLocalCliConfig() { diff --git itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index dcf2d49f0f..56d52aac95 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -135,7 +135,6 @@ /** * QTestUtil. - * */ public class QTestUtil { @@ -150,21 +149,19 @@ private final static String defaultInitScript = "q_test_init.sql"; private final static String defaultCleanupScript = "q_test_cleanup.sql"; private static SimpleDateFormat formatter = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss"); - private final String[] testOnlyCommands = new String[]{"crypto", "erasure"}; + private final String[] testOnlyCommands = new String[]{ "crypto", "erasure" }; public static final String TEST_TMP_DIR_PROPERTY = "test.tmp.dir"; // typically target/tmp private static final String BUILD_DIR_PROPERTY = "build.dir"; // typically target private static final String TEST_SRC_TABLES_PROPERTY = "test.src.tables"; - /** * The default Erasure Coding Policy to use in Erasure Coding tests. */ public static final String DEFAULT_TEST_EC_POLICY = "RS-3-2-1024k"; private String testWarehouse; - @Deprecated - private final String testFiles; + @Deprecated private final String testFiles; private final File datasetDir; private final String outDir; protected final String logDir; @@ -204,19 +201,19 @@ private SingleNodeKafkaCluster kafkaCluster; public static Set getSrcTables() { - if (srcTables == null){ + if (srcTables == null) { initSrcTables(); } return srcTables; } - public static void addSrcTable(String table){ + public static void addSrcTable(String table) { getSrcTables().add(table); storeSrcTables(); } public static Set initSrcTables() { - if (srcTables == null){ + if (srcTables == null) { initSrcTablesFromSystemProperty(); storeSrcTables(); } @@ -228,7 +225,7 @@ private static void storeSrcTables() { System.setProperty(TEST_SRC_TABLES_PROPERTY, String.join(",", srcTables)); } - private static void initSrcTablesFromSystemProperty(){ + private static void initSrcTablesFromSystemProperty() { srcTables = new HashSet(); // FIXME: moved default value to here...for now // i think this features is never really used from the command line @@ -241,7 +238,7 @@ private static void initSrcTablesFromSystemProperty(){ } private CliDriver getCliDriver() { - if(cliDriver == null){ + if (cliDriver == null) { throw new RuntimeException("no clidriver"); } return cliDriver; @@ -249,6 +246,7 @@ private CliDriver getCliDriver() { /** * Returns the default UDF names which should not be removed when resetting the test database + * * @return The list of the UDF names not to remove */ private Set getSrcUDFs() { @@ -256,8 +254,7 @@ private CliDriver getCliDriver() { // FIXME: moved default value to here...for now // i think this features is never really used from the command line String defaultTestSrcUDFs = "qtest_get_java_boolean"; - for (String srcUDF : System.getProperty("test.src.udfs", defaultTestSrcUDFs).trim().split(",")) - { + for (String srcUDF : System.getProperty("test.src.udfs", defaultTestSrcUDFs).trim().split(",")) { srcUDF = srcUDF.trim(); if (!srcUDF.isEmpty()) { srcUDFs.add(srcUDF); @@ -276,19 +273,18 @@ public HiveConf getConf() { public void initConf() throws Exception { String vectorizationEnabled = System.getProperty("test.vectorization.enabled"); - if(vectorizationEnabled != null && vectorizationEnabled.equalsIgnoreCase("true")) { + if (vectorizationEnabled != null && vectorizationEnabled.equalsIgnoreCase("true")) { conf.setBoolVar(ConfVars.HIVE_VECTORIZATION_ENABLED, true); } // Plug verifying metastore in for testing DirectSQL. - conf.setVar(ConfVars.METASTORE_RAW_STORE_IMPL, - "org.apache.hadoop.hive.metastore.VerifyingObjectStore"); + conf.setVar(ConfVars.METASTORE_RAW_STORE_IMPL, "org.apache.hadoop.hive.metastore.VerifyingObjectStore"); if (mr != null) { mr.setupConfiguration(conf); // TODO Ideally this should be done independent of whether mr is setup or not. - setFsRelatedProperties(conf, fs.getScheme().equals("file"),fs); + setFsRelatedProperties(conf, fs.getScheme().equals("file"), fs); } if (llapCluster != null) { @@ -306,8 +302,7 @@ public void initConf() throws Exception { conf.set("hive.druid.metadata.uri", druidCluster.getMetadataURI()); conf.set("hive.druid.coordinator.address.default", druidCluster.getCoordinatorURI()); conf.set("hive.druid.overlord.address.default", druidCluster.getOverlordURI()); - final Path scratchDir = fs - .makeQualified(new Path(System.getProperty("test.tmp.dir"), "druidStagingDir")); + final Path scratchDir = fs.makeQualified(new Path(System.getProperty("test.tmp.dir"), "druidStagingDir")); fs.mkdirs(scratchDir); conf.set("hive.druid.working.directory", scratchDir.toUri().getPath()); } @@ -326,7 +321,7 @@ private void setFsRelatedProperties(HiveConf conf, boolean isLocalFs, FileSystem Path path = new Path(fsUriString, buildDir); // Create a fake fs root for local fs - Path localFsRoot = new Path(path, "localfs"); + Path localFsRoot = new Path(path, "localfs"); warehousePath = new Path(localFsRoot, "warehouse"); jarPath = new Path(localFsRoot, "jar"); userInstallPath = new Path(localFsRoot, "user_install"); @@ -366,50 +361,36 @@ private void createRemoteDirs() { try { fs.mkdirs(warehousePath); } catch (IOException e) { - LOG.error("Failed to create path={}. Continuing. Exception message={}", warehousePath, - e.getMessage()); + LOG.error("Failed to create path={}. Continuing. Exception message={}", warehousePath, e.getMessage()); } try { fs.mkdirs(hiveJarPath); } catch (IOException e) { - LOG.error("Failed to create path={}. Continuing. Exception message={}", warehousePath, - e.getMessage()); + LOG.error("Failed to create path={}. Continuing. Exception message={}", warehousePath, e.getMessage()); } try { fs.mkdirs(userInstallPath); } catch (IOException e) { - LOG.error("Failed to create path={}. Continuing. Exception message={}", warehousePath, - e.getMessage()); + LOG.error("Failed to create path={}. Continuing. Exception message={}", warehousePath, e.getMessage()); } } private enum CoreClusterType { - MR, - TEZ, - SPARK + MR, TEZ, SPARK } public enum FsType { - local, - hdfs, - encrypted_hdfs, - erasure_coded_hdfs, + local, hdfs, encrypted_hdfs, erasure_coded_hdfs, } public enum MiniClusterType { - mr(CoreClusterType.MR, FsType.hdfs), - tez(CoreClusterType.TEZ, FsType.hdfs), - tez_local(CoreClusterType.TEZ, FsType.local), - spark(CoreClusterType.SPARK, FsType.local), - miniSparkOnYarn(CoreClusterType.SPARK, FsType.hdfs), - llap(CoreClusterType.TEZ, FsType.hdfs), - llap_local(CoreClusterType.TEZ, FsType.local), - none(CoreClusterType.MR, FsType.local), - druidLocal(CoreClusterType.TEZ, FsType.local), - druidKafka(CoreClusterType.TEZ, FsType.hdfs), - kafka(CoreClusterType.TEZ, FsType.hdfs); - + mr(CoreClusterType.MR, FsType.hdfs), tez(CoreClusterType.TEZ, FsType.hdfs), tez_local(CoreClusterType.TEZ, + FsType.local), spark(CoreClusterType.SPARK, FsType.local), miniSparkOnYarn(CoreClusterType.SPARK, + FsType.hdfs), llap(CoreClusterType.TEZ, FsType.hdfs), llap_local(CoreClusterType.TEZ, FsType.local), none( + CoreClusterType.MR, + FsType.local), druidLocal(CoreClusterType.TEZ, FsType.local), druid(CoreClusterType.TEZ, + FsType.hdfs), druidKafka(CoreClusterType.TEZ, FsType.hdfs), kafka(CoreClusterType.TEZ, FsType.hdfs); private final CoreClusterType coreClusterType; private final FsType defaultFsType; @@ -445,10 +426,13 @@ public static MiniClusterType valueForString(String type) { return llap_local; } else if (type.equals("druidLocal")) { return druidLocal; + } else if (type.equals("druid")) { + return druid; } else if (type.equals("druid-kafka")) { return druidKafka; - } - else { + } else if (type.equals("kafka")) { + return kafka; + } else { return none; } } @@ -464,10 +448,16 @@ private String getKeyProviderURI() { } public QTestUtil(QTestArguments testArgs) throws Exception { - LOG.info("Setting up QTestUtil with outDir={}, logDir={}, clusterType={}, confDir={}," + - " initScript={}, cleanupScript={}, withLlapIo={}, fsType={}", - testArgs.getOutDir(), testArgs.getLogDir(), testArgs.getClusterType(), testArgs.getConfDir(), - testArgs.getInitScript(), testArgs.getCleanupScript(), testArgs.isWithLlapIo(), testArgs.getFsType()); + LOG.info("Setting up QTestUtil with outDir={}, logDir={}, clusterType={}, confDir={}," + + " initScript={}, cleanupScript={}, withLlapIo={}, fsType={}", + testArgs.getOutDir(), + testArgs.getLogDir(), + testArgs.getClusterType(), + testArgs.getConfDir(), + testArgs.getInitScript(), + testArgs.getCleanupScript(), + testArgs.isWithLlapIo(), + testArgs.getFsType()); Preconditions.checkNotNull(testArgs.getClusterType(), "ClusterType cannot be null"); @@ -479,10 +469,11 @@ public QTestUtil(QTestArguments testArgs) throws Exception { // HIVE-14443 move this fall-back logic to CliConfigs if (testArgs.getConfDir() != null && !testArgs.getConfDir().isEmpty()) { - HiveConf.setHiveSiteLocation(new URL( - "file://"+ new File(testArgs.getConfDir()).toURI().getPath() + "/hive-site.xml")); + HiveConf.setHiveSiteLocation(new URL("file://" + + new File(testArgs.getConfDir()).toURI().getPath() + + "/hive-site.xml")); MetastoreConf.setHiveSiteLocation(HiveConf.getHiveSiteLocation()); - System.out.println("Setting hive-site: "+HiveConf.getHiveSiteLocation()); + System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation()); } queryState = new QueryState.Builder().withHiveConf(new HiveConf(IDriver.class)).build(); @@ -510,7 +501,6 @@ public QTestUtil(QTestArguments testArgs) throws Exception { LlapProxy.initializeLlapIo(conf); } - // Use the current directory if it is not specified String dataDir = conf.get("test.data.files"); if (dataDir == null) { @@ -520,9 +510,10 @@ public QTestUtil(QTestArguments testArgs) throws Exception { conf.set("test.data.dir", dataDir); // Use path relative to dataDir directory if it is not specified - datasetDir = conf.get("test.data.set.files") == null - ? new File(new File(dataDir).getAbsolutePath() + "/datasets") - : new File(conf.get("test.data.set.files")); + datasetDir = + conf.get("test.data.set.files") == null ? + new File(new File(dataDir).getAbsolutePath() + "/datasets") : + new File(conf.get("test.data.set.files")); String scriptsDir = getScriptsDir(); @@ -552,7 +543,7 @@ private void setupFileSystem(HadoopShims shims) throws IOException { if (fsType == FsType.local) { fs = FileSystem.getLocal(conf); - } else if (fsType == FsType.hdfs || fsType == FsType.encrypted_hdfs|| fsType == FsType.erasure_coded_hdfs) { + } else if (fsType == FsType.hdfs || fsType == FsType.encrypted_hdfs || fsType == FsType.erasure_coded_hdfs) { int numDataNodes = 4; // Setup before getting dfs @@ -599,80 +590,67 @@ private void setupFileSystem(HadoopShims shims) throws IOException { } } - private void setupMiniCluster(HadoopShims shims, String confDir) throws - IOException { + private void setupMiniCluster(HadoopShims shims, String confDir) throws IOException { String uriString = fs.getUri().toString(); if (clusterType == MiniClusterType.druidKafka - || clusterType == MiniClusterType.druidLocal) { + || clusterType == MiniClusterType.druidLocal + || clusterType == MiniClusterType.druid) { final String tempDir = System.getProperty("test.tmp.dir"); - druidCluster = new MiniDruidCluster("mini-druid", - logDir, - tempDir, - setup.zkPort, - Utilities.jarFinderGetJar(MiniDruidCluster.class) - ); + druidCluster = + new MiniDruidCluster("mini-druid", + logDir, + tempDir, + setup.zkPort, + Utilities.jarFinderGetJar(MiniDruidCluster.class)); final Path druidDeepStorage = fs.makeQualified(new Path(druidCluster.getDeepStorageDir())); fs.mkdirs(druidDeepStorage); - conf.set("hive.druid.storage.storageDirectory", druidDeepStorage.toUri().getPath()); - conf.set("hive.druid.metadata.db.type", "derby"); - conf.set("hive.druid.metadata.uri", druidCluster.getMetadataURI()); - final Path scratchDir = fs - .makeQualified(new Path(System.getProperty("test.tmp.dir"), "druidStagingDir")); + final Path scratchDir = fs.makeQualified(new Path(System.getProperty("test.tmp.dir"), "druidStagingDir")); fs.mkdirs(scratchDir); conf.set("hive.druid.working.directory", scratchDir.toUri().getPath()); druidCluster.init(conf); druidCluster.start(); } - if (clusterType == MiniClusterType.kafka - || clusterType == MiniClusterType.druidKafka - || clusterType == MiniClusterType.druidLocal) { - kafkaCluster = new SingleNodeKafkaCluster("kafka", - logDir + "/kafka-cluster", - setup.zkPort - ); + if (clusterType == MiniClusterType.kafka || clusterType == MiniClusterType.druidKafka) { + kafkaCluster = + new SingleNodeKafkaCluster("kafka", System.getProperty("test.tmp.dir") + "/kafka-cluster", setup.zkPort); kafkaCluster.init(conf); kafkaCluster.start(); - kafkaCluster.createTopicWithData( - "test-topic", - new File(getScriptsDir(), "kafka_init_data.json") - ); - kafkaCluster.createTopicWithData( - "wiki_kafka_csv", - new File(getScriptsDir(), "kafka_init_data.csv") - ); + kafkaCluster.createTopicWithData("test-topic", new File(getScriptsDir(), "kafka_init_data.json")); + kafkaCluster.createTopicWithData("wiki_kafka_csv", new File(getScriptsDir(), "kafka_init_data.csv")); kafkaCluster.createTopicWithData("wiki_kafka_avro_table", getAvroRows()); } if (clusterType.getCoreClusterType() == CoreClusterType.TEZ) { if (confDir != null && !confDir.isEmpty()) { - conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() - + "/tez-site.xml")); + conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() + "/tez-site.xml")); } int numTrackers = 2; - if (EnumSet.of( - MiniClusterType.llap, + if (EnumSet.of(MiniClusterType.llap, MiniClusterType.llap_local, MiniClusterType.druidLocal, - MiniClusterType.druidKafka - ).contains(clusterType)) { + MiniClusterType.druidKafka, + MiniClusterType.druid, + MiniClusterType.kafka).contains(clusterType)) { llapCluster = LlapItUtils.startAndGetMiniLlapCluster(conf, setup.zooKeeperCluster, confDir); } if (EnumSet.of(MiniClusterType.llap_local, MiniClusterType.tez_local, MiniClusterType.druidLocal) - .contains(clusterType)) { - mr = shims.getLocalMiniTezCluster(conf, - clusterType == MiniClusterType.llap_local - || clusterType == MiniClusterType.druidLocal - ); + .contains(clusterType)) { + mr = + shims.getLocalMiniTezCluster(conf, + clusterType == MiniClusterType.llap_local || clusterType == MiniClusterType.druidLocal); } else { - mr = shims.getMiniTezCluster( - conf, - numTrackers, - uriString, - EnumSet.of(MiniClusterType.llap, MiniClusterType.llap_local, MiniClusterType.druidKafka).contains(clusterType) - ); + mr = + shims.getMiniTezCluster(conf, + numTrackers, + uriString, + EnumSet.of(MiniClusterType.llap, + MiniClusterType.llap_local, + MiniClusterType.druidKafka, + MiniClusterType.druid, + MiniClusterType.kafka).contains(clusterType)); } } else if (clusterType == MiniClusterType.miniSparkOnYarn) { mr = shims.getMiniSparkCluster(conf, 2, uriString, 1); @@ -685,42 +663,38 @@ private void setupMiniCluster(HadoopShims shims, String confDir) throws int numRows = 10; List events; final DatumWriter writer = new SpecificDatumWriter<>(Wikipedia.getClassSchema()); - events = - IntStream.rangeClosed(0, numRows) - .mapToObj(i -> Wikipedia.newBuilder() - // 1534736225090 -> 08/19/2018 20:37:05 - .setTimestamp(formatter.format(new Timestamp(1534736225090L + 1000 * 3600 * i))) - .setAdded(i * 300) - .setDeleted(-i) - .setIsrobot(i % 2 == 0) - .setChannel("chanel number " + i) - .setComment("comment number " + i) - .setCommentlength(i) - .setDiffurl(String.format("url %s", i)) - .setFlags("flag") - .setIsminor(i % 2 > 0) - .setIsanonymous(i % 3 != 0) - .setNamespace("namespace") - .setIsunpatrolled(new Boolean(i % 3 == 0)) - .setIsnew(new Boolean(i % 2 > 0)) - .setPage(String.format("page is %s", i * 100)) - .setDelta(i) - .setDeltabucket(i * 100.4) - .setUser("test-user-" + i) - .build()) - .map(genericRecord -> { - java.io.ByteArrayOutputStream out = new java.io.ByteArrayOutputStream(); - BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); - try { - writer.write(genericRecord, encoder); - encoder.flush(); - out.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - return out.toByteArray(); - }) - .collect(Collectors.toList()); + events = IntStream.rangeClosed(0, numRows).mapToObj(i -> Wikipedia.newBuilder() + // 1534736225090 -> 08/19/2018 20:37:05 + .setTimestamp(formatter.format(new Timestamp(1534736225090L + 1000 * 3600 * i))) + .setAdded(i * 300) + .setDeleted(-i) + .setIsrobot(i % 2 == 0) + .setChannel("chanel number " + i) + .setComment("comment number " + i) + .setCommentlength(i) + .setDiffurl(String.format("url %s", i)) + .setFlags("flag") + .setIsminor(i % 2 > 0) + .setIsanonymous(i % 3 != 0) + .setNamespace("namespace") + .setIsunpatrolled(new Boolean(i % 3 == 0)) + .setIsnew(new Boolean(i % 2 > 0)) + .setPage(String.format("page is %s", i * 100)) + .setDelta(i) + .setDeltabucket(i * 100.4) + .setUser("test-user-" + i) + .build()).map(genericRecord -> { + java.io.ByteArrayOutputStream out = new java.io.ByteArrayOutputStream(); + BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); + try { + writer.write(genericRecord, encoder); + encoder.flush(); + out.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + return out.toByteArray(); + }).collect(Collectors.toList()); return events; } @@ -764,8 +738,9 @@ public void shutdown() throws Exception { } public String readEntireFileIntoString(File queryFile) throws IOException { - InputStreamReader isr = new InputStreamReader( - new BufferedInputStream(new FileInputStream(queryFile)), QTestUtil.UTF_8); + InputStreamReader + isr = + new InputStreamReader(new BufferedInputStream(new FileInputStream(queryFile)), QTestUtil.UTF_8); StringWriter sw = new StringWriter(); try { IOUtils.copy(isr, sw); @@ -781,7 +756,7 @@ public void addFile(String queryFile) throws IOException { addFile(new File(queryFile), false); } - public void addFile(File qf, boolean partial) throws IOException { + public void addFile(File qf, boolean partial) throws IOException { String query = readEntireFileIntoString(qf); qMap.put(qf.getName(), query); if (partial) { @@ -859,8 +834,7 @@ public void clearTablesCreatedDuringTests() throws Exception { return; } - conf.set("hive.metastore.filter.hook", - "org.apache.hadoop.hive.metastore.DefaultMetaStoreFilterHookImpl"); + conf.set("hive.metastore.filter.hook", "org.apache.hadoop.hive.metastore.DefaultMetaStoreFilterHookImpl"); db = Hive.get(conf); // First delete any MVs to avoid race conditions @@ -921,10 +895,10 @@ public void clearTablesCreatedDuringTests() throws Exception { SessionState.get().setCurrentDatabase(DEFAULT_DATABASE_NAME); List roleNames = db.getAllRoleNames(); - for (String roleName : roleNames) { - if (!"PUBLIC".equalsIgnoreCase(roleName) && !"ADMIN".equalsIgnoreCase(roleName)) { - db.dropRole(roleName); - } + for (String roleName : roleNames) { + if (!"PUBLIC".equalsIgnoreCase(roleName) && !"ADMIN".equalsIgnoreCase(roleName)) { + db.dropRole(roleName); + } } } @@ -942,7 +916,8 @@ public void newSession(boolean canReuseSession) throws Exception { // renew the metastore since the cluster type is unencrypted db = Hive.get(conf); // propagate new conf to meta store - HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_AUTHENTICATOR_MANAGER, + HiveConf.setVar(conf, + HiveConf.ConfVars.HIVE_AUTHENTICATOR_MANAGER, "org.apache.hadoop.hive.ql.security.DummyAuthenticator"); CliSessionState ss = new CliSessionState(conf); ss.in = System.in; @@ -959,6 +934,7 @@ public void newSession(boolean canReuseSession) throws Exception { setSessionOutputs("that_shouldnt_happen_there", ss, outf); } + /** * Clear out any side effects of running tests */ @@ -989,7 +965,7 @@ public void cleanUp() throws Exception { public void cleanUp(String fileName) throws Exception { boolean canReuseSession = (fileName == null) || !qNoSessionReuseQuerySet.contains(fileName); - if(!isSessionStateStarted) { + if (!isSessionStateStarted) { startSessionState(canReuseSession); } if (System.getenv(QTEST_LEAVE_FILES) != null) { @@ -1008,8 +984,8 @@ public void cleanUp(String fileName) throws Exception { FileSystem fs = p.getFileSystem(conf); try { - FileStatus [] ls = fs.listStatus(p); - for (int i=0; (ls != null) && (i outputs) - throws Exception { + public QTestProcessExecResult checkCompareCliDriverResults(String tname, List outputs) throws Exception { assert outputs.size() > 1; qOutProcessor.maskPatterns(outputs.get(0), tname); for (int i = 1; i < outputs.size(); ++i) { qOutProcessor.maskPatterns(outputs.get(i), tname); - QTestProcessExecResult result = executeDiffCommand( - outputs.get(i - 1), outputs.get(i), false, qSortSet.contains(tname)); + QTestProcessExecResult + result = + executeDiffCommand(outputs.get(i - 1), outputs.get(i), false, qSortSet.contains(tname)); if (result.getReturnCode() != 0) { System.out.println("Files don't match: " + outputs.get(i - 1) + " and " + outputs.get(i)); return result; @@ -1554,21 +1532,16 @@ private static void overwriteResults(String inFileName, String outFileName) thro // once Hive uses JAVA 7. System.out.println("Overwriting results " + inFileName + " to " + outFileName); int result = executeCmd(new String[]{ - "cp", - getQuotedString(inFileName), - getQuotedString(outFileName) - }).getReturnCode(); + "cp", getQuotedString(inFileName), getQuotedString(outFileName) }).getReturnCode(); if (result != 0) { - throw new IllegalStateException("Unexpected error while overwriting " + - inFileName + " with " + outFileName); + throw new IllegalStateException("Unexpected error while overwriting " + inFileName + " with " + outFileName); } } private static QTestProcessExecResult executeDiffCommand(String inFileName, String outFileName, boolean ignoreWhiteSpace, - boolean sortResults - ) throws Exception { + boolean sortResults) throws Exception { QTestProcessExecResult result; @@ -1617,9 +1590,7 @@ private static QTestProcessExecResult executeDiffCommand(String inFileName, private static void sortFiles(String in, String out) throws Exception { int result = executeCmd(new String[]{ - "sort", - getQuotedString(in), - }, out, null).getReturnCode(); + "sort", getQuotedString(in), }, out, null).getReturnCode(); if (result != 0) { throw new IllegalStateException("Unexpected error while sorting " + in); } @@ -1633,22 +1604,25 @@ private static QTestProcessExecResult executeCmd(String[] args) throws Exception return executeCmd(args, null, null); } - private static QTestProcessExecResult executeCmd(Collection args, String outFile, - String errFile) throws Exception { + private static QTestProcessExecResult executeCmd(Collection args, String outFile, String errFile) + throws Exception { String[] cmdArray = args.toArray(new String[args.size()]); return executeCmd(cmdArray, outFile, errFile); } - private static QTestProcessExecResult executeCmd(String[] args, String outFile, - String errFile) throws Exception { + private static QTestProcessExecResult executeCmd(String[] args, String outFile, String errFile) throws Exception { System.out.println("Running: " + org.apache.commons.lang.StringUtils.join(args, ' ')); - PrintStream out = outFile == null ? - SessionState.getConsole().getChildOutStream() : - new PrintStream(new FileOutputStream(outFile), true, "UTF-8"); - PrintStream err = errFile == null ? - SessionState.getConsole().getChildErrStream() : - new PrintStream(new FileOutputStream(errFile), true, "UTF-8"); + PrintStream + out = + outFile == null ? + SessionState.getConsole().getChildOutStream() : + new PrintStream(new FileOutputStream(outFile), true, "UTF-8"); + PrintStream + err = + errFile == null ? + SessionState.getConsole().getChildErrStream() : + new PrintStream(new FileOutputStream(errFile), true, "UTF-8"); Process executor = Runtime.getRuntime().exec(args); @@ -1678,7 +1652,7 @@ private static QTestProcessExecResult executeCmd(String[] args, String outFile, create(result, new String(bos.toByteArray(), StandardCharsets.UTF_8)); } - private static String getQuotedString(String str){ + private static String getQuotedString(String str) { return str; } @@ -1704,8 +1678,7 @@ public ASTNode parseQuery(String tname) throws Exception { * QTestSetup defines test fixtures which are reused across testcases, * and are needed before any test can be run */ - public static class QTestSetup - { + public static class QTestSetup { private MiniZooKeeperCluster zooKeeperCluster = null; private int zkPort; private ZooKeeper zooKeeper; @@ -1717,7 +1690,7 @@ public void preTest(HiveConf conf) throws Exception { if (zooKeeperCluster == null) { //create temp dir - String tmpBaseDir = System.getProperty(TEST_TMP_DIR_PROPERTY); + String tmpBaseDir = System.getProperty(TEST_TMP_DIR_PROPERTY); File tmpDir = Files.createTempDirectory(Paths.get(tmpBaseDir), "tmp_").toFile(); zooKeeperCluster = new MiniZooKeeperCluster(); @@ -1728,10 +1701,11 @@ public void preTest(HiveConf conf) throws Exception { zooKeeper.close(); } - int sessionTimeout = (int) conf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS); + int + sessionTimeout = + (int) conf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS); zooKeeper = new ZooKeeper("localhost:" + zkPort, sessionTimeout, new Watcher() { - @Override - public void process(WatchedEvent arg0) { + @Override public void process(WatchedEvent arg0) { } }); @@ -1764,7 +1738,6 @@ public void tearDown() throws Exception { /** * QTRunner: Runnable class for running a single query file. - * **/ public static class QTRunner implements Runnable { private final QTestUtil qt; @@ -1775,8 +1748,7 @@ public QTRunner(QTestUtil qt, File file) { this.file = file; } - @Override - public void run() { + @Override public void run() { try { qt.startSessionState(false); // assumption is that environment has already been cleaned once globally @@ -1784,8 +1756,7 @@ public void run() { qt.cliInit(file); qt.executeClient(file.getName()); } catch (Throwable e) { - System.err.println("Query file " + file.getName() + " failed with exception " - + e.getMessage()); + System.err.println("Query file " + file.getName() + " failed with exception " + e.getMessage()); e.printStackTrace(); outputTestFailureHelpMessage(); } @@ -1795,31 +1766,30 @@ public void run() { /** * Setup to execute a set of query files. Uses QTestUtil to do so. * - * @param qfiles - * array of input query files containing arbitrary number of hive - * queries - * @param resDir - * output directory - * @param logDir - * log directory + * @param qfiles array of input query files containing arbitrary number of hive + * queries + * @param resDir output directory + * @param logDir log directory * @return one QTestUtil for each query file */ - public static QTestUtil[] queryListRunnerSetup(File[] qfiles, String resDir, - String logDir, String initScript, String cleanupScript) throws Exception - { + public static QTestUtil[] queryListRunnerSetup(File[] qfiles, + String resDir, + String logDir, + String initScript, + String cleanupScript) throws Exception { QTestUtil[] qt = new QTestUtil[qfiles.length]; for (int i = 0; i < qfiles.length; i++) { - qt[i] = new QTestUtil( - QTestArguments.QTestArgumentsBuilder.instance() - .withOutDir(resDir) - .withLogDir(logDir) - .withClusterType(MiniClusterType.none) - .withConfDir(null) - .withInitScript(initScript == null ? defaultInitScript : initScript) - .withCleanupScript(cleanupScript == null ? defaultCleanupScript : cleanupScript) - .withLlapIo(false) - .build()); + qt[i] = + new QTestUtil(QTestArguments.QTestArgumentsBuilder.instance() + .withOutDir(resDir) + .withLogDir(logDir) + .withClusterType(MiniClusterType.none) + .withConfDir(null) + .withInitScript(initScript == null ? defaultInitScript : initScript) + .withCleanupScript(cleanupScript == null ? defaultCleanupScript : cleanupScript) + .withLlapIo(false) + .build()); qt[i].addFile(qfiles[i], false); qt[i].clearTestSideEffects(); @@ -1831,16 +1801,12 @@ public void run() { /** * Executes a set of query files in sequence. * - * @param qfiles - * array of input query files containing arbitrary number of hive - * queries - * @param qt - * array of QTestUtils, one per qfile + * @param qfiles array of input query files containing arbitrary number of hive + * queries + * @param qt array of QTestUtils, one per qfile * @return true if all queries passed, false otw */ - public static boolean queryListRunnerSingleThreaded(File[] qfiles, QTestUtil[] qt) - throws Exception - { + public static boolean queryListRunnerSingleThreaded(File[] qfiles, QTestUtil[] qt) throws Exception { boolean failed = false; qt[0].cleanUp(); qt[0].createSources(); @@ -1869,21 +1835,16 @@ public static boolean queryListRunnerSingleThreaded(File[] qfiles, QTestUtil[] q /** * Executes a set of query files parallel. - * + *

* Each query file is run in a separate thread. The caller has to arrange * that different query files do not collide (in terms of destination tables) * - * @param qfiles - * array of input query files containing arbitrary number of hive - * queries - * @param qt - * array of QTestUtils, one per qfile + * @param qfiles array of input query files containing arbitrary number of hive + * queries + * @param qt array of QTestUtils, one per qfile * @return true if all queries passed, false otw - * */ - public static boolean queryListRunnerMultiThreaded(File[] qfiles, QTestUtil[] qt) - throws Exception - { + public static boolean queryListRunnerMultiThreaded(File[] qfiles, QTestUtil[] qt) throws Exception { boolean failed = false; // in multithreaded mode - do cleanup/initialization just once @@ -1925,10 +1886,9 @@ public static boolean queryListRunnerMultiThreaded(File[] qfiles, QTestUtil[] qt } public static void outputTestFailureHelpMessage() { - System.err.println( - "See ./ql/target/tmp/log/hive.log or ./itests/qtest/target/tmp/log/hive.log, or check " + - "./ql/target/surefire-reports or ./itests/qtest/target/surefire-reports/ for specific " + - "test cases logs."); + System.err.println("See ./ql/target/tmp/log/hive.log or ./itests/qtest/target/tmp/log/hive.log, or check " + + "./ql/target/surefire-reports or ./itests/qtest/target/surefire-reports/ for specific " + + "test cases logs."); System.err.flush(); } @@ -1952,8 +1912,7 @@ private static void ensureQvFileList(String queryDir) { // Not thread-safe. System.out.println("Getting versions from " + queryDir); cachedQvFileList = (new File(queryDir)).list(new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { + @Override public boolean accept(File dir, String name) { return name.toLowerCase().endsWith(".qv"); } }); @@ -1962,8 +1921,7 @@ public boolean accept(File dir, String name) { } Arrays.sort(cachedQvFileList, String.CASE_INSENSITIVE_ORDER); List defaults = getVersionFilesInternal("default"); - cachedDefaultQvFileList = (defaults != null) - ? ImmutableList.copyOf(defaults) : ImmutableList.of(); + cachedDefaultQvFileList = (defaults != null) ? ImmutableList.copyOf(defaults) : ImmutableList.of(); } private static List getVersionFilesInternal(String tname) { @@ -1992,25 +1950,34 @@ public boolean accept(File dir, String name) { public void failed(int ecode, String fname, String debugHint) { String command = SessionState.get() != null ? SessionState.get().getLastCommand() : null; - String message = "Client execution failed with error code = " + ecode + - (command != null ? " running \"" + command : "") + "\" fname=" + fname + " " + - (debugHint != null ? debugHint : ""); + String + message = + "Client execution failed with error code = " + + ecode + + (command != null ? " running \"" + command : "") + + "\" fname=" + + fname + + " " + + (debugHint != null ? debugHint : ""); LOG.error(message); Assert.fail(message); } // for negative tests, which is succeeded.. no need to print the query string public void failed(String fname, String debugHint) { - Assert.fail( - "Client Execution was expected to fail, but succeeded with error code 0 for fname=" + - fname + (debugHint != null ? (" " + debugHint) : "")); + Assert.fail("Client Execution was expected to fail, but succeeded with error code 0 for fname=" + fname + (debugHint + != null ? (" " + debugHint) : "")); } public void failedDiff(int ecode, String fname, String debugHint) { - String message = - "Client Execution succeeded but contained differences " + - "(error code = " + ecode + ") after executing " + - fname + (debugHint != null ? (" " + debugHint) : ""); + String + message = + "Client Execution succeeded but contained differences " + + "(error code = " + + ecode + + ") after executing " + + fname + + (debugHint != null ? (" " + debugHint) : ""); LOG.error(message); Assert.fail(message); } @@ -2019,10 +1986,8 @@ public void failed(Exception e, String fname, String debugHint) { String command = SessionState.get() != null ? SessionState.get().getLastCommand() : null; System.err.println("Failed query: " + fname); System.err.flush(); - Assert.fail("Unexpected exception " + - org.apache.hadoop.util.StringUtils.stringifyException(e) + "\n" + - (command != null ? " running " + command : "") + - (debugHint != null ? debugHint : "")); + Assert.fail("Unexpected exception " + org.apache.hadoop.util.StringUtils.stringifyException(e) + "\n" + (command + != null ? " running " + command : "") + (debugHint != null ? debugHint : "")); } public QOutProcessor getQOutProcessor() { @@ -2035,6 +2000,7 @@ public static void initEventNotificationPoll() throws Exception { /** * Should deleted test tables have their data purged. + * * @return true if data should be purged */ private static boolean fsNeedsPurge(FsType type) { diff --git ql/src/test/queries/clientpositive/druidmini_test_insert.q ql/src/test/queries/clientpositive/druidmini_test_insert.q index 28fa6d7aff..96644f7e0d 100644 --- ql/src/test/queries/clientpositive/druidmini_test_insert.q +++ ql/src/test/queries/clientpositive/druidmini_test_insert.q @@ -153,4 +153,4 @@ select * from druid_test_table_n9 where `__time` = cast('2015-03-12 00:00:00' as DROP TABLE test_base_table; DROP TABLE druid_test_table_n9; -drop database druid_test_dst; +drop database druid_test_dst; diff --git ql/src/test/results/clientpositive/druid/druidmini_test_insert.q.out ql/src/test/results/clientpositive/druid/druidmini_test_insert.q.out index b74d33c4ea..bb309e51d8 100644 --- ql/src/test/results/clientpositive/druid/druidmini_test_insert.q.out +++ ql/src/test/results/clientpositive/druid/druidmini_test_insert.q.out @@ -865,11 +865,11 @@ POSTHOOK: query: DROP TABLE druid_test_table_n9 POSTHOOK: type: DROPTABLE POSTHOOK: Input: druid_test_dst@druid_test_table_n9 POSTHOOK: Output: druid_test_dst@druid_test_table_n9 -PREHOOK: query: drop database druid_test_dst +PREHOOK: query: drop database druid_test_dst PREHOOK: type: DROPDATABASE PREHOOK: Input: database:druid_test_dst PREHOOK: Output: database:druid_test_dst -POSTHOOK: query: drop database druid_test_dst +POSTHOOK: query: drop database druid_test_dst POSTHOOK: type: DROPDATABASE POSTHOOK: Input: database:druid_test_dst POSTHOOK: Output: database:druid_test_dst diff --git ql/src/test/results/clientpositive/kafka/kafka_storage_handler.q.out ql/src/test/results/clientpositive/kafka/kafka_storage_handler.q.out new file mode 100644 index 0000000000..883b4477a6 --- /dev/null +++ ql/src/test/results/clientpositive/kafka/kafka_storage_handler.q.out @@ -0,0 +1,1717 @@ +PREHOOK: query: CREATE EXTERNAL TABLE kafka_table +(`__time` timestamp , `page` string, `user` string, `language` string, +`country` string,`continent` string, `namespace` string, `newPage` boolean, `unpatrolled` boolean, +`anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) +STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' +WITH SERDEPROPERTIES ("timestamp.formats"="yyyy-MM-dd\'T\'HH:mm:ss\'Z\'") +TBLPROPERTIES +("kafka.topic" = "test-topic", +"kafka.bootstrap.servers"="localhost:9092", +"kafka.serde.class"="org.apache.hadoop.hive.serde2.JsonSerDe") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@kafka_table +POSTHOOK: query: CREATE EXTERNAL TABLE kafka_table +(`__time` timestamp , `page` string, `user` string, `language` string, +`country` string,`continent` string, `namespace` string, `newPage` boolean, `unpatrolled` boolean, +`anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) +STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' +WITH SERDEPROPERTIES ("timestamp.formats"="yyyy-MM-dd\'T\'HH:mm:ss\'Z\'") +TBLPROPERTIES +("kafka.topic" = "test-topic", +"kafka.bootstrap.servers"="localhost:9092", +"kafka.serde.class"="org.apache.hadoop.hive.serde2.JsonSerDe") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@kafka_table +PREHOOK: query: DESCRIBE EXTENDED kafka_table +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@kafka_table +POSTHOOK: query: DESCRIBE EXTENDED kafka_table +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@kafka_table +__time timestamp from deserializer +page string from deserializer +user string from deserializer +language string from deserializer +country string from deserializer +continent string from deserializer +namespace string from deserializer +newpage boolean from deserializer +unpatrolled boolean from deserializer +anonymous boolean from deserializer +robot boolean from deserializer +added int from deserializer +deleted int from deserializer +delta bigint from deserializer +__key binary from deserializer +__partition int from deserializer +__offset bigint from deserializer +__timestamp bigint from deserializer + +#### A masked pattern was here #### +StorageHandlerInfo +Partition(topic = test-topic, partition = 0, leader = 1, replicas = [1], isr = [1], offlineReplicas = []) [start offset = [0], end offset = [10]] +PREHOOK: query: Select `__partition` , `__offset`,`__key`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +`unpatrolled` , `anonymous` , `robot` , added , deleted , delta FROM kafka_table +PREHOOK: type: QUERY +PREHOOK: Input: default@kafka_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select `__partition` , `__offset`,`__key`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +`unpatrolled` , `anonymous` , `robot` , added , deleted , delta FROM kafka_table +POSTHOOK: type: QUERY +POSTHOOK: Input: default@kafka_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +0 0 key NULL Gypsy Danger nuclear en United States North America article true true false false 57 200 -143 +0 1 key NULL Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 +0 2 key NULL Cherno Alpha masterYi ru Russia Asia article true false false true 123 12 111 +0 3 key NULL Crimson Typhoon triplets zh China Asia wikipedia false true false true 905 5 900 +0 4 key NULL Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 +0 5 key NULL Gypsy Danger nuclear en United States North America article true true false false 57 200 -143 +0 6 key NULL Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 +0 7 key NULL Cherno Alpha masterYi ru Russia Asia article true false false true 123 12 111 +0 8 key NULL Crimson Typhoon triplets zh China Asia wikipedia false true false true 905 5 900 +0 9 key NULL Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 +PREHOOK: query: Select count(*) FROM kafka_table +PREHOOK: type: QUERY +PREHOOK: Input: default@kafka_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select count(*) FROM kafka_table +POSTHOOK: type: QUERY +POSTHOOK: Input: default@kafka_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +10 +PREHOOK: query: Select `__partition`, `__offset`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +`unpatrolled` , `anonymous` , `robot` , added , deleted , delta +from kafka_table where `__timestamp` > 1533960760123 +PREHOOK: type: QUERY +PREHOOK: Input: default@kafka_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select `__partition`, `__offset`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +`unpatrolled` , `anonymous` , `robot` , added , deleted , delta +from kafka_table where `__timestamp` > 1533960760123 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@kafka_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +0 0 NULL Gypsy Danger nuclear en United States North America article true true false false 57 200 -143 +0 1 NULL Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 +0 2 NULL Cherno Alpha masterYi ru Russia Asia article true false false true 123 12 111 +0 3 NULL Crimson Typhoon triplets zh China Asia wikipedia false true false true 905 5 900 +0 4 NULL Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 +0 5 NULL Gypsy Danger nuclear en United States North America article true true false false 57 200 -143 +0 6 NULL Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 +0 7 NULL Cherno Alpha masterYi ru Russia Asia article true false false true 123 12 111 +0 8 NULL Crimson Typhoon triplets zh China Asia wikipedia false true false true 905 5 900 +0 9 NULL Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 +PREHOOK: query: Select `__partition`, `__offset` ,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +`unpatrolled` , `anonymous` , `robot` , added , deleted , delta +from kafka_table where `__timestamp` > 533960760123 +PREHOOK: type: QUERY +PREHOOK: Input: default@kafka_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select `__partition`, `__offset` ,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +`unpatrolled` , `anonymous` , `robot` , added , deleted , delta +from kafka_table where `__timestamp` > 533960760123 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@kafka_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +0 0 NULL Gypsy Danger nuclear en United States North America article true true false false 57 200 -143 +0 1 NULL Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 +0 2 NULL Cherno Alpha masterYi ru Russia Asia article true false false true 123 12 111 +0 3 NULL Crimson Typhoon triplets zh China Asia wikipedia false true false true 905 5 900 +0 4 NULL Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 +0 5 NULL Gypsy Danger nuclear en United States North America article true true false false 57 200 -143 +0 6 NULL Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 +0 7 NULL Cherno Alpha masterYi ru Russia Asia article true false false true 123 12 111 +0 8 NULL Crimson Typhoon triplets zh China Asia wikipedia false true false true 905 5 900 +0 9 NULL Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 +PREHOOK: query: Select `__partition`, `__offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +`unpatrolled` , `anonymous` , `robot` , added , deleted , delta +from kafka_table where (`__offset` > 7 and `__partition` = 0 and `__offset` <9 ) OR +`__offset` = 4 and `__partition` = 0 OR (`__offset` <= 1 and `__partition` = 0 and `__offset` > 0) +PREHOOK: type: QUERY +PREHOOK: Input: default@kafka_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select `__partition`, `__offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +`unpatrolled` , `anonymous` , `robot` , added , deleted , delta +from kafka_table where (`__offset` > 7 and `__partition` = 0 and `__offset` <9 ) OR +`__offset` = 4 and `__partition` = 0 OR (`__offset` <= 1 and `__partition` = 0 and `__offset` > 0) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@kafka_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +0 1 NULL Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 +0 4 NULL Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 +0 8 NULL Crimson Typhoon triplets zh China Asia wikipedia false true false true 905 5 900 +PREHOOK: query: Select `__key`,`__partition`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` = 5 +PREHOOK: type: QUERY +PREHOOK: Input: default@kafka_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select `__key`,`__partition`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` = 5 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@kafka_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +key 0 5 NULL Gypsy Danger nuclear +PREHOOK: query: Select `__key`,`__partition`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` < 5 +PREHOOK: type: QUERY +PREHOOK: Input: default@kafka_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select `__key`,`__partition`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` < 5 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@kafka_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +key 0 0 NULL Gypsy Danger nuclear +key 0 1 NULL Striker Eureka speed +key 0 2 NULL Cherno Alpha masterYi +key 0 3 NULL Crimson Typhoon triplets +key 0 4 NULL Coyote Tango stringer +PREHOOK: query: Select `__key`,`__partition`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` > 5 +PREHOOK: type: QUERY +PREHOOK: Input: default@kafka_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select `__key`,`__partition`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` > 5 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@kafka_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +key 0 6 NULL Striker Eureka speed +key 0 7 NULL Cherno Alpha masterYi +key 0 8 NULL Crimson Typhoon triplets +key 0 9 NULL Coyote Tango stringer +PREHOOK: query: Select `__partition`, `__offset`, `user` from kafka_table where +`__timestamp` > to_epoch_milli(CURRENT_TIMESTAMP - interval '1' HOURS) +PREHOOK: type: QUERY +PREHOOK: Input: default@kafka_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select `__partition`, `__offset`, `user` from kafka_table where +`__timestamp` > to_epoch_milli(CURRENT_TIMESTAMP - interval '1' HOURS) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@kafka_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +0 0 nuclear +0 1 speed +0 2 masterYi +0 3 triplets +0 4 stringer +0 5 nuclear +0 6 speed +0 7 masterYi +0 8 triplets +0 9 stringer +PREHOOK: query: Select count(*) from kafka_table where `__partition` = 1 +PREHOOK: type: QUERY +PREHOOK: Input: default@kafka_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select count(*) from kafka_table where `__partition` = 1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@kafka_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +0 +PREHOOK: query: Select count(*) from kafka_table where `__offset` = 100 +PREHOOK: type: QUERY +PREHOOK: Input: default@kafka_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select count(*) from kafka_table where `__offset` = 100 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@kafka_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +0 +PREHOOK: query: Select count(*) from kafka_table where `__offset` <= 100 and `__partition` <= 100 +PREHOOK: type: QUERY +PREHOOK: Input: default@kafka_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select count(*) from kafka_table where `__offset` <= 100 and `__partition` <= 100 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@kafka_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +10 +PREHOOK: query: Drop table kafka_table_offsets +PREHOOK: type: DROPTABLE +POSTHOOK: query: Drop table kafka_table_offsets +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table kafka_table_offsets(partition_id int, max_offset bigint, insert_time timestamp) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@kafka_table_offsets +POSTHOOK: query: create table kafka_table_offsets(partition_id int, max_offset bigint, insert_time timestamp) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@kafka_table_offsets +PREHOOK: query: insert overwrite table kafka_table_offsets select `__partition`, min(`__offset`) - 1, CURRENT_TIMESTAMP from kafka_table group by `__partition`, CURRENT_TIMESTAMP +PREHOOK: type: QUERY +PREHOOK: Input: default@kafka_table +PREHOOK: Output: default@kafka_table_offsets +POSTHOOK: query: insert overwrite table kafka_table_offsets select `__partition`, min(`__offset`) - 1, CURRENT_TIMESTAMP from kafka_table group by `__partition`, CURRENT_TIMESTAMP +POSTHOOK: type: QUERY +POSTHOOK: Input: default@kafka_table +POSTHOOK: Output: default@kafka_table_offsets +POSTHOOK: Lineage: kafka_table_offsets.insert_time SIMPLE [] +POSTHOOK: Lineage: kafka_table_offsets.max_offset EXPRESSION [(kafka_table)kafka_table.FieldSchema(name:__offset, type:bigint, comment:from deserializer), ] +POSTHOOK: Lineage: kafka_table_offsets.partition_id SIMPLE [(kafka_table)kafka_table.FieldSchema(name:__partition, type:int, comment:from deserializer), ] +PREHOOK: query: select partition_id, max_offset from kafka_table_offsets +PREHOOK: type: QUERY +PREHOOK: Input: default@kafka_table_offsets +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select partition_id, max_offset from kafka_table_offsets +POSTHOOK: type: QUERY +POSTHOOK: Input: default@kafka_table_offsets +POSTHOOK: Output: hdfs://### HDFS PATH ### +0 -1 +PREHOOK: query: Drop table orc_kafka_table +PREHOOK: type: DROPTABLE +POSTHOOK: query: Drop table orc_kafka_table +POSTHOOK: type: DROPTABLE +PREHOOK: query: Create table orc_kafka_table (partition_id int, row_offset bigint, kafka_ts bigint, + `__time` timestamp , `page` string, `user` string, `language` string, +`country` string,`continent` string, `namespace` string, `newPage` boolean, `unpatrolled` boolean, +`anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint +) stored as ORC +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@orc_kafka_table +POSTHOOK: query: Create table orc_kafka_table (partition_id int, row_offset bigint, kafka_ts bigint, + `__time` timestamp , `page` string, `user` string, `language` string, +`country` string,`continent` string, `namespace` string, `newPage` boolean, `unpatrolled` boolean, +`anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint +) stored as ORC +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@orc_kafka_table +PREHOOK: query: From kafka_table ktable JOIN kafka_table_offsets offset_table +on (ktable.`__partition` = offset_table.partition_id and ktable.`__offset` > offset_table.max_offset and ktable.`__offset` < 3 ) +insert into table orc_kafka_table select `__partition`, `__offset`, `__timestamp`, +`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +`unpatrolled` , `anonymous` , `robot` , added , deleted , delta +Insert overwrite table kafka_table_offsets select +`__partition`, max(`__offset`), CURRENT_TIMESTAMP group by `__partition`, CURRENT_TIMESTAMP +PREHOOK: type: QUERY +PREHOOK: Input: default@kafka_table +PREHOOK: Input: default@kafka_table_offsets +PREHOOK: Output: default@kafka_table_offsets +PREHOOK: Output: default@orc_kafka_table +POSTHOOK: query: From kafka_table ktable JOIN kafka_table_offsets offset_table +on (ktable.`__partition` = offset_table.partition_id and ktable.`__offset` > offset_table.max_offset and ktable.`__offset` < 3 ) +insert into table orc_kafka_table select `__partition`, `__offset`, `__timestamp`, +`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +`unpatrolled` , `anonymous` , `robot` , added , deleted , delta +Insert overwrite table kafka_table_offsets select +`__partition`, max(`__offset`), CURRENT_TIMESTAMP group by `__partition`, CURRENT_TIMESTAMP +POSTHOOK: type: QUERY +POSTHOOK: Input: default@kafka_table +POSTHOOK: Input: default@kafka_table_offsets +POSTHOOK: Output: default@kafka_table_offsets +POSTHOOK: Output: default@orc_kafka_table +POSTHOOK: Lineage: kafka_table_offsets.insert_time EXPRESSION [] +POSTHOOK: Lineage: kafka_table_offsets.max_offset EXPRESSION [(kafka_table)ktable.FieldSchema(name:__offset, type:bigint, comment:from deserializer), ] +POSTHOOK: Lineage: kafka_table_offsets.partition_id SIMPLE [(kafka_table)ktable.FieldSchema(name:__partition, type:int, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.__time SIMPLE [(kafka_table)ktable.FieldSchema(name:__time, type:timestamp, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.added SIMPLE [(kafka_table)ktable.FieldSchema(name:added, type:int, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.anonymous SIMPLE [(kafka_table)ktable.FieldSchema(name:anonymous, type:boolean, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.continent SIMPLE [(kafka_table)ktable.FieldSchema(name:continent, type:string, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.country SIMPLE [(kafka_table)ktable.FieldSchema(name:country, type:string, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.deleted SIMPLE [(kafka_table)ktable.FieldSchema(name:deleted, type:int, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.delta SIMPLE [(kafka_table)ktable.FieldSchema(name:delta, type:bigint, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.kafka_ts SIMPLE [(kafka_table)ktable.FieldSchema(name:__timestamp, type:bigint, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.language SIMPLE [(kafka_table)ktable.FieldSchema(name:language, type:string, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.namespace SIMPLE [(kafka_table)ktable.FieldSchema(name:namespace, type:string, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.newpage SIMPLE [(kafka_table)ktable.FieldSchema(name:newpage, type:boolean, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.page SIMPLE [(kafka_table)ktable.FieldSchema(name:page, type:string, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.partition_id SIMPLE [(kafka_table)ktable.FieldSchema(name:__partition, type:int, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.robot SIMPLE [(kafka_table)ktable.FieldSchema(name:robot, type:boolean, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.row_offset SIMPLE [(kafka_table)ktable.FieldSchema(name:__offset, type:bigint, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.unpatrolled SIMPLE [(kafka_table)ktable.FieldSchema(name:unpatrolled, type:boolean, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.user SIMPLE [(kafka_table)ktable.FieldSchema(name:user, type:string, comment:from deserializer), ] +PREHOOK: query: select count(*) from orc_kafka_table +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_kafka_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select count(*) from orc_kafka_table +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_kafka_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +3 +PREHOOK: query: select partition_id, max_offset from kafka_table_offsets +PREHOOK: type: QUERY +PREHOOK: Input: default@kafka_table_offsets +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select partition_id, max_offset from kafka_table_offsets +POSTHOOK: type: QUERY +POSTHOOK: Input: default@kafka_table_offsets +POSTHOOK: Output: hdfs://### HDFS PATH ### +0 2 +PREHOOK: query: select `partition_id`, `row_offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +`unpatrolled` , `anonymous` , `robot` , added , deleted , delta from orc_kafka_table +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_kafka_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select `partition_id`, `row_offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +`unpatrolled` , `anonymous` , `robot` , added , deleted , delta from orc_kafka_table +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_kafka_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +0 0 NULL Gypsy Danger nuclear en United States North America article true true false false 57 200 -143 +0 1 NULL Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 +0 2 NULL Cherno Alpha masterYi ru Russia Asia article true false false true 123 12 111 +PREHOOK: query: From kafka_table ktable JOIN kafka_table_offsets offset_table +on (ktable.`__partition` = offset_table.partition_id and ktable.`__offset` > offset_table.max_offset) +insert into table orc_kafka_table select `__partition`, `__offset`, `__timestamp`, +`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +`unpatrolled` , `anonymous` , `robot` , added , deleted , delta +Insert overwrite table kafka_table_offsets select +`__partition`, max(`__offset`), CURRENT_TIMESTAMP group by `__partition`, CURRENT_TIMESTAMP +PREHOOK: type: QUERY +PREHOOK: Input: default@kafka_table +PREHOOK: Input: default@kafka_table_offsets +PREHOOK: Output: default@kafka_table_offsets +PREHOOK: Output: default@orc_kafka_table +POSTHOOK: query: From kafka_table ktable JOIN kafka_table_offsets offset_table +on (ktable.`__partition` = offset_table.partition_id and ktable.`__offset` > offset_table.max_offset) +insert into table orc_kafka_table select `__partition`, `__offset`, `__timestamp`, +`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +`unpatrolled` , `anonymous` , `robot` , added , deleted , delta +Insert overwrite table kafka_table_offsets select +`__partition`, max(`__offset`), CURRENT_TIMESTAMP group by `__partition`, CURRENT_TIMESTAMP +POSTHOOK: type: QUERY +POSTHOOK: Input: default@kafka_table +POSTHOOK: Input: default@kafka_table_offsets +POSTHOOK: Output: default@kafka_table_offsets +POSTHOOK: Output: default@orc_kafka_table +POSTHOOK: Lineage: kafka_table_offsets.insert_time EXPRESSION [] +POSTHOOK: Lineage: kafka_table_offsets.max_offset EXPRESSION [(kafka_table)ktable.FieldSchema(name:__offset, type:bigint, comment:from deserializer), ] +POSTHOOK: Lineage: kafka_table_offsets.partition_id SIMPLE [(kafka_table)ktable.FieldSchema(name:__partition, type:int, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.__time SIMPLE [(kafka_table)ktable.FieldSchema(name:__time, type:timestamp, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.added SIMPLE [(kafka_table)ktable.FieldSchema(name:added, type:int, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.anonymous SIMPLE [(kafka_table)ktable.FieldSchema(name:anonymous, type:boolean, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.continent SIMPLE [(kafka_table)ktable.FieldSchema(name:continent, type:string, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.country SIMPLE [(kafka_table)ktable.FieldSchema(name:country, type:string, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.deleted SIMPLE [(kafka_table)ktable.FieldSchema(name:deleted, type:int, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.delta SIMPLE [(kafka_table)ktable.FieldSchema(name:delta, type:bigint, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.kafka_ts SIMPLE [(kafka_table)ktable.FieldSchema(name:__timestamp, type:bigint, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.language SIMPLE [(kafka_table)ktable.FieldSchema(name:language, type:string, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.namespace SIMPLE [(kafka_table)ktable.FieldSchema(name:namespace, type:string, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.newpage SIMPLE [(kafka_table)ktable.FieldSchema(name:newpage, type:boolean, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.page SIMPLE [(kafka_table)ktable.FieldSchema(name:page, type:string, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.partition_id SIMPLE [(kafka_table)ktable.FieldSchema(name:__partition, type:int, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.robot SIMPLE [(kafka_table)ktable.FieldSchema(name:robot, type:boolean, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.row_offset SIMPLE [(kafka_table)ktable.FieldSchema(name:__offset, type:bigint, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.unpatrolled SIMPLE [(kafka_table)ktable.FieldSchema(name:unpatrolled, type:boolean, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.user SIMPLE [(kafka_table)ktable.FieldSchema(name:user, type:string, comment:from deserializer), ] +PREHOOK: query: select partition_id, max_offset from kafka_table_offsets +PREHOOK: type: QUERY +PREHOOK: Input: default@kafka_table_offsets +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select partition_id, max_offset from kafka_table_offsets +POSTHOOK: type: QUERY +POSTHOOK: Input: default@kafka_table_offsets +POSTHOOK: Output: hdfs://### HDFS PATH ### +0 9 +PREHOOK: query: select count(*) from orc_kafka_table +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_kafka_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select count(*) from orc_kafka_table +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_kafka_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +10 +PREHOOK: query: select `partition_id`, `row_offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +`unpatrolled` , `anonymous` , `robot` , added , deleted , delta from orc_kafka_table +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_kafka_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select `partition_id`, `row_offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +`unpatrolled` , `anonymous` , `robot` , added , deleted , delta from orc_kafka_table +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_kafka_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +0 0 NULL Gypsy Danger nuclear en United States North America article true true false false 57 200 -143 +0 1 NULL Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 +0 2 NULL Cherno Alpha masterYi ru Russia Asia article true false false true 123 12 111 +0 3 NULL Crimson Typhoon triplets zh China Asia wikipedia false true false true 905 5 900 +0 4 NULL Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 +0 5 NULL Gypsy Danger nuclear en United States North America article true true false false 57 200 -143 +0 6 NULL Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 +0 7 NULL Cherno Alpha masterYi ru Russia Asia article true false false true 123 12 111 +0 8 NULL Crimson Typhoon triplets zh China Asia wikipedia false true false true 905 5 900 +0 9 NULL Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 +PREHOOK: query: Drop table kafka_table_offsets +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@kafka_table_offsets +PREHOOK: Output: default@kafka_table_offsets +POSTHOOK: query: Drop table kafka_table_offsets +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@kafka_table_offsets +POSTHOOK: Output: default@kafka_table_offsets +PREHOOK: query: create table kafka_table_offsets(partition_id int, max_offset bigint, insert_time timestamp) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@kafka_table_offsets +POSTHOOK: query: create table kafka_table_offsets(partition_id int, max_offset bigint, insert_time timestamp) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@kafka_table_offsets +PREHOOK: query: Drop table orc_kafka_table +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@orc_kafka_table +PREHOOK: Output: default@orc_kafka_table +POSTHOOK: query: Drop table orc_kafka_table +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@orc_kafka_table +POSTHOOK: Output: default@orc_kafka_table +PREHOOK: query: Create table orc_kafka_table (partition_id int, row_offset bigint, kafka_ts bigint, + `__time` timestamp , `page` string, `user` string, `language` string, +`country` string,`continent` string, `namespace` string, `newPage` boolean, `unpatrolled` boolean, +`anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint +) stored as ORC +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@orc_kafka_table +POSTHOOK: query: Create table orc_kafka_table (partition_id int, row_offset bigint, kafka_ts bigint, + `__time` timestamp , `page` string, `user` string, `language` string, +`country` string,`continent` string, `namespace` string, `newPage` boolean, `unpatrolled` boolean, +`anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint +) stored as ORC +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@orc_kafka_table +PREHOOK: query: From kafka_table ktable LEFT OUTER JOIN kafka_table_offsets offset_table +on (ktable.`__partition` = offset_table.partition_id and ktable.`__offset` > offset_table.max_offset ) +insert into table orc_kafka_table select `__partition`, `__offset`, `__timestamp`, +`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +`unpatrolled` , `anonymous` , `robot` , added , deleted , delta +Insert overwrite table kafka_table_offsets select +`__partition`, max(`__offset`), CURRENT_TIMESTAMP group by `__partition`, CURRENT_TIMESTAMP +PREHOOK: type: QUERY +PREHOOK: Input: default@kafka_table +PREHOOK: Input: default@kafka_table_offsets +PREHOOK: Output: default@kafka_table_offsets +PREHOOK: Output: default@orc_kafka_table +POSTHOOK: query: From kafka_table ktable LEFT OUTER JOIN kafka_table_offsets offset_table +on (ktable.`__partition` = offset_table.partition_id and ktable.`__offset` > offset_table.max_offset ) +insert into table orc_kafka_table select `__partition`, `__offset`, `__timestamp`, +`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +`unpatrolled` , `anonymous` , `robot` , added , deleted , delta +Insert overwrite table kafka_table_offsets select +`__partition`, max(`__offset`), CURRENT_TIMESTAMP group by `__partition`, CURRENT_TIMESTAMP +POSTHOOK: type: QUERY +POSTHOOK: Input: default@kafka_table +POSTHOOK: Input: default@kafka_table_offsets +POSTHOOK: Output: default@kafka_table_offsets +POSTHOOK: Output: default@orc_kafka_table +POSTHOOK: Lineage: kafka_table_offsets.insert_time EXPRESSION [] +POSTHOOK: Lineage: kafka_table_offsets.max_offset EXPRESSION [(kafka_table)ktable.FieldSchema(name:__offset, type:bigint, comment:from deserializer), ] +POSTHOOK: Lineage: kafka_table_offsets.partition_id SIMPLE [(kafka_table)ktable.FieldSchema(name:__partition, type:int, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.__time SIMPLE [(kafka_table)ktable.FieldSchema(name:__time, type:timestamp, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.added SIMPLE [(kafka_table)ktable.FieldSchema(name:added, type:int, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.anonymous SIMPLE [(kafka_table)ktable.FieldSchema(name:anonymous, type:boolean, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.continent SIMPLE [(kafka_table)ktable.FieldSchema(name:continent, type:string, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.country SIMPLE [(kafka_table)ktable.FieldSchema(name:country, type:string, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.deleted SIMPLE [(kafka_table)ktable.FieldSchema(name:deleted, type:int, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.delta SIMPLE [(kafka_table)ktable.FieldSchema(name:delta, type:bigint, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.kafka_ts SIMPLE [(kafka_table)ktable.FieldSchema(name:__timestamp, type:bigint, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.language SIMPLE [(kafka_table)ktable.FieldSchema(name:language, type:string, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.namespace SIMPLE [(kafka_table)ktable.FieldSchema(name:namespace, type:string, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.newpage SIMPLE [(kafka_table)ktable.FieldSchema(name:newpage, type:boolean, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.page SIMPLE [(kafka_table)ktable.FieldSchema(name:page, type:string, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.partition_id SIMPLE [(kafka_table)ktable.FieldSchema(name:__partition, type:int, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.robot SIMPLE [(kafka_table)ktable.FieldSchema(name:robot, type:boolean, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.row_offset SIMPLE [(kafka_table)ktable.FieldSchema(name:__offset, type:bigint, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.unpatrolled SIMPLE [(kafka_table)ktable.FieldSchema(name:unpatrolled, type:boolean, comment:from deserializer), ] +POSTHOOK: Lineage: orc_kafka_table.user SIMPLE [(kafka_table)ktable.FieldSchema(name:user, type:string, comment:from deserializer), ] +PREHOOK: query: select count(*) from orc_kafka_table +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_kafka_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select count(*) from orc_kafka_table +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_kafka_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +10 +PREHOOK: query: select partition_id, max_offset from kafka_table_offsets +PREHOOK: type: QUERY +PREHOOK: Input: default@kafka_table_offsets +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select partition_id, max_offset from kafka_table_offsets +POSTHOOK: type: QUERY +POSTHOOK: Input: default@kafka_table_offsets +POSTHOOK: Output: hdfs://### HDFS PATH ### +0 9 +PREHOOK: query: select `partition_id`, `row_offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +`unpatrolled` , `anonymous` , `robot` , added , deleted , delta from orc_kafka_table +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_kafka_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select `partition_id`, `row_offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +`unpatrolled` , `anonymous` , `robot` , added , deleted , delta from orc_kafka_table +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_kafka_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +0 0 NULL Gypsy Danger nuclear en United States North America article true true false false 57 200 -143 +0 1 NULL Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 +0 2 NULL Cherno Alpha masterYi ru Russia Asia article true false false true 123 12 111 +0 3 NULL Crimson Typhoon triplets zh China Asia wikipedia false true false true 905 5 900 +0 4 NULL Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 +0 5 NULL Gypsy Danger nuclear en United States North America article true true false false 57 200 -143 +0 6 NULL Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 +0 7 NULL Cherno Alpha masterYi ru Russia Asia article true false false true 123 12 111 +0 8 NULL Crimson Typhoon triplets zh China Asia wikipedia false true false true 905 5 900 +0 9 NULL Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 +PREHOOK: query: CREATE EXTERNAL TABLE kafka_table_2 +(`__time` timestamp with local time zone , `page` string, `user` string, `language` string, +`country` string,`continent` string, `namespace` string, `newPage` boolean, `unpatrolled` boolean, +`anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) +STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' +TBLPROPERTIES +("kafka.topic" = "test-topic", +"kafka.bootstrap.servers"="localhost:9092") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@kafka_table_2 +POSTHOOK: query: CREATE EXTERNAL TABLE kafka_table_2 +(`__time` timestamp with local time zone , `page` string, `user` string, `language` string, +`country` string,`continent` string, `namespace` string, `newPage` boolean, `unpatrolled` boolean, +`anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) +STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' +TBLPROPERTIES +("kafka.topic" = "test-topic", +"kafka.bootstrap.servers"="localhost:9092") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@kafka_table_2 +PREHOOK: query: Select `__partition`, `__offset`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +`unpatrolled` , `anonymous` , `robot` , added , deleted , delta +FROM kafka_table_2 +PREHOOK: type: QUERY +PREHOOK: Input: default@kafka_table_2 +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select `__partition`, `__offset`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` , +`unpatrolled` , `anonymous` , `robot` , added , deleted , delta +FROM kafka_table_2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@kafka_table_2 +POSTHOOK: Output: hdfs://### HDFS PATH ### +0 0 NULL Gypsy Danger nuclear en United States North America article true true false false 57 200 -143 +0 1 NULL Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 +0 2 NULL Cherno Alpha masterYi ru Russia Asia article true false false true 123 12 111 +0 3 NULL Crimson Typhoon triplets zh China Asia wikipedia false true false true 905 5 900 +0 4 NULL Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 +0 5 NULL Gypsy Danger nuclear en United States North America article true true false false 57 200 -143 +0 6 NULL Striker Eureka speed en Australia Australia wikipedia true false false true 459 129 330 +0 7 NULL Cherno Alpha masterYi ru Russia Asia article true false false true 123 12 111 +0 8 NULL Crimson Typhoon triplets zh China Asia wikipedia false true false true 905 5 900 +0 9 NULL Coyote Tango stringer ja Japan Asia wikipedia false true false true 1 10 -9 +PREHOOK: query: Select count(*) FROM kafka_table_2 +PREHOOK: type: QUERY +PREHOOK: Input: default@kafka_table_2 +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select count(*) FROM kafka_table_2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@kafka_table_2 +POSTHOOK: Output: hdfs://### HDFS PATH ### +10 +PREHOOK: query: CREATE EXTERNAL TABLE wiki_kafka_avro_table +STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' +TBLPROPERTIES +("kafka.topic" = "wiki_kafka_avro_table", +"kafka.bootstrap.servers"="localhost:9092", +"kafka.serde.class"="org.apache.hadoop.hive.serde2.avro.AvroSerDe", +'avro.schema.literal'='{ + "type" : "record", + "name" : "Wikipedia", + "namespace" : "org.apache.hive.kafka", + "version": "1", + "fields" : [ { + "name" : "isrobot", + "type" : "boolean" + }, { + "name" : "channel", + "type" : "string" + }, { + "name" : "timestamp", + "type" : "string" + }, { + "name" : "flags", + "type" : "string" + }, { + "name" : "isunpatrolled", + "type" : "boolean" + }, { + "name" : "page", + "type" : "string" + }, { + "name" : "diffurl", + "type" : "string" + }, { + "name" : "added", + "type" : "long" + }, { + "name" : "comment", + "type" : "string" + }, { + "name" : "commentlength", + "type" : "long" + }, { + "name" : "isnew", + "type" : "boolean" + }, { + "name" : "isminor", + "type" : "boolean" + }, { + "name" : "delta", + "type" : "long" + }, { + "name" : "isanonymous", + "type" : "boolean" + }, { + "name" : "user", + "type" : "string" + }, { + "name" : "deltabucket", + "type" : "double" + }, { + "name" : "deleted", + "type" : "long" + }, { + "name" : "namespace", + "type" : "string" + } ] +}' +) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@wiki_kafka_avro_table +POSTHOOK: query: CREATE EXTERNAL TABLE wiki_kafka_avro_table +STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' +TBLPROPERTIES +("kafka.topic" = "wiki_kafka_avro_table", +"kafka.bootstrap.servers"="localhost:9092", +"kafka.serde.class"="org.apache.hadoop.hive.serde2.avro.AvroSerDe", +'avro.schema.literal'='{ + "type" : "record", + "name" : "Wikipedia", + "namespace" : "org.apache.hive.kafka", + "version": "1", + "fields" : [ { + "name" : "isrobot", + "type" : "boolean" + }, { + "name" : "channel", + "type" : "string" + }, { + "name" : "timestamp", + "type" : "string" + }, { + "name" : "flags", + "type" : "string" + }, { + "name" : "isunpatrolled", + "type" : "boolean" + }, { + "name" : "page", + "type" : "string" + }, { + "name" : "diffurl", + "type" : "string" + }, { + "name" : "added", + "type" : "long" + }, { + "name" : "comment", + "type" : "string" + }, { + "name" : "commentlength", + "type" : "long" + }, { + "name" : "isnew", + "type" : "boolean" + }, { + "name" : "isminor", + "type" : "boolean" + }, { + "name" : "delta", + "type" : "long" + }, { + "name" : "isanonymous", + "type" : "boolean" + }, { + "name" : "user", + "type" : "string" + }, { + "name" : "deltabucket", + "type" : "double" + }, { + "name" : "deleted", + "type" : "long" + }, { + "name" : "namespace", + "type" : "string" + } ] +}' +) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@wiki_kafka_avro_table +PREHOOK: query: describe extended wiki_kafka_avro_table +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@wiki_kafka_avro_table +POSTHOOK: query: describe extended wiki_kafka_avro_table +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@wiki_kafka_avro_table +isrobot boolean from deserializer +channel string from deserializer +timestamp string from deserializer +flags string from deserializer +isunpatrolled boolean from deserializer +page string from deserializer +diffurl string from deserializer +added bigint from deserializer +comment string from deserializer +commentlength bigint from deserializer +isnew boolean from deserializer +isminor boolean from deserializer +delta bigint from deserializer +isanonymous boolean from deserializer +user string from deserializer +deltabucket double from deserializer +deleted bigint from deserializer +namespace string from deserializer +__key binary from deserializer +__partition int from deserializer +__offset bigint from deserializer +__timestamp bigint from deserializer + +#### A masked pattern was here #### +StorageHandlerInfo +Partition(topic = wiki_kafka_avro_table, partition = 0, leader = 1, replicas = [1], isr = [1], offlineReplicas = []) [start offset = [0], end offset = [11]] +PREHOOK: query: select cast (`__timestamp` as timestamp) as kafka_record_ts, `__partition`, `__offset`, `timestamp`, `user`, + `page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table +PREHOOK: type: QUERY +PREHOOK: Input: default@wiki_kafka_avro_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select cast (`__timestamp` as timestamp) as kafka_record_ts, `__partition`, `__offset`, `timestamp`, `user`, + `page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table +POSTHOOK: type: QUERY +POSTHOOK: Input: default@wiki_kafka_avro_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +2018-08-20 03:37:05.09 0 0 08/19/2018 20:37:05 test-user-0 page is 0 0 0.0 false 0 +2018-08-20 04:37:05.09 0 1 08/19/2018 21:37:05 test-user-1 page is 100 -1 100.4 true 1 +2018-08-20 05:37:05.09 0 2 08/19/2018 22:37:05 test-user-2 page is 200 -2 200.8 true 2 +2018-08-20 06:37:05.09 0 3 08/19/2018 23:37:05 test-user-3 page is 300 -3 301.20000000000005 false 3 +2018-08-20 07:37:05.09 0 4 08/20/2018 00:37:05 test-user-4 page is 400 -4 401.6 true 4 +2018-08-20 08:37:05.09 0 5 08/20/2018 01:37:05 test-user-5 page is 500 -5 502.0 true 5 +2018-08-20 09:37:05.09 0 6 08/20/2018 02:37:05 test-user-6 page is 600 -6 602.4000000000001 false 6 +2018-08-20 10:37:05.09 0 7 08/20/2018 03:37:05 test-user-7 page is 700 -7 702.8000000000001 true 7 +2018-08-20 11:37:05.09 0 8 08/20/2018 04:37:05 test-user-8 page is 800 -8 803.2 true 8 +2018-08-20 12:37:05.09 0 9 08/20/2018 05:37:05 test-user-9 page is 900 -9 903.6 false 9 +2018-08-20 13:37:05.09 0 10 08/20/2018 06:37:05 test-user-10 page is 1000 -10 1004.0 true 10 +PREHOOK: query: select count(*) from wiki_kafka_avro_table +PREHOOK: type: QUERY +PREHOOK: Input: default@wiki_kafka_avro_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select count(*) from wiki_kafka_avro_table +POSTHOOK: type: QUERY +POSTHOOK: Input: default@wiki_kafka_avro_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +11 +PREHOOK: query: select count(distinct `user`) from wiki_kafka_avro_table +PREHOOK: type: QUERY +PREHOOK: Input: default@wiki_kafka_avro_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select count(distinct `user`) from wiki_kafka_avro_table +POSTHOOK: type: QUERY +POSTHOOK: Input: default@wiki_kafka_avro_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +11 +PREHOOK: query: select sum(deltabucket), min(commentlength) from wiki_kafka_avro_table +PREHOOK: type: QUERY +PREHOOK: Input: default@wiki_kafka_avro_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select sum(deltabucket), min(commentlength) from wiki_kafka_avro_table +POSTHOOK: type: QUERY +POSTHOOK: Input: default@wiki_kafka_avro_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +5522.000000000001 0 +PREHOOK: query: select cast (`__timestamp` as timestamp) as kafka_record_ts, `__timestamp` as kafka_record_ts_long, +`__partition`, `__key`, `__offset`, `timestamp`, `user`, `page`, `deleted`, `deltabucket`, +`isanonymous`, `commentlength` from wiki_kafka_avro_table where `__timestamp` > 1534750625090 +PREHOOK: type: QUERY +PREHOOK: Input: default@wiki_kafka_avro_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select cast (`__timestamp` as timestamp) as kafka_record_ts, `__timestamp` as kafka_record_ts_long, +`__partition`, `__key`, `__offset`, `timestamp`, `user`, `page`, `deleted`, `deltabucket`, +`isanonymous`, `commentlength` from wiki_kafka_avro_table where `__timestamp` > 1534750625090 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@wiki_kafka_avro_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +2018-08-20 08:37:05.09 1534754225090 0 key-5 5 08/20/2018 01:37:05 test-user-5 page is 500 -5 502.0 true 5 +2018-08-20 09:37:05.09 1534757825090 0 key-6 6 08/20/2018 02:37:05 test-user-6 page is 600 -6 602.4000000000001 false 6 +2018-08-20 10:37:05.09 1534761425090 0 key-7 7 08/20/2018 03:37:05 test-user-7 page is 700 -7 702.8000000000001 true 7 +2018-08-20 11:37:05.09 1534765025090 0 key-8 8 08/20/2018 04:37:05 test-user-8 page is 800 -8 803.2 true 8 +2018-08-20 12:37:05.09 1534768625090 0 key-9 9 08/20/2018 05:37:05 test-user-9 page is 900 -9 903.6 false 9 +2018-08-20 13:37:05.09 1534772225090 0 key-10 10 08/20/2018 06:37:05 test-user-10 page is 1000 -10 1004.0 true 10 +PREHOOK: query: CREATE EXTERNAL TABLE kafka_table_insert +(c_name string, c_int int, c_float float) +STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' +WITH SERDEPROPERTIES ("timestamp.formats"="yyyy-MM-dd\'T\'HH:mm:ss\'Z\'") +TBLPROPERTIES +("kafka.topic" = "test-topic-write-json", +"kafka.bootstrap.servers"="localhost:9092", +"kafka.serde.class"="org.apache.hadoop.hive.serde2.JsonSerDe") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@kafka_table_insert +POSTHOOK: query: CREATE EXTERNAL TABLE kafka_table_insert +(c_name string, c_int int, c_float float) +STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' +WITH SERDEPROPERTIES ("timestamp.formats"="yyyy-MM-dd\'T\'HH:mm:ss\'Z\'") +TBLPROPERTIES +("kafka.topic" = "test-topic-write-json", +"kafka.bootstrap.servers"="localhost:9092", +"kafka.serde.class"="org.apache.hadoop.hive.serde2.JsonSerDe") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@kafka_table_insert +PREHOOK: query: insert into table kafka_table_insert (c_name,c_int, c_float,`__key`, `__partition`, `__offset`, `__timestamp`) +values ('test1',5, 4.999,'key',null ,-1,1536449552290) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@kafka_table_insert +POSTHOOK: query: insert into table kafka_table_insert (c_name,c_int, c_float,`__key`, `__partition`, `__offset`, `__timestamp`) +values ('test1',5, 4.999,'key',null ,-1,1536449552290) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@kafka_table_insert +PREHOOK: query: insert into table kafka_table_insert (c_name,c_int, c_float, `__key`, `__partition`, `__offset`, `__timestamp`) +values ('test2',15, 14.9996666, null ,null ,-1,1536449552285) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@kafka_table_insert +POSTHOOK: query: insert into table kafka_table_insert (c_name,c_int, c_float, `__key`, `__partition`, `__offset`, `__timestamp`) +values ('test2',15, 14.9996666, null ,null ,-1,1536449552285) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@kafka_table_insert +PREHOOK: query: select * from kafka_table_insert +PREHOOK: type: QUERY +PREHOOK: Input: default@kafka_table_insert +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select * from kafka_table_insert +POSTHOOK: type: QUERY +POSTHOOK: Input: default@kafka_table_insert +POSTHOOK: Output: hdfs://### HDFS PATH ### +test1 5 4.999 key 0 0 1536449552290 +test2 15 14.999666 NULL 0 1 1536449552285 +PREHOOK: query: insert into table wiki_kafka_avro_table select +isrobot as isrobot, channel as channel,`timestamp` as `timestamp`, flags as flags, isunpatrolled as isunpatrolled, page as page, +diffurl as diffurl, added as added, comment as comment, commentlength as commentlength, isnew as isnew, isminor as isminor, +delta as delta, isanonymous as isanonymous, `user` as `user`, deltabucket as detlabucket, deleted as deleted, namespace as namespace, +`__key`, `__partition`, -1 as `__offset`,`__timestamp` +from wiki_kafka_avro_table +PREHOOK: type: QUERY +PREHOOK: Input: default@wiki_kafka_avro_table +PREHOOK: Output: default@wiki_kafka_avro_table +POSTHOOK: query: insert into table wiki_kafka_avro_table select +isrobot as isrobot, channel as channel,`timestamp` as `timestamp`, flags as flags, isunpatrolled as isunpatrolled, page as page, +diffurl as diffurl, added as added, comment as comment, commentlength as commentlength, isnew as isnew, isminor as isminor, +delta as delta, isanonymous as isanonymous, `user` as `user`, deltabucket as detlabucket, deleted as deleted, namespace as namespace, +`__key`, `__partition`, -1 as `__offset`,`__timestamp` +from wiki_kafka_avro_table +POSTHOOK: type: QUERY +POSTHOOK: Input: default@wiki_kafka_avro_table +POSTHOOK: Output: default@wiki_kafka_avro_table +PREHOOK: query: select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__partition`, `__offset`, `timestamp`, `user`, +`page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table +PREHOOK: type: QUERY +PREHOOK: Input: default@wiki_kafka_avro_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__partition`, `__offset`, `timestamp`, `user`, +`page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table +POSTHOOK: type: QUERY +POSTHOOK: Input: default@wiki_kafka_avro_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +2018-08-20 03:37:05.09 0 0 08/19/2018 20:37:05 test-user-0 page is 0 0 0.0 false 0 +2018-08-20 04:37:05.09 0 1 08/19/2018 21:37:05 test-user-1 page is 100 -1 100.4 true 1 +2018-08-20 05:37:05.09 0 2 08/19/2018 22:37:05 test-user-2 page is 200 -2 200.8 true 2 +2018-08-20 06:37:05.09 0 3 08/19/2018 23:37:05 test-user-3 page is 300 -3 301.20000000000005 false 3 +2018-08-20 07:37:05.09 0 4 08/20/2018 00:37:05 test-user-4 page is 400 -4 401.6 true 4 +2018-08-20 08:37:05.09 0 5 08/20/2018 01:37:05 test-user-5 page is 500 -5 502.0 true 5 +2018-08-20 09:37:05.09 0 6 08/20/2018 02:37:05 test-user-6 page is 600 -6 602.4000000000001 false 6 +2018-08-20 10:37:05.09 0 7 08/20/2018 03:37:05 test-user-7 page is 700 -7 702.8000000000001 true 7 +2018-08-20 11:37:05.09 0 8 08/20/2018 04:37:05 test-user-8 page is 800 -8 803.2 true 8 +2018-08-20 12:37:05.09 0 9 08/20/2018 05:37:05 test-user-9 page is 900 -9 903.6 false 9 +2018-08-20 13:37:05.09 0 10 08/20/2018 06:37:05 test-user-10 page is 1000 -10 1004.0 true 10 +2018-08-20 03:37:05.09 0 11 08/19/2018 20:37:05 test-user-0 page is 0 0 0.0 false 0 +2018-08-20 04:37:05.09 0 12 08/19/2018 21:37:05 test-user-1 page is 100 -1 100.4 true 1 +2018-08-20 05:37:05.09 0 13 08/19/2018 22:37:05 test-user-2 page is 200 -2 200.8 true 2 +2018-08-20 06:37:05.09 0 14 08/19/2018 23:37:05 test-user-3 page is 300 -3 301.20000000000005 false 3 +2018-08-20 07:37:05.09 0 15 08/20/2018 00:37:05 test-user-4 page is 400 -4 401.6 true 4 +2018-08-20 08:37:05.09 0 16 08/20/2018 01:37:05 test-user-5 page is 500 -5 502.0 true 5 +2018-08-20 09:37:05.09 0 17 08/20/2018 02:37:05 test-user-6 page is 600 -6 602.4000000000001 false 6 +2018-08-20 10:37:05.09 0 18 08/20/2018 03:37:05 test-user-7 page is 700 -7 702.8000000000001 true 7 +2018-08-20 11:37:05.09 0 19 08/20/2018 04:37:05 test-user-8 page is 800 -8 803.2 true 8 +2018-08-20 12:37:05.09 0 20 08/20/2018 05:37:05 test-user-9 page is 900 -9 903.6 false 9 +2018-08-20 13:37:05.09 0 21 08/20/2018 06:37:05 test-user-10 page is 1000 -10 1004.0 true 10 +PREHOOK: query: select `__key`, count(1) FROM wiki_kafka_avro_table group by `__key` order by `__key` +PREHOOK: type: QUERY +PREHOOK: Input: default@wiki_kafka_avro_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select `__key`, count(1) FROM wiki_kafka_avro_table group by `__key` order by `__key` +POSTHOOK: type: QUERY +POSTHOOK: Input: default@wiki_kafka_avro_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +key-0 2 +key-1 2 +key-10 2 +key-2 2 +key-3 2 +key-4 2 +key-5 2 +key-6 2 +key-7 2 +key-8 2 +key-9 2 +PREHOOK: query: select `__timestamp`, count(1) from wiki_kafka_avro_table group by `__timestamp` order by `__timestamp` +PREHOOK: type: QUERY +PREHOOK: Input: default@wiki_kafka_avro_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select `__timestamp`, count(1) from wiki_kafka_avro_table group by `__timestamp` order by `__timestamp` +POSTHOOK: type: QUERY +POSTHOOK: Input: default@wiki_kafka_avro_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +1534736225090 2 +1534739825090 2 +1534743425090 2 +1534747025090 2 +1534750625090 2 +1534754225090 2 +1534757825090 2 +1534761425090 2 +1534765025090 2 +1534768625090 2 +1534772225090 2 +PREHOOK: query: CREATE EXTERNAL TABLE kafka_table_csv +(c_name string, c_int int, c_float float) +STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' +TBLPROPERTIES +("kafka.topic" = "test-topic-write-csv", +"kafka.bootstrap.servers"="localhost:9092", +"kafka.serde.class"="org.apache.hadoop.hive.serde2.OpenCSVSerde") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@kafka_table_csv +POSTHOOK: query: CREATE EXTERNAL TABLE kafka_table_csv +(c_name string, c_int int, c_float float) +STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' +TBLPROPERTIES +("kafka.topic" = "test-topic-write-csv", +"kafka.bootstrap.servers"="localhost:9092", +"kafka.serde.class"="org.apache.hadoop.hive.serde2.OpenCSVSerde") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@kafka_table_csv +PREHOOK: query: ALTER TABLE kafka_table_csv SET TBLPROPERTIES ("hive.kafka.optimistic.commit"="false", "kafka.write.semantic"="EXACTLY_ONCE") +PREHOOK: type: ALTERTABLE_PROPERTIES +PREHOOK: Input: default@kafka_table_csv +PREHOOK: Output: default@kafka_table_csv +POSTHOOK: query: ALTER TABLE kafka_table_csv SET TBLPROPERTIES ("hive.kafka.optimistic.commit"="false", "kafka.write.semantic"="EXACTLY_ONCE") +POSTHOOK: type: ALTERTABLE_PROPERTIES +POSTHOOK: Input: default@kafka_table_csv +POSTHOOK: Output: default@kafka_table_csv +PREHOOK: query: insert into table kafka_table_csv select c_name, c_int, c_float, `__key`, `__partition`, -1 as `__offset`, `__timestamp` from kafka_table_insert +PREHOOK: type: QUERY +PREHOOK: Input: default@kafka_table_insert +PREHOOK: Output: default@kafka_table_csv +POSTHOOK: query: insert into table kafka_table_csv select c_name, c_int, c_float, `__key`, `__partition`, -1 as `__offset`, `__timestamp` from kafka_table_insert +POSTHOOK: type: QUERY +POSTHOOK: Input: default@kafka_table_insert +POSTHOOK: Output: default@kafka_table_csv +PREHOOK: query: insert into table kafka_table_csv (c_name,c_int, c_float,`__key`, `__partition`, `__offset`, `__timestamp`) +values ('test4',-5, -4.999,'key-2',null ,-1,1536449552291) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@kafka_table_csv +POSTHOOK: query: insert into table kafka_table_csv (c_name,c_int, c_float,`__key`, `__partition`, `__offset`, `__timestamp`) +values ('test4',-5, -4.999,'key-2',null ,-1,1536449552291) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@kafka_table_csv +PREHOOK: query: insert into table kafka_table_csv (c_name,c_int, c_float, `__key`, `__partition`, `__offset`, `__timestamp`) +values ('test5',-15, -14.9996666, 'key-3' ,null ,-1,1536449552284) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@kafka_table_csv +POSTHOOK: query: insert into table kafka_table_csv (c_name,c_int, c_float, `__key`, `__partition`, `__offset`, `__timestamp`) +values ('test5',-15, -14.9996666, 'key-3' ,null ,-1,1536449552284) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@kafka_table_csv +PREHOOK: query: select * from kafka_table_csv +PREHOOK: type: QUERY +PREHOOK: Input: default@kafka_table_csv +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select * from kafka_table_csv +POSTHOOK: type: QUERY +POSTHOOK: Input: default@kafka_table_csv +POSTHOOK: Output: hdfs://### HDFS PATH ### +test1 5 4.999 key 0 0 1536449552290 +test2 15 14.999666 NULL 0 1 1536449552285 +test4 -5 -4.999 key-2 0 3 1536449552291 +test5 -15 -14.9996666 key-3 0 5 1536449552284 +PREHOOK: query: select distinct `__key`, c_name from kafka_table_csv +PREHOOK: type: QUERY +PREHOOK: Input: default@kafka_table_csv +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select distinct `__key`, c_name from kafka_table_csv +POSTHOOK: type: QUERY +POSTHOOK: Input: default@kafka_table_csv +POSTHOOK: Output: hdfs://### HDFS PATH ### +key test1 +key-2 test4 +key-3 test5 +NULL test2 +PREHOOK: query: explain extended select distinct `__offset`, cast(`__timestamp` as timestamp ) , `__key` from wiki_kafka_avro_table +PREHOOK: type: QUERY +PREHOOK: Input: default@wiki_kafka_avro_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: explain extended select distinct `__offset`, cast(`__timestamp` as timestamp ) , `__key` from wiki_kafka_avro_table +POSTHOOK: type: QUERY +POSTHOOK: Input: default@wiki_kafka_avro_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +OPTIMIZED SQL: SELECT `__offset` AS `$f0`, CAST(`__timestamp` AS TIMESTAMP(9)) AS `$f1`, `__key` AS `$f2` +FROM `default`.`wiki_kafka_avro_table` +GROUP BY `__offset`, CAST(`__timestamp` AS TIMESTAMP(9)), `__key` +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: wiki_kafka_avro_table + Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: NONE + GatherStats: false + Select Operator + expressions: __offset (type: bigint), CAST( __timestamp AS TIMESTAMP) (type: timestamp), __key (type: binary) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: NONE + Group By Operator + keys: _col0 (type: bigint), _col1 (type: timestamp), _col2 (type: binary) + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: bigint), _col1 (type: timestamp), _col2 (type: binary) + null sort order: aaa + sort order: +++ + Map-reduce partition columns: _col0 (type: bigint), _col1 (type: timestamp), _col2 (type: binary) + Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: NONE + tag: -1 + auto parallelism: true + Execution mode: llap + LLAP IO: no inputs + Path -> Alias: + hdfs://### HDFS PATH ### [wiki_kafka_avro_table] + Path -> Partition: + hdfs://### HDFS PATH ### + Partition + base file name: wiki_kafka_avro_table + input format: org.apache.hadoop.hive.kafka.KafkaInputFormat + output format: org.apache.hadoop.hive.kafka.KafkaOutputFormat + properties: + EXTERNAL TRUE + avro.schema.literal { + "type" : "record", + "name" : "Wikipedia", + "namespace" : "org.apache.hive.kafka", + "version": "1", + "fields" : [ { + "name" : "isrobot", + "type" : "boolean" + }, { + "name" : "channel", + "type" : "string" + }, { + "name" : "timestamp", + "type" : "string" + }, { + "name" : "flags", + "type" : "string" + }, { + "name" : "isunpatrolled", + "type" : "boolean" + }, { + "name" : "page", + "type" : "string" + }, { + "name" : "diffurl", + "type" : "string" + }, { + "name" : "added", + "type" : "long" + }, { + "name" : "comment", + "type" : "string" + }, { + "name" : "commentlength", + "type" : "long" + }, { + "name" : "isnew", + "type" : "boolean" + }, { + "name" : "isminor", + "type" : "boolean" + }, { + "name" : "delta", + "type" : "long" + }, { + "name" : "isanonymous", + "type" : "boolean" + }, { + "name" : "user", + "type" : "string" + }, { + "name" : "deltabucket", + "type" : "double" + }, { + "name" : "deleted", + "type" : "long" + }, { + "name" : "namespace", + "type" : "string" + } ] +} + bucket_count -1 + bucketing_version 2 + column.name.delimiter , + columns isrobot,channel,timestamp,flags,isunpatrolled,page,diffurl,added,comment,commentlength,isnew,isminor,delta,isanonymous,user,deltabucket,deleted,namespace,__key,__partition,__offset,__timestamp + columns.comments 'from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer' + columns.types boolean:string:string:string:boolean:string:string:bigint:string:bigint:boolean:boolean:bigint:boolean:string:double:bigint:string:binary:int:bigint:bigint + discover.partitions true +#### A masked pattern was here #### + hive.kafka.max.retries 6 + hive.kafka.metadata.poll.timeout.ms 30000 + hive.kafka.optimistic.commit false + hive.kafka.poll.timeout.ms 5000 + kafka.bootstrap.servers localhost:9092 + kafka.serde.class org.apache.hadoop.hive.serde2.avro.AvroSerDe + kafka.topic wiki_kafka_avro_table + kafka.write.semantic AT_LEAST_ONCE +#### A masked pattern was here #### + location hdfs://### HDFS PATH ### + name default.wiki_kafka_avro_table + numFiles 0 + numRows 0 + rawDataSize 0 + serialization.ddl struct wiki_kafka_avro_table { bool isrobot, string channel, string timestamp, string flags, bool isunpatrolled, string page, string diffurl, i64 added, string comment, i64 commentlength, bool isnew, bool isminor, i64 delta, bool isanonymous, string user, double deltabucket, i64 deleted, string namespace, binary __key, i32 __partition, i64 __offset, i64 __timestamp} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.kafka.KafkaSerDe + storage_handler org.apache.hadoop.hive.kafka.KafkaStorageHandler + totalSize 0 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.kafka.KafkaSerDe + + input format: org.apache.hadoop.hive.kafka.KafkaInputFormat + jobProperties: + hive.kafka.max.retries 6 + hive.kafka.metadata.poll.timeout.ms 30000 + hive.kafka.optimistic.commit false + hive.kafka.poll.timeout.ms 5000 + kafka.bootstrap.servers localhost:9092 + kafka.serde.class org.apache.hadoop.hive.serde2.avro.AvroSerDe + kafka.topic wiki_kafka_avro_table + kafka.write.semantic AT_LEAST_ONCE + output format: org.apache.hadoop.hive.kafka.KafkaOutputFormat + properties: + EXTERNAL TRUE + avro.schema.literal { + "type" : "record", + "name" : "Wikipedia", + "namespace" : "org.apache.hive.kafka", + "version": "1", + "fields" : [ { + "name" : "isrobot", + "type" : "boolean" + }, { + "name" : "channel", + "type" : "string" + }, { + "name" : "timestamp", + "type" : "string" + }, { + "name" : "flags", + "type" : "string" + }, { + "name" : "isunpatrolled", + "type" : "boolean" + }, { + "name" : "page", + "type" : "string" + }, { + "name" : "diffurl", + "type" : "string" + }, { + "name" : "added", + "type" : "long" + }, { + "name" : "comment", + "type" : "string" + }, { + "name" : "commentlength", + "type" : "long" + }, { + "name" : "isnew", + "type" : "boolean" + }, { + "name" : "isminor", + "type" : "boolean" + }, { + "name" : "delta", + "type" : "long" + }, { + "name" : "isanonymous", + "type" : "boolean" + }, { + "name" : "user", + "type" : "string" + }, { + "name" : "deltabucket", + "type" : "double" + }, { + "name" : "deleted", + "type" : "long" + }, { + "name" : "namespace", + "type" : "string" + } ] +} + bucket_count -1 + bucketing_version 2 + column.name.delimiter , + columns isrobot,channel,timestamp,flags,isunpatrolled,page,diffurl,added,comment,commentlength,isnew,isminor,delta,isanonymous,user,deltabucket,deleted,namespace,__key,__partition,__offset,__timestamp + columns.comments 'from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer' + columns.types boolean:string:string:string:boolean:string:string:bigint:string:bigint:boolean:boolean:bigint:boolean:string:double:bigint:string:binary:int:bigint:bigint + discover.partitions true +#### A masked pattern was here #### + hive.kafka.max.retries 6 + hive.kafka.metadata.poll.timeout.ms 30000 + hive.kafka.optimistic.commit false + hive.kafka.poll.timeout.ms 5000 + kafka.bootstrap.servers localhost:9092 + kafka.serde.class org.apache.hadoop.hive.serde2.avro.AvroSerDe + kafka.topic wiki_kafka_avro_table + kafka.write.semantic AT_LEAST_ONCE +#### A masked pattern was here #### + location hdfs://### HDFS PATH ### + name default.wiki_kafka_avro_table + numFiles 0 + numRows 0 + rawDataSize 0 + serialization.ddl struct wiki_kafka_avro_table { bool isrobot, string channel, string timestamp, string flags, bool isunpatrolled, string page, string diffurl, i64 added, string comment, i64 commentlength, bool isnew, bool isminor, i64 delta, bool isanonymous, string user, double deltabucket, i64 deleted, string namespace, binary __key, i32 __partition, i64 __offset, i64 __timestamp} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.kafka.KafkaSerDe + storage_handler org.apache.hadoop.hive.kafka.KafkaStorageHandler + totalSize 0 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.kafka.KafkaSerDe + name: default.wiki_kafka_avro_table + name: default.wiki_kafka_avro_table + Truncated Path -> Alias: + /wiki_kafka_avro_table [wiki_kafka_avro_table] + Reducer 2 + Execution mode: llap + Needs Tagging: false + Reduce Operator Tree: + Group By Operator + keys: KEY._col0 (type: bigint), KEY._col1 (type: timestamp), KEY._col2 (type: binary) + mode: mergepartial + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + GlobalTableId: 0 + directory: hdfs://### HDFS PATH ### + NumFilesPerFileSink: 1 + Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: NONE + Stats Publishing Key Prefix: hdfs://### HDFS PATH ### + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns _col0,_col1,_col2 + columns.types bigint:timestamp:binary + escape.delim \ + hive.serialization.extend.additional.nesting.levels true + serialization.escape.crlf true + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + TotalFiles: 1 + GatherStats: false + MultiFileSpray: false + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select distinct `__offset`, cast(`__timestamp` as timestamp ) , `__key` from wiki_kafka_avro_table +PREHOOK: type: QUERY +PREHOOK: Input: default@wiki_kafka_avro_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select distinct `__offset`, cast(`__timestamp` as timestamp ) , `__key` from wiki_kafka_avro_table +POSTHOOK: type: QUERY +POSTHOOK: Input: default@wiki_kafka_avro_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +0 2018-08-20 03:37:05.09 key-0 +1 2018-08-20 04:37:05.09 key-1 +3 2018-08-20 06:37:05.09 key-3 +4 2018-08-20 07:37:05.09 key-4 +5 2018-08-20 08:37:05.09 key-5 +7 2018-08-20 10:37:05.09 key-7 +8 2018-08-20 11:37:05.09 key-8 +9 2018-08-20 12:37:05.09 key-9 +10 2018-08-20 13:37:05.09 key-10 +11 2018-08-20 03:37:05.09 key-0 +13 2018-08-20 05:37:05.09 key-2 +15 2018-08-20 07:37:05.09 key-4 +17 2018-08-20 09:37:05.09 key-6 +19 2018-08-20 11:37:05.09 key-8 +21 2018-08-20 13:37:05.09 key-10 +2 2018-08-20 05:37:05.09 key-2 +6 2018-08-20 09:37:05.09 key-6 +12 2018-08-20 04:37:05.09 key-1 +14 2018-08-20 06:37:05.09 key-3 +16 2018-08-20 08:37:05.09 key-5 +18 2018-08-20 10:37:05.09 key-7 +20 2018-08-20 12:37:05.09 key-9 +PREHOOK: query: explain extended select distinct `__offset`, cast(`__timestamp` as timestamp ) , `__key` from wiki_kafka_avro_table +PREHOOK: type: QUERY +PREHOOK: Input: default@wiki_kafka_avro_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: explain extended select distinct `__offset`, cast(`__timestamp` as timestamp ) , `__key` from wiki_kafka_avro_table +POSTHOOK: type: QUERY +POSTHOOK: Input: default@wiki_kafka_avro_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +OPTIMIZED SQL: SELECT `__offset` AS `$f0`, CAST(`__timestamp` AS TIMESTAMP(9)) AS `$f1`, `__key` AS `$f2` +FROM `default`.`wiki_kafka_avro_table` +GROUP BY `__offset`, CAST(`__timestamp` AS TIMESTAMP(9)), `__key` +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: wiki_kafka_avro_table + Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: NONE + GatherStats: false + Select Operator + expressions: __offset (type: bigint), CAST( __timestamp AS TIMESTAMP) (type: timestamp), __key (type: binary) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: NONE + Group By Operator + keys: _col0 (type: bigint), _col1 (type: timestamp), _col2 (type: binary) + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: bigint), _col1 (type: timestamp), _col2 (type: binary) + null sort order: aaa + sort order: +++ + Map-reduce partition columns: _col0 (type: bigint), _col1 (type: timestamp), _col2 (type: binary) + Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: NONE + tag: -1 + auto parallelism: true + Execution mode: vectorized, llap + LLAP IO: no inputs + Path -> Alias: + hdfs://### HDFS PATH ### [wiki_kafka_avro_table] + Path -> Partition: + hdfs://### HDFS PATH ### + Partition + base file name: wiki_kafka_avro_table + input format: org.apache.hadoop.hive.kafka.KafkaInputFormat + output format: org.apache.hadoop.hive.kafka.KafkaOutputFormat + properties: + EXTERNAL TRUE + avro.schema.literal { + "type" : "record", + "name" : "Wikipedia", + "namespace" : "org.apache.hive.kafka", + "version": "1", + "fields" : [ { + "name" : "isrobot", + "type" : "boolean" + }, { + "name" : "channel", + "type" : "string" + }, { + "name" : "timestamp", + "type" : "string" + }, { + "name" : "flags", + "type" : "string" + }, { + "name" : "isunpatrolled", + "type" : "boolean" + }, { + "name" : "page", + "type" : "string" + }, { + "name" : "diffurl", + "type" : "string" + }, { + "name" : "added", + "type" : "long" + }, { + "name" : "comment", + "type" : "string" + }, { + "name" : "commentlength", + "type" : "long" + }, { + "name" : "isnew", + "type" : "boolean" + }, { + "name" : "isminor", + "type" : "boolean" + }, { + "name" : "delta", + "type" : "long" + }, { + "name" : "isanonymous", + "type" : "boolean" + }, { + "name" : "user", + "type" : "string" + }, { + "name" : "deltabucket", + "type" : "double" + }, { + "name" : "deleted", + "type" : "long" + }, { + "name" : "namespace", + "type" : "string" + } ] +} + bucket_count -1 + bucketing_version 2 + column.name.delimiter , + columns isrobot,channel,timestamp,flags,isunpatrolled,page,diffurl,added,comment,commentlength,isnew,isminor,delta,isanonymous,user,deltabucket,deleted,namespace,__key,__partition,__offset,__timestamp + columns.comments 'from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer' + columns.types boolean:string:string:string:boolean:string:string:bigint:string:bigint:boolean:boolean:bigint:boolean:string:double:bigint:string:binary:int:bigint:bigint + discover.partitions true +#### A masked pattern was here #### + hive.kafka.max.retries 6 + hive.kafka.metadata.poll.timeout.ms 30000 + hive.kafka.optimistic.commit false + hive.kafka.poll.timeout.ms 5000 + kafka.bootstrap.servers localhost:9092 + kafka.serde.class org.apache.hadoop.hive.serde2.avro.AvroSerDe + kafka.topic wiki_kafka_avro_table + kafka.write.semantic AT_LEAST_ONCE +#### A masked pattern was here #### + location hdfs://### HDFS PATH ### + name default.wiki_kafka_avro_table + numFiles 0 + numRows 0 + rawDataSize 0 + serialization.ddl struct wiki_kafka_avro_table { bool isrobot, string channel, string timestamp, string flags, bool isunpatrolled, string page, string diffurl, i64 added, string comment, i64 commentlength, bool isnew, bool isminor, i64 delta, bool isanonymous, string user, double deltabucket, i64 deleted, string namespace, binary __key, i32 __partition, i64 __offset, i64 __timestamp} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.kafka.KafkaSerDe + storage_handler org.apache.hadoop.hive.kafka.KafkaStorageHandler + totalSize 0 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.kafka.KafkaSerDe + + input format: org.apache.hadoop.hive.kafka.KafkaInputFormat + jobProperties: + hive.kafka.max.retries 6 + hive.kafka.metadata.poll.timeout.ms 30000 + hive.kafka.optimistic.commit false + hive.kafka.poll.timeout.ms 5000 + kafka.bootstrap.servers localhost:9092 + kafka.serde.class org.apache.hadoop.hive.serde2.avro.AvroSerDe + kafka.topic wiki_kafka_avro_table + kafka.write.semantic AT_LEAST_ONCE + output format: org.apache.hadoop.hive.kafka.KafkaOutputFormat + properties: + EXTERNAL TRUE + avro.schema.literal { + "type" : "record", + "name" : "Wikipedia", + "namespace" : "org.apache.hive.kafka", + "version": "1", + "fields" : [ { + "name" : "isrobot", + "type" : "boolean" + }, { + "name" : "channel", + "type" : "string" + }, { + "name" : "timestamp", + "type" : "string" + }, { + "name" : "flags", + "type" : "string" + }, { + "name" : "isunpatrolled", + "type" : "boolean" + }, { + "name" : "page", + "type" : "string" + }, { + "name" : "diffurl", + "type" : "string" + }, { + "name" : "added", + "type" : "long" + }, { + "name" : "comment", + "type" : "string" + }, { + "name" : "commentlength", + "type" : "long" + }, { + "name" : "isnew", + "type" : "boolean" + }, { + "name" : "isminor", + "type" : "boolean" + }, { + "name" : "delta", + "type" : "long" + }, { + "name" : "isanonymous", + "type" : "boolean" + }, { + "name" : "user", + "type" : "string" + }, { + "name" : "deltabucket", + "type" : "double" + }, { + "name" : "deleted", + "type" : "long" + }, { + "name" : "namespace", + "type" : "string" + } ] +} + bucket_count -1 + bucketing_version 2 + column.name.delimiter , + columns isrobot,channel,timestamp,flags,isunpatrolled,page,diffurl,added,comment,commentlength,isnew,isminor,delta,isanonymous,user,deltabucket,deleted,namespace,__key,__partition,__offset,__timestamp + columns.comments 'from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer','from deserializer' + columns.types boolean:string:string:string:boolean:string:string:bigint:string:bigint:boolean:boolean:bigint:boolean:string:double:bigint:string:binary:int:bigint:bigint + discover.partitions true +#### A masked pattern was here #### + hive.kafka.max.retries 6 + hive.kafka.metadata.poll.timeout.ms 30000 + hive.kafka.optimistic.commit false + hive.kafka.poll.timeout.ms 5000 + kafka.bootstrap.servers localhost:9092 + kafka.serde.class org.apache.hadoop.hive.serde2.avro.AvroSerDe + kafka.topic wiki_kafka_avro_table + kafka.write.semantic AT_LEAST_ONCE +#### A masked pattern was here #### + location hdfs://### HDFS PATH ### + name default.wiki_kafka_avro_table + numFiles 0 + numRows 0 + rawDataSize 0 + serialization.ddl struct wiki_kafka_avro_table { bool isrobot, string channel, string timestamp, string flags, bool isunpatrolled, string page, string diffurl, i64 added, string comment, i64 commentlength, bool isnew, bool isminor, i64 delta, bool isanonymous, string user, double deltabucket, i64 deleted, string namespace, binary __key, i32 __partition, i64 __offset, i64 __timestamp} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.kafka.KafkaSerDe + storage_handler org.apache.hadoop.hive.kafka.KafkaStorageHandler + totalSize 0 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.kafka.KafkaSerDe + name: default.wiki_kafka_avro_table + name: default.wiki_kafka_avro_table + Truncated Path -> Alias: + /wiki_kafka_avro_table [wiki_kafka_avro_table] + Reducer 2 + Execution mode: vectorized, llap + Needs Tagging: false + Reduce Operator Tree: + Group By Operator + keys: KEY._col0 (type: bigint), KEY._col1 (type: timestamp), KEY._col2 (type: binary) + mode: mergepartial + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + GlobalTableId: 0 + directory: hdfs://### HDFS PATH ### + NumFilesPerFileSink: 1 + Statistics: Num rows: 1 Data size: 160 Basic stats: COMPLETE Column stats: NONE + Stats Publishing Key Prefix: hdfs://### HDFS PATH ### + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns _col0,_col1,_col2 + columns.types bigint:timestamp:binary + escape.delim \ + hive.serialization.extend.additional.nesting.levels true + serialization.escape.crlf true + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + TotalFiles: 1 + GatherStats: false + MultiFileSpray: false + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select distinct `__offset`, cast(`__timestamp` as timestamp ) , `__key` from wiki_kafka_avro_table +PREHOOK: type: QUERY +PREHOOK: Input: default@wiki_kafka_avro_table +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select distinct `__offset`, cast(`__timestamp` as timestamp ) , `__key` from wiki_kafka_avro_table +POSTHOOK: type: QUERY +POSTHOOK: Input: default@wiki_kafka_avro_table +POSTHOOK: Output: hdfs://### HDFS PATH ### +0 2018-08-20 03:37:05.09 key-0 +1 2018-08-20 04:37:05.09 key-1 +3 2018-08-20 06:37:05.09 key-3 +4 2018-08-20 07:37:05.09 key-4 +5 2018-08-20 08:37:05.09 key-5 +7 2018-08-20 10:37:05.09 key-7 +8 2018-08-20 11:37:05.09 key-8 +9 2018-08-20 12:37:05.09 key-9 +10 2018-08-20 13:37:05.09 key-10 +11 2018-08-20 03:37:05.09 key-0 +13 2018-08-20 05:37:05.09 key-2 +15 2018-08-20 07:37:05.09 key-4 +17 2018-08-20 09:37:05.09 key-6 +19 2018-08-20 11:37:05.09 key-8 +21 2018-08-20 13:37:05.09 key-10 +2 2018-08-20 05:37:05.09 key-2 +6 2018-08-20 09:37:05.09 key-6 +12 2018-08-20 04:37:05.09 key-1 +14 2018-08-20 06:37:05.09 key-3 +16 2018-08-20 08:37:05.09 key-5 +18 2018-08-20 10:37:05.09 key-7 +20 2018-08-20 12:37:05.09 key-9 diff --git testutils/ptest2/conf/deployed/master-mr2.properties testutils/ptest2/conf/deployed/master-mr2.properties index ad5405f2b4..9835d6f28f 100644 --- testutils/ptest2/conf/deployed/master-mr2.properties +++ testutils/ptest2/conf/deployed/master-mr2.properties @@ -182,7 +182,22 @@ qFileTest.erasurecodingCli.groups.normal = mainProperties.${erasurecoding.only.q qFileTest.miniDruid.driver = TestMiniDruidCliDriver qFileTest.miniDruid.directory = ql/src/test/queries/clientpositive -qFileTest.miniDruid.batchSize = 55 +qFileTest.miniDruid.batchSize = 15 qFileTest.miniDruid.queryFilesProperty = qfile qFileTest.miniDruid.include = normal qFileTest.miniDruid.groups.normal = mainProperties.${druid.query.files} + + +qFileTest.miniDruidKafka.driver = TestMiniDruidKafkaCliDriver +qFileTest.miniDruidKafka.directory = ql/src/test/queries/clientpositive +qFileTest.miniDruidKafka.batchSize = 15 +qFileTest.miniDruidKafka.queryFilesProperty = qfile +qFileTest.miniDruidKafka.include = normal +qFileTest.miniDruidKafka.groups.normal = mainProperties.${druid.kafka.query.files} + +qFileTest.miniKafka.driver = TestMiniHiveKafkaCliDriver +qFileTest.miniKafka.directory = ql/src/test/queries/clientpositive +qFileTest.miniKafka.batchSize = 15 +qFileTest.miniKafka.queryFilesProperty = qfile +qFileTest.miniKafka.include = normal +qFileTest.miniKafka.groups.normal = mainProperties.${hive.kafka.query.files}