From 06dcaf418684728ab93109b3fe200df20bb3a1e5 Mon Sep 17 00:00:00 2001 From: Nishant Date: Wed, 31 Oct 2018 21:28:10 +0530 Subject: [PATCH] [HIVE-19026] Add support for more ingestion formats - Druid Kafka Indexing Service review comments --- .../apache/hadoop/hive/conf/Constants.java | 14 +- data/scripts/kafka_init_data.csv | 10 + .../hive/druid/DruidStorageHandler.java | 240 +++++++++------- .../hive/druid/DruidStorageHandlerUtils.java | 89 +++++- .../hive/druid/conf/DruidConstants.java | 63 +++++ .../hive/druid/io/DruidOutputFormat.java | 7 +- .../druid/io/DruidQueryBasedInputFormat.java | 3 +- .../hive/druid/json/AvroBytesDecoder.java | 37 +++ .../hadoop/hive/druid/json/AvroParseSpec.java | 104 +++++++ .../druid/json/AvroStreamInputRowParser.java | 98 +++++++ .../json/InlineSchemaAvroBytesDecoder.java | 52 ++++ itests/qtest-druid/pom.xml | 11 + .../apache/hive/druid/MiniDruidCluster.java | 3 +- .../resources/testconfiguration.properties | 3 + .../org/apache/hadoop/hive/ql/QTestUtil.java | 4 + .../clientpositive/druidkafkamini_avro.q | 99 +++++++ .../clientpositive/druidkafkamini_csv.q | 37 +++ .../clientpositive/druidkafkamini_delimited.q | 38 +++ .../druid/druidkafkamini_avro.q.out | 263 ++++++++++++++++++ .../druid/druidkafkamini_csv.q.out | 138 +++++++++ .../druid/druidkafkamini_delimited.q.out | 140 ++++++++++ 21 files changed, 1334 insertions(+), 119 deletions(-) create mode 100644 data/scripts/kafka_init_data.csv create mode 100644 druid-handler/src/java/org/apache/hadoop/hive/druid/conf/DruidConstants.java create mode 100644 druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroBytesDecoder.java create mode 100644 druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroParseSpec.java create mode 100644 druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroStreamInputRowParser.java create mode 100644 druid-handler/src/java/org/apache/hadoop/hive/druid/json/InlineSchemaAvroBytesDecoder.java create mode 100644 ql/src/test/queries/clientpositive/druidkafkamini_avro.q create mode 100644 ql/src/test/queries/clientpositive/druidkafkamini_csv.q create mode 100644 ql/src/test/queries/clientpositive/druidkafkamini_delimited.q create mode 100644 ql/src/test/results/clientpositive/druid/druidkafkamini_avro.q.out create mode 100644 ql/src/test/results/clientpositive/druid/druidkafkamini_csv.q.out create mode 100644 ql/src/test/results/clientpositive/druid/druidkafkamini_delimited.q.out diff --git a/common/src/java/org/apache/hadoop/hive/conf/Constants.java b/common/src/java/org/apache/hadoop/hive/conf/Constants.java index 61bc9df4a0..ce94e052da 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/Constants.java +++ b/common/src/java/org/apache/hadoop/hive/conf/Constants.java @@ -32,7 +32,7 @@ "org.apache.hadoop.hive.druid.io.DruidOutputFormat"; public static final String DRUID_DATA_SOURCE = "druid.datasource"; public static final String DRUID_SEGMENT_GRANULARITY = "druid.segment.granularity"; - public static final String DRUID_ROLLUP = "druid.rollup"; + public static final String DRUID_QUERY_GRANULARITY = "druid.query.granularity"; public static final String DRUID_TARGET_SHARDS_PER_GRANULARITY = "druid.segment.targetShardsPerGranularity"; @@ -42,22 +42,10 @@ public static final String DRUID_QUERY_FIELD_NAMES = "druid.fieldNames"; public static final String DRUID_QUERY_FIELD_TYPES = "druid.fieldTypes"; public static final String DRUID_QUERY_TYPE = "druid.query.type"; - public static final String DRUID_QUERY_FETCH = "druid.query.fetch"; - public static final String DRUID_SEGMENT_DIRECTORY = "druid.storage.storageDirectory"; - public static final String DRUID_SEGMENT_INTERMEDIATE_DIRECTORY = "druid.storage.storageDirectory.intermediate"; - - public static final String DRUID_SEGMENT_VERSION = "druid.segment.version"; - public static final String DRUID_JOB_WORKING_DIRECTORY = "druid.job.workingDirectory"; - public static final String KAFKA_TOPIC = "kafka.topic"; public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; - public static final String DRUID_KAFKA_INGESTION_PROPERTY_PREFIX = "druid.kafka.ingestion."; - public static final String DRUID_KAFKA_CONSUMER_PROPERTY_PREFIX = DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "consumer."; - /* Kafka Ingestion state - valid values - START/STOP/RESET */ - public static final String DRUID_KAFKA_INGESTION = "druid.kafka.ingestion"; - public static final String JDBC_HIVE_STORAGE_HANDLER_ID = "org.apache.hive.storage.jdbc.JdbcStorageHandler"; public static final String JDBC_CONFIG_PREFIX = "hive.sql"; diff --git a/data/scripts/kafka_init_data.csv b/data/scripts/kafka_init_data.csv new file mode 100644 index 0000000000..5dc094ed21 --- /dev/null +++ b/data/scripts/kafka_init_data.csv @@ -0,0 +1,10 @@ +"2013-08-31T01:02:33Z", "Gypsy Danger","en","nuclear","true","true","false","false","article","North America","United States","Bay Area","San Francisco",57,200,-143 +"2013-08-31T03:32:45Z","Striker Eureka","en","speed","false","true","true","false","wikipedia","Australia","Australia","Cantebury","Syndey",459,129,330 +"2013-08-31T07:11:21Z","Cherno Alpha","ru","masterYi","false","true","true","false","article","Asia","Russia","Oblast","Moscow",123,12,111 +"2013-08-31T11:58:39Z","Crimson Typhoon","zh","triplets","true","false","true","false","wikipedia","Asia","China","Shanxi","Taiyuan",905,5,900 +"2013-08-31T12:41:27Z","Coyote Tango","ja","stringer","true","false","true","false","wikipedia","Asia","Japan","Kanto","Tokyo",1,10,-9 +"2013-09-01T01:02:33Z","Gypsy Danger","en","nuclear","true","true","false","false","article","North America","United States","Bay Area","San Francisco",57,200,-143 +"2013-09-01T03:32:45Z","Striker Eureka","en","speed","false","true","true","false","wikipedia","Australia","Australia","Cantebury","Syndey",459,129,330 +"2013-09-01T07:11:21Z","Cherno Alpha","ru","masterYi","false","true","true","false","article","Asia","Russia","Oblast","Moscow",123,12,111 +"2013-09-01T11:58:39Z","Crimson Typhoon","zh","triplets","true","false","true","false","wikipedia","Asia","China","Shanxi","Taiyuan",905,5,900 +"2013-09-01T12:41:27Z","Coyote Tango","ja","stringer","true","false","true","false","wikipedia","Asia","Japan","Kanto","Tokyo",1,10,-9 \ No newline at end of file diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java index cc38904b39..66b7ea4c59 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.druid; +import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -27,6 +28,8 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; +import io.druid.data.input.impl.CSVParseSpec; +import io.druid.data.input.impl.DelimitedParseSpec; import io.druid.data.input.impl.DimensionSchema; import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.InputRowParser; @@ -65,9 +68,13 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.druid.conf.DruidConstants; import org.apache.hadoop.hive.druid.io.DruidOutputFormat; import org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat; import org.apache.hadoop.hive.druid.io.DruidRecordWriter; +import org.apache.hadoop.hive.druid.json.AvroParseSpec; +import org.apache.hadoop.hive.druid.json.AvroStreamInputRowParser; +import org.apache.hadoop.hive.druid.json.InlineSchemaAvroBytesDecoder; import org.apache.hadoop.hive.druid.json.KafkaSupervisorIOConfig; import org.apache.hadoop.hive.druid.json.KafkaSupervisorReport; import org.apache.hadoop.hive.druid.json.KafkaSupervisorSpec; @@ -269,10 +276,13 @@ private void updateKafkaIngestion(Table table){ final String overlordAddress = HiveConf .getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_OVERLORD_DEFAULT_ADDRESS); - final String dataSourceName = Preconditions.checkNotNull(getTableProperty(table, Constants.DRUID_DATA_SOURCE), "Druid datasource name is null"); + final String dataSourceName = Preconditions.checkNotNull( + DruidStorageHandlerUtils.getTableProperty(table, Constants.DRUID_DATA_SOURCE), "Druid datasource name is null"); - final String kafkaTopic = Preconditions.checkNotNull(getTableProperty(table, Constants.KAFKA_TOPIC), "kafka topic is null"); - final String kafka_servers = Preconditions.checkNotNull(getTableProperty(table, Constants.KAFKA_BOOTSTRAP_SERVERS), "kafka connect string is null"); + final String kafkaTopic = Preconditions.checkNotNull( + DruidStorageHandlerUtils.getTableProperty(table, Constants.KAFKA_TOPIC), "kafka topic is null"); + final String kafka_servers = Preconditions.checkNotNull( + DruidStorageHandlerUtils.getTableProperty(table, Constants.KAFKA_BOOTSTRAP_SERVERS), "kafka connect string is null"); Properties tableProperties = new Properties(); tableProperties.putAll(table.getParameters()); @@ -297,13 +307,16 @@ private void updateKafkaIngestion(Table table){ columnNames); } - final InputRowParser inputRowParser = new StringInputRowParser( - new JSONParseSpec( - new TimestampSpec(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, "auto", null), - new DimensionsSpec(dimensionsAndAggregates.lhs, null, null), - null, + DimensionsSpec dimensionsSpec = new DimensionsSpec(dimensionsAndAggregates.lhs, null, null); + String timestampFormat = DruidStorageHandlerUtils.getTableProperty(table, DruidConstants.DRUID_TIMESTAMP_FORMAT); + String timestampColumnName = DruidStorageHandlerUtils.getTableProperty(table, DruidConstants.DRUID_TIMESTAMP_COLUMN); + if(timestampColumnName == null){ + timestampColumnName = DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN; + } + TimestampSpec timestampSpec = new TimestampSpec(timestampColumnName, timestampFormat, null - ), "UTF-8"); + ); + final InputRowParser inputRowParser = getInputRowParser(table, timestampSpec, dimensionsSpec); Map inputParser = JSON_MAPPER .convertValue(inputRowParser, Map.class); @@ -323,7 +336,7 @@ private void updateKafkaIngestion(Table table){ // Fetch existing Ingestion Spec from Druid, if any KafkaSupervisorSpec existingSpec = fetchKafkaIngestionSpec(table); - String targetState = getTableProperty(table, Constants.DRUID_KAFKA_INGESTION); + String targetState = DruidStorageHandlerUtils.getTableProperty(table, DruidConstants.DRUID_KAFKA_INGESTION); if(targetState == null){ // Case when user has not specified any ingestion state in the current command // if there is a kafka supervisor running then keep it last known state is START otherwise STOP. @@ -345,44 +358,125 @@ private void updateKafkaIngestion(Table table){ } resetKafkaIngestion(overlordAddress, dataSourceName); } else { - throw new IllegalArgumentException(String.format("Invalid value for property [%s], Valid values are [START, STOP, RESET]", Constants.DRUID_KAFKA_INGESTION)); + throw new IllegalArgumentException(String.format("Invalid value for property [%s], Valid values are [START, STOP, RESET]", DruidConstants.DRUID_KAFKA_INGESTION)); } // We do not want to keep state in two separate places so remove from hive table properties. - table.getParameters().remove(Constants.DRUID_KAFKA_INGESTION); + table.getParameters().remove(DruidConstants.DRUID_KAFKA_INGESTION); + } + + private InputRowParser getInputRowParser(Table table, + TimestampSpec timestampSpec, + DimensionsSpec dimensionsSpec + ) { + String parseSpecFormat = DruidStorageHandlerUtils.getTableProperty(table, DruidConstants.DRUID_PARSE_SPEC_FORMAT); + + // Default case JSON + if(parseSpecFormat == null || parseSpecFormat.equalsIgnoreCase("json")) { + return new StringInputRowParser( + new JSONParseSpec(timestampSpec, + dimensionsSpec, + null, + null + ), "UTF-8"); + } else if(parseSpecFormat.equalsIgnoreCase("csv")){ + return new StringInputRowParser( + new CSVParseSpec( + timestampSpec, + dimensionsSpec, + DruidStorageHandlerUtils.getTableProperty(table, DruidConstants.DRUID_PARSE_SPEC_LIST_DELIMITER), + DruidStorageHandlerUtils.getListProperty(table, DruidConstants.DRUID_PARSE_SPEC_COLUMNS), + DruidStorageHandlerUtils.getBooleanProperty(table, DruidConstants.DRUID_PARSE_SPEC_HAS_HEADER_ROWS, false), + DruidStorageHandlerUtils.getIntegerProperty(table, DruidConstants.DRUID_PARSE_SPEC_SKIP_HEADER_ROWS, 0) + ), "UTF-8"); + } else if (parseSpecFormat.equalsIgnoreCase("delimited")){ + return new StringInputRowParser( + new DelimitedParseSpec( + timestampSpec, + dimensionsSpec, + DruidStorageHandlerUtils.getTableProperty(table, DruidConstants.DRUID_PARSE_SPEC_DELIMITER), + DruidStorageHandlerUtils.getTableProperty(table, DruidConstants.DRUID_PARSE_SPEC_LIST_DELIMITER), + DruidStorageHandlerUtils.getListProperty(table, DruidConstants.DRUID_PARSE_SPEC_COLUMNS), + DruidStorageHandlerUtils.getBooleanProperty(table, DruidConstants.DRUID_PARSE_SPEC_HAS_HEADER_ROWS, false), + DruidStorageHandlerUtils.getIntegerProperty(table, DruidConstants.DRUID_PARSE_SPEC_SKIP_HEADER_ROWS, 0) + ), "UTF-8"); + } else if(parseSpecFormat.equalsIgnoreCase("avro")) { + try { + String avroSchemaLiteral = DruidStorageHandlerUtils.getTableProperty(table, DruidConstants.AVRO_SCHEMA_LITERAL); + Preconditions.checkNotNull(avroSchemaLiteral, + "Please specify avro schema literal when using avro parser" + ); + Map avroSchema = JSON_MAPPER + .readValue(avroSchemaLiteral, new TypeReference>() { + }); + return new AvroStreamInputRowParser(new AvroParseSpec( + timestampSpec, + dimensionsSpec, + null + ), new InlineSchemaAvroBytesDecoder(avroSchema)); + } catch (Exception e) { + throw new IllegalStateException("Exception while creating avro schema", e); + } + } + + throw new IllegalArgumentException("Invalid parse spec format [" + parseSpecFormat+"]. " + + "Supported types are : json, csv, tsv, avro"); } private static KafkaSupervisorSpec createKafkaSupervisorSpec(Table table, String kafkaTopic, String kafka_servers, DataSchema dataSchema, IndexSpec indexSpec) { return new KafkaSupervisorSpec(dataSchema, new KafkaSupervisorTuningConfig( - getIntegerProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxRowsInMemory"), - getIntegerProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxRowsPerSegment"), - getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "intermediatePersistPeriod"), + DruidStorageHandlerUtils + .getIntegerProperty(table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxRowsInMemory"), + DruidStorageHandlerUtils + .getIntegerProperty(table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxRowsPerSegment"), + DruidStorageHandlerUtils + .getPeriodProperty(table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "intermediatePersistPeriod"), null, // basePersistDirectory - use druid default, no need to be configured by user - getIntegerProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxPendingPersists"), + DruidStorageHandlerUtils + .getIntegerProperty(table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxPendingPersists"), indexSpec, null, // buildV9Directly - use druid default, no need to be configured by user - getBooleanProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "reportParseExceptions"), - getLongProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "handoffConditionTimeout"), - getBooleanProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "resetOffsetAutomatically"), - getIntegerProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "workerThreads"), - getIntegerProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "chatThreads"), - getLongProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "chatRetries"), - getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "httpTimeout"), - getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "shutdownTimeout"), - getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "offsetFetchPeriod")), + DruidStorageHandlerUtils + .getBooleanProperty(table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "reportParseExceptions"), + DruidStorageHandlerUtils + .getLongProperty(table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "handoffConditionTimeout"), + DruidStorageHandlerUtils + .getBooleanProperty(table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "resetOffsetAutomatically"), + DruidStorageHandlerUtils + .getIntegerProperty(table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "workerThreads"), + DruidStorageHandlerUtils + .getIntegerProperty(table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "chatThreads"), + DruidStorageHandlerUtils + .getLongProperty(table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "chatRetries"), + DruidStorageHandlerUtils + .getPeriodProperty(table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "httpTimeout"), + DruidStorageHandlerUtils + .getPeriodProperty(table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "shutdownTimeout"), + DruidStorageHandlerUtils + .getPeriodProperty(table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "offsetFetchPeriod")), new KafkaSupervisorIOConfig(kafkaTopic, // Mandatory Property - getIntegerProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "replicas"), - getIntegerProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "taskCount"), - getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "taskDuration"), + DruidStorageHandlerUtils + .getIntegerProperty(table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "replicas"), + DruidStorageHandlerUtils + .getIntegerProperty(table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "taskCount"), + DruidStorageHandlerUtils + .getPeriodProperty(table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "taskDuration"), getKafkaConsumerProperties(table, kafka_servers), // Mandatory Property - getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "startDelay"), - getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "period"), - getBooleanProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "useEarliestOffset"), - getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "completionTimeout"), - getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "lateMessageRejectionPeriod"), - getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "earlyMessageRejectionPeriod"), - getBooleanProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "skipOffsetGaps")), + DruidStorageHandlerUtils + .getPeriodProperty(table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "startDelay"), + DruidStorageHandlerUtils + .getPeriodProperty(table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "period"), + DruidStorageHandlerUtils + .getBooleanProperty(table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "useEarliestOffset"), + DruidStorageHandlerUtils + .getPeriodProperty(table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "completionTimeout"), + DruidStorageHandlerUtils + .getPeriodProperty(table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "lateMessageRejectionPeriod"), + DruidStorageHandlerUtils + .getPeriodProperty(table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "earlyMessageRejectionPeriod"), + DruidStorageHandlerUtils + .getBooleanProperty(table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "skipOffsetGaps")), new HashMap() ); } @@ -391,9 +485,9 @@ private static KafkaSupervisorSpec createKafkaSupervisorSpec(Table table, String ImmutableMap.Builder builder = ImmutableMap.builder(); builder.put(KafkaSupervisorIOConfig.BOOTSTRAP_SERVERS_KEY, kafka_servers); for (Map.Entry entry : table.getParameters().entrySet()) { - if (entry.getKey().startsWith(Constants.DRUID_KAFKA_CONSUMER_PROPERTY_PREFIX)) { + if (entry.getKey().startsWith(DruidConstants.DRUID_KAFKA_CONSUMER_PROPERTY_PREFIX)) { String propertyName = entry.getKey() - .substring(Constants.DRUID_KAFKA_CONSUMER_PROPERTY_PREFIX.length()); + .substring(DruidConstants.DRUID_KAFKA_CONSUMER_PROPERTY_PREFIX.length()); builder.put(propertyName, entry.getValue()); } } @@ -494,7 +588,7 @@ private KafkaSupervisorSpec fetchKafkaIngestionSpec(Table table) { .getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_OVERLORD_DEFAULT_ADDRESS), "Druid Overlord Address is null"); String dataSourceName = Preconditions - .checkNotNull(getTableProperty(table, Constants.DRUID_DATA_SOURCE), + .checkNotNull(DruidStorageHandlerUtils.getTableProperty(table, Constants.DRUID_DATA_SOURCE), "Druid Datasource name is null"); try { FullResponseHolder response = RetryUtils @@ -540,7 +634,7 @@ private KafkaSupervisorReport fetchKafkaSupervisorReport(Table table) { .getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_OVERLORD_DEFAULT_ADDRESS), "Druid Overlord Address is null"); String dataSourceName = Preconditions - .checkNotNull(getTableProperty(table, Constants.DRUID_DATA_SOURCE), + .checkNotNull(DruidStorageHandlerUtils.getTableProperty(table, Constants.DRUID_DATA_SOURCE), "Druid Datasource name is null"); try { FullResponseHolder response = RetryUtils.retry(() -> DruidStorageHandlerUtils @@ -585,8 +679,8 @@ private KafkaSupervisorReport fetchKafkaSupervisorReport(Table table) { throws IOException, CallbackFailedException { final String dataSourceName = table.getParameters().get(Constants.DRUID_DATA_SOURCE); final String segmentDirectory = - table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY) != null - ? table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY) + table.getParameters().get(DruidConstants.DRUID_SEGMENT_DIRECTORY) != null + ? table.getParameters().get(DruidConstants.DRUID_SEGMENT_DIRECTORY) : HiveConf.getVar(getConf(), HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY); final HdfsDataSegmentPusherConfig hdfsSegmentPusherConfig = new HdfsDataSegmentPusherConfig(); @@ -771,7 +865,7 @@ public void commitDropTable(Table table, boolean deleteData) { .getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_OVERLORD_DEFAULT_ADDRESS), "Druid Overlord Address is null"); String dataSourceName = Preconditions - .checkNotNull(getTableProperty(table, Constants.DRUID_DATA_SOURCE), + .checkNotNull(DruidStorageHandlerUtils.getTableProperty(table, Constants.DRUID_DATA_SOURCE), "Druid Datasource name is null"); stopKafkaIngestion(overlordAddress, dataSourceName); } @@ -864,10 +958,10 @@ public void rollbackInsertTable(Table table, boolean overwrite) { @Override public void configureOutputJobProperties(TableDesc tableDesc, Map jobProperties) { jobProperties.put(Constants.DRUID_DATA_SOURCE, tableDesc.getTableName()); - jobProperties.put(Constants.DRUID_SEGMENT_VERSION, new DateTime().toString()); - jobProperties.put(Constants.DRUID_JOB_WORKING_DIRECTORY, getStagingWorkingDir().toString()); + jobProperties.put(DruidConstants.DRUID_SEGMENT_VERSION, new DateTime().toString()); + jobProperties.put(DruidConstants.DRUID_JOB_WORKING_DIRECTORY, getStagingWorkingDir().toString()); // DruidOutputFormat will write segments in an intermediate directory - jobProperties.put(Constants.DRUID_SEGMENT_INTERMEDIATE_DIRECTORY, + jobProperties.put(DruidConstants.DRUID_SEGMENT_INTERMEDIATE_DIRECTORY, getIntermediateSegmentDir().toString()); } @@ -1060,63 +1154,9 @@ public void preAlterTable(Table table, EnvironmentContext context) throws MetaEx } } - private static Boolean getBooleanProperty(Table table, String propertyName) { - String val = getTableProperty(table, propertyName); - if (val == null) { - return null; - } - return Boolean.parseBoolean(val); - } - - private static Integer getIntegerProperty(Table table, String propertyName) { - String val = getTableProperty(table, propertyName); - if (val == null) { - return null; - } - try { - return Integer.parseInt(val); - } catch (NumberFormatException e) { - throw new NumberFormatException(String - .format("Exception while parsing property[%s] with Value [%s] as Integer", propertyName, - val)); - } - } - - private static Long getLongProperty(Table table, String propertyName) { - String val = getTableProperty(table, propertyName); - if (val == null) { - return null; - } - try { - return Long.parseLong(val); - } catch (NumberFormatException e) { - throw new NumberFormatException(String - .format("Exception while parsing property[%s] with Value [%s] as Long", propertyName, - val)); - } - } - - private static Period getPeriodProperty(Table table, String propertyName) { - String val = getTableProperty(table, propertyName); - if (val == null) { - return null; - } - try { - return Period.parse(val); - } catch (IllegalArgumentException e) { - throw new IllegalArgumentException(String - .format("Exception while parsing property[%s] with Value [%s] as Period", propertyName, - val)); - } - } - - private static String getTableProperty(Table table, String propertyName) { - return table.getParameters().get(propertyName); - } - private static boolean isKafkaStreamingTable(Table table){ // For kafka Streaming tables it is mandatory to set a kafka topic. - return getTableProperty(table, Constants.KAFKA_TOPIC) != null; + return DruidStorageHandlerUtils.getTableProperty(table, Constants.KAFKA_TOPIC) != null; } private int getMaxRetryCount() { diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java index c3e7e5df8d..cf2f75da70 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java @@ -92,6 +92,10 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.druid.conf.DruidConstants; +import org.apache.hadoop.hive.druid.json.AvroParseSpec; +import org.apache.hadoop.hive.druid.json.AvroStreamInputRowParser; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; @@ -105,6 +109,7 @@ import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.joda.time.DateTime; import org.joda.time.Interval; +import org.joda.time.Period; import org.joda.time.chrono.ISOChronology; import org.skife.jdbi.v2.Folder3; import org.skife.jdbi.v2.Handle; @@ -201,6 +206,11 @@ // Register the shard sub type to be used by the mapper JSON_MAPPER.registerSubtypes(new NamedType(LinearShardSpec.class, "linear")); JSON_MAPPER.registerSubtypes(new NamedType(NumberedShardSpec.class, "numbered")); + JSON_MAPPER.registerSubtypes(new NamedType(AvroParseSpec.class, "avro")); + SMILE_MAPPER.registerSubtypes(new NamedType(AvroParseSpec.class, "avro")); + JSON_MAPPER.registerSubtypes(new NamedType(AvroStreamInputRowParser.class, "avro_stream")); + SMILE_MAPPER.registerSubtypes(new NamedType(AvroStreamInputRowParser.class, "avro_stream")); + // set the timezone of the object mapper // THIS IS NOT WORKING workaround is to set it as part of java opts -Duser.timezone="UTC" JSON_MAPPER.setTimeZone(TimeZone.getTimeZone("UTC")); @@ -683,6 +693,83 @@ public static String createScanAllQuery(String dataSourceName, List colu .build(); return JSON_MAPPER.writeValueAsString(scanQuery); } + + static Boolean getBooleanProperty(Table table, String propertyName) { + String val = getTableProperty(table, propertyName); + if (val == null) { + return null; + } + return Boolean.parseBoolean(val); + } + + static boolean getBooleanProperty(Table table, String propertyName, boolean defaultVal) { + Boolean val = getBooleanProperty(table, propertyName); + return val == null ? defaultVal : val; + } + + static Integer getIntegerProperty(Table table, String propertyName) { + String val = getTableProperty(table, propertyName); + if (val == null) { + return null; + } + try { + return Integer.parseInt(val); + } catch (NumberFormatException e) { + throw new NumberFormatException(String + .format("Exception while parsing property[%s] with Value [%s] as Integer", propertyName, + val)); + } + } + + static int getIntegerProperty(Table table, String propertyName, int defaultVal) { + Integer val = getIntegerProperty(table, propertyName); + return val == null ? defaultVal : val; + } + + static Long getLongProperty(Table table, String propertyName) { + String val = getTableProperty(table, propertyName); + if (val == null) { + return null; + } + try { + return Long.parseLong(val); + } catch (NumberFormatException e) { + throw new NumberFormatException(String + .format("Exception while parsing property[%s] with Value [%s] as Long", propertyName, + val)); + } + } + + static Period getPeriodProperty(Table table, String propertyName) { + String val = getTableProperty(table, propertyName); + if (val == null) { + return null; + } + try { + return Period.parse(val); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException(String + .format("Exception while parsing property[%s] with Value [%s] as Period", propertyName, + val)); + } + } + + static String getTableProperty(Table table, String propertyName) { + return table.getParameters().get(propertyName); + } + + public static List getListProperty(Table table, String propertyName) { + List rv = new ArrayList(); + String values = getTableProperty(table, propertyName); + String[] vals = values.trim().split(","); + for (String val : vals) { + if (org.apache.commons.lang.StringUtils.isNotBlank(val)) { + rv.add(val); + } + } + return rv; + } + /** * Simple interface for retry operations */ @@ -827,7 +914,7 @@ public static GranularitySpec getGranularitySpec(Configuration configuration, Pr tableProperties.getProperty(Constants.DRUID_SEGMENT_GRANULARITY) != null ? tableProperties.getProperty(Constants.DRUID_SEGMENT_GRANULARITY) : HiveConf.getVar(configuration, HiveConf.ConfVars.HIVE_DRUID_INDEXING_GRANULARITY); - final boolean rollup = tableProperties.getProperty(Constants.DRUID_ROLLUP) != null ? + final boolean rollup = tableProperties.getProperty(DruidConstants.DRUID_ROLLUP) != null ? Boolean.parseBoolean(tableProperties.getProperty(Constants.DRUID_SEGMENT_GRANULARITY)): HiveConf.getBoolVar(configuration, HiveConf.ConfVars.HIVE_DRUID_ROLLUP); return new UniformGranularitySpec( diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/conf/DruidConstants.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/conf/DruidConstants.java new file mode 100644 index 0000000000..815cf8ff73 --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/conf/DruidConstants.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.druid.conf; + +public class DruidConstants { + public static final String DRUID_ROLLUP = "druid.rollup"; + + public static final String DRUID_SEGMENT_DIRECTORY = "druid.storage.storageDirectory"; + + public static final String DRUID_SEGMENT_INTERMEDIATE_DIRECTORY = "druid.storage.storageDirectory.intermediate"; + + public static final String DRUID_SEGMENT_VERSION = "druid.segment.version"; + + public static final String DRUID_JOB_WORKING_DIRECTORY = "druid.job.workingDirectory"; + + public static final String DRUID_KAFKA_INGESTION_PROPERTY_PREFIX = "druid.kafka.ingestion."; + + public static final String DRUID_KAFKA_CONSUMER_PROPERTY_PREFIX = DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "consumer."; + + /* Kafka Ingestion state - valid values - START/STOP/RESET */ + public static final String DRUID_KAFKA_INGESTION = "druid.kafka.ingestion"; + + // Druid ParseSpec Type - JSON/CSV/TSV/AVRO + public static final String DRUID_PARSE_SPEC_FORMAT = "druid.parseSpec.format"; + + public static final String DRUID_TIMESTAMP_FORMAT = "druid.timestamp.format"; + + // Used when the field name in ingested data via streaming ingestion does not match + // druid default timestamp column i.e `__time` + public static final String DRUID_TIMESTAMP_COLUMN = "druid.timestamp.column"; + + public static final String AVRO_SCHEMA_LITERAL = "avro.schema.literal"; + + // value delimiter for druid columns + public static final String DRUID_PARSE_SPEC_DELIMITER = "druid.parseSpec.delimiter"; + + // list demiliter for multi-valued columns + public static final String DRUID_PARSE_SPEC_LIST_DELIMITER = "druid.parseSpec.listDelimiter"; + + // order of columns for delimiter and csv parse specs. + public static final String DRUID_PARSE_SPEC_COLUMNS = "druid.parseSpec.columns"; + + public static final String DRUID_PARSE_SPEC_SKIP_HEADER_ROWS = "druid.parseSpec.skipHeaderRows"; + + public static final String DRUID_PARSE_SPEC_HAS_HEADER_ROWS = "druid.parseSpec.hasHeaderRows"; + + public static final String DRUID_QUERY_FETCH = "druid.query.fetch"; +} diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java index ecb4360623..69718c4260 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; +import org.apache.hadoop.hive.druid.conf.DruidConstants; import org.apache.hadoop.hive.druid.serde.DruidWritable; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; @@ -89,7 +90,7 @@ final String dataSource = tableProperties.getProperty(Constants.DRUID_DATA_SOURCE) == null ? jc.get(Constants.DRUID_DATA_SOURCE) : tableProperties.getProperty(Constants.DRUID_DATA_SOURCE); - final String segmentDirectory = jc.get(Constants.DRUID_SEGMENT_INTERMEDIATE_DIRECTORY); + final String segmentDirectory = jc.get(DruidConstants.DRUID_SEGMENT_INTERMEDIATE_DIRECTORY); final GranularitySpec granularitySpec = DruidStorageHandlerUtils.getGranularitySpec(jc, tableProperties); @@ -133,8 +134,8 @@ DruidStorageHandlerUtils.JSON_MAPPER ); - final String workingPath = jc.get(Constants.DRUID_JOB_WORKING_DIRECTORY); - final String version = jc.get(Constants.DRUID_SEGMENT_VERSION); + final String workingPath = jc.get(DruidConstants.DRUID_JOB_WORKING_DIRECTORY); + final String version = jc.get(DruidConstants.DRUID_SEGMENT_VERSION); String basePersistDirectory = HiveConf .getVar(jc, HiveConf.ConfVars.HIVE_DRUID_BASE_PERSIST_DIRECTORY); if (Strings.isNullOrEmpty(basePersistDirectory)) { diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java index 1c989c1a4d..edd853845c 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.druid.DruidStorageHandler; import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; +import org.apache.hadoop.hive.druid.conf.DruidConstants; import org.apache.hadoop.hive.druid.serde.DruidGroupByQueryRecordReader; import org.apache.hadoop.hive.druid.serde.DruidQueryRecordReader; import org.apache.hadoop.hive.druid.serde.DruidScanQueryRecordReader; @@ -174,7 +175,7 @@ public static DruidQueryRecordReader getDruidQueryReader(String druidQueryType) private static HiveDruidSplit[] distributeSelectQuery(Configuration conf, String address, SelectQuery query, Path dummyPath) throws IOException { // If it has a limit, we use it and we do not distribute the query - final boolean isFetch = query.getContextBoolean(Constants.DRUID_QUERY_FETCH, false); + final boolean isFetch = query.getContextBoolean(DruidConstants.DRUID_QUERY_FETCH, false); if (isFetch) { return new HiveDruidSplit[] { new HiveDruidSplit( DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath, diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroBytesDecoder.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroBytesDecoder.java new file mode 100644 index 0000000000..3a1dbf7229 --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroBytesDecoder.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.druid.json; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.avro.generic.GenericRecord; + +import java.nio.ByteBuffer; + +/** + * This class is copied from druid source code + * in order to avoid adding additional dependencies on druid-indexing-service. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "schema_inline", value = InlineSchemaAvroBytesDecoder.class) +}) +public interface AvroBytesDecoder +{ +} diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroParseSpec.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroParseSpec.java new file mode 100644 index 0000000000..af71f9a732 --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroParseSpec.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.druid.json; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.ParseSpec; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.java.util.common.parsers.JSONPathSpec; +import io.druid.java.util.common.parsers.Parser; + +import java.util.Objects; + +/** + * This class is copied from druid source code + * in order to avoid adding additional dependencies on druid-indexing-service. + */ +public class AvroParseSpec extends ParseSpec +{ + + @JsonIgnore + private final JSONPathSpec flattenSpec; + + @JsonCreator + public AvroParseSpec( + @JsonProperty("timestampSpec") TimestampSpec timestampSpec, + @JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec, + @JsonProperty("flattenSpec") JSONPathSpec flattenSpec + ) + { + super( + timestampSpec != null ? timestampSpec : new TimestampSpec(null, null, null), + dimensionsSpec != null ? dimensionsSpec : new DimensionsSpec(null, null, null) + ); + + this.flattenSpec = flattenSpec != null ? flattenSpec : JSONPathSpec.DEFAULT; + } + + @JsonProperty + public JSONPathSpec getFlattenSpec() + { + return flattenSpec; + } + + @Override + public Parser makeParser() + { + // makeParser is only used by StringInputRowParser, which cannot parse avro anyway. + throw new UnsupportedOperationException("makeParser not supported"); + } + + @Override + public ParseSpec withTimestampSpec(TimestampSpec spec) + { + return new AvroParseSpec(spec, getDimensionsSpec(), flattenSpec); + } + + @Override + public ParseSpec withDimensionsSpec(DimensionsSpec spec) + { + return new AvroParseSpec(getTimestampSpec(), spec, flattenSpec); + } + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + final AvroParseSpec that = (AvroParseSpec) o; + return Objects.equals(flattenSpec, that.flattenSpec); + } + + @Override + public int hashCode() + { + return Objects.hash(super.hashCode(), flattenSpec); + } +} diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroStreamInputRowParser.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroStreamInputRowParser.java new file mode 100644 index 0000000000..d6e6624669 --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroStreamInputRowParser.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.druid.json; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import io.druid.data.input.ByteBufferInputRowParser; +import io.druid.data.input.InputRow; +import io.druid.data.input.impl.ParseSpec; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Objects; + +/** + * This class is copied from druid source code + * in order to avoid adding additional dependencies on druid-indexing-service. + */ +public class AvroStreamInputRowParser implements ByteBufferInputRowParser +{ + private final ParseSpec parseSpec; + private final AvroBytesDecoder avroBytesDecoder; + + @JsonCreator + public AvroStreamInputRowParser( + @JsonProperty("parseSpec") ParseSpec parseSpec, + @JsonProperty("avroBytesDecoder") AvroBytesDecoder avroBytesDecoder + ) + { + this.parseSpec = Preconditions.checkNotNull(parseSpec, "parseSpec"); + this.avroBytesDecoder = Preconditions.checkNotNull(avroBytesDecoder, "avroBytesDecoder"); + } + + @Override + public List parseBatch(ByteBuffer input) + { + throw new UnsupportedOperationException("This class is only used for JSON serde"); + } + + @JsonProperty + @Override + public ParseSpec getParseSpec() + { + return parseSpec; + } + + @JsonProperty + public AvroBytesDecoder getAvroBytesDecoder() + { + return avroBytesDecoder; + } + + @Override + public ByteBufferInputRowParser withParseSpec(ParseSpec parseSpec) + { + return new AvroStreamInputRowParser( + parseSpec, + avroBytesDecoder + ); + } + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final AvroStreamInputRowParser that = (AvroStreamInputRowParser) o; + return Objects.equals(parseSpec, that.parseSpec) && + Objects.equals(avroBytesDecoder, that.avroBytesDecoder); + } + + @Override + public int hashCode() + { + return Objects.hash(parseSpec, avroBytesDecoder); + } +} diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/InlineSchemaAvroBytesDecoder.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/InlineSchemaAvroBytesDecoder.java new file mode 100644 index 0000000000..72d6cbbc1e --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/InlineSchemaAvroBytesDecoder.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.druid.json; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; + +import java.util.Map; + +/** + * This class is copied from druid source code + * in order to avoid adding additional dependencies on druid-indexing-service. + */ +public class InlineSchemaAvroBytesDecoder implements AvroBytesDecoder +{ + private final Map schema; + + @JsonCreator + public InlineSchemaAvroBytesDecoder( + @JsonProperty("schema") Map schema + ) + { + Preconditions.checkArgument(schema != null, "schema must be provided"); + + this.schema = schema; + } + + @JsonProperty + public Map getSchema() + { + return schema; + } + +} diff --git a/itests/qtest-druid/pom.xml b/itests/qtest-druid/pom.xml index 19cdf918b2..10ddfaa006 100644 --- a/itests/qtest-druid/pom.xml +++ b/itests/qtest-druid/pom.xml @@ -109,6 +109,17 @@ druid-kafka-indexing-service ${druid.version} + + io.druid.extensions + druid-avro-extensions + ${druid.version} + + + org.mortbay.jetty + servlet-api + + + org.apache.logging.log4j log4j-api diff --git a/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java b/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java index 2a319527ac..a9d381f0f7 100644 --- a/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java +++ b/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java @@ -55,7 +55,8 @@ "druid.metadata.storage.type", "derby", "druid.storage.type", "hdfs", "druid.processing.buffer.sizeBytes", "213870912", - "druid.processing.numThreads", "2" + "druid.processing.numThreads", "2", + "druid.worker.capacity", "4" ); private static final Map COMMON_DRUID_HISTORICAL = ImmutableMap.of( diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index ab6021332d..fb50588fe7 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -1849,6 +1849,9 @@ druid.query.files=druidmini_test1.q,\ druidmini_floorTime.q, \ druidmini_masking.q, \ druidkafkamini_basic.q, \ + druidkafkamini_avro.q, \ + druidkafkamini_csv.q, \ + druidkafkamini_delimited.q, \ kafka_storage_handler.q druid.llap.local.query.files=druidmini_noop.q diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index b5d2386245..59c7ac4c2c 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -640,6 +640,10 @@ private void setupMiniCluster(HadoopShims shims, String confDir) throws "test-topic", new File(getScriptsDir(), "kafka_init_data.json") ); + kafkaCluster.createTopicWithData( + "wiki_kafka_csv", + new File(getScriptsDir(), "kafka_init_data.csv") + ); kafkaCluster.createTopicWithData("wiki_kafka_avro_table", getAvroRows()); } diff --git a/ql/src/test/queries/clientpositive/druidkafkamini_avro.q b/ql/src/test/queries/clientpositive/druidkafkamini_avro.q new file mode 100644 index 0000000000..183491c804 --- /dev/null +++ b/ql/src/test/queries/clientpositive/druidkafkamini_avro.q @@ -0,0 +1,99 @@ +SET hive.vectorized.execution.enabled=false; + +CREATE EXTERNAL TABLE druid_kafka_test_avro(`__time` timestamp , `page` string, `user` string, `language` string, + `country` string,`continent` string, `namespace` string, `newPage` boolean, `unpatrolled` boolean, + `anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) + STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' + TBLPROPERTIES ( + "druid.segment.granularity" = "MONTH", + "druid.query.granularity" = "MINUTE", + "kafka.bootstrap.servers" = "localhost:9092", + "kafka.topic" = "wiki_kafka_avro_table", + "druid.kafka.ingestion.useEarliestOffset" = "true", + "druid.kafka.ingestion.maxRowsInMemory" = "5", + "druid.kafka.ingestion.startDelay" = "PT1S", + "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.period" = "PT5S", + "druid.kafka.ingestion.consumer.retries" = "2", + "druid.kafka.ingestion.reportParseExceptions" = "true", + "druid.timestamp.column" = "timestamp", + "druid.timestamp.format" = "MM/dd/yyyy HH:mm:ss", + "druid.parseSpec.format" = "avro", + 'avro.schema.literal'='{ + "type" : "record", + "name" : "Wikipedia", + "namespace" : "org.apache.hive.kafka", + "version": "1", + "fields" : [ { + "name" : "isrobot", + "type" : "boolean" + }, { + "name" : "channel", + "type" : "string" + }, { + "name" : "timestamp", + "type" : "string" + }, { + "name" : "flags", + "type" : "string" + }, { + "name" : "isunpatrolled", + "type" : "boolean" + }, { + "name" : "page", + "type" : "string" + }, { + "name" : "diffurl", + "type" : "string" + }, { + "name" : "added", + "type" : "long" + }, { + "name" : "comment", + "type" : "string" + }, { + "name" : "commentlength", + "type" : "long" + }, { + "name" : "isnew", + "type" : "boolean" + }, { + "name" : "isminor", + "type" : "boolean" + }, { + "name" : "delta", + "type" : "long" + }, { + "name" : "isanonymous", + "type" : "boolean" + }, { + "name" : "user", + "type" : "string" + }, { + "name" : "deltabucket", + "type" : "double" + }, { + "name" : "deleted", + "type" : "long" + }, { + "name" : "namespace", + "type" : "string" + } ] + }' + ); + +ALTER TABLE druid_kafka_test_avro SET TBLPROPERTIES('druid.kafka.ingestion' = 'START'); + +!curl --noproxy * -ss http://localhost:8081/druid/indexer/v1/supervisor; + +-- Sleep for some time for ingestion tasks to ingest events +!sleep 60; + +DESCRIBE druid_kafka_test_avro; +DESCRIBE EXTENDED druid_kafka_test_avro; + +Select count(*) FROM druid_kafka_test_avro; + +Select page FROM druid_kafka_test_avro; + +DROP TABLE druid_kafka_test_avro; diff --git a/ql/src/test/queries/clientpositive/druidkafkamini_csv.q b/ql/src/test/queries/clientpositive/druidkafkamini_csv.q new file mode 100644 index 0000000000..34be462d90 --- /dev/null +++ b/ql/src/test/queries/clientpositive/druidkafkamini_csv.q @@ -0,0 +1,37 @@ +SET hive.vectorized.execution.enabled=false; + +CREATE EXTERNAL TABLE druid_kafka_test_csv(`__time` timestamp , `page` string, `user` string, `language` string, + `country` string,`continent` string, `namespace` string, `newpage` boolean, `unpatrolled` boolean, + `anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) + STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' + TBLPROPERTIES ( + "druid.segment.granularity" = "MONTH", + "druid.query.granularity" = "MINUTE", + "kafka.bootstrap.servers" = "localhost:9092", + "kafka.topic" = "wiki_kafka_csv", + "druid.kafka.ingestion.useEarliestOffset" = "true", + "druid.kafka.ingestion.maxRowsInMemory" = "5", + "druid.kafka.ingestion.startDelay" = "PT1S", + "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.period" = "PT5S", + "druid.kafka.ingestion.consumer.retries" = "2", + "druid.kafka.ingestion.reportParseExceptions" = "true", + "druid.parseSpec.format" = "csv", + "druid.parseSpec.columns" = "__time,page,language,user,unpatrolled,newpage,robot,anonymous,namespace,continent,country,region,city,added,deleted,delta" + ); + +ALTER TABLE druid_kafka_test_csv SET TBLPROPERTIES('druid.kafka.ingestion' = 'START'); + +!curl --noproxy * -ss http://localhost:8081/druid/indexer/v1/supervisor; + +-- Sleep for some time for ingestion tasks to ingest events +!sleep 60; + +DESCRIBE druid_kafka_test_csv; +DESCRIBE EXTENDED druid_kafka_test_csv; + +Select count(*) FROM druid_kafka_test_csv; + +Select page FROM druid_kafka_test_csv; + +DROP TABLE druid_kafka_test_csv; diff --git a/ql/src/test/queries/clientpositive/druidkafkamini_delimited.q b/ql/src/test/queries/clientpositive/druidkafkamini_delimited.q new file mode 100644 index 0000000000..91e279d904 --- /dev/null +++ b/ql/src/test/queries/clientpositive/druidkafkamini_delimited.q @@ -0,0 +1,38 @@ +SET hive.vectorized.execution.enabled=false; + +CREATE EXTERNAL TABLE druid_kafka_test_delimited(`__time` timestamp , `page` string, `user` string, `language` string, + `country` string,`continent` string, `namespace` string, `newpage` boolean, `unpatrolled` boolean, + `anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) + STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' + TBLPROPERTIES ( + "druid.segment.granularity" = "MONTH", + "druid.query.granularity" = "MINUTE", + "kafka.bootstrap.servers" = "localhost:9092", + "kafka.topic" = "wiki_kafka_csv", + "druid.kafka.ingestion.useEarliestOffset" = "true", + "druid.kafka.ingestion.maxRowsInMemory" = "5", + "druid.kafka.ingestion.startDelay" = "PT1S", + "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.period" = "PT5S", + "druid.kafka.ingestion.consumer.retries" = "2", + "druid.kafka.ingestion.reportParseExceptions" = "true", + "druid.parseSpec.format" = "delimited", + "druid.parseSpec.columns" = "__time,page,language,user,unpatrolled,newpage,robot,anonymous,namespace,continent,country,region,city,added,deleted,delta", + "druid.parseSpec.delimiter"="," + ); + +ALTER TABLE druid_kafka_test_delimited SET TBLPROPERTIES('druid.kafka.ingestion' = 'START'); + +!curl --noproxy * -ss http://localhost:8081/druid/indexer/v1/supervisor; + +-- Sleep for some time for ingestion tasks to ingest events +!sleep 60; + +DESCRIBE druid_kafka_test_delimited; +DESCRIBE EXTENDED druid_kafka_test_delimited; + +Select count(*) FROM druid_kafka_test_delimited; + +Select page FROM druid_kafka_test_delimited; + +DROP TABLE druid_kafka_test_delimited; diff --git a/ql/src/test/results/clientpositive/druid/druidkafkamini_avro.q.out b/ql/src/test/results/clientpositive/druid/druidkafkamini_avro.q.out new file mode 100644 index 0000000000..d33dd4cbc2 --- /dev/null +++ b/ql/src/test/results/clientpositive/druid/druidkafkamini_avro.q.out @@ -0,0 +1,263 @@ +PREHOOK: query: CREATE EXTERNAL TABLE druid_kafka_test_avro(`__time` timestamp , `page` string, `user` string, `language` string, + `country` string,`continent` string, `namespace` string, `newPage` boolean, `unpatrolled` boolean, + `anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) + STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' + TBLPROPERTIES ( + "druid.segment.granularity" = "MONTH", + "druid.query.granularity" = "MINUTE", + "kafka.bootstrap.servers" = "localhost:9092", + "kafka.topic" = "wiki_kafka_avro_table", + "druid.kafka.ingestion.useEarliestOffset" = "true", + "druid.kafka.ingestion.maxRowsInMemory" = "5", + "druid.kafka.ingestion.startDelay" = "PT1S", + "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.period" = "PT5S", + "druid.kafka.ingestion.consumer.retries" = "2", + "druid.kafka.ingestion.reportParseExceptions" = "true", + "druid.timestamp.column" = "timestamp", + "druid.timestamp.format" = "MM/dd/yyyy HH:mm:ss", + "druid.parseSpec.format" = "avro", + 'avro.schema.literal'='{ + "type" : "record", + "name" : "Wikipedia", + "namespace" : "org.apache.hive.kafka", + "version": "1", + "fields" : [ { + "name" : "isrobot", + "type" : "boolean" + }, { + "name" : "channel", + "type" : "string" + }, { + "name" : "timestamp", + "type" : "string" + }, { + "name" : "flags", + "type" : "string" + }, { + "name" : "isunpatrolled", + "type" : "boolean" + }, { + "name" : "page", + "type" : "string" + }, { + "name" : "diffurl", + "type" : "string" + }, { + "name" : "added", + "type" : "long" + }, { + "name" : "comment", + "type" : "string" + }, { + "name" : "commentlength", + "type" : "long" + }, { + "name" : "isnew", + "type" : "boolean" + }, { + "name" : "isminor", + "type" : "boolean" + }, { + "name" : "delta", + "type" : "long" + }, { + "name" : "isanonymous", + "type" : "boolean" + }, { + "name" : "user", + "type" : "string" + }, { + "name" : "deltabucket", + "type" : "double" + }, { + "name" : "deleted", + "type" : "long" + }, { + "name" : "namespace", + "type" : "string" + } ] + }' + ) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@druid_kafka_test_avro +POSTHOOK: query: CREATE EXTERNAL TABLE druid_kafka_test_avro(`__time` timestamp , `page` string, `user` string, `language` string, + `country` string,`continent` string, `namespace` string, `newPage` boolean, `unpatrolled` boolean, + `anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) + STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' + TBLPROPERTIES ( + "druid.segment.granularity" = "MONTH", + "druid.query.granularity" = "MINUTE", + "kafka.bootstrap.servers" = "localhost:9092", + "kafka.topic" = "wiki_kafka_avro_table", + "druid.kafka.ingestion.useEarliestOffset" = "true", + "druid.kafka.ingestion.maxRowsInMemory" = "5", + "druid.kafka.ingestion.startDelay" = "PT1S", + "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.period" = "PT5S", + "druid.kafka.ingestion.consumer.retries" = "2", + "druid.kafka.ingestion.reportParseExceptions" = "true", + "druid.timestamp.column" = "timestamp", + "druid.timestamp.format" = "MM/dd/yyyy HH:mm:ss", + "druid.parseSpec.format" = "avro", + 'avro.schema.literal'='{ + "type" : "record", + "name" : "Wikipedia", + "namespace" : "org.apache.hive.kafka", + "version": "1", + "fields" : [ { + "name" : "isrobot", + "type" : "boolean" + }, { + "name" : "channel", + "type" : "string" + }, { + "name" : "timestamp", + "type" : "string" + }, { + "name" : "flags", + "type" : "string" + }, { + "name" : "isunpatrolled", + "type" : "boolean" + }, { + "name" : "page", + "type" : "string" + }, { + "name" : "diffurl", + "type" : "string" + }, { + "name" : "added", + "type" : "long" + }, { + "name" : "comment", + "type" : "string" + }, { + "name" : "commentlength", + "type" : "long" + }, { + "name" : "isnew", + "type" : "boolean" + }, { + "name" : "isminor", + "type" : "boolean" + }, { + "name" : "delta", + "type" : "long" + }, { + "name" : "isanonymous", + "type" : "boolean" + }, { + "name" : "user", + "type" : "string" + }, { + "name" : "deltabucket", + "type" : "double" + }, { + "name" : "deleted", + "type" : "long" + }, { + "name" : "namespace", + "type" : "string" + } ] + }' + ) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@druid_kafka_test_avro +PREHOOK: query: ALTER TABLE druid_kafka_test_avro SET TBLPROPERTIES('druid.kafka.ingestion' = 'START') +PREHOOK: type: ALTERTABLE_PROPERTIES +PREHOOK: Input: default@druid_kafka_test_avro +PREHOOK: Output: default@druid_kafka_test_avro +POSTHOOK: query: ALTER TABLE druid_kafka_test_avro SET TBLPROPERTIES('druid.kafka.ingestion' = 'START') +POSTHOOK: type: ALTERTABLE_PROPERTIES +POSTHOOK: Input: default@druid_kafka_test_avro +POSTHOOK: Output: default@druid_kafka_test_avro +["default.druid_kafka_test_avro"] +PREHOOK: query: DESCRIBE druid_kafka_test_avro +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@druid_kafka_test_avro +POSTHOOK: query: DESCRIBE druid_kafka_test_avro +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@druid_kafka_test_avro +__time timestamp from deserializer +page string from deserializer +user string from deserializer +language string from deserializer +country string from deserializer +continent string from deserializer +namespace string from deserializer +newpage boolean from deserializer +unpatrolled boolean from deserializer +anonymous boolean from deserializer +robot boolean from deserializer +added int from deserializer +deleted int from deserializer +delta bigint from deserializer +PREHOOK: query: DESCRIBE EXTENDED druid_kafka_test_avro +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@druid_kafka_test_avro +POSTHOOK: query: DESCRIBE EXTENDED druid_kafka_test_avro +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@druid_kafka_test_avro +__time timestamp from deserializer +page string from deserializer +user string from deserializer +language string from deserializer +country string from deserializer +continent string from deserializer +namespace string from deserializer +newpage boolean from deserializer +unpatrolled boolean from deserializer +anonymous boolean from deserializer +robot boolean from deserializer +added int from deserializer +deleted int from deserializer +delta bigint from deserializer + +#### A masked pattern was here #### +StorageHandlerInfo +Druid Storage Handler Runtime Status for default.druid_kafka_test_avro +kafkaPartitions=1 +activeTasks=[] +publishingTasks=[] +#### A masked pattern was here #### +aggregateLag=0 +#### A masked pattern was here #### +PREHOOK: query: Select count(*) FROM druid_kafka_test_avro +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_kafka_test_avro +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select count(*) FROM druid_kafka_test_avro +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_kafka_test_avro +POSTHOOK: Output: hdfs://### HDFS PATH ### +11 +PREHOOK: query: Select page FROM druid_kafka_test_avro +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_kafka_test_avro +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select page FROM druid_kafka_test_avro +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_kafka_test_avro +POSTHOOK: Output: hdfs://### HDFS PATH ### +page is 0 +page is 100 +page is 200 +page is 300 +page is 400 +page is 500 +page is 600 +page is 700 +page is 800 +page is 900 +page is 1000 +PREHOOK: query: DROP TABLE druid_kafka_test_avro +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@druid_kafka_test_avro +PREHOOK: Output: default@druid_kafka_test_avro +POSTHOOK: query: DROP TABLE druid_kafka_test_avro +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@druid_kafka_test_avro +POSTHOOK: Output: default@druid_kafka_test_avro diff --git a/ql/src/test/results/clientpositive/druid/druidkafkamini_csv.q.out b/ql/src/test/results/clientpositive/druid/druidkafkamini_csv.q.out new file mode 100644 index 0000000000..2f5817ae46 --- /dev/null +++ b/ql/src/test/results/clientpositive/druid/druidkafkamini_csv.q.out @@ -0,0 +1,138 @@ +PREHOOK: query: CREATE EXTERNAL TABLE druid_kafka_test_csv(`__time` timestamp , `page` string, `user` string, `language` string, + `country` string,`continent` string, `namespace` string, `newpage` boolean, `unpatrolled` boolean, + `anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) + STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' + TBLPROPERTIES ( + "druid.segment.granularity" = "MONTH", + "druid.query.granularity" = "MINUTE", + "kafka.bootstrap.servers" = "localhost:9092", + "kafka.topic" = "wiki_kafka_csv", + "druid.kafka.ingestion.useEarliestOffset" = "true", + "druid.kafka.ingestion.maxRowsInMemory" = "5", + "druid.kafka.ingestion.startDelay" = "PT1S", + "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.period" = "PT5S", + "druid.kafka.ingestion.consumer.retries" = "2", + "druid.kafka.ingestion.reportParseExceptions" = "true", + "druid.parseSpec.format" = "csv", + "druid.parseSpec.columns" = "__time,page,language,user,unpatrolled,newpage,robot,anonymous,namespace,continent,country,region,city,added,deleted,delta" + ) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@druid_kafka_test_csv +POSTHOOK: query: CREATE EXTERNAL TABLE druid_kafka_test_csv(`__time` timestamp , `page` string, `user` string, `language` string, + `country` string,`continent` string, `namespace` string, `newpage` boolean, `unpatrolled` boolean, + `anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) + STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' + TBLPROPERTIES ( + "druid.segment.granularity" = "MONTH", + "druid.query.granularity" = "MINUTE", + "kafka.bootstrap.servers" = "localhost:9092", + "kafka.topic" = "wiki_kafka_csv", + "druid.kafka.ingestion.useEarliestOffset" = "true", + "druid.kafka.ingestion.maxRowsInMemory" = "5", + "druid.kafka.ingestion.startDelay" = "PT1S", + "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.period" = "PT5S", + "druid.kafka.ingestion.consumer.retries" = "2", + "druid.kafka.ingestion.reportParseExceptions" = "true", + "druid.parseSpec.format" = "csv", + "druid.parseSpec.columns" = "__time,page,language,user,unpatrolled,newpage,robot,anonymous,namespace,continent,country,region,city,added,deleted,delta" + ) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@druid_kafka_test_csv +PREHOOK: query: ALTER TABLE druid_kafka_test_csv SET TBLPROPERTIES('druid.kafka.ingestion' = 'START') +PREHOOK: type: ALTERTABLE_PROPERTIES +PREHOOK: Input: default@druid_kafka_test_csv +PREHOOK: Output: default@druid_kafka_test_csv +POSTHOOK: query: ALTER TABLE druid_kafka_test_csv SET TBLPROPERTIES('druid.kafka.ingestion' = 'START') +POSTHOOK: type: ALTERTABLE_PROPERTIES +POSTHOOK: Input: default@druid_kafka_test_csv +POSTHOOK: Output: default@druid_kafka_test_csv +["default.druid_kafka_test_csv"] +PREHOOK: query: DESCRIBE druid_kafka_test_csv +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@druid_kafka_test_csv +POSTHOOK: query: DESCRIBE druid_kafka_test_csv +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@druid_kafka_test_csv +__time timestamp from deserializer +page string from deserializer +user string from deserializer +language string from deserializer +country string from deserializer +continent string from deserializer +namespace string from deserializer +newpage boolean from deserializer +unpatrolled boolean from deserializer +anonymous boolean from deserializer +robot boolean from deserializer +added int from deserializer +deleted int from deserializer +delta bigint from deserializer +PREHOOK: query: DESCRIBE EXTENDED druid_kafka_test_csv +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@druid_kafka_test_csv +POSTHOOK: query: DESCRIBE EXTENDED druid_kafka_test_csv +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@druid_kafka_test_csv +__time timestamp from deserializer +page string from deserializer +user string from deserializer +language string from deserializer +country string from deserializer +continent string from deserializer +namespace string from deserializer +newpage boolean from deserializer +unpatrolled boolean from deserializer +anonymous boolean from deserializer +robot boolean from deserializer +added int from deserializer +deleted int from deserializer +delta bigint from deserializer + +#### A masked pattern was here #### +StorageHandlerInfo +Druid Storage Handler Runtime Status for default.druid_kafka_test_csv +kafkaPartitions=1 +activeTasks=[] +publishingTasks=[] +#### A masked pattern was here #### +aggregateLag=0 +#### A masked pattern was here #### +PREHOOK: query: Select count(*) FROM druid_kafka_test_csv +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_kafka_test_csv +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select count(*) FROM druid_kafka_test_csv +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_kafka_test_csv +POSTHOOK: Output: hdfs://### HDFS PATH ### +10 +PREHOOK: query: Select page FROM druid_kafka_test_csv +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_kafka_test_csv +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select page FROM druid_kafka_test_csv +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_kafka_test_csv +POSTHOOK: Output: hdfs://### HDFS PATH ### +Gypsy Danger +Striker Eureka +Cherno Alpha +Crimson Typhoon +Coyote Tango +Gypsy Danger +Striker Eureka +Cherno Alpha +Crimson Typhoon +Coyote Tango +PREHOOK: query: DROP TABLE druid_kafka_test_csv +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@druid_kafka_test_csv +PREHOOK: Output: default@druid_kafka_test_csv +POSTHOOK: query: DROP TABLE druid_kafka_test_csv +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@druid_kafka_test_csv +POSTHOOK: Output: default@druid_kafka_test_csv diff --git a/ql/src/test/results/clientpositive/druid/druidkafkamini_delimited.q.out b/ql/src/test/results/clientpositive/druid/druidkafkamini_delimited.q.out new file mode 100644 index 0000000000..f6a417b6c3 --- /dev/null +++ b/ql/src/test/results/clientpositive/druid/druidkafkamini_delimited.q.out @@ -0,0 +1,140 @@ +PREHOOK: query: CREATE EXTERNAL TABLE druid_kafka_test_delimited(`__time` timestamp , `page` string, `user` string, `language` string, + `country` string,`continent` string, `namespace` string, `newpage` boolean, `unpatrolled` boolean, + `anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) + STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' + TBLPROPERTIES ( + "druid.segment.granularity" = "MONTH", + "druid.query.granularity" = "MINUTE", + "kafka.bootstrap.servers" = "localhost:9092", + "kafka.topic" = "wiki_kafka_csv", + "druid.kafka.ingestion.useEarliestOffset" = "true", + "druid.kafka.ingestion.maxRowsInMemory" = "5", + "druid.kafka.ingestion.startDelay" = "PT1S", + "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.period" = "PT5S", + "druid.kafka.ingestion.consumer.retries" = "2", + "druid.kafka.ingestion.reportParseExceptions" = "true", + "druid.parseSpec.format" = "delimited", + "druid.parseSpec.columns" = "__time,page,language,user,unpatrolled,newpage,robot,anonymous,namespace,continent,country,region,city,added,deleted,delta", + "druid.parseSpec.delimiter"="," + ) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@druid_kafka_test_delimited +POSTHOOK: query: CREATE EXTERNAL TABLE druid_kafka_test_delimited(`__time` timestamp , `page` string, `user` string, `language` string, + `country` string,`continent` string, `namespace` string, `newpage` boolean, `unpatrolled` boolean, + `anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) + STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' + TBLPROPERTIES ( + "druid.segment.granularity" = "MONTH", + "druid.query.granularity" = "MINUTE", + "kafka.bootstrap.servers" = "localhost:9092", + "kafka.topic" = "wiki_kafka_csv", + "druid.kafka.ingestion.useEarliestOffset" = "true", + "druid.kafka.ingestion.maxRowsInMemory" = "5", + "druid.kafka.ingestion.startDelay" = "PT1S", + "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.period" = "PT5S", + "druid.kafka.ingestion.consumer.retries" = "2", + "druid.kafka.ingestion.reportParseExceptions" = "true", + "druid.parseSpec.format" = "delimited", + "druid.parseSpec.columns" = "__time,page,language,user,unpatrolled,newpage,robot,anonymous,namespace,continent,country,region,city,added,deleted,delta", + "druid.parseSpec.delimiter"="," + ) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@druid_kafka_test_delimited +PREHOOK: query: ALTER TABLE druid_kafka_test_delimited SET TBLPROPERTIES('druid.kafka.ingestion' = 'START') +PREHOOK: type: ALTERTABLE_PROPERTIES +PREHOOK: Input: default@druid_kafka_test_delimited +PREHOOK: Output: default@druid_kafka_test_delimited +POSTHOOK: query: ALTER TABLE druid_kafka_test_delimited SET TBLPROPERTIES('druid.kafka.ingestion' = 'START') +POSTHOOK: type: ALTERTABLE_PROPERTIES +POSTHOOK: Input: default@druid_kafka_test_delimited +POSTHOOK: Output: default@druid_kafka_test_delimited +["default.druid_kafka_test_delimited"] +PREHOOK: query: DESCRIBE druid_kafka_test_delimited +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@druid_kafka_test_delimited +POSTHOOK: query: DESCRIBE druid_kafka_test_delimited +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@druid_kafka_test_delimited +__time timestamp from deserializer +page string from deserializer +user string from deserializer +language string from deserializer +country string from deserializer +continent string from deserializer +namespace string from deserializer +newpage boolean from deserializer +unpatrolled boolean from deserializer +anonymous boolean from deserializer +robot boolean from deserializer +added int from deserializer +deleted int from deserializer +delta bigint from deserializer +PREHOOK: query: DESCRIBE EXTENDED druid_kafka_test_delimited +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@druid_kafka_test_delimited +POSTHOOK: query: DESCRIBE EXTENDED druid_kafka_test_delimited +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@druid_kafka_test_delimited +__time timestamp from deserializer +page string from deserializer +user string from deserializer +language string from deserializer +country string from deserializer +continent string from deserializer +namespace string from deserializer +newpage boolean from deserializer +unpatrolled boolean from deserializer +anonymous boolean from deserializer +robot boolean from deserializer +added int from deserializer +deleted int from deserializer +delta bigint from deserializer + +#### A masked pattern was here #### +StorageHandlerInfo +Druid Storage Handler Runtime Status for default.druid_kafka_test_delimited +kafkaPartitions=1 +activeTasks=[] +publishingTasks=[] +#### A masked pattern was here #### +aggregateLag=0 +#### A masked pattern was here #### +PREHOOK: query: Select count(*) FROM druid_kafka_test_delimited +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_kafka_test_delimited +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select count(*) FROM druid_kafka_test_delimited +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_kafka_test_delimited +POSTHOOK: Output: hdfs://### HDFS PATH ### +10 +PREHOOK: query: Select page FROM druid_kafka_test_delimited +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_kafka_test_delimited +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select page FROM druid_kafka_test_delimited +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_kafka_test_delimited +POSTHOOK: Output: hdfs://### HDFS PATH ### + "Gypsy Danger" +"Striker Eureka" +"Cherno Alpha" +"Crimson Typhoon" +"Coyote Tango" +"Gypsy Danger" +"Striker Eureka" +"Cherno Alpha" +"Crimson Typhoon" +"Coyote Tango" +PREHOOK: query: DROP TABLE druid_kafka_test_delimited +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@druid_kafka_test_delimited +PREHOOK: Output: default@druid_kafka_test_delimited +POSTHOOK: query: DROP TABLE druid_kafka_test_delimited +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@druid_kafka_test_delimited +POSTHOOK: Output: default@druid_kafka_test_delimited -- 2.17.1 (Apple Git-112)