From 6f5d49eef10eba6ca935f6193f579302d416bbda Mon Sep 17 00:00:00 2001 From: Nishant Date: Wed, 31 Oct 2018 21:28:10 +0530 Subject: [PATCH] [HIVE-19026] Add support for more ingestion formats - Druid Kafka Indexing Service review comments --- .../apache/hadoop/hive/conf/Constants.java | 1 + data/scripts/kafka_init_data.csv | 10 + .../hadoop/hive/druid/DruidKafkaUtils.java | 123 ++++++-- .../hive/druid/DruidStorageHandler.java | 45 +-- .../hive/druid/DruidStorageHandlerUtils.java | 60 ++-- .../hive/druid/conf/DruidConstants.java | 76 +++++ .../hive/druid/io/DruidOutputFormat.java | 13 +- .../druid/io/DruidQueryBasedInputFormat.java | 3 +- .../hive/druid/io/DruidRecordWriter.java | 3 +- .../hive/druid/json/AvroBytesDecoder.java | 37 +++ .../hadoop/hive/druid/json/AvroParseSpec.java | 104 +++++++ .../druid/json/AvroStreamInputRowParser.java | 98 +++++++ .../json/InlineSchemaAvroBytesDecoder.java | 52 ++++ .../serde/DruidGroupByQueryRecordReader.java | 5 +- .../serde/DruidSelectQueryRecordReader.java | 5 +- .../hadoop/hive/druid/serde/DruidSerDe.java | 7 +- .../DruidTimeseriesQueryRecordReader.java | 5 +- .../hive/ql/io/TestDruidRecordWriter.java | 15 +- itests/qtest-druid/pom.xml | 11 + .../apache/hive/druid/MiniDruidCluster.java | 3 +- .../resources/testconfiguration.properties | 3 + .../org/apache/hadoop/hive/ql/QTestUtil.java | 4 + .../clientpositive/druidkafkamini_avro.q | 99 +++++++ .../clientpositive/druidkafkamini_csv.q | 37 +++ .../clientpositive/druidkafkamini_delimited.q | 38 +++ .../druid/druidkafkamini_avro.q.out | 263 ++++++++++++++++++ .../druid/druidkafkamini_csv.q.out | 138 +++++++++ .../druid/druidkafkamini_delimited.q.out | 140 ++++++++++ 28 files changed, 1306 insertions(+), 92 deletions(-) create mode 100644 data/scripts/kafka_init_data.csv create mode 100644 druid-handler/src/java/org/apache/hadoop/hive/druid/conf/DruidConstants.java create mode 100644 druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroBytesDecoder.java create mode 100644 druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroParseSpec.java create mode 100644 druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroStreamInputRowParser.java create mode 100644 druid-handler/src/java/org/apache/hadoop/hive/druid/json/InlineSchemaAvroBytesDecoder.java create mode 100644 ql/src/test/queries/clientpositive/druidkafkamini_avro.q create mode 100644 ql/src/test/queries/clientpositive/druidkafkamini_csv.q create mode 100644 ql/src/test/queries/clientpositive/druidkafkamini_delimited.q create mode 100644 ql/src/test/results/clientpositive/druid/druidkafkamini_avro.q.out create mode 100644 ql/src/test/results/clientpositive/druid/druidkafkamini_csv.q.out create mode 100644 ql/src/test/results/clientpositive/druid/druidkafkamini_delimited.q.out diff --git a/common/src/java/org/apache/hadoop/hive/conf/Constants.java b/common/src/java/org/apache/hadoop/hive/conf/Constants.java index 44d0717f03..ee954d9aac 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/Constants.java +++ b/common/src/java/org/apache/hadoop/hive/conf/Constants.java @@ -32,6 +32,7 @@ "org.apache.hadoop.hive.druid.io.DruidOutputFormat"; public static final String DRUID_DATA_SOURCE = "druid.datasource"; public static final String DRUID_SEGMENT_GRANULARITY = "druid.segment.granularity"; + public static final String DRUID_TARGET_SHARDS_PER_GRANULARITY = "druid.segment.targetShardsPerGranularity"; public static final String DRUID_TIMESTAMP_GRANULARITY_COL_NAME = "__time_granularity"; diff --git a/data/scripts/kafka_init_data.csv b/data/scripts/kafka_init_data.csv new file mode 100644 index 0000000000..5dc094ed21 --- /dev/null +++ b/data/scripts/kafka_init_data.csv @@ -0,0 +1,10 @@ +"2013-08-31T01:02:33Z", "Gypsy Danger","en","nuclear","true","true","false","false","article","North America","United States","Bay Area","San Francisco",57,200,-143 +"2013-08-31T03:32:45Z","Striker Eureka","en","speed","false","true","true","false","wikipedia","Australia","Australia","Cantebury","Syndey",459,129,330 +"2013-08-31T07:11:21Z","Cherno Alpha","ru","masterYi","false","true","true","false","article","Asia","Russia","Oblast","Moscow",123,12,111 +"2013-08-31T11:58:39Z","Crimson Typhoon","zh","triplets","true","false","true","false","wikipedia","Asia","China","Shanxi","Taiyuan",905,5,900 +"2013-08-31T12:41:27Z","Coyote Tango","ja","stringer","true","false","true","false","wikipedia","Asia","Japan","Kanto","Tokyo",1,10,-9 +"2013-09-01T01:02:33Z","Gypsy Danger","en","nuclear","true","true","false","false","article","North America","United States","Bay Area","San Francisco",57,200,-143 +"2013-09-01T03:32:45Z","Striker Eureka","en","speed","false","true","true","false","wikipedia","Australia","Australia","Cantebury","Syndey",459,129,330 +"2013-09-01T07:11:21Z","Cherno Alpha","ru","masterYi","false","true","true","false","article","Asia","Russia","Oblast","Moscow",123,12,111 +"2013-09-01T11:58:39Z","Crimson Typhoon","zh","triplets","true","false","true","false","wikipedia","Asia","China","Shanxi","Taiyuan",905,5,900 +"2013-09-01T12:41:27Z","Coyote Tango","ja","stringer","true","false","true","false","wikipedia","Asia","Japan","Kanto","Tokyo",1,10,-9 \ No newline at end of file diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidKafkaUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidKafkaUtils.java index b99c33375c..e0e29a3c6d 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidKafkaUtils.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidKafkaUtils.java @@ -18,12 +18,25 @@ package org.apache.hadoop.hive.druid; +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; +import io.druid.data.input.impl.CSVParseSpec; +import io.druid.data.input.impl.DelimitedParseSpec; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.InputRowParser; +import io.druid.data.input.impl.JSONParseSpec; +import io.druid.data.input.impl.StringInputRowParser; +import io.druid.data.input.impl.TimestampSpec; import io.druid.java.util.http.client.Request; import io.druid.java.util.http.client.response.FullResponseHandler; import io.druid.java.util.http.client.response.FullResponseHolder; import io.druid.segment.IndexSpec; import io.druid.segment.indexing.DataSchema; +import org.apache.hadoop.hive.druid.conf.DruidConstants; +import org.apache.hadoop.hive.druid.json.AvroParseSpec; +import org.apache.hadoop.hive.druid.json.AvroStreamInputRowParser; +import org.apache.hadoop.hive.druid.json.InlineSchemaAvroBytesDecoder; import org.apache.hadoop.hive.druid.json.KafkaSupervisorIOConfig; import org.apache.hadoop.hive.druid.json.KafkaSupervisorSpec; import org.apache.hadoop.hive.druid.json.KafkaSupervisorTuningConfig; @@ -59,60 +72,60 @@ static KafkaSupervisorSpec createKafkaSupervisorSpec(Table table, IndexSpec indexSpec) { return new KafkaSupervisorSpec(dataSchema, new KafkaSupervisorTuningConfig(DruidStorageHandlerUtils.getIntegerProperty(table, - DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxRowsInMemory"), + DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxRowsInMemory"), DruidStorageHandlerUtils.getIntegerProperty(table, - DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxRowsPerSegment"), + DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxRowsPerSegment"), DruidStorageHandlerUtils.getPeriodProperty(table, - DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "intermediatePersistPeriod"), + DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "intermediatePersistPeriod"), null, // basePersistDirectory - use druid default, no need to be configured by user DruidStorageHandlerUtils.getIntegerProperty(table, - DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxPendingPersists"), + DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxPendingPersists"), indexSpec, null, // buildV9Directly - use druid default, no need to be configured by user DruidStorageHandlerUtils.getBooleanProperty(table, - DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "reportParseExceptions"), + DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "reportParseExceptions"), DruidStorageHandlerUtils.getLongProperty(table, - DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "handoffConditionTimeout"), + DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "handoffConditionTimeout"), DruidStorageHandlerUtils.getBooleanProperty(table, - DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "resetOffsetAutomatically"), + DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "resetOffsetAutomatically"), DruidStorageHandlerUtils.getIntegerProperty(table, - DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "workerThreads"), + DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "workerThreads"), DruidStorageHandlerUtils.getIntegerProperty(table, - DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "chatThreads"), + DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "chatThreads"), DruidStorageHandlerUtils.getLongProperty(table, - DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "chatRetries"), + DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "chatRetries"), DruidStorageHandlerUtils.getPeriodProperty(table, - DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "httpTimeout"), + DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "httpTimeout"), DruidStorageHandlerUtils.getPeriodProperty(table, - DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "shutdownTimeout"), + DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "shutdownTimeout"), DruidStorageHandlerUtils.getPeriodProperty(table, - DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "offsetFetchPeriod")), + DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "offsetFetchPeriod")), new KafkaSupervisorIOConfig(kafkaTopic, // Mandatory Property DruidStorageHandlerUtils.getIntegerProperty(table, - DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "replicas"), + DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "replicas"), DruidStorageHandlerUtils.getIntegerProperty(table, - DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "taskCount"), + DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "taskCount"), DruidStorageHandlerUtils.getPeriodProperty(table, - DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "taskDuration"), + DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "taskDuration"), getKafkaConsumerProperties(table, kafkaServers), // Mandatory Property DruidStorageHandlerUtils.getPeriodProperty(table, - DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "startDelay"), + DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "startDelay"), DruidStorageHandlerUtils.getPeriodProperty(table, - DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "period"), + DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "period"), DruidStorageHandlerUtils.getBooleanProperty(table, - DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "useEarliestOffset"), + DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "useEarliestOffset"), DruidStorageHandlerUtils.getPeriodProperty(table, - DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "completionTimeout"), + DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "completionTimeout"), DruidStorageHandlerUtils.getPeriodProperty(table, - DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "lateMessageRejectionPeriod"), + DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "lateMessageRejectionPeriod"), DruidStorageHandlerUtils.getPeriodProperty(table, - DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "earlyMessageRejectionPeriod"), + DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "earlyMessageRejectionPeriod"), DruidStorageHandlerUtils.getBooleanProperty(table, - DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "skipOffsetGaps")), + DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "skipOffsetGaps")), new HashMap<>()); } @@ -120,10 +133,10 @@ static KafkaSupervisorSpec createKafkaSupervisorSpec(Table table, ImmutableMap.Builder builder = ImmutableMap.builder(); builder.put(KafkaSupervisorIOConfig.BOOTSTRAP_SERVERS_KEY, kafkaServers); for (Map.Entry entry : table.getParameters().entrySet()) { - if (entry.getKey().startsWith(DruidStorageHandlerUtils.DRUID_KAFKA_CONSUMER_PROPERTY_PREFIX)) { + if (entry.getKey().startsWith(DruidConstants.DRUID_KAFKA_CONSUMER_PROPERTY_PREFIX)) { String propertyName = - entry.getKey().substring(DruidStorageHandlerUtils.DRUID_KAFKA_CONSUMER_PROPERTY_PREFIX.length()); + entry.getKey().substring(DruidConstants.DRUID_KAFKA_CONSUMER_PROPERTY_PREFIX.length()); builder.put(propertyName, entry.getValue()); } } @@ -162,6 +175,64 @@ static void updateKafkaIngestionSpec(String overlordAddress, KafkaSupervisorSpec static boolean isKafkaStreamingTable(Table table) { // For kafka Streaming tables it is mandatory to set a kafka topic. - return DruidStorageHandlerUtils.getTableProperty(table, DruidStorageHandlerUtils.KAFKA_TOPIC) != null; + return DruidStorageHandlerUtils.getTableProperty(table, DruidConstants.KAFKA_TOPIC) != null; + } + + static InputRowParser getInputRowParser(Table table, + TimestampSpec timestampSpec, + DimensionsSpec dimensionsSpec + ) { + String parseSpecFormat = DruidStorageHandlerUtils.getTableProperty(table, DruidConstants.DRUID_PARSE_SPEC_FORMAT); + + // Default case JSON + if(parseSpecFormat == null || parseSpecFormat.equalsIgnoreCase("json")) { + return new StringInputRowParser( + new JSONParseSpec(timestampSpec, + dimensionsSpec, + null, + null + ), "UTF-8"); + } else if(parseSpecFormat.equalsIgnoreCase("csv")){ + return new StringInputRowParser( + new CSVParseSpec( + timestampSpec, + dimensionsSpec, + DruidStorageHandlerUtils.getTableProperty(table, DruidConstants.DRUID_PARSE_SPEC_LIST_DELIMITER), + DruidStorageHandlerUtils.getListProperty(table, DruidConstants.DRUID_PARSE_SPEC_COLUMNS), + DruidStorageHandlerUtils.getBooleanProperty(table, DruidConstants.DRUID_PARSE_SPEC_HAS_HEADER_ROWS, false), + DruidStorageHandlerUtils.getIntegerProperty(table, DruidConstants.DRUID_PARSE_SPEC_SKIP_HEADER_ROWS, 0) + ), "UTF-8"); + } else if (parseSpecFormat.equalsIgnoreCase("delimited")){ + return new StringInputRowParser( + new DelimitedParseSpec( + timestampSpec, + dimensionsSpec, + DruidStorageHandlerUtils.getTableProperty(table, DruidConstants.DRUID_PARSE_SPEC_DELIMITER), + DruidStorageHandlerUtils.getTableProperty(table, DruidConstants.DRUID_PARSE_SPEC_LIST_DELIMITER), + DruidStorageHandlerUtils.getListProperty(table, DruidConstants.DRUID_PARSE_SPEC_COLUMNS), + DruidStorageHandlerUtils.getBooleanProperty(table, DruidConstants.DRUID_PARSE_SPEC_HAS_HEADER_ROWS, false), + DruidStorageHandlerUtils.getIntegerProperty(table, DruidConstants.DRUID_PARSE_SPEC_SKIP_HEADER_ROWS, 0) + ), "UTF-8"); + } else if(parseSpecFormat.equalsIgnoreCase("avro")) { + try { + String avroSchemaLiteral = DruidStorageHandlerUtils.getTableProperty(table, DruidConstants.AVRO_SCHEMA_LITERAL); + Preconditions.checkNotNull(avroSchemaLiteral, + "Please specify avro schema literal when using avro parser" + ); + Map avroSchema = JSON_MAPPER + .readValue(avroSchemaLiteral, new TypeReference>() { + }); + return new AvroStreamInputRowParser(new AvroParseSpec( + timestampSpec, + dimensionsSpec, + null + ), new InlineSchemaAvroBytesDecoder(avroSchema)); + } catch (Exception e) { + throw new IllegalStateException("Exception while creating avro schema", e); + } + } + + throw new IllegalArgumentException("Invalid parse spec format [" + parseSpecFormat+"]. " + + "Supported types are : json, csv, tsv, avro"); } } diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java index 1c52ae6119..7434559532 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java @@ -30,8 +30,6 @@ import io.druid.data.input.impl.DimensionSchema; import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.InputRowParser; -import io.druid.data.input.impl.JSONParseSpec; -import io.druid.data.input.impl.StringInputRowParser; import io.druid.data.input.impl.TimestampSpec; import io.druid.java.util.common.Pair; import io.druid.java.util.common.RetryUtils; @@ -66,6 +64,7 @@ import org.apache.hadoop.hive.common.TableName; import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.druid.conf.DruidConstants; import org.apache.hadoop.hive.druid.io.DruidOutputFormat; import org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat; import org.apache.hadoop.hive.druid.io.DruidRecordWriter; @@ -259,11 +258,11 @@ private void updateKafkaIngestion(Table table) { final String kafkaTopic = Preconditions.checkNotNull(DruidStorageHandlerUtils.getTableProperty(table, - DruidStorageHandlerUtils.KAFKA_TOPIC), "kafka topic is null"); + DruidConstants.KAFKA_TOPIC), "kafka topic is null"); final String kafkaServers = Preconditions.checkNotNull(DruidStorageHandlerUtils.getTableProperty(table, - DruidStorageHandlerUtils.KAFKA_BOOTSTRAP_SERVERS), "kafka connect string is null"); + DruidConstants.KAFKA_BOOTSTRAP_SERVERS), "kafka connect string is null"); Properties tableProperties = new Properties(); tableProperties.putAll(table.getParameters()); @@ -282,18 +281,26 @@ private void updateKafkaIngestion(Table table) { Pair, AggregatorFactory[]> dimensionsAndAggregates = DruidStorageHandlerUtils.getDimensionsAndAggregates(columnNames, columnTypes); - if (!columnNames.contains(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN)) { + if (!columnNames.contains(DruidConstants.DEFAULT_TIMESTAMP_COLUMN)) { throw new IllegalStateException("Timestamp column (' " - + DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN + + DruidConstants.DEFAULT_TIMESTAMP_COLUMN + "') not specified in create table; list of columns is : " + columnNames); } - final InputRowParser - inputRowParser = - new StringInputRowParser(new JSONParseSpec(new TimestampSpec(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, - "auto", - null), new DimensionsSpec(dimensionsAndAggregates.lhs, null, null), null, null), "UTF-8"); + DimensionsSpec dimensionsSpec = new DimensionsSpec(dimensionsAndAggregates.lhs, null, null); + String timestampFormat = DruidStorageHandlerUtils + .getTableProperty(table, DruidConstants.DRUID_TIMESTAMP_FORMAT); + String timestampColumnName = DruidStorageHandlerUtils + .getTableProperty(table, DruidConstants.DRUID_TIMESTAMP_COLUMN); + if(timestampColumnName == null) { + timestampColumnName = DruidConstants.DEFAULT_TIMESTAMP_COLUMN; + } + final TimestampSpec timestampSpec = new TimestampSpec(timestampColumnName, timestampFormat, + null + ); + final InputRowParser inputRowParser = DruidKafkaUtils + .getInputRowParser(table, timestampSpec, dimensionsSpec); final Map inputParser = @@ -318,7 +325,7 @@ private void updateKafkaIngestion(Table table) { KafkaSupervisorSpec existingSpec = fetchKafkaIngestionSpec(table); String targetState = - DruidStorageHandlerUtils.getTableProperty(table, DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION); + DruidStorageHandlerUtils.getTableProperty(table, DruidConstants.DRUID_KAFKA_INGESTION); if (targetState == null) { // Case when user has not specified any ingestion state in the current command // if there is a kafka supervisor running then keep it last known state is START otherwise STOP. @@ -342,10 +349,10 @@ private void updateKafkaIngestion(Table table) { } else { throw new IllegalArgumentException(String.format( "Invalid value for property [%s], Valid values are [START, STOP, RESET]", - DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION)); + DruidConstants.DRUID_KAFKA_INGESTION)); } // We do not want to keep state in two separate places so remove from hive table properties. - table.getParameters().remove(DruidStorageHandlerUtils.DRUID_KAFKA_INGESTION); + table.getParameters().remove(DruidConstants.DRUID_KAFKA_INGESTION); } private void resetKafkaIngestion(String overlordAddress, String dataSourceName) { @@ -490,8 +497,8 @@ private KafkaSupervisorSpec fetchKafkaIngestionSpec(Table table) { final String dataSourceName = table.getParameters().get(Constants.DRUID_DATA_SOURCE); final String segmentDirectory = - table.getParameters().get(DruidStorageHandlerUtils.DRUID_SEGMENT_DIRECTORY) != null ? - table.getParameters().get(DruidStorageHandlerUtils.DRUID_SEGMENT_DIRECTORY) : + table.getParameters().get(DruidConstants.DRUID_SEGMENT_DIRECTORY) != null ? + table.getParameters().get(DruidConstants.DRUID_SEGMENT_DIRECTORY) : HiveConf.getVar(getConf(), HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY); final HdfsDataSegmentPusherConfig hdfsSegmentPusherConfig = new HdfsDataSegmentPusherConfig(); @@ -745,10 +752,10 @@ private static boolean safeNonRecursiveDelete(FileSystem fs, Path path) { @Override public void configureOutputJobProperties(TableDesc tableDesc, Map jobProperties) { jobProperties.put(Constants.DRUID_DATA_SOURCE, tableDesc.getTableName()); - jobProperties.put(DruidStorageHandlerUtils.DRUID_SEGMENT_VERSION, new DateTime().toString()); - jobProperties.put(DruidStorageHandlerUtils.DRUID_JOB_WORKING_DIRECTORY, getStagingWorkingDir().toString()); + jobProperties.put(DruidConstants.DRUID_SEGMENT_VERSION, new DateTime().toString()); + jobProperties.put(DruidConstants.DRUID_JOB_WORKING_DIRECTORY, getStagingWorkingDir().toString()); // DruidOutputFormat will write segments in an intermediate directory - jobProperties.put(DruidStorageHandlerUtils.DRUID_SEGMENT_INTERMEDIATE_DIRECTORY, + jobProperties.put(DruidConstants.DRUID_SEGMENT_INTERMEDIATE_DIRECTORY, getIntermediateSegmentDir().toString()); } diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java index 8fcadea89c..6dc97d53b7 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java @@ -92,6 +92,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.druid.conf.DruidConstants; +import org.apache.hadoop.hive.druid.json.AvroParseSpec; +import org.apache.hadoop.hive.druid.json.AvroStreamInputRowParser; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; @@ -152,28 +155,12 @@ private DruidStorageHandlerUtils () { private static final Logger LOG = LoggerFactory.getLogger(DruidStorageHandlerUtils.class); - private static final String DRUID_ROLLUP = "druid.rollup"; - private static final String DRUID_QUERY_GRANULARITY = "druid.query.granularity"; - public static final String DRUID_QUERY_FETCH = "druid.query.fetch"; - static final String DRUID_SEGMENT_DIRECTORY = "druid.storage.storageDirectory"; - public static final String DRUID_SEGMENT_INTERMEDIATE_DIRECTORY = "druid.storage.storageDirectory.intermediate"; - public static final String DRUID_SEGMENT_VERSION = "druid.segment.version"; - public static final String DRUID_JOB_WORKING_DIRECTORY = "druid.job.workingDirectory"; - static final String KAFKA_TOPIC = "kafka.topic"; - static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; - static final String DRUID_KAFKA_INGESTION_PROPERTY_PREFIX = "druid.kafka.ingestion."; - static final String DRUID_KAFKA_CONSUMER_PROPERTY_PREFIX = DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "consumer."; - /* Kafka Ingestion state - valid values - START/STOP/RESET */ - static final String DRUID_KAFKA_INGESTION = "druid.kafka.ingestion"; private static final int NUM_RETRIES = 8; private static final int SECONDS_BETWEEN_RETRIES = 2; private static final int DEFAULT_FS_BUFFER_SIZE = 1 << 18; // 256KB private static final int DEFAULT_STREAMING_RESULT_SIZE = 100; private static final String SMILE_CONTENT_TYPE = "application/x-jackson-smile"; - //Druid storage timestamp column name - public static final String DEFAULT_TIMESTAMP_COLUMN = "__time"; - //Druid Json timestamp column name - public static final String EVENT_TIMESTAMP_COLUMN = "timestamp"; + static final String INDEX_ZIP = "index.zip"; private static final String DESCRIPTOR_JSON = "descriptor.json"; private static final Interval @@ -218,6 +205,10 @@ private DruidStorageHandlerUtils () { // Register the shard sub type to be used by the mapper JSON_MAPPER.registerSubtypes(new NamedType(LinearShardSpec.class, "linear")); JSON_MAPPER.registerSubtypes(new NamedType(NumberedShardSpec.class, "numbered")); + JSON_MAPPER.registerSubtypes(new NamedType(AvroParseSpec.class, "avro")); + SMILE_MAPPER.registerSubtypes(new NamedType(AvroParseSpec.class, "avro")); + JSON_MAPPER.registerSubtypes(new NamedType(AvroStreamInputRowParser.class, "avro_stream")); + SMILE_MAPPER.registerSubtypes(new NamedType(AvroStreamInputRowParser.class, "avro_stream")); // set the timezone of the object mapper // THIS IS NOT WORKING workaround is to set it as part of java opts -Duser.timezone="UTC" JSON_MAPPER.setTimeZone(TimeZone.getTimeZone("UTC")); @@ -628,6 +619,11 @@ public static String createScanAllQuery(String dataSourceName, List colu return Boolean.parseBoolean(val); } + static boolean getBooleanProperty(Table table, String propertyName, boolean defaultVal) { + Boolean val = getBooleanProperty(table, propertyName); + return val == null ? defaultVal : val; + } + @Nullable static Integer getIntegerProperty(Table table, String propertyName) { String val = getTableProperty(table, propertyName); if (val == null) { @@ -642,6 +638,11 @@ public static String createScanAllQuery(String dataSourceName, List colu } } + static int getIntegerProperty(Table table, String propertyName, int defaultVal) { + Integer val = getIntegerProperty(table, propertyName); + return val == null ? defaultVal : val; + } + @Nullable static Long getLongProperty(Table table, String propertyName) { String val = getTableProperty(table, propertyName); if (val == null) { @@ -670,6 +671,21 @@ public static String createScanAllQuery(String dataSourceName, List colu } } + @Nullable public static List getListProperty(Table table, String propertyName) { + List rv = new ArrayList(); + String values = getTableProperty(table, propertyName); + if(values == null) { + return null; + } + String[] vals = values.trim().split(","); + for(String val : vals) { + if(org.apache.commons.lang.StringUtils.isNotBlank(val)) { + rv.add(val); + } + } + return rv; + } + static String getTableProperty(Table table, String propertyName) { return table.getParameters().get(propertyName); } @@ -799,13 +815,13 @@ public static GranularitySpec getGranularitySpec(Configuration configuration, Pr HiveConf.getVar(configuration, HiveConf.ConfVars.HIVE_DRUID_INDEXING_GRANULARITY); final boolean rollup = - tableProperties.getProperty(DRUID_ROLLUP) != null ? + tableProperties.getProperty(DruidConstants.DRUID_ROLLUP) != null ? Boolean.parseBoolean(tableProperties.getProperty(Constants.DRUID_SEGMENT_GRANULARITY)) : HiveConf.getBoolVar(configuration, HiveConf.ConfVars.HIVE_DRUID_ROLLUP); return new UniformGranularitySpec(Granularity.fromString(segmentGranularity), - Granularity.fromString(tableProperties.getProperty(DRUID_QUERY_GRANULARITY) == null ? + Granularity.fromString(tableProperties.getProperty(DruidConstants.DRUID_QUERY_GRANULARITY) == null ? "NONE" : - tableProperties.getProperty(DRUID_QUERY_GRANULARITY)), + tableProperties.getProperty(DruidConstants.DRUID_QUERY_GRANULARITY)), rollup, null); } @@ -853,7 +869,7 @@ public static IndexSpec getIndexSpec(Configuration jc) { // Granularity column String tColumnName = columnNames.get(i); if (!tColumnName.equals(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME) && !tColumnName.equals( - DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN)) { + DruidConstants.DEFAULT_TIMESTAMP_COLUMN)) { throw new IllegalArgumentException("Dimension " + tColumnName + " does not have STRING type: " @@ -863,7 +879,7 @@ public static IndexSpec getIndexSpec(Configuration jc) { case TIMESTAMPLOCALTZ: // Druid timestamp column String tLocalTZColumnName = columnNames.get(i); - if (!tLocalTZColumnName.equals(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN)) { + if (!tLocalTZColumnName.equals(DruidConstants.DEFAULT_TIMESTAMP_COLUMN)) { throw new IllegalArgumentException("Dimension " + tLocalTZColumnName + " does not have STRING type: " diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/conf/DruidConstants.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/conf/DruidConstants.java new file mode 100644 index 0000000000..242f7be4dd --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/conf/DruidConstants.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.druid.conf; + +public class DruidConstants { + + public static final String DRUID_QUERY_FETCH = "druid.query.fetch"; + + public static final String DRUID_ROLLUP = "druid.rollup"; + + public static final String DRUID_QUERY_GRANULARITY = "druid.query.granularity"; + + public static final String DRUID_SEGMENT_DIRECTORY = "druid.storage.storageDirectory"; + + public static final String DRUID_SEGMENT_INTERMEDIATE_DIRECTORY = "druid.storage.storageDirectory.intermediate"; + + public static final String DRUID_SEGMENT_VERSION = "druid.segment.version"; + + public static final String DRUID_JOB_WORKING_DIRECTORY = "druid.job.workingDirectory"; + + public static final String KAFKA_TOPIC = "kafka.topic"; + + public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; + + public static final String DRUID_KAFKA_INGESTION_PROPERTY_PREFIX = "druid.kafka.ingestion."; + + public static final String DRUID_KAFKA_CONSUMER_PROPERTY_PREFIX = DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "consumer."; + + /* Kafka Ingestion state - valid values - START/STOP/RESET */ + public static final String DRUID_KAFKA_INGESTION = "druid.kafka.ingestion"; + + //Druid storage timestamp column name + public static final String DEFAULT_TIMESTAMP_COLUMN = "__time"; + + public static final String DRUID_TIMESTAMP_FORMAT = "druid.timestamp.format"; + + // Used when the field name in ingested data via streaming ingestion does not match + // druid default timestamp column i.e `__time` + public static final String DRUID_TIMESTAMP_COLUMN = "druid.timestamp.column"; + + //Druid Json timestamp column name for GroupBy results + public static final String EVENT_TIMESTAMP_COLUMN = "timestamp"; + + // Druid ParseSpec Type - JSON/CSV/TSV/AVRO + public static final String DRUID_PARSE_SPEC_FORMAT = "druid.parseSpec.format"; + + public static final String AVRO_SCHEMA_LITERAL = "avro.schema.literal"; + + // value delimiter for druid columns + public static final String DRUID_PARSE_SPEC_DELIMITER = "druid.parseSpec.delimiter"; + + // list demiliter for multi-valued columns + public static final String DRUID_PARSE_SPEC_LIST_DELIMITER = "druid.parseSpec.listDelimiter"; + + // order of columns for delimiter and csv parse specs. + public static final String DRUID_PARSE_SPEC_COLUMNS = "druid.parseSpec.columns"; + + public static final String DRUID_PARSE_SPEC_SKIP_HEADER_ROWS = "druid.parseSpec.skipHeaderRows"; + + public static final String DRUID_PARSE_SPEC_HAS_HEADER_ROWS = "druid.parseSpec.hasHeaderRows"; +} diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java index 862d7ca3dc..f0f039a347 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; +import org.apache.hadoop.hive.druid.conf.DruidConstants; import org.apache.hadoop.hive.druid.serde.DruidWritable; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; @@ -95,7 +96,7 @@ final String dataSource = tableProperties.getProperty(Constants.DRUID_DATA_SOURCE) == null ? jc.get(Constants.DRUID_DATA_SOURCE) : tableProperties.getProperty(Constants.DRUID_DATA_SOURCE); - final String segmentDirectory = jc.get(DruidStorageHandlerUtils.DRUID_SEGMENT_INTERMEDIATE_DIRECTORY); + final String segmentDirectory = jc.get(DruidConstants.DRUID_SEGMENT_INTERMEDIATE_DIRECTORY); final GranularitySpec granularitySpec = DruidStorageHandlerUtils.getGranularitySpec(jc, tableProperties); @@ -109,8 +110,8 @@ )); } ArrayList columnNames = Lists.newArrayList(columnNameProperty.split(",")); - if (!columnNames.contains(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN)) { - throw new IllegalStateException("Timestamp column (' " + DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN + + if (!columnNames.contains(DruidConstants.DEFAULT_TIMESTAMP_COLUMN)) { + throw new IllegalStateException("Timestamp column (' " + DruidConstants.DEFAULT_TIMESTAMP_COLUMN + "') not specified in create table; list of columns is : " + tableProperties.getProperty(serdeConstants.LIST_COLUMNS)); } @@ -119,7 +120,7 @@ Pair, AggregatorFactory[]> dimensionsAndAggregates = DruidStorageHandlerUtils .getDimensionsAndAggregates(columnNames, columnTypes); final InputRowParser inputRowParser = new MapInputRowParser(new TimeAndDimsParseSpec( - new TimestampSpec(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, "auto", null), + new TimestampSpec(DruidConstants.DEFAULT_TIMESTAMP_COLUMN, "auto", null), new DimensionsSpec(dimensionsAndAggregates.lhs, Lists .newArrayList(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME, Constants.DRUID_SHARD_KEY_COL_NAME @@ -141,8 +142,8 @@ DruidStorageHandlerUtils.JSON_MAPPER ); - final String workingPath = jc.get(DruidStorageHandlerUtils.DRUID_JOB_WORKING_DIRECTORY); - final String version = jc.get(DruidStorageHandlerUtils.DRUID_SEGMENT_VERSION); + final String workingPath = jc.get(DruidConstants.DRUID_JOB_WORKING_DIRECTORY); + final String version = jc.get(DruidConstants.DRUID_SEGMENT_VERSION); String basePersistDirectory = HiveConf .getVar(jc, HiveConf.ConfVars.HIVE_DRUID_BASE_PERSIST_DIRECTORY); if (Strings.isNullOrEmpty(basePersistDirectory)) { diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java index ff766c4dd0..c1e0e75f98 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.druid.DruidStorageHandler; import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; +import org.apache.hadoop.hive.druid.conf.DruidConstants; import org.apache.hadoop.hive.druid.serde.DruidGroupByQueryRecordReader; import org.apache.hadoop.hive.druid.serde.DruidQueryRecordReader; import org.apache.hadoop.hive.druid.serde.DruidScanQueryRecordReader; @@ -173,7 +174,7 @@ private static HiveDruidSplit[] distributeSelectQuery(String address, SelectQuery query, Path dummyPath) throws IOException { // If it has a limit, we use it and we do not distribute the query - final boolean isFetch = query.getContextBoolean(DruidStorageHandlerUtils.DRUID_QUERY_FETCH, false); + final boolean isFetch = query.getContextBoolean(DruidConstants.DRUID_QUERY_FETCH, false); if (isFetch) { return new HiveDruidSplit[] {new HiveDruidSplit(DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath, diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java index 671b8cf9e5..65edc665a3 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java @@ -45,6 +45,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; +import org.apache.hadoop.hive.druid.conf.DruidConstants; import org.apache.hadoop.hive.druid.serde.DruidWritable; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.io.NullWritable; @@ -215,7 +216,7 @@ private void pushSegments(List segmentsToPush) { @Override public void write(Writable w) throws IOException { DruidWritable record = (DruidWritable) w; - final long timestamp = (long) record.getValue().get(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN); + final long timestamp = (long) record.getValue().get(DruidConstants.DEFAULT_TIMESTAMP_COLUMN); final int partitionNumber = Math.toIntExact((long) record.getValue().getOrDefault(Constants.DRUID_SHARD_KEY_COL_NAME, -1L)); diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroBytesDecoder.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroBytesDecoder.java new file mode 100644 index 0000000000..3a1dbf7229 --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroBytesDecoder.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.druid.json; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.avro.generic.GenericRecord; + +import java.nio.ByteBuffer; + +/** + * This class is copied from druid source code + * in order to avoid adding additional dependencies on druid-indexing-service. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "schema_inline", value = InlineSchemaAvroBytesDecoder.class) +}) +public interface AvroBytesDecoder +{ +} diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroParseSpec.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroParseSpec.java new file mode 100644 index 0000000000..af71f9a732 --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroParseSpec.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.druid.json; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.ParseSpec; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.java.util.common.parsers.JSONPathSpec; +import io.druid.java.util.common.parsers.Parser; + +import java.util.Objects; + +/** + * This class is copied from druid source code + * in order to avoid adding additional dependencies on druid-indexing-service. + */ +public class AvroParseSpec extends ParseSpec +{ + + @JsonIgnore + private final JSONPathSpec flattenSpec; + + @JsonCreator + public AvroParseSpec( + @JsonProperty("timestampSpec") TimestampSpec timestampSpec, + @JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec, + @JsonProperty("flattenSpec") JSONPathSpec flattenSpec + ) + { + super( + timestampSpec != null ? timestampSpec : new TimestampSpec(null, null, null), + dimensionsSpec != null ? dimensionsSpec : new DimensionsSpec(null, null, null) + ); + + this.flattenSpec = flattenSpec != null ? flattenSpec : JSONPathSpec.DEFAULT; + } + + @JsonProperty + public JSONPathSpec getFlattenSpec() + { + return flattenSpec; + } + + @Override + public Parser makeParser() + { + // makeParser is only used by StringInputRowParser, which cannot parse avro anyway. + throw new UnsupportedOperationException("makeParser not supported"); + } + + @Override + public ParseSpec withTimestampSpec(TimestampSpec spec) + { + return new AvroParseSpec(spec, getDimensionsSpec(), flattenSpec); + } + + @Override + public ParseSpec withDimensionsSpec(DimensionsSpec spec) + { + return new AvroParseSpec(getTimestampSpec(), spec, flattenSpec); + } + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + final AvroParseSpec that = (AvroParseSpec) o; + return Objects.equals(flattenSpec, that.flattenSpec); + } + + @Override + public int hashCode() + { + return Objects.hash(super.hashCode(), flattenSpec); + } +} diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroStreamInputRowParser.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroStreamInputRowParser.java new file mode 100644 index 0000000000..d6e6624669 --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroStreamInputRowParser.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.druid.json; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import io.druid.data.input.ByteBufferInputRowParser; +import io.druid.data.input.InputRow; +import io.druid.data.input.impl.ParseSpec; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Objects; + +/** + * This class is copied from druid source code + * in order to avoid adding additional dependencies on druid-indexing-service. + */ +public class AvroStreamInputRowParser implements ByteBufferInputRowParser +{ + private final ParseSpec parseSpec; + private final AvroBytesDecoder avroBytesDecoder; + + @JsonCreator + public AvroStreamInputRowParser( + @JsonProperty("parseSpec") ParseSpec parseSpec, + @JsonProperty("avroBytesDecoder") AvroBytesDecoder avroBytesDecoder + ) + { + this.parseSpec = Preconditions.checkNotNull(parseSpec, "parseSpec"); + this.avroBytesDecoder = Preconditions.checkNotNull(avroBytesDecoder, "avroBytesDecoder"); + } + + @Override + public List parseBatch(ByteBuffer input) + { + throw new UnsupportedOperationException("This class is only used for JSON serde"); + } + + @JsonProperty + @Override + public ParseSpec getParseSpec() + { + return parseSpec; + } + + @JsonProperty + public AvroBytesDecoder getAvroBytesDecoder() + { + return avroBytesDecoder; + } + + @Override + public ByteBufferInputRowParser withParseSpec(ParseSpec parseSpec) + { + return new AvroStreamInputRowParser( + parseSpec, + avroBytesDecoder + ); + } + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final AvroStreamInputRowParser that = (AvroStreamInputRowParser) o; + return Objects.equals(parseSpec, that.parseSpec) && + Objects.equals(avroBytesDecoder, that.avroBytesDecoder); + } + + @Override + public int hashCode() + { + return Objects.hash(parseSpec, avroBytesDecoder); + } +} diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/InlineSchemaAvroBytesDecoder.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/InlineSchemaAvroBytesDecoder.java new file mode 100644 index 0000000000..72d6cbbc1e --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/InlineSchemaAvroBytesDecoder.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.druid.json; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; + +import java.util.Map; + +/** + * This class is copied from druid source code + * in order to avoid adding additional dependencies on druid-indexing-service. + */ +public class InlineSchemaAvroBytesDecoder implements AvroBytesDecoder +{ + private final Map schema; + + @JsonCreator + public InlineSchemaAvroBytesDecoder( + @JsonProperty("schema") Map schema + ) + { + Preconditions.checkArgument(schema != null, "schema must be provided"); + + this.schema = schema; + } + + @JsonProperty + public Map getSchema() + { + return schema; + } + +} diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java index 521973e330..9efa6f6bf6 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java @@ -22,6 +22,7 @@ import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; +import org.apache.hadoop.hive.druid.conf.DruidConstants; import org.apache.hadoop.io.NullWritable; import java.io.IOException; @@ -62,7 +63,7 @@ // Create new value DruidWritable value = new DruidWritable(false); // 1) The timestamp column - value.getValue().put(DruidStorageHandlerUtils.EVENT_TIMESTAMP_COLUMN, + value.getValue().put(DruidConstants.EVENT_TIMESTAMP_COLUMN, currentRow.getTimestamp() == null ? null : currentRow.getTimestamp().getMillis() ); // 2) The dimension columns @@ -75,7 +76,7 @@ // Update value value.getValue().clear(); // 1) The timestamp column - value.getValue().put(DruidStorageHandlerUtils.EVENT_TIMESTAMP_COLUMN, + value.getValue().put(DruidConstants.EVENT_TIMESTAMP_COLUMN, currentRow.getTimestamp() == null ? null : currentRow.getTimestamp().getMillis() ); // 2) The dimension columns diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java index 744f4d1d16..2c4c8f9910 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.JavaType; import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; +import org.apache.hadoop.hive.druid.conf.DruidConstants; import org.apache.hadoop.io.NullWritable; import com.fasterxml.jackson.core.type.TypeReference; @@ -67,7 +68,7 @@ // Create new value DruidWritable value = new DruidWritable(false); EventHolder e = values.next(); - value.getValue().put(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, e.getTimestamp().getMillis()); + value.getValue().put(DruidConstants.DEFAULT_TIMESTAMP_COLUMN, e.getTimestamp().getMillis()); value.getValue().putAll(e.getEvent()); return value; } @@ -77,7 +78,7 @@ // Update value value.getValue().clear(); EventHolder e = values.next(); - value.getValue().put(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, e.getTimestamp().getMillis()); + value.getValue().put(DruidConstants.DEFAULT_TIMESTAMP_COLUMN, e.getTimestamp().getMillis()); value.getValue().putAll(e.getEvent()); return true; } diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java index 1d87262ccc..516faf0814 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.druid.DruidStorageHandler; import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; +import org.apache.hadoop.hive.druid.conf.DruidConstants; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.AbstractSerDe; @@ -169,7 +170,7 @@ private void initFromMetaDataQuery(final Configuration configuration, final Prop throw new SerDeException(e); } for (Entry columnInfo : schemaInfo.getColumns().entrySet()) { - if (columnInfo.getKey().equals(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN)) { + if (columnInfo.getKey().equals(DruidConstants.DEFAULT_TIMESTAMP_COLUMN)) { // Special handling for timestamp column columnNames.add(columnInfo.getKey()); // field name PrimitiveTypeInfo type = tsTZTypeInfo; // field type @@ -190,9 +191,9 @@ private void initFromMetaDataQuery(final Configuration configuration, final Prop private void initFromProperties(final Properties properties) throws SerDeException { final List columnNames = new ArrayList<>(Utilities.getColumnNames(properties)); - if (!columnNames.contains(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN)) { + if (!columnNames.contains(DruidConstants.DEFAULT_TIMESTAMP_COLUMN)) { throw new SerDeException("Timestamp column (' " - + DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN + + DruidConstants.DEFAULT_TIMESTAMP_COLUMN + "') not specified in create table; list of columns is : " + properties.getProperty(serdeConstants.LIST_COLUMNS)); } diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java index 41fd341341..beb342b554 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java @@ -22,6 +22,7 @@ import io.druid.query.Result; import io.druid.query.timeseries.TimeseriesResultValue; import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; +import org.apache.hadoop.hive.druid.conf.DruidConstants; import org.apache.hadoop.io.NullWritable; import java.io.IOException; @@ -59,7 +60,7 @@ public NullWritable getCurrentKey() throws IOException, InterruptedException { public DruidWritable getCurrentValue() throws IOException, InterruptedException { // Create new value DruidWritable value = new DruidWritable(false); - value.getValue().put(DruidStorageHandlerUtils.EVENT_TIMESTAMP_COLUMN, + value.getValue().put(DruidConstants.EVENT_TIMESTAMP_COLUMN, current.getTimestamp() == null ? null : current.getTimestamp().getMillis() ); value.getValue().putAll(current.getValue().getBaseObject()); @@ -71,7 +72,7 @@ public boolean next(NullWritable key, DruidWritable value) { if (nextKeyValue()) { // Update value value.getValue().clear(); - value.getValue().put(DruidStorageHandlerUtils.EVENT_TIMESTAMP_COLUMN, + value.getValue().put(DruidConstants.EVENT_TIMESTAMP_COLUMN, current.getTimestamp() == null ? null : current.getTimestamp().getMillis() ); value.getValue().putAll(current.getValue().getBaseObject()); diff --git a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java index 111f04765c..63efdc4d8f 100644 --- a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java +++ b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.druid.DruidStorageHandler; import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; +import org.apache.hadoop.hive.druid.conf.DruidConstants; import org.apache.hadoop.hive.druid.io.DruidRecordWriter; import org.apache.hadoop.hive.druid.serde.DruidWritable; import org.joda.time.DateTime; @@ -86,7 +87,7 @@ final List> expectedRows = - ImmutableList.of(ImmutableMap.of(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, + ImmutableList.of(ImmutableMap.of(DruidConstants.DEFAULT_TIMESTAMP_COLUMN, DateTime.parse("2014-10-22T00:00:00.000Z").getMillis(), "host", ImmutableList.of("a.example.com"), @@ -94,7 +95,7 @@ 190L, "unique_hosts", 1.0d), - ImmutableMap.of(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, + ImmutableMap.of(DruidConstants.DEFAULT_TIMESTAMP_COLUMN, DateTime.parse("2014-10-22T01:00:00.000Z").getMillis(), "host", ImmutableList.of("b.example.com"), @@ -102,7 +103,7 @@ 175L, "unique_hosts", 1.0d), - ImmutableMap.of(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, + ImmutableMap.of(DruidConstants.DEFAULT_TIMESTAMP_COLUMN, DateTime.parse("2014-10-22T02:00:00.000Z").getMillis(), "host", ImmutableList.of("c.example.com"), @@ -113,7 +114,7 @@ @Test public void testTimeStampColumnName() { Assert.assertEquals("Time column name need to match to ensure serdeser compatibility", - DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, + DruidConstants.DEFAULT_TIMESTAMP_COLUMN, DruidTable.DEFAULT_TIMESTAMP_COLUMN); } @@ -127,7 +128,7 @@ final InputRowParser inputRowParser = - new MapInputRowParser(new TimeAndDimsParseSpec(new TimestampSpec(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, + new MapInputRowParser(new TimeAndDimsParseSpec(new TimestampSpec(DruidConstants.DEFAULT_TIMESTAMP_COLUMN, "auto", null), new DimensionsSpec(ImmutableList.of(new StringDimensionSchema("host")), null, null))); final Map @@ -183,7 +184,7 @@ expectedRows.stream() .map(input -> new DruidWritable(ImmutableMap.builder().putAll(input) .put(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME, - Granularities.DAY.bucketStart(new DateTime((long) input.get(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN))) + Granularities.DAY.bucketStart(new DateTime((long) input.get(DruidConstants.DEFAULT_TIMESTAMP_COLUMN))) .getMillis()) .build())) .collect(Collectors.toList()); @@ -226,7 +227,7 @@ private void verifyRows(List> expectedRows, Listdruid-kafka-indexing-service ${druid.version} + + io.druid.extensions + druid-avro-extensions + ${druid.version} + + + org.mortbay.jetty + servlet-api + + + org.apache.logging.log4j log4j-api diff --git a/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java b/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java index 2a319527ac..a9d381f0f7 100644 --- a/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java +++ b/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java @@ -55,7 +55,8 @@ "druid.metadata.storage.type", "derby", "druid.storage.type", "hdfs", "druid.processing.buffer.sizeBytes", "213870912", - "druid.processing.numThreads", "2" + "druid.processing.numThreads", "2", + "druid.worker.capacity", "4" ); private static final Map COMMON_DRUID_HISTORICAL = ImmutableMap.of( diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index ab6021332d..fb50588fe7 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -1849,6 +1849,9 @@ druid.query.files=druidmini_test1.q,\ druidmini_floorTime.q, \ druidmini_masking.q, \ druidkafkamini_basic.q, \ + druidkafkamini_avro.q, \ + druidkafkamini_csv.q, \ + druidkafkamini_delimited.q, \ kafka_storage_handler.q druid.llap.local.query.files=druidmini_noop.q diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index b5d2386245..59c7ac4c2c 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -640,6 +640,10 @@ private void setupMiniCluster(HadoopShims shims, String confDir) throws "test-topic", new File(getScriptsDir(), "kafka_init_data.json") ); + kafkaCluster.createTopicWithData( + "wiki_kafka_csv", + new File(getScriptsDir(), "kafka_init_data.csv") + ); kafkaCluster.createTopicWithData("wiki_kafka_avro_table", getAvroRows()); } diff --git a/ql/src/test/queries/clientpositive/druidkafkamini_avro.q b/ql/src/test/queries/clientpositive/druidkafkamini_avro.q new file mode 100644 index 0000000000..183491c804 --- /dev/null +++ b/ql/src/test/queries/clientpositive/druidkafkamini_avro.q @@ -0,0 +1,99 @@ +SET hive.vectorized.execution.enabled=false; + +CREATE EXTERNAL TABLE druid_kafka_test_avro(`__time` timestamp , `page` string, `user` string, `language` string, + `country` string,`continent` string, `namespace` string, `newPage` boolean, `unpatrolled` boolean, + `anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) + STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' + TBLPROPERTIES ( + "druid.segment.granularity" = "MONTH", + "druid.query.granularity" = "MINUTE", + "kafka.bootstrap.servers" = "localhost:9092", + "kafka.topic" = "wiki_kafka_avro_table", + "druid.kafka.ingestion.useEarliestOffset" = "true", + "druid.kafka.ingestion.maxRowsInMemory" = "5", + "druid.kafka.ingestion.startDelay" = "PT1S", + "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.period" = "PT5S", + "druid.kafka.ingestion.consumer.retries" = "2", + "druid.kafka.ingestion.reportParseExceptions" = "true", + "druid.timestamp.column" = "timestamp", + "druid.timestamp.format" = "MM/dd/yyyy HH:mm:ss", + "druid.parseSpec.format" = "avro", + 'avro.schema.literal'='{ + "type" : "record", + "name" : "Wikipedia", + "namespace" : "org.apache.hive.kafka", + "version": "1", + "fields" : [ { + "name" : "isrobot", + "type" : "boolean" + }, { + "name" : "channel", + "type" : "string" + }, { + "name" : "timestamp", + "type" : "string" + }, { + "name" : "flags", + "type" : "string" + }, { + "name" : "isunpatrolled", + "type" : "boolean" + }, { + "name" : "page", + "type" : "string" + }, { + "name" : "diffurl", + "type" : "string" + }, { + "name" : "added", + "type" : "long" + }, { + "name" : "comment", + "type" : "string" + }, { + "name" : "commentlength", + "type" : "long" + }, { + "name" : "isnew", + "type" : "boolean" + }, { + "name" : "isminor", + "type" : "boolean" + }, { + "name" : "delta", + "type" : "long" + }, { + "name" : "isanonymous", + "type" : "boolean" + }, { + "name" : "user", + "type" : "string" + }, { + "name" : "deltabucket", + "type" : "double" + }, { + "name" : "deleted", + "type" : "long" + }, { + "name" : "namespace", + "type" : "string" + } ] + }' + ); + +ALTER TABLE druid_kafka_test_avro SET TBLPROPERTIES('druid.kafka.ingestion' = 'START'); + +!curl --noproxy * -ss http://localhost:8081/druid/indexer/v1/supervisor; + +-- Sleep for some time for ingestion tasks to ingest events +!sleep 60; + +DESCRIBE druid_kafka_test_avro; +DESCRIBE EXTENDED druid_kafka_test_avro; + +Select count(*) FROM druid_kafka_test_avro; + +Select page FROM druid_kafka_test_avro; + +DROP TABLE druid_kafka_test_avro; diff --git a/ql/src/test/queries/clientpositive/druidkafkamini_csv.q b/ql/src/test/queries/clientpositive/druidkafkamini_csv.q new file mode 100644 index 0000000000..34be462d90 --- /dev/null +++ b/ql/src/test/queries/clientpositive/druidkafkamini_csv.q @@ -0,0 +1,37 @@ +SET hive.vectorized.execution.enabled=false; + +CREATE EXTERNAL TABLE druid_kafka_test_csv(`__time` timestamp , `page` string, `user` string, `language` string, + `country` string,`continent` string, `namespace` string, `newpage` boolean, `unpatrolled` boolean, + `anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) + STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' + TBLPROPERTIES ( + "druid.segment.granularity" = "MONTH", + "druid.query.granularity" = "MINUTE", + "kafka.bootstrap.servers" = "localhost:9092", + "kafka.topic" = "wiki_kafka_csv", + "druid.kafka.ingestion.useEarliestOffset" = "true", + "druid.kafka.ingestion.maxRowsInMemory" = "5", + "druid.kafka.ingestion.startDelay" = "PT1S", + "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.period" = "PT5S", + "druid.kafka.ingestion.consumer.retries" = "2", + "druid.kafka.ingestion.reportParseExceptions" = "true", + "druid.parseSpec.format" = "csv", + "druid.parseSpec.columns" = "__time,page,language,user,unpatrolled,newpage,robot,anonymous,namespace,continent,country,region,city,added,deleted,delta" + ); + +ALTER TABLE druid_kafka_test_csv SET TBLPROPERTIES('druid.kafka.ingestion' = 'START'); + +!curl --noproxy * -ss http://localhost:8081/druid/indexer/v1/supervisor; + +-- Sleep for some time for ingestion tasks to ingest events +!sleep 60; + +DESCRIBE druid_kafka_test_csv; +DESCRIBE EXTENDED druid_kafka_test_csv; + +Select count(*) FROM druid_kafka_test_csv; + +Select page FROM druid_kafka_test_csv; + +DROP TABLE druid_kafka_test_csv; diff --git a/ql/src/test/queries/clientpositive/druidkafkamini_delimited.q b/ql/src/test/queries/clientpositive/druidkafkamini_delimited.q new file mode 100644 index 0000000000..91e279d904 --- /dev/null +++ b/ql/src/test/queries/clientpositive/druidkafkamini_delimited.q @@ -0,0 +1,38 @@ +SET hive.vectorized.execution.enabled=false; + +CREATE EXTERNAL TABLE druid_kafka_test_delimited(`__time` timestamp , `page` string, `user` string, `language` string, + `country` string,`continent` string, `namespace` string, `newpage` boolean, `unpatrolled` boolean, + `anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) + STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' + TBLPROPERTIES ( + "druid.segment.granularity" = "MONTH", + "druid.query.granularity" = "MINUTE", + "kafka.bootstrap.servers" = "localhost:9092", + "kafka.topic" = "wiki_kafka_csv", + "druid.kafka.ingestion.useEarliestOffset" = "true", + "druid.kafka.ingestion.maxRowsInMemory" = "5", + "druid.kafka.ingestion.startDelay" = "PT1S", + "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.period" = "PT5S", + "druid.kafka.ingestion.consumer.retries" = "2", + "druid.kafka.ingestion.reportParseExceptions" = "true", + "druid.parseSpec.format" = "delimited", + "druid.parseSpec.columns" = "__time,page,language,user,unpatrolled,newpage,robot,anonymous,namespace,continent,country,region,city,added,deleted,delta", + "druid.parseSpec.delimiter"="," + ); + +ALTER TABLE druid_kafka_test_delimited SET TBLPROPERTIES('druid.kafka.ingestion' = 'START'); + +!curl --noproxy * -ss http://localhost:8081/druid/indexer/v1/supervisor; + +-- Sleep for some time for ingestion tasks to ingest events +!sleep 60; + +DESCRIBE druid_kafka_test_delimited; +DESCRIBE EXTENDED druid_kafka_test_delimited; + +Select count(*) FROM druid_kafka_test_delimited; + +Select page FROM druid_kafka_test_delimited; + +DROP TABLE druid_kafka_test_delimited; diff --git a/ql/src/test/results/clientpositive/druid/druidkafkamini_avro.q.out b/ql/src/test/results/clientpositive/druid/druidkafkamini_avro.q.out new file mode 100644 index 0000000000..d33dd4cbc2 --- /dev/null +++ b/ql/src/test/results/clientpositive/druid/druidkafkamini_avro.q.out @@ -0,0 +1,263 @@ +PREHOOK: query: CREATE EXTERNAL TABLE druid_kafka_test_avro(`__time` timestamp , `page` string, `user` string, `language` string, + `country` string,`continent` string, `namespace` string, `newPage` boolean, `unpatrolled` boolean, + `anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) + STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' + TBLPROPERTIES ( + "druid.segment.granularity" = "MONTH", + "druid.query.granularity" = "MINUTE", + "kafka.bootstrap.servers" = "localhost:9092", + "kafka.topic" = "wiki_kafka_avro_table", + "druid.kafka.ingestion.useEarliestOffset" = "true", + "druid.kafka.ingestion.maxRowsInMemory" = "5", + "druid.kafka.ingestion.startDelay" = "PT1S", + "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.period" = "PT5S", + "druid.kafka.ingestion.consumer.retries" = "2", + "druid.kafka.ingestion.reportParseExceptions" = "true", + "druid.timestamp.column" = "timestamp", + "druid.timestamp.format" = "MM/dd/yyyy HH:mm:ss", + "druid.parseSpec.format" = "avro", + 'avro.schema.literal'='{ + "type" : "record", + "name" : "Wikipedia", + "namespace" : "org.apache.hive.kafka", + "version": "1", + "fields" : [ { + "name" : "isrobot", + "type" : "boolean" + }, { + "name" : "channel", + "type" : "string" + }, { + "name" : "timestamp", + "type" : "string" + }, { + "name" : "flags", + "type" : "string" + }, { + "name" : "isunpatrolled", + "type" : "boolean" + }, { + "name" : "page", + "type" : "string" + }, { + "name" : "diffurl", + "type" : "string" + }, { + "name" : "added", + "type" : "long" + }, { + "name" : "comment", + "type" : "string" + }, { + "name" : "commentlength", + "type" : "long" + }, { + "name" : "isnew", + "type" : "boolean" + }, { + "name" : "isminor", + "type" : "boolean" + }, { + "name" : "delta", + "type" : "long" + }, { + "name" : "isanonymous", + "type" : "boolean" + }, { + "name" : "user", + "type" : "string" + }, { + "name" : "deltabucket", + "type" : "double" + }, { + "name" : "deleted", + "type" : "long" + }, { + "name" : "namespace", + "type" : "string" + } ] + }' + ) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@druid_kafka_test_avro +POSTHOOK: query: CREATE EXTERNAL TABLE druid_kafka_test_avro(`__time` timestamp , `page` string, `user` string, `language` string, + `country` string,`continent` string, `namespace` string, `newPage` boolean, `unpatrolled` boolean, + `anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) + STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' + TBLPROPERTIES ( + "druid.segment.granularity" = "MONTH", + "druid.query.granularity" = "MINUTE", + "kafka.bootstrap.servers" = "localhost:9092", + "kafka.topic" = "wiki_kafka_avro_table", + "druid.kafka.ingestion.useEarliestOffset" = "true", + "druid.kafka.ingestion.maxRowsInMemory" = "5", + "druid.kafka.ingestion.startDelay" = "PT1S", + "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.period" = "PT5S", + "druid.kafka.ingestion.consumer.retries" = "2", + "druid.kafka.ingestion.reportParseExceptions" = "true", + "druid.timestamp.column" = "timestamp", + "druid.timestamp.format" = "MM/dd/yyyy HH:mm:ss", + "druid.parseSpec.format" = "avro", + 'avro.schema.literal'='{ + "type" : "record", + "name" : "Wikipedia", + "namespace" : "org.apache.hive.kafka", + "version": "1", + "fields" : [ { + "name" : "isrobot", + "type" : "boolean" + }, { + "name" : "channel", + "type" : "string" + }, { + "name" : "timestamp", + "type" : "string" + }, { + "name" : "flags", + "type" : "string" + }, { + "name" : "isunpatrolled", + "type" : "boolean" + }, { + "name" : "page", + "type" : "string" + }, { + "name" : "diffurl", + "type" : "string" + }, { + "name" : "added", + "type" : "long" + }, { + "name" : "comment", + "type" : "string" + }, { + "name" : "commentlength", + "type" : "long" + }, { + "name" : "isnew", + "type" : "boolean" + }, { + "name" : "isminor", + "type" : "boolean" + }, { + "name" : "delta", + "type" : "long" + }, { + "name" : "isanonymous", + "type" : "boolean" + }, { + "name" : "user", + "type" : "string" + }, { + "name" : "deltabucket", + "type" : "double" + }, { + "name" : "deleted", + "type" : "long" + }, { + "name" : "namespace", + "type" : "string" + } ] + }' + ) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@druid_kafka_test_avro +PREHOOK: query: ALTER TABLE druid_kafka_test_avro SET TBLPROPERTIES('druid.kafka.ingestion' = 'START') +PREHOOK: type: ALTERTABLE_PROPERTIES +PREHOOK: Input: default@druid_kafka_test_avro +PREHOOK: Output: default@druid_kafka_test_avro +POSTHOOK: query: ALTER TABLE druid_kafka_test_avro SET TBLPROPERTIES('druid.kafka.ingestion' = 'START') +POSTHOOK: type: ALTERTABLE_PROPERTIES +POSTHOOK: Input: default@druid_kafka_test_avro +POSTHOOK: Output: default@druid_kafka_test_avro +["default.druid_kafka_test_avro"] +PREHOOK: query: DESCRIBE druid_kafka_test_avro +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@druid_kafka_test_avro +POSTHOOK: query: DESCRIBE druid_kafka_test_avro +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@druid_kafka_test_avro +__time timestamp from deserializer +page string from deserializer +user string from deserializer +language string from deserializer +country string from deserializer +continent string from deserializer +namespace string from deserializer +newpage boolean from deserializer +unpatrolled boolean from deserializer +anonymous boolean from deserializer +robot boolean from deserializer +added int from deserializer +deleted int from deserializer +delta bigint from deserializer +PREHOOK: query: DESCRIBE EXTENDED druid_kafka_test_avro +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@druid_kafka_test_avro +POSTHOOK: query: DESCRIBE EXTENDED druid_kafka_test_avro +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@druid_kafka_test_avro +__time timestamp from deserializer +page string from deserializer +user string from deserializer +language string from deserializer +country string from deserializer +continent string from deserializer +namespace string from deserializer +newpage boolean from deserializer +unpatrolled boolean from deserializer +anonymous boolean from deserializer +robot boolean from deserializer +added int from deserializer +deleted int from deserializer +delta bigint from deserializer + +#### A masked pattern was here #### +StorageHandlerInfo +Druid Storage Handler Runtime Status for default.druid_kafka_test_avro +kafkaPartitions=1 +activeTasks=[] +publishingTasks=[] +#### A masked pattern was here #### +aggregateLag=0 +#### A masked pattern was here #### +PREHOOK: query: Select count(*) FROM druid_kafka_test_avro +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_kafka_test_avro +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select count(*) FROM druid_kafka_test_avro +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_kafka_test_avro +POSTHOOK: Output: hdfs://### HDFS PATH ### +11 +PREHOOK: query: Select page FROM druid_kafka_test_avro +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_kafka_test_avro +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select page FROM druid_kafka_test_avro +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_kafka_test_avro +POSTHOOK: Output: hdfs://### HDFS PATH ### +page is 0 +page is 100 +page is 200 +page is 300 +page is 400 +page is 500 +page is 600 +page is 700 +page is 800 +page is 900 +page is 1000 +PREHOOK: query: DROP TABLE druid_kafka_test_avro +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@druid_kafka_test_avro +PREHOOK: Output: default@druid_kafka_test_avro +POSTHOOK: query: DROP TABLE druid_kafka_test_avro +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@druid_kafka_test_avro +POSTHOOK: Output: default@druid_kafka_test_avro diff --git a/ql/src/test/results/clientpositive/druid/druidkafkamini_csv.q.out b/ql/src/test/results/clientpositive/druid/druidkafkamini_csv.q.out new file mode 100644 index 0000000000..2f5817ae46 --- /dev/null +++ b/ql/src/test/results/clientpositive/druid/druidkafkamini_csv.q.out @@ -0,0 +1,138 @@ +PREHOOK: query: CREATE EXTERNAL TABLE druid_kafka_test_csv(`__time` timestamp , `page` string, `user` string, `language` string, + `country` string,`continent` string, `namespace` string, `newpage` boolean, `unpatrolled` boolean, + `anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) + STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' + TBLPROPERTIES ( + "druid.segment.granularity" = "MONTH", + "druid.query.granularity" = "MINUTE", + "kafka.bootstrap.servers" = "localhost:9092", + "kafka.topic" = "wiki_kafka_csv", + "druid.kafka.ingestion.useEarliestOffset" = "true", + "druid.kafka.ingestion.maxRowsInMemory" = "5", + "druid.kafka.ingestion.startDelay" = "PT1S", + "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.period" = "PT5S", + "druid.kafka.ingestion.consumer.retries" = "2", + "druid.kafka.ingestion.reportParseExceptions" = "true", + "druid.parseSpec.format" = "csv", + "druid.parseSpec.columns" = "__time,page,language,user,unpatrolled,newpage,robot,anonymous,namespace,continent,country,region,city,added,deleted,delta" + ) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@druid_kafka_test_csv +POSTHOOK: query: CREATE EXTERNAL TABLE druid_kafka_test_csv(`__time` timestamp , `page` string, `user` string, `language` string, + `country` string,`continent` string, `namespace` string, `newpage` boolean, `unpatrolled` boolean, + `anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) + STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' + TBLPROPERTIES ( + "druid.segment.granularity" = "MONTH", + "druid.query.granularity" = "MINUTE", + "kafka.bootstrap.servers" = "localhost:9092", + "kafka.topic" = "wiki_kafka_csv", + "druid.kafka.ingestion.useEarliestOffset" = "true", + "druid.kafka.ingestion.maxRowsInMemory" = "5", + "druid.kafka.ingestion.startDelay" = "PT1S", + "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.period" = "PT5S", + "druid.kafka.ingestion.consumer.retries" = "2", + "druid.kafka.ingestion.reportParseExceptions" = "true", + "druid.parseSpec.format" = "csv", + "druid.parseSpec.columns" = "__time,page,language,user,unpatrolled,newpage,robot,anonymous,namespace,continent,country,region,city,added,deleted,delta" + ) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@druid_kafka_test_csv +PREHOOK: query: ALTER TABLE druid_kafka_test_csv SET TBLPROPERTIES('druid.kafka.ingestion' = 'START') +PREHOOK: type: ALTERTABLE_PROPERTIES +PREHOOK: Input: default@druid_kafka_test_csv +PREHOOK: Output: default@druid_kafka_test_csv +POSTHOOK: query: ALTER TABLE druid_kafka_test_csv SET TBLPROPERTIES('druid.kafka.ingestion' = 'START') +POSTHOOK: type: ALTERTABLE_PROPERTIES +POSTHOOK: Input: default@druid_kafka_test_csv +POSTHOOK: Output: default@druid_kafka_test_csv +["default.druid_kafka_test_csv"] +PREHOOK: query: DESCRIBE druid_kafka_test_csv +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@druid_kafka_test_csv +POSTHOOK: query: DESCRIBE druid_kafka_test_csv +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@druid_kafka_test_csv +__time timestamp from deserializer +page string from deserializer +user string from deserializer +language string from deserializer +country string from deserializer +continent string from deserializer +namespace string from deserializer +newpage boolean from deserializer +unpatrolled boolean from deserializer +anonymous boolean from deserializer +robot boolean from deserializer +added int from deserializer +deleted int from deserializer +delta bigint from deserializer +PREHOOK: query: DESCRIBE EXTENDED druid_kafka_test_csv +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@druid_kafka_test_csv +POSTHOOK: query: DESCRIBE EXTENDED druid_kafka_test_csv +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@druid_kafka_test_csv +__time timestamp from deserializer +page string from deserializer +user string from deserializer +language string from deserializer +country string from deserializer +continent string from deserializer +namespace string from deserializer +newpage boolean from deserializer +unpatrolled boolean from deserializer +anonymous boolean from deserializer +robot boolean from deserializer +added int from deserializer +deleted int from deserializer +delta bigint from deserializer + +#### A masked pattern was here #### +StorageHandlerInfo +Druid Storage Handler Runtime Status for default.druid_kafka_test_csv +kafkaPartitions=1 +activeTasks=[] +publishingTasks=[] +#### A masked pattern was here #### +aggregateLag=0 +#### A masked pattern was here #### +PREHOOK: query: Select count(*) FROM druid_kafka_test_csv +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_kafka_test_csv +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select count(*) FROM druid_kafka_test_csv +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_kafka_test_csv +POSTHOOK: Output: hdfs://### HDFS PATH ### +10 +PREHOOK: query: Select page FROM druid_kafka_test_csv +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_kafka_test_csv +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select page FROM druid_kafka_test_csv +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_kafka_test_csv +POSTHOOK: Output: hdfs://### HDFS PATH ### +Gypsy Danger +Striker Eureka +Cherno Alpha +Crimson Typhoon +Coyote Tango +Gypsy Danger +Striker Eureka +Cherno Alpha +Crimson Typhoon +Coyote Tango +PREHOOK: query: DROP TABLE druid_kafka_test_csv +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@druid_kafka_test_csv +PREHOOK: Output: default@druid_kafka_test_csv +POSTHOOK: query: DROP TABLE druid_kafka_test_csv +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@druid_kafka_test_csv +POSTHOOK: Output: default@druid_kafka_test_csv diff --git a/ql/src/test/results/clientpositive/druid/druidkafkamini_delimited.q.out b/ql/src/test/results/clientpositive/druid/druidkafkamini_delimited.q.out new file mode 100644 index 0000000000..f6a417b6c3 --- /dev/null +++ b/ql/src/test/results/clientpositive/druid/druidkafkamini_delimited.q.out @@ -0,0 +1,140 @@ +PREHOOK: query: CREATE EXTERNAL TABLE druid_kafka_test_delimited(`__time` timestamp , `page` string, `user` string, `language` string, + `country` string,`continent` string, `namespace` string, `newpage` boolean, `unpatrolled` boolean, + `anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) + STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' + TBLPROPERTIES ( + "druid.segment.granularity" = "MONTH", + "druid.query.granularity" = "MINUTE", + "kafka.bootstrap.servers" = "localhost:9092", + "kafka.topic" = "wiki_kafka_csv", + "druid.kafka.ingestion.useEarliestOffset" = "true", + "druid.kafka.ingestion.maxRowsInMemory" = "5", + "druid.kafka.ingestion.startDelay" = "PT1S", + "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.period" = "PT5S", + "druid.kafka.ingestion.consumer.retries" = "2", + "druid.kafka.ingestion.reportParseExceptions" = "true", + "druid.parseSpec.format" = "delimited", + "druid.parseSpec.columns" = "__time,page,language,user,unpatrolled,newpage,robot,anonymous,namespace,continent,country,region,city,added,deleted,delta", + "druid.parseSpec.delimiter"="," + ) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@druid_kafka_test_delimited +POSTHOOK: query: CREATE EXTERNAL TABLE druid_kafka_test_delimited(`__time` timestamp , `page` string, `user` string, `language` string, + `country` string,`continent` string, `namespace` string, `newpage` boolean, `unpatrolled` boolean, + `anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) + STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' + TBLPROPERTIES ( + "druid.segment.granularity" = "MONTH", + "druid.query.granularity" = "MINUTE", + "kafka.bootstrap.servers" = "localhost:9092", + "kafka.topic" = "wiki_kafka_csv", + "druid.kafka.ingestion.useEarliestOffset" = "true", + "druid.kafka.ingestion.maxRowsInMemory" = "5", + "druid.kafka.ingestion.startDelay" = "PT1S", + "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.period" = "PT5S", + "druid.kafka.ingestion.consumer.retries" = "2", + "druid.kafka.ingestion.reportParseExceptions" = "true", + "druid.parseSpec.format" = "delimited", + "druid.parseSpec.columns" = "__time,page,language,user,unpatrolled,newpage,robot,anonymous,namespace,continent,country,region,city,added,deleted,delta", + "druid.parseSpec.delimiter"="," + ) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@druid_kafka_test_delimited +PREHOOK: query: ALTER TABLE druid_kafka_test_delimited SET TBLPROPERTIES('druid.kafka.ingestion' = 'START') +PREHOOK: type: ALTERTABLE_PROPERTIES +PREHOOK: Input: default@druid_kafka_test_delimited +PREHOOK: Output: default@druid_kafka_test_delimited +POSTHOOK: query: ALTER TABLE druid_kafka_test_delimited SET TBLPROPERTIES('druid.kafka.ingestion' = 'START') +POSTHOOK: type: ALTERTABLE_PROPERTIES +POSTHOOK: Input: default@druid_kafka_test_delimited +POSTHOOK: Output: default@druid_kafka_test_delimited +["default.druid_kafka_test_delimited"] +PREHOOK: query: DESCRIBE druid_kafka_test_delimited +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@druid_kafka_test_delimited +POSTHOOK: query: DESCRIBE druid_kafka_test_delimited +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@druid_kafka_test_delimited +__time timestamp from deserializer +page string from deserializer +user string from deserializer +language string from deserializer +country string from deserializer +continent string from deserializer +namespace string from deserializer +newpage boolean from deserializer +unpatrolled boolean from deserializer +anonymous boolean from deserializer +robot boolean from deserializer +added int from deserializer +deleted int from deserializer +delta bigint from deserializer +PREHOOK: query: DESCRIBE EXTENDED druid_kafka_test_delimited +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@druid_kafka_test_delimited +POSTHOOK: query: DESCRIBE EXTENDED druid_kafka_test_delimited +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@druid_kafka_test_delimited +__time timestamp from deserializer +page string from deserializer +user string from deserializer +language string from deserializer +country string from deserializer +continent string from deserializer +namespace string from deserializer +newpage boolean from deserializer +unpatrolled boolean from deserializer +anonymous boolean from deserializer +robot boolean from deserializer +added int from deserializer +deleted int from deserializer +delta bigint from deserializer + +#### A masked pattern was here #### +StorageHandlerInfo +Druid Storage Handler Runtime Status for default.druid_kafka_test_delimited +kafkaPartitions=1 +activeTasks=[] +publishingTasks=[] +#### A masked pattern was here #### +aggregateLag=0 +#### A masked pattern was here #### +PREHOOK: query: Select count(*) FROM druid_kafka_test_delimited +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_kafka_test_delimited +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select count(*) FROM druid_kafka_test_delimited +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_kafka_test_delimited +POSTHOOK: Output: hdfs://### HDFS PATH ### +10 +PREHOOK: query: Select page FROM druid_kafka_test_delimited +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_kafka_test_delimited +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select page FROM druid_kafka_test_delimited +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_kafka_test_delimited +POSTHOOK: Output: hdfs://### HDFS PATH ### + "Gypsy Danger" +"Striker Eureka" +"Cherno Alpha" +"Crimson Typhoon" +"Coyote Tango" +"Gypsy Danger" +"Striker Eureka" +"Cherno Alpha" +"Crimson Typhoon" +"Coyote Tango" +PREHOOK: query: DROP TABLE druid_kafka_test_delimited +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@druid_kafka_test_delimited +PREHOOK: Output: default@druid_kafka_test_delimited +POSTHOOK: query: DROP TABLE druid_kafka_test_delimited +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@druid_kafka_test_delimited +POSTHOOK: Output: default@druid_kafka_test_delimited -- 2.17.1 (Apple Git-112)