From 2aef2ac4fd43a84444f7ec0eb5326af7fdde79ca Mon Sep 17 00:00:00 2001 From: Nishant Date: Tue, 9 Oct 2018 20:22:54 +0530 Subject: [PATCH] [HIVE-19026] Add support for more ingestion formats - Druid Kafka Indexing Service --- .../apache/hadoop/hive/conf/Constants.java | 10 + .../hive/druid/DruidStorageHandler.java | 102 ++++++- .../hive/druid/DruidStorageHandlerUtils.java | 7 + .../hive/druid/json/AvroBytesDecoder.java | 37 +++ .../hadoop/hive/druid/json/AvroParseSpec.java | 104 +++++++ .../druid/json/AvroStreamInputRowParser.java | 98 +++++++ .../json/InlineSchemaAvroBytesDecoder.java | 52 ++++ itests/qtest-druid/pom.xml | 11 + .../apache/hive/druid/MiniDruidCluster.java | 3 +- .../resources/testconfiguration.properties | 1 + .../clientpositive/druidkafkamini_avro.q | 99 +++++++ .../druid/druidkafkamini_avro.q.out | 263 ++++++++++++++++++ 12 files changed, 780 insertions(+), 7 deletions(-) create mode 100644 druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroBytesDecoder.java create mode 100644 druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroParseSpec.java create mode 100644 druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroStreamInputRowParser.java create mode 100644 druid-handler/src/java/org/apache/hadoop/hive/druid/json/InlineSchemaAvroBytesDecoder.java create mode 100644 ql/src/test/queries/clientpositive/druidkafkamini_avro.q create mode 100644 ql/src/test/results/clientpositive/druid/druidkafkamini_avro.q.out diff --git a/common/src/java/org/apache/hadoop/hive/conf/Constants.java b/common/src/java/org/apache/hadoop/hive/conf/Constants.java index 4badfa3ff4..51ede23e1c 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/Constants.java +++ b/common/src/java/org/apache/hadoop/hive/conf/Constants.java @@ -56,6 +56,16 @@ public static final String DRUID_KAFKA_CONSUMER_PROPERTY_PREFIX = DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "consumer."; /* Kafka Ingestion state - valid values - START/STOP/RESET */ public static final String DRUID_KAFKA_INGESTION = "druid.kafka.ingestion"; + // Druid ParseSpec Type - JSON/CSV/TSV/AVRO + public static final String DRUID_PARSE_SPEC_FORMAT = "druid.parseSpec.format"; + public static final String DRUID_TIMESTAMP_FORMAT = "druid.timestamp.format"; + public static final String DRUID_TIMESTAMP_COLUMN = "druid.timestamp.column"; + public static final String AVRO_SCHEMA_LITERAL = "avro.schema.literal"; + public static final String DRUID_PARSE_SPEC_DELIMITER = "druid.parseSpec.list.delimiter"; + public static final String DRUID_PARSE_SPEC_LIST_DELIMITER = "druid.parseSpec.listDelimiter"; + public static final String DRUID_PARSE_SPEC_COLUMNS = "druid.parseSpec.columns"; + public static final String DRUID_PARSE_SPEC_SKIP_HEADER_ROWS = "druid.parseSpec.skipHeaderRows"; + public static final String DRUID_PARSE_SPEC_HAS_HEADER_ROWS = "druid.parseSpec.hasHeaderRows"; public static final String JDBC_HIVE_STORAGE_HANDLER_ID = "org.apache.hive.storage.jdbc.JdbcStorageHandler"; diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java index cc38904b39..7da4d17542 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.druid; +import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -27,6 +28,8 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; +import io.druid.data.input.impl.CSVParseSpec; +import io.druid.data.input.impl.DelimitedParseSpec; import io.druid.data.input.impl.DimensionSchema; import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.InputRowParser; @@ -68,6 +71,9 @@ import org.apache.hadoop.hive.druid.io.DruidOutputFormat; import org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat; import org.apache.hadoop.hive.druid.io.DruidRecordWriter; +import org.apache.hadoop.hive.druid.json.AvroParseSpec; +import org.apache.hadoop.hive.druid.json.AvroStreamInputRowParser; +import org.apache.hadoop.hive.druid.json.InlineSchemaAvroBytesDecoder; import org.apache.hadoop.hive.druid.json.KafkaSupervisorIOConfig; import org.apache.hadoop.hive.druid.json.KafkaSupervisorReport; import org.apache.hadoop.hive.druid.json.KafkaSupervisorSpec; @@ -297,13 +303,16 @@ private void updateKafkaIngestion(Table table){ columnNames); } - final InputRowParser inputRowParser = new StringInputRowParser( - new JSONParseSpec( - new TimestampSpec(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, "auto", null), - new DimensionsSpec(dimensionsAndAggregates.lhs, null, null), - null, + DimensionsSpec dimensionsSpec = new DimensionsSpec(dimensionsAndAggregates.lhs, null, null); + String timestampFormat = getTableProperty(table, Constants.DRUID_TIMESTAMP_FORMAT); + String timestampColumnName = getTableProperty(table, Constants.DRUID_TIMESTAMP_COLUMN); + if(timestampColumnName == null){ + timestampColumnName = DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN; + } + TimestampSpec timestampSpec = new TimestampSpec(timestampColumnName, timestampFormat, null - ), "UTF-8"); + ); + final InputRowParser inputRowParser = getInputRowParser(table, timestampSpec, dimensionsSpec); Map inputParser = JSON_MAPPER .convertValue(inputRowParser, Map.class); @@ -351,6 +360,64 @@ private void updateKafkaIngestion(Table table){ table.getParameters().remove(Constants.DRUID_KAFKA_INGESTION); } + private InputRowParser getInputRowParser(Table table, + TimestampSpec timestampSpec, + DimensionsSpec dimensionsSpec + ) { + String parseSpecFormat = getTableProperty(table, Constants.DRUID_PARSE_SPEC_FORMAT); + + // Default case JSON + if(parseSpecFormat == null || parseSpecFormat.equalsIgnoreCase("json")) { + return new StringInputRowParser( + new JSONParseSpec(timestampSpec, + dimensionsSpec, + null, + null + ), "UTF-8"); + } else if(parseSpecFormat.equalsIgnoreCase("csv")){ + return new StringInputRowParser( + new CSVParseSpec( + timestampSpec, + dimensionsSpec, + getTableProperty(table, Constants.DRUID_PARSE_SPEC_LIST_DELIMITER), + getListProperty(table, Constants.DRUID_PARSE_SPEC_COLUMNS), + getBooleanProperty(table, Constants.DRUID_PARSE_SPEC_HAS_HEADER_ROWS, false), + getIntegerProperty(table, Constants.DRUID_PARSE_SPEC_SKIP_HEADER_ROWS, 0) + ), "UTF-8"); + } else if (parseSpecFormat.equalsIgnoreCase("tsv")){ + return new StringInputRowParser( + new DelimitedParseSpec( + timestampSpec, + dimensionsSpec, + getTableProperty(table, Constants.DRUID_PARSE_SPEC_DELIMITER), + getTableProperty(table, Constants.DRUID_PARSE_SPEC_LIST_DELIMITER), + getListProperty(table, Constants.DRUID_PARSE_SPEC_COLUMNS), + getBooleanProperty(table, Constants.DRUID_PARSE_SPEC_HAS_HEADER_ROWS, false), + getIntegerProperty(table, Constants.DRUID_PARSE_SPEC_SKIP_HEADER_ROWS, 0) + ), "UTF-8"); + } else if(parseSpecFormat.equalsIgnoreCase("avro")) { + try { + String avroSchemaLiteral = getTableProperty(table, Constants.AVRO_SCHEMA_LITERAL); + Preconditions.checkNotNull(avroSchemaLiteral, + "Please specify avro schema literal when using avro parser" + ); + Map avroSchema = JSON_MAPPER + .readValue(avroSchemaLiteral, new TypeReference>() { + }); + return new AvroStreamInputRowParser(new AvroParseSpec( + timestampSpec, + dimensionsSpec, + null + ), new InlineSchemaAvroBytesDecoder(avroSchema)); + } catch (Exception e) { + throw new IllegalStateException("Exception while creating avro schema", e); + } + } + + throw new IllegalArgumentException("Invalid parse spec format [" + parseSpecFormat+"]. " + + "Supported types are : json, csv, tsv, avro"); + } + private static KafkaSupervisorSpec createKafkaSupervisorSpec(Table table, String kafkaTopic, String kafka_servers, DataSchema dataSchema, IndexSpec indexSpec) { return new KafkaSupervisorSpec(dataSchema, @@ -1068,6 +1135,11 @@ public void preAlterTable(Table table, EnvironmentContext context) throws MetaEx return Boolean.parseBoolean(val); } + private static boolean getBooleanProperty(Table table, String propertyName, boolean defaultVal) { + Boolean val = getBooleanProperty(table, propertyName); + return val == null ? defaultVal : val; + } + private static Integer getIntegerProperty(Table table, String propertyName) { String val = getTableProperty(table, propertyName); if (val == null) { @@ -1082,6 +1154,12 @@ public void preAlterTable(Table table, EnvironmentContext context) throws MetaEx } } + + private static int getIntegerProperty(Table table, String propertyName, int defaultVal) { + Integer val = getIntegerProperty(table, propertyName); + return val == null ? defaultVal : val; + } + private static Long getLongProperty(Table table, String propertyName) { String val = getTableProperty(table, propertyName); if (val == null) { @@ -1114,6 +1192,18 @@ private static String getTableProperty(Table table, String propertyName) { return table.getParameters().get(propertyName); } + public static List getListProperty(Table table, String propertyName) { + List rv = new ArrayList(); + String values = getTableProperty(table, propertyName); + String[] vals = values.trim().split(","); + for (String val : vals) { + if (org.apache.commons.lang.StringUtils.isNotBlank(val)) { + rv.add(val); + } + } + return rv; + } + private static boolean isKafkaStreamingTable(Table table){ // For kafka Streaming tables it is mandatory to set a kafka topic. return getTableProperty(table, Constants.KAFKA_TOPIC) != null; diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java index b9eb367f0f..fd8f8f19b3 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java @@ -92,6 +92,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.druid.json.AvroParseSpec; +import org.apache.hadoop.hive.druid.json.AvroStreamInputRowParser; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; @@ -201,6 +203,11 @@ // Register the shard sub type to be used by the mapper JSON_MAPPER.registerSubtypes(new NamedType(LinearShardSpec.class, "linear")); JSON_MAPPER.registerSubtypes(new NamedType(NumberedShardSpec.class, "numbered")); + JSON_MAPPER.registerSubtypes(new NamedType(AvroParseSpec.class, "avro")); + SMILE_MAPPER.registerSubtypes(new NamedType(AvroParseSpec.class, "avro")); + JSON_MAPPER.registerSubtypes(new NamedType(AvroStreamInputRowParser.class, "avro_stream")); + SMILE_MAPPER.registerSubtypes(new NamedType(AvroStreamInputRowParser.class, "avro_stream")); + // set the timezone of the object mapper // THIS IS NOT WORKING workaround is to set it as part of java opts -Duser.timezone="UTC" JSON_MAPPER.setTimeZone(TimeZone.getTimeZone("UTC")); diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroBytesDecoder.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroBytesDecoder.java new file mode 100644 index 0000000000..3a1dbf7229 --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroBytesDecoder.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.druid.json; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.avro.generic.GenericRecord; + +import java.nio.ByteBuffer; + +/** + * This class is copied from druid source code + * in order to avoid adding additional dependencies on druid-indexing-service. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "schema_inline", value = InlineSchemaAvroBytesDecoder.class) +}) +public interface AvroBytesDecoder +{ +} diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroParseSpec.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroParseSpec.java new file mode 100644 index 0000000000..af71f9a732 --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroParseSpec.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.druid.json; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.ParseSpec; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.java.util.common.parsers.JSONPathSpec; +import io.druid.java.util.common.parsers.Parser; + +import java.util.Objects; + +/** + * This class is copied from druid source code + * in order to avoid adding additional dependencies on druid-indexing-service. + */ +public class AvroParseSpec extends ParseSpec +{ + + @JsonIgnore + private final JSONPathSpec flattenSpec; + + @JsonCreator + public AvroParseSpec( + @JsonProperty("timestampSpec") TimestampSpec timestampSpec, + @JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec, + @JsonProperty("flattenSpec") JSONPathSpec flattenSpec + ) + { + super( + timestampSpec != null ? timestampSpec : new TimestampSpec(null, null, null), + dimensionsSpec != null ? dimensionsSpec : new DimensionsSpec(null, null, null) + ); + + this.flattenSpec = flattenSpec != null ? flattenSpec : JSONPathSpec.DEFAULT; + } + + @JsonProperty + public JSONPathSpec getFlattenSpec() + { + return flattenSpec; + } + + @Override + public Parser makeParser() + { + // makeParser is only used by StringInputRowParser, which cannot parse avro anyway. + throw new UnsupportedOperationException("makeParser not supported"); + } + + @Override + public ParseSpec withTimestampSpec(TimestampSpec spec) + { + return new AvroParseSpec(spec, getDimensionsSpec(), flattenSpec); + } + + @Override + public ParseSpec withDimensionsSpec(DimensionsSpec spec) + { + return new AvroParseSpec(getTimestampSpec(), spec, flattenSpec); + } + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + final AvroParseSpec that = (AvroParseSpec) o; + return Objects.equals(flattenSpec, that.flattenSpec); + } + + @Override + public int hashCode() + { + return Objects.hash(super.hashCode(), flattenSpec); + } +} diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroStreamInputRowParser.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroStreamInputRowParser.java new file mode 100644 index 0000000000..d6e6624669 --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroStreamInputRowParser.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.druid.json; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import io.druid.data.input.ByteBufferInputRowParser; +import io.druid.data.input.InputRow; +import io.druid.data.input.impl.ParseSpec; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Objects; + +/** + * This class is copied from druid source code + * in order to avoid adding additional dependencies on druid-indexing-service. + */ +public class AvroStreamInputRowParser implements ByteBufferInputRowParser +{ + private final ParseSpec parseSpec; + private final AvroBytesDecoder avroBytesDecoder; + + @JsonCreator + public AvroStreamInputRowParser( + @JsonProperty("parseSpec") ParseSpec parseSpec, + @JsonProperty("avroBytesDecoder") AvroBytesDecoder avroBytesDecoder + ) + { + this.parseSpec = Preconditions.checkNotNull(parseSpec, "parseSpec"); + this.avroBytesDecoder = Preconditions.checkNotNull(avroBytesDecoder, "avroBytesDecoder"); + } + + @Override + public List parseBatch(ByteBuffer input) + { + throw new UnsupportedOperationException("This class is only used for JSON serde"); + } + + @JsonProperty + @Override + public ParseSpec getParseSpec() + { + return parseSpec; + } + + @JsonProperty + public AvroBytesDecoder getAvroBytesDecoder() + { + return avroBytesDecoder; + } + + @Override + public ByteBufferInputRowParser withParseSpec(ParseSpec parseSpec) + { + return new AvroStreamInputRowParser( + parseSpec, + avroBytesDecoder + ); + } + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final AvroStreamInputRowParser that = (AvroStreamInputRowParser) o; + return Objects.equals(parseSpec, that.parseSpec) && + Objects.equals(avroBytesDecoder, that.avroBytesDecoder); + } + + @Override + public int hashCode() + { + return Objects.hash(parseSpec, avroBytesDecoder); + } +} diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/InlineSchemaAvroBytesDecoder.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/InlineSchemaAvroBytesDecoder.java new file mode 100644 index 0000000000..72d6cbbc1e --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/InlineSchemaAvroBytesDecoder.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.druid.json; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; + +import java.util.Map; + +/** + * This class is copied from druid source code + * in order to avoid adding additional dependencies on druid-indexing-service. + */ +public class InlineSchemaAvroBytesDecoder implements AvroBytesDecoder +{ + private final Map schema; + + @JsonCreator + public InlineSchemaAvroBytesDecoder( + @JsonProperty("schema") Map schema + ) + { + Preconditions.checkArgument(schema != null, "schema must be provided"); + + this.schema = schema; + } + + @JsonProperty + public Map getSchema() + { + return schema; + } + +} diff --git a/itests/qtest-druid/pom.xml b/itests/qtest-druid/pom.xml index e566fcf4d7..a825da8096 100644 --- a/itests/qtest-druid/pom.xml +++ b/itests/qtest-druid/pom.xml @@ -109,6 +109,17 @@ druid-kafka-indexing-service ${druid.version} + + io.druid.extensions + druid-avro-extensions + ${druid.version} + + + org.mortbay.jetty + servlet-api + + + org.apache.logging.log4j log4j-api diff --git a/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java b/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java index 2a319527ac..a9d381f0f7 100644 --- a/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java +++ b/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java @@ -55,7 +55,8 @@ "druid.metadata.storage.type", "derby", "druid.storage.type", "hdfs", "druid.processing.buffer.sizeBytes", "213870912", - "druid.processing.numThreads", "2" + "druid.processing.numThreads", "2", + "druid.worker.capacity", "4" ); private static final Map COMMON_DRUID_HISTORICAL = ImmutableMap.of( diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 97609cfadd..c4aaf9cf8e 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -1743,6 +1743,7 @@ druid.query.files=druidmini_test1.q,\ druidmini_floorTime.q, \ druidmini_masking.q, \ druidkafkamini_basic.q, \ + druidkafkamini_avro.q, \ kafka_storage_handler.q druid.llap.local.query.files=druidmini_noop.q diff --git a/ql/src/test/queries/clientpositive/druidkafkamini_avro.q b/ql/src/test/queries/clientpositive/druidkafkamini_avro.q new file mode 100644 index 0000000000..183491c804 --- /dev/null +++ b/ql/src/test/queries/clientpositive/druidkafkamini_avro.q @@ -0,0 +1,99 @@ +SET hive.vectorized.execution.enabled=false; + +CREATE EXTERNAL TABLE druid_kafka_test_avro(`__time` timestamp , `page` string, `user` string, `language` string, + `country` string,`continent` string, `namespace` string, `newPage` boolean, `unpatrolled` boolean, + `anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) + STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' + TBLPROPERTIES ( + "druid.segment.granularity" = "MONTH", + "druid.query.granularity" = "MINUTE", + "kafka.bootstrap.servers" = "localhost:9092", + "kafka.topic" = "wiki_kafka_avro_table", + "druid.kafka.ingestion.useEarliestOffset" = "true", + "druid.kafka.ingestion.maxRowsInMemory" = "5", + "druid.kafka.ingestion.startDelay" = "PT1S", + "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.period" = "PT5S", + "druid.kafka.ingestion.consumer.retries" = "2", + "druid.kafka.ingestion.reportParseExceptions" = "true", + "druid.timestamp.column" = "timestamp", + "druid.timestamp.format" = "MM/dd/yyyy HH:mm:ss", + "druid.parseSpec.format" = "avro", + 'avro.schema.literal'='{ + "type" : "record", + "name" : "Wikipedia", + "namespace" : "org.apache.hive.kafka", + "version": "1", + "fields" : [ { + "name" : "isrobot", + "type" : "boolean" + }, { + "name" : "channel", + "type" : "string" + }, { + "name" : "timestamp", + "type" : "string" + }, { + "name" : "flags", + "type" : "string" + }, { + "name" : "isunpatrolled", + "type" : "boolean" + }, { + "name" : "page", + "type" : "string" + }, { + "name" : "diffurl", + "type" : "string" + }, { + "name" : "added", + "type" : "long" + }, { + "name" : "comment", + "type" : "string" + }, { + "name" : "commentlength", + "type" : "long" + }, { + "name" : "isnew", + "type" : "boolean" + }, { + "name" : "isminor", + "type" : "boolean" + }, { + "name" : "delta", + "type" : "long" + }, { + "name" : "isanonymous", + "type" : "boolean" + }, { + "name" : "user", + "type" : "string" + }, { + "name" : "deltabucket", + "type" : "double" + }, { + "name" : "deleted", + "type" : "long" + }, { + "name" : "namespace", + "type" : "string" + } ] + }' + ); + +ALTER TABLE druid_kafka_test_avro SET TBLPROPERTIES('druid.kafka.ingestion' = 'START'); + +!curl --noproxy * -ss http://localhost:8081/druid/indexer/v1/supervisor; + +-- Sleep for some time for ingestion tasks to ingest events +!sleep 60; + +DESCRIBE druid_kafka_test_avro; +DESCRIBE EXTENDED druid_kafka_test_avro; + +Select count(*) FROM druid_kafka_test_avro; + +Select page FROM druid_kafka_test_avro; + +DROP TABLE druid_kafka_test_avro; diff --git a/ql/src/test/results/clientpositive/druid/druidkafkamini_avro.q.out b/ql/src/test/results/clientpositive/druid/druidkafkamini_avro.q.out new file mode 100644 index 0000000000..d33dd4cbc2 --- /dev/null +++ b/ql/src/test/results/clientpositive/druid/druidkafkamini_avro.q.out @@ -0,0 +1,263 @@ +PREHOOK: query: CREATE EXTERNAL TABLE druid_kafka_test_avro(`__time` timestamp , `page` string, `user` string, `language` string, + `country` string,`continent` string, `namespace` string, `newPage` boolean, `unpatrolled` boolean, + `anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) + STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' + TBLPROPERTIES ( + "druid.segment.granularity" = "MONTH", + "druid.query.granularity" = "MINUTE", + "kafka.bootstrap.servers" = "localhost:9092", + "kafka.topic" = "wiki_kafka_avro_table", + "druid.kafka.ingestion.useEarliestOffset" = "true", + "druid.kafka.ingestion.maxRowsInMemory" = "5", + "druid.kafka.ingestion.startDelay" = "PT1S", + "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.period" = "PT5S", + "druid.kafka.ingestion.consumer.retries" = "2", + "druid.kafka.ingestion.reportParseExceptions" = "true", + "druid.timestamp.column" = "timestamp", + "druid.timestamp.format" = "MM/dd/yyyy HH:mm:ss", + "druid.parseSpec.format" = "avro", + 'avro.schema.literal'='{ + "type" : "record", + "name" : "Wikipedia", + "namespace" : "org.apache.hive.kafka", + "version": "1", + "fields" : [ { + "name" : "isrobot", + "type" : "boolean" + }, { + "name" : "channel", + "type" : "string" + }, { + "name" : "timestamp", + "type" : "string" + }, { + "name" : "flags", + "type" : "string" + }, { + "name" : "isunpatrolled", + "type" : "boolean" + }, { + "name" : "page", + "type" : "string" + }, { + "name" : "diffurl", + "type" : "string" + }, { + "name" : "added", + "type" : "long" + }, { + "name" : "comment", + "type" : "string" + }, { + "name" : "commentlength", + "type" : "long" + }, { + "name" : "isnew", + "type" : "boolean" + }, { + "name" : "isminor", + "type" : "boolean" + }, { + "name" : "delta", + "type" : "long" + }, { + "name" : "isanonymous", + "type" : "boolean" + }, { + "name" : "user", + "type" : "string" + }, { + "name" : "deltabucket", + "type" : "double" + }, { + "name" : "deleted", + "type" : "long" + }, { + "name" : "namespace", + "type" : "string" + } ] + }' + ) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@druid_kafka_test_avro +POSTHOOK: query: CREATE EXTERNAL TABLE druid_kafka_test_avro(`__time` timestamp , `page` string, `user` string, `language` string, + `country` string,`continent` string, `namespace` string, `newPage` boolean, `unpatrolled` boolean, + `anonymous` boolean, `robot` boolean, added int, deleted int, delta bigint) + STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' + TBLPROPERTIES ( + "druid.segment.granularity" = "MONTH", + "druid.query.granularity" = "MINUTE", + "kafka.bootstrap.servers" = "localhost:9092", + "kafka.topic" = "wiki_kafka_avro_table", + "druid.kafka.ingestion.useEarliestOffset" = "true", + "druid.kafka.ingestion.maxRowsInMemory" = "5", + "druid.kafka.ingestion.startDelay" = "PT1S", + "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.period" = "PT5S", + "druid.kafka.ingestion.consumer.retries" = "2", + "druid.kafka.ingestion.reportParseExceptions" = "true", + "druid.timestamp.column" = "timestamp", + "druid.timestamp.format" = "MM/dd/yyyy HH:mm:ss", + "druid.parseSpec.format" = "avro", + 'avro.schema.literal'='{ + "type" : "record", + "name" : "Wikipedia", + "namespace" : "org.apache.hive.kafka", + "version": "1", + "fields" : [ { + "name" : "isrobot", + "type" : "boolean" + }, { + "name" : "channel", + "type" : "string" + }, { + "name" : "timestamp", + "type" : "string" + }, { + "name" : "flags", + "type" : "string" + }, { + "name" : "isunpatrolled", + "type" : "boolean" + }, { + "name" : "page", + "type" : "string" + }, { + "name" : "diffurl", + "type" : "string" + }, { + "name" : "added", + "type" : "long" + }, { + "name" : "comment", + "type" : "string" + }, { + "name" : "commentlength", + "type" : "long" + }, { + "name" : "isnew", + "type" : "boolean" + }, { + "name" : "isminor", + "type" : "boolean" + }, { + "name" : "delta", + "type" : "long" + }, { + "name" : "isanonymous", + "type" : "boolean" + }, { + "name" : "user", + "type" : "string" + }, { + "name" : "deltabucket", + "type" : "double" + }, { + "name" : "deleted", + "type" : "long" + }, { + "name" : "namespace", + "type" : "string" + } ] + }' + ) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@druid_kafka_test_avro +PREHOOK: query: ALTER TABLE druid_kafka_test_avro SET TBLPROPERTIES('druid.kafka.ingestion' = 'START') +PREHOOK: type: ALTERTABLE_PROPERTIES +PREHOOK: Input: default@druid_kafka_test_avro +PREHOOK: Output: default@druid_kafka_test_avro +POSTHOOK: query: ALTER TABLE druid_kafka_test_avro SET TBLPROPERTIES('druid.kafka.ingestion' = 'START') +POSTHOOK: type: ALTERTABLE_PROPERTIES +POSTHOOK: Input: default@druid_kafka_test_avro +POSTHOOK: Output: default@druid_kafka_test_avro +["default.druid_kafka_test_avro"] +PREHOOK: query: DESCRIBE druid_kafka_test_avro +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@druid_kafka_test_avro +POSTHOOK: query: DESCRIBE druid_kafka_test_avro +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@druid_kafka_test_avro +__time timestamp from deserializer +page string from deserializer +user string from deserializer +language string from deserializer +country string from deserializer +continent string from deserializer +namespace string from deserializer +newpage boolean from deserializer +unpatrolled boolean from deserializer +anonymous boolean from deserializer +robot boolean from deserializer +added int from deserializer +deleted int from deserializer +delta bigint from deserializer +PREHOOK: query: DESCRIBE EXTENDED druid_kafka_test_avro +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@druid_kafka_test_avro +POSTHOOK: query: DESCRIBE EXTENDED druid_kafka_test_avro +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@druid_kafka_test_avro +__time timestamp from deserializer +page string from deserializer +user string from deserializer +language string from deserializer +country string from deserializer +continent string from deserializer +namespace string from deserializer +newpage boolean from deserializer +unpatrolled boolean from deserializer +anonymous boolean from deserializer +robot boolean from deserializer +added int from deserializer +deleted int from deserializer +delta bigint from deserializer + +#### A masked pattern was here #### +StorageHandlerInfo +Druid Storage Handler Runtime Status for default.druid_kafka_test_avro +kafkaPartitions=1 +activeTasks=[] +publishingTasks=[] +#### A masked pattern was here #### +aggregateLag=0 +#### A masked pattern was here #### +PREHOOK: query: Select count(*) FROM druid_kafka_test_avro +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_kafka_test_avro +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select count(*) FROM druid_kafka_test_avro +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_kafka_test_avro +POSTHOOK: Output: hdfs://### HDFS PATH ### +11 +PREHOOK: query: Select page FROM druid_kafka_test_avro +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_kafka_test_avro +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select page FROM druid_kafka_test_avro +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_kafka_test_avro +POSTHOOK: Output: hdfs://### HDFS PATH ### +page is 0 +page is 100 +page is 200 +page is 300 +page is 400 +page is 500 +page is 600 +page is 700 +page is 800 +page is 900 +page is 1000 +PREHOOK: query: DROP TABLE druid_kafka_test_avro +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@druid_kafka_test_avro +PREHOOK: Output: default@druid_kafka_test_avro +POSTHOOK: query: DROP TABLE druid_kafka_test_avro +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@druid_kafka_test_avro +POSTHOOK: Output: default@druid_kafka_test_avro -- 2.17.1 (Apple Git-112)