From 0215c25e04ef28138b2d92af565b029e04e2a37d Mon Sep 17 00:00:00 2001 From: Nishant Date: Fri, 16 Mar 2018 23:54:36 +0530 Subject: [PATCH] [HIVE-18976] Add ability to setup Druid Kafka Indexing Service via Hive --- .../org/apache/hadoop/hive/conf/Constants.java | 5 + .../java/org/apache/hadoop/hive/conf/HiveConf.java | 3 + data/scripts/kafka_init_data.json | 10 + druid-handler/pom.xml | 24 ++ .../druid/DruidKafkaStreamingStorageHandler.java | 365 +++++++++++++++++++++ .../hive/druid/DruidStorageHandlerUtils.java | 6 + itests/qtest-druid/pom.xml | 11 + .../org/apache/hive/druid/MiniDruidCluster.java | 59 ++-- .../apache/hive/kafka/SingleNodeKafkaCluster.java | 122 +++++++ .../hive/cli/TestMiniDruidKafkaCliDriver.java | 63 ++++ .../apache/hadoop/hive/cli/control/CliConfigs.java | 23 ++ .../java/org/apache/hadoop/hive/ql/QTestUtil.java | 63 +++- pom.xml | 2 +- .../queries/clientpositive/druidkafkamini_basic.q | 19 ++ .../druid/druidkafkamini_basic.q.out | 60 ++++ 15 files changed, 797 insertions(+), 38 deletions(-) create mode 100644 data/scripts/kafka_init_data.json create mode 100644 druid-handler/src/java/org/apache/hadoop/hive/druid/DruidKafkaStreamingStorageHandler.java create mode 100644 itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java create mode 100644 itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidKafkaCliDriver.java create mode 100644 ql/src/test/queries/clientpositive/druidkafkamini_basic.q create mode 100644 ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out diff --git a/common/src/java/org/apache/hadoop/hive/conf/Constants.java b/common/src/java/org/apache/hadoop/hive/conf/Constants.java index 10aaee182f..eb9ae3670e 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/Constants.java +++ b/common/src/java/org/apache/hadoop/hive/conf/Constants.java @@ -46,6 +46,11 @@ public static final String DRUID_SEGMENT_VERSION = "druid.segment.version"; public static final String DRUID_JOB_WORKING_DIRECTORY = "druid.job.workingDirectory"; + public static final String KAFKA_TOPIC = "kafka.topic"; + public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; + + public static final String DRUID_KAFKA_INGESTION_PROPERTY_PREFIX = "druid.kafka.ingestion."; + public static final String HIVE_SERVER2_JOB_CREDSTORE_PASSWORD_ENVVAR = "HIVE_JOB_CREDSTORE_PASSWORD"; public static final String HADOOP_CREDENTIAL_PASSWORD_ENVVAR = "HADOOP_CREDSTORE_PASSWORD"; public static final String HADOOP_CREDENTIAL_PROVIDER_PATH_CONFIG = "hadoop.security.credential.provider.path"; diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 06efd02253..8406b06022 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2084,6 +2084,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVE_DRUID_COORDINATOR_DEFAULT_ADDRESS("hive.druid.coordinator.address.default", "localhost:8081", "Address of the Druid coordinator. It is used to check the load status of newly created segments" ), + HIVE_DRUID_OVERLORD_DEFAULT_ADDRESS("hive.druid.overlord.address.default", "localhost:8090", + "Address of the Druid overlord. It is used to submit indexing tasks to druid." + ), HIVE_DRUID_SELECT_THRESHOLD("hive.druid.select.threshold", 10000, "Takes only effect when hive.druid.select.distribute is set to false. \n" + "When we can split a Select query, this is the maximum number of rows that we try to retrieve\n" + diff --git a/data/scripts/kafka_init_data.json b/data/scripts/kafka_init_data.json new file mode 100644 index 0000000000..9e2c58cbdb --- /dev/null +++ b/data/scripts/kafka_init_data.json @@ -0,0 +1,10 @@ +{"__time": "2013-08-31T01:02:33Z", "page": "Gypsy Danger", "language" : "en", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143} +{"__time": "2013-08-31T03:32:45Z", "page": "Striker Eureka", "language" : "en", "user" : "speed", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Australia", "country":"Australia", "region":"Cantebury", "city":"Syndey", "added": 459, "deleted": 129, "delta": 330} +{"__time": "2013-08-31T07:11:21Z", "page": "Cherno Alpha", "language" : "ru", "user" : "masterYi", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"article", "continent":"Asia", "country":"Russia", "region":"Oblast", "city":"Moscow", "added": 123, "deleted": 12, "delta": 111} +{"__time": "2013-08-31T11:58:39Z", "page": "Crimson Typhoon", "language" : "zh", "user" : "triplets", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"China", "region":"Shanxi", "city":"Taiyuan", "added": 905, "deleted": 5, "delta": 900} +{"__time": "2013-08-31T12:41:27Z", "page": "Coyote Tango", "language" : "ja", "user" : "stringer", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"Japan", "region":"Kanto", "city":"Tokyo", "added": 1, "deleted": 10, "delta": -9} +{"__time": "2013-09-01T01:02:33Z", "page": "Gypsy Danger", "language" : "en", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143} +{"__time": "2013-09-01T03:32:45Z", "page": "Striker Eureka", "language" : "en", "user" : "speed", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Australia", "country":"Australia", "region":"Cantebury", "city":"Syndey", "added": 459, "deleted": 129, "delta": 330} +{"__time": "2013-09-01T07:11:21Z", "page": "Cherno Alpha", "language" : "ru", "user" : "masterYi", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"article", "continent":"Asia", "country":"Russia", "region":"Oblast", "city":"Moscow", "added": 123, "deleted": 12, "delta": 111} +{"__time": "2013-09-01T11:58:39Z", "page": "Crimson Typhoon", "language" : "zh", "user" : "triplets", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"China", "region":"Shanxi", "city":"Taiyuan", "added": 905, "deleted": 5, "delta": 900} +{"__time": "2013-09-01T12:41:27Z", "page": "Coyote Tango", "language" : "ja", "user" : "stringer", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"Japan", "region":"Kanto", "city":"Tokyo", "added": 1, "deleted": 10, "delta": -9} \ No newline at end of file diff --git a/druid-handler/pom.xml b/druid-handler/pom.xml index b53ddb4a89..4e4c837b0e 100644 --- a/druid-handler/pom.xml +++ b/druid-handler/pom.xml @@ -138,6 +138,11 @@ + io.druid + druid-indexing-service + ${druid.version} + + io.druid.extensions druid-hdfs-storage ${druid.version} @@ -181,6 +186,25 @@ ${druid.version} + io.druid.extensions + druid-kafka-indexing-service + ${druid.version} + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-annotations + + + com.fasterxml.jackson.core + jackson-databind + + + + org.apache.hadoop hadoop-common provided diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidKafkaStreamingStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidKafkaStreamingStorageHandler.java new file mode 100644 index 0000000000..ccdbcdd03b --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidKafkaStreamingStorageHandler.java @@ -0,0 +1,365 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.druid; + +import io.druid.data.input.impl.DimensionSchema; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.InputRowParser; +import io.druid.data.input.impl.JSONParseSpec; +import io.druid.data.input.impl.StringDimensionSchema; +import io.druid.data.input.impl.StringInputRowParser; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig; +import io.druid.indexing.kafka.supervisor.KafkaSupervisorSpec; +import io.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig; +import io.druid.java.util.common.granularity.Granularity; +import io.druid.metadata.MetadataStorageTablesConfig; +import io.druid.metadata.SQLMetadataConnector; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.DoubleSumAggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.segment.IndexSpec; +import io.druid.segment.data.ConciseBitmapSerdeFactory; +import io.druid.segment.data.RoaringBitmapSerdeFactory; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.granularity.GranularitySpec; +import io.druid.segment.indexing.granularity.UniformGranularitySpec; + +import org.apache.hadoop.hive.conf.Constants; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.metamx.http.client.Request; +import com.metamx.http.client.response.StatusResponseHandler; +import com.metamx.http.client.response.StatusResponseHolder; + +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.joda.time.Period; + +import java.io.IOException; +import java.net.URL; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * DruidStorageHandler provides a HiveStorageHandler implementation for Druid. + */ +@SuppressWarnings({ "rawtypes" }) +public class DruidKafkaStreamingStorageHandler extends DruidStorageHandler { + + public DruidKafkaStreamingStorageHandler() { + } + + @VisibleForTesting + public DruidKafkaStreamingStorageHandler(SQLMetadataConnector connector, + MetadataStorageTablesConfig druidMetadataStorageTablesConfig + ) { + super(connector, druidMetadataStorageTablesConfig); + } + + @Override + public void commitCreateTable(Table table) throws MetaException { + final String overlordAddress = HiveConf + .getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_OVERLORD_DEFAULT_ADDRESS); + + final String dataSourceName = Preconditions.checkNotNull(getTableProperty(table, Constants.DRUID_DATA_SOURCE), "Druid datasource name is null"); + + final String kafkaTopic = Preconditions.checkNotNull(getTableProperty(table, Constants.KAFKA_TOPIC), "kafka topic is null"); + final String kafka_servers = Preconditions.checkNotNull(getTableProperty(table, Constants.KAFKA_BOOTSTRAP_SERVERS), "kafka connect string is null"); + + final String segmentGranularity = + table.getParameters().get(Constants.DRUID_SEGMENT_GRANULARITY) != null ? + table.getParameters().get(Constants.DRUID_SEGMENT_GRANULARITY) : + HiveConf.getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_INDEXING_GRANULARITY); + + final GranularitySpec granularitySpec = new UniformGranularitySpec( + Granularity.fromString(segmentGranularity), + Granularity.fromString( + table.getParameters().get(Constants.DRUID_QUERY_GRANULARITY) == null + ? "NONE" + : table.getParameters().get(Constants.DRUID_QUERY_GRANULARITY)), + null + ); + + List columns = table.getSd().getCols(); + List columnNames = columns.stream().map(fieldSchema -> fieldSchema.getName()) + .collect(Collectors.toList()); + + if (!columnNames.contains(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN)) { + throw new IllegalStateException( + "Timestamp column (' " + DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN + + "') not specified in create table; list of columns is : " + + columnNames); + } + final boolean approximationAllowed = HiveConf + .getBoolVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_APPROX_RESULT); + // Default, all columns that are not metrics or timestamp, are treated as dimensions + final List dimensions = new ArrayList<>(); + ImmutableList.Builder aggregatorFactoryBuilder = ImmutableList.builder(); + for (int i = 0; i < columns.size(); i++) { + final PrimitiveObjectInspector.PrimitiveCategory primitiveCategory = ((PrimitiveTypeInfo) TypeInfoUtils + .getTypeInfoFromTypeString(columns + .get(i).getType())).getPrimitiveCategory(); + AggregatorFactory af; + switch (primitiveCategory) { + case BYTE: + case SHORT: + case INT: + case LONG: + af = new LongSumAggregatorFactory(columns.get(i).getName(), columns.get(i).getName()); + break; + case FLOAT: + case DOUBLE: + af = new DoubleSumAggregatorFactory(columns.get(i).getName(), columns.get(i).getName()); + break; + case DECIMAL: + if (approximationAllowed) { + af = new DoubleSumAggregatorFactory(columns.get(i).getName(), columns.get(i).getName()); + } else { + throw new UnsupportedOperationException( + String.format("Druid does not support decimal column type." + + "Either cast column [%s] to double or Enable Approximate Result for Druid by setting property [%s] to true", + columns.get(i).getName(), HiveConf.ConfVars.HIVE_DRUID_APPROX_RESULT.varname)); + } + break; + case TIMESTAMP: + // Granularity column + String tColumnName = columns.get(i).getName(); + if (!tColumnName.equals(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN)) { + throw new IllegalStateException( + "Dimension " + tColumnName + " does not have STRING type: " + + primitiveCategory); + } + continue; + case TIMESTAMPLOCALTZ: + // Druid timestamp column + String tLocalTZColumnName = columns.get(i).getName(); + if (!tLocalTZColumnName.equals(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN)) { + throw new IllegalStateException( + "Dimension " + tLocalTZColumnName + " does not have STRING type: " + + primitiveCategory); + } + continue; + default: + // Dimension + String dColumnName = columns.get(i).getName(); + if (PrimitiveObjectInspectorUtils.getPrimitiveGrouping(primitiveCategory) != + PrimitiveObjectInspectorUtils.PrimitiveGrouping.STRING_GROUP + && primitiveCategory != PrimitiveObjectInspector.PrimitiveCategory.BOOLEAN) { + throw new IllegalStateException( + "Dimension " + dColumnName + " does not have STRING type: " + + primitiveCategory); + } + dimensions.add(new StringDimensionSchema(dColumnName)); + continue; + } + aggregatorFactoryBuilder.add(af); + } + List aggregatorFactories = aggregatorFactoryBuilder.build(); + final InputRowParser inputRowParser = new StringInputRowParser( + new JSONParseSpec( + new TimestampSpec(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, "auto", null), + new DimensionsSpec(dimensions, null, null), + null, + null + ), "UTF-8"); + + Map inputParser = DruidStorageHandlerUtils.JSON_MAPPER + .convertValue(inputRowParser, Map.class); + + final DataSchema dataSchema = new DataSchema( + dataSourceName, + inputParser, + aggregatorFactories.toArray(new AggregatorFactory[aggregatorFactories.size()]), + granularitySpec, + DruidStorageHandlerUtils.JSON_MAPPER + ); + + IndexSpec indexSpec; + if ("concise".equals(HiveConf.getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_BITMAP_FACTORY_TYPE))) { + indexSpec = new IndexSpec(new ConciseBitmapSerdeFactory(), null, null, null); + } else { + indexSpec = new IndexSpec(new RoaringBitmapSerdeFactory(true), null, null, null); + } + + + KafkaSupervisorSpec spec = new KafkaSupervisorSpec(dataSchema, + new KafkaSupervisorTuningConfig( + getIntegerProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxRowsInMemory"), + getIntegerProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxRowsPerSegment"), + getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "intermediatePersistPeriod"), + null, // basePersistDirectory - use druid default, no need to be configured by user + getIntegerProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxPendingPersists"), + indexSpec, + null, // buildV9Directly - use druid default, no need to be configured by user + getBooleanProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "reportParseExceptions"), + getLongProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "handoffConditionTimeout"), + getBooleanProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "resetOffsetAutomatically"), + getIntegerProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "workerThreads"), + getIntegerProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "chatThreads"), + getLongProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "chatRetries"), + getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "httpTimeout"), + getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "shutdownTimeout"), + getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "offsetFetchPeriod")), + new KafkaSupervisorIOConfig(kafkaTopic, // Mandatory Property + getIntegerProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "replicas"), + getIntegerProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "taskCount"), + getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "taskDuration"), + ImmutableMap.of(KafkaSupervisorIOConfig.BOOTSTRAP_SERVERS_KEY, + kafka_servers), // Mandatory Property + getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "startDelay"), + getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "period"), + getBooleanProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "useEarliestOffset"), + getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "completionTimeout"), + getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "lateMessageRejectionPeriod"), + getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "earlyMessageRejectionPeriod"), + getBooleanProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "skipOffsetGaps")), + new HashMap(), + null, + null, + null, + null, + null, + null, + null + ); + + try { + String task = DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(spec); + console.printInfo("submitting kafka Spec {}", task); + LOG.debug("submitting kafka Supervisor Spec {}", task); + + StatusResponseHolder response = getHttpClient().go(new Request(HttpMethod.POST, + new URL(String.format("http://%s/druid/indexer/v1/supervisor", overlordAddress))) + .setContent( + "application/json", + DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsBytes(spec)), + new StatusResponseHandler( + Charset.forName("UTF-8"))).get(); + if (response.getStatus().equals(HttpResponseStatus.OK)) { + console.printInfo("Kafka Supervisor Submitted Successfully to druid."); + } else { + throw new IOException(String + .format("Unable to start Kafka Ingestion Druid status [%d] full response [%s]", + response.getStatus().getCode(), response.getContent())); + } + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + + @Override + public void commitDropTable(Table table, boolean deleteData) throws MetaException { + // Stop Kafka Ingestion first + final String overlordAddress = Preconditions.checkNotNull(HiveConf + .getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_OVERLORD_DEFAULT_ADDRESS), + "Druid Overlord Address is null"); + String dataSourceName = Preconditions + .checkNotNull(getTableProperty(table, Constants.DRUID_DATA_SOURCE), + "Druid Datasource name is null"); + try { + StatusResponseHolder response = getHttpClient().go(new Request(HttpMethod.POST, + new URL(String + .format("http://%s/druid/indexer/v1/supervisor/%s/shutdown", overlordAddress, + dataSourceName))), + new StatusResponseHandler( + Charset.forName("UTF-8"))).get(); + if (response.getStatus().equals(HttpResponseStatus.OK)) { + console.printInfo("Kafka Stopped Druid Kafka Ingestion."); + } else { + throw new IOException(String + .format("Unable to stop Kafka Ingestion Druid status [%d] full response [%s]", + response.getStatus().getCode(), response.getContent())); + } + } catch (Exception e) { + throw Throwables.propagate(e); + } + // disable or delete data segment. + super.commitDropTable(table, deleteData); + } + + private static Boolean getBooleanProperty(Table table, String propertyName) { + String val = getTableProperty(table, propertyName); + if (val == null) { + return null; + } + return Boolean.parseBoolean(val); + } + + private static Integer getIntegerProperty(Table table, String propertyName) { + String val = getTableProperty(table, propertyName); + if (val == null) { + return null; + } + try { + return Integer.parseInt(val); + } catch (NumberFormatException e) { + throw new NumberFormatException(String + .format("Exception while parsing property[%s] with Value [%s] as Integer", propertyName, + val)); + } + } + + private static Long getLongProperty(Table table, String propertyName) { + String val = getTableProperty(table, propertyName); + if (val == null) { + return null; + } + try { + return Long.parseLong(val); + } catch (NumberFormatException e) { + throw new NumberFormatException(String + .format("Exception while parsing property[%s] with Value [%s] as Long", propertyName, + val)); + } + } + + private static Period getPeriodProperty(Table table, String propertyName) { + String val = getTableProperty(table, propertyName); + if (val == null) { + return null; + } + try { + return Period.parse(val); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException(String + .format("Exception while parsing property[%s] with Value [%s] as Period", propertyName, + val)); + } + } + + private static String getTableProperty(Table table, String propertyName) { + return table.getParameters().get(propertyName); + } +} \ No newline at end of file diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java index 2f956b179b..0003e1fafd 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.druid; import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.dataformat.smile.SmileFactory; @@ -30,6 +31,8 @@ import com.metamx.emitter.service.ServiceEmitter; import com.metamx.http.client.HttpClient; import com.metamx.http.client.response.InputStreamResponseHandler; + +import io.druid.indexing.kafka.KafkaIndexTaskModule; import io.druid.jackson.DefaultObjectMapper; import io.druid.math.expr.ExprMacroTable; import io.druid.metadata.MetadataStorageTablesConfig; @@ -157,6 +160,9 @@ // set the timezone of the object mapper // THIS IS NOT WORKING workaround is to set it as part of java opts -Duser.timezone="UTC" JSON_MAPPER.setTimeZone(TimeZone.getTimeZone("UTC")); + List kafkaIndexTaskJacksonModules = (List) new KafkaIndexTaskModule().getJacksonModules(); + JSON_MAPPER.registerModules(kafkaIndexTaskJacksonModules); + SMILE_MAPPER.registerModules(kafkaIndexTaskJacksonModules); try { // No operation emitter will be used by some internal druid classes. EmittingLogger.registerEmitter( diff --git a/itests/qtest-druid/pom.xml b/itests/qtest-druid/pom.xml index 870e3654e6..c01898de7a 100644 --- a/itests/qtest-druid/pom.xml +++ b/itests/qtest-druid/pom.xml @@ -43,6 +43,7 @@ 10.11.1.1 16.0.1 4.1.0 + 0.10.2.0 @@ -104,6 +105,11 @@ + io.druid.extensions + druid-kafka-indexing-service + ${druid.version} + + org.apache.logging.log4j log4j-api ${log4j2.version} @@ -196,6 +202,11 @@ ${junit.version} test + + org.apache.kafka + kafka_2.11 + ${kafka.version} + diff --git a/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java b/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java index 71259dc914..8e79d4cdd9 100644 --- a/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java +++ b/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java @@ -21,7 +21,6 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import org.apache.commons.io.FileUtils; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.service.AbstractService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,12 +50,13 @@ ); private static final Map COMMON_DRUID_CONF = ImmutableMap.of( - "druid.metadata.storage.type", "derby" + "druid.metadata.storage.type", "derby", + "druid.storage.type", "hdfs", + "druid.processing.buffer.sizeBytes", "213870912", + "druid.processing.numThreads", "2" ); private static final Map COMMON_DRUID_HISTORICAL = ImmutableMap.of( - "druid.processing.buffer.sizeBytes", "213870912", - "druid.processing.numThreads", "2", "druid.server.maxSize", "130000000000" ); @@ -87,26 +87,13 @@ public MiniDruidCluster(String name) { } - public MiniDruidCluster(String name, String logDir, String dataDir, Integer zookeeperPort, String classpath) { + public MiniDruidCluster(String name, String logDir, String tmpDir, Integer zookeeperPort, String classpath) { super(name); - this.dataDirectory = new File(dataDir, "druid-data"); + this.dataDirectory = new File(tmpDir, "druid-data"); this.logDirectory = new File(logDir); - try { - if (dataDirectory.exists()) { - // need to clean data directory to ensure that there is no interference from old runs - // Cleaning is happening here to allow debugging in case of tests fail - // we don;t have to clean logs since it is an append mode - log.info("Cleaning the druid-data directory [{}]", dataDirectory.getAbsolutePath()); - FileUtils.deleteDirectory(dataDirectory); - } else { - log.info("Creating the druid-data directory [{}]", dataDirectory.getAbsolutePath()); - dataDirectory.mkdirs(); - } - } catch (IOException e) { - log.error("Failed to clean data directory"); - Throwables.propagate(e); - } + ensureCleanDirectory(dataDirectory); + String derbyURI = String .format("jdbc:derby://localhost:1527/%s/druid_derby/metadata.db;create=true", dataDirectory.getAbsolutePath() @@ -126,13 +113,14 @@ public MiniDruidCluster(String name, String logDir, String dataDir, Integer zook .put("druid.indexer.logs.directory", indexingLogDir) .put("druid.zk.service.host", "localhost:" + zookeeperPort) .put("druid.coordinator.startDelay", "PT1S") + .put("druid.indexer.runner", "local") + .put("druid.storage.storageDirectory", getDeepStorageDir()) .build(); Map historicalProperties = historicalMapBuilder.putAll(COMMON_DRUID_CONF) .putAll(COMMON_DRUID_HISTORICAL) .put("druid.zk.service.host", "localhost:" + zookeeperPort) .put("druid.segmentCache.locations", segmentsCache) .put("druid.storage.storageDirectory", getDeepStorageDir()) - .put("druid.storage.type", "hdfs") .build(); coordinator = new ForkingDruidNode("coordinator", classpath, coordinatorProperties, COORDINATOR_JVM_CONF, @@ -148,6 +136,24 @@ public MiniDruidCluster(String name, String logDir, String dataDir, Integer zook } + private static void ensureCleanDirectory(File dir){ + try { + if (dir.exists()) { + // need to clean data directory to ensure that there is no interference from old runs + // Cleaning is happening here to allow debugging in case of tests fail + // we don;t have to clean logs since it is an append mode + log.info("Cleaning the druid directory [{}]", dir.getAbsolutePath()); + FileUtils.deleteDirectory(dir); + } else { + log.info("Creating the druid directory [{}]", dir.getAbsolutePath()); + dir.mkdirs(); + } + } catch (IOException e) { + log.error("Failed to clean druid directory"); + Throwables.propagate(e); + } + } + @Override protected void serviceStart() throws Exception { druidNodes.stream().forEach(node -> { @@ -191,4 +197,13 @@ public String getMetadataURI() { public String getDeepStorageDir() { return dataDirectory.getAbsolutePath() + File.separator + "deep-storage"; } + + public String getCoordinatorURI(){ + return "localhost:8081"; + } + + public String getOverlordURI(){ + // Overlord and coordinator both run in same JVM. + return getCoordinatorURI(); + } } diff --git a/itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java b/itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java new file mode 100644 index 0000000000..d839fd2db4 --- /dev/null +++ b/itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java @@ -0,0 +1,122 @@ +package org.apache.hive.kafka; + +import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServerStartable; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; + +import org.apache.hadoop.service.AbstractService; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; + +import com.google.common.base.Throwables; +import com.google.common.io.Files; + +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.List; +import java.util.Properties; + +/** + * This class has the hooks to start and stop single node kafka cluster. + * The kafka broker is started on port 9092 + */ +public class SingleNodeKafkaCluster extends AbstractService { + private static final Logger log = LoggerFactory.getLogger(SingleNodeKafkaCluster.class); + + private final KafkaServerStartable serverStartable; + private final String zkString; + + public SingleNodeKafkaCluster(String name, String logDir, Integer zkPort){ + super(name); + Properties properties = new Properties(); + this.zkString = String.format("localhost:%d", zkPort); + properties.setProperty("zookeeper.connect", zkString); + properties.setProperty("broker.id", String.valueOf(1)); + properties.setProperty("host.name", "localhost"); + properties.setProperty("port", Integer.toString(9092)); + properties.setProperty("log.dir", logDir); + properties.setProperty("log.flush.interval.messages", String.valueOf(1)); + properties.setProperty("offsets.topic.replication.factor", String.valueOf(1)); + properties.setProperty("offsets.topic.num.partitions", String.valueOf(1)); + properties.setProperty("transaction.state.log.replication.factor", String.valueOf(1)); + properties.setProperty("transaction.state.log.min.isr", String.valueOf(1)); + properties.setProperty("log.cleaner.dedupe.buffer.size", "1048577"); + + this.serverStartable = new KafkaServerStartable(KafkaConfig.fromProps(properties)); + } + + + @Override + protected void serviceStart() throws Exception { + serverStartable.startup(); + log.info("Kafka Server Started"); + + } + + @Override + protected void serviceStop() throws Exception { + log.info("Stopping Kafka Server"); + serverStartable.shutdown(); + log.info("Kafka Server Stopped"); + } + + /** + * Creates a topic and inserts data from the specified datafile. + * Each line in the datafile is sent to kafka as a single message. + * @param topicName + * @param datafile + */ + public void createTopicWithData(String topicName, File datafile){ + createTopic(topicName); + // set up kafka producer + Properties properties = new Properties(); + properties.put("bootstrap.servers", "localhost:9092"); + properties.put("acks", "1"); + properties.put("retries", "3"); + + try(KafkaProducer producer = new KafkaProducer<>( + properties, + new StringSerializer(), + new StringSerializer() + )){ + List events = Files.readLines(datafile, Charset.forName("UTF-8")); + for(String event : events){ + producer.send(new ProducerRecord(topicName, event)); + } + } catch (IOException e) { + Throwables.propagate(e); + } + + } + + public void createTopic(String topic) { + int sessionTimeoutMs = 1000; + ZkClient zkClient = new ZkClient( + this.zkString, sessionTimeoutMs, sessionTimeoutMs, + ZKStringSerializer$.MODULE$ + ); + ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zkString, sessionTimeoutMs), false); + int numPartitions = 1; + int replicationFactor = 1; + Properties topicConfig = new Properties(); + AdminUtils.createTopic( + zkUtils, + topic, + numPartitions, + replicationFactor, + topicConfig, + RackAwareMode.Disabled$.MODULE$ + ); + } + +} diff --git a/itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidKafkaCliDriver.java b/itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidKafkaCliDriver.java new file mode 100644 index 0000000000..4768975225 --- /dev/null +++ b/itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidKafkaCliDriver.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.cli; + +import org.apache.hadoop.hive.cli.control.CliAdapter; +import org.apache.hadoop.hive.cli.control.CliConfigs; + +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestRule; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.io.File; +import java.util.List; + +@RunWith(Parameterized.class) +public class TestMiniDruidKafkaCliDriver { + + static CliAdapter adapter = new CliConfigs.MiniDruidKafkaCliConfig().getCliAdapter(); + + @Parameters(name = "{0}") + public static List getParameters() throws Exception { + return adapter.getParameters(); + } + + @ClassRule + public static TestRule cliClassRule = adapter.buildClassRule(); + + @Rule + public TestRule cliTestRule = adapter.buildTestRule(); + + private String name; + private File qfile; + + public TestMiniDruidKafkaCliDriver(String name, File qfile) { + this.name = name; + this.qfile = qfile; + } + + @Test + public void testCliDriver() throws Exception { + adapter.runTest(name, qfile); + } + +} diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java index 7034c38b90..1e65569a3c 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java @@ -187,6 +187,29 @@ public MiniDruidCliConfig() { } } + public static class MiniDruidKafkaCliConfig extends AbstractCliConfig { + public MiniDruidKafkaCliConfig() { + super(CoreCliDriver.class); + try { + setQueryDir("ql/src/test/queries/clientpositive"); + + includesFrom(testConfigProps, "druid.kafka.query.files"); + + setResultsDir("ql/src/test/results/clientpositive/druid"); + setLogDir("itests/qtest/target/tmp/log"); + + setInitScript("q_test_druid_init.sql"); + setCleanupScript("q_test_cleanup_druid.sql"); + setHiveConfDir("data/conf/llap"); + setClusterType(MiniClusterType.druidKafka); + setMetastoreType(MetastoreType.sql); + setFsType(QTestUtil.FsType.hdfs); + } catch (Exception e) { + throw new RuntimeException("can't construct cliconfig", e); + } + } + } + public static class MiniLlapLocalCliConfig extends AbstractCliConfig { public MiniLlapLocalCliConfig() { diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index 050f9d5765..3cdad284ef 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -67,6 +67,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Stream; +import junit.framework.TestSuite; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.apache.commons.io.IOUtils; @@ -87,12 +88,11 @@ import org.apache.hadoop.hive.common.io.SortPrintStream; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.metastore.conf.MetastoreConf; -import org.apache.hive.druid.MiniDruidCluster; import org.apache.hadoop.hive.llap.LlapItUtils; import org.apache.hadoop.hive.llap.daemon.MiniLlapCluster; import org.apache.hadoop.hive.llap.io.api.LlapProxy; import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.Task; @@ -123,20 +123,23 @@ import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hive.common.util.StreamPrinter; +import org.apache.hive.druid.MiniDruidCluster; +import org.apache.hive.kafka.SingleNodeKafkaCluster; import org.apache.logging.log4j.util.Strings; import org.apache.tools.ant.BuildException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; -import org.junit.Assert; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_NAME; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; -import junit.framework.TestSuite; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * QTestUtil. @@ -203,6 +206,7 @@ private final String cleanupScript; private MiniDruidCluster druidCluster; + private SingleNodeKafkaCluster kafkaCluster; public interface SuiteAddTestFunctor { public void addTestToSuite(TestSuite suite, Object setup, String tName); @@ -402,6 +406,8 @@ public void initConf() throws Exception { conf.set("hive.druid.storage.storageDirectory", druidDeepStorage.toUri().getPath()); conf.set("hive.druid.metadata.db.type", "derby"); conf.set("hive.druid.metadata.uri", druidCluster.getMetadataURI()); + conf.set("hive.druid.coordinator.address.default", druidCluster.getCoordinatorURI()); + conf.set("hive.druid.overlord.address.default", druidCluster.getOverlordURI()); final Path scratchDir = fs .makeQualified(new Path(System.getProperty("test.tmp.dir"), "druidStagingDir")); fs.mkdirs(scratchDir); @@ -501,7 +507,9 @@ private void createRemoteDirs() { llap(CoreClusterType.TEZ, FsType.hdfs), llap_local(CoreClusterType.TEZ, FsType.local), none(CoreClusterType.MR, FsType.local), - druid(CoreClusterType.TEZ, FsType.hdfs); + druid(CoreClusterType.TEZ, FsType.hdfs), + druidKafka(CoreClusterType.TEZ, FsType.hdfs), + kafka(CoreClusterType.TEZ, FsType.hdfs); private final CoreClusterType coreClusterType; @@ -537,8 +545,11 @@ public static MiniClusterType valueForString(String type) { } else if (type.equals("llap_local")) { return llap_local; } else if (type.equals("druid")) { - return druid; - } else { + return druid; + } else if (type.equals("druid-kafka")) { + return druidKafka; + } + else { return none; } } @@ -630,11 +641,7 @@ public QTestUtil(String outDir, String logDir, MiniClusterType clusterType, ? new File(new File(dataDir).getAbsolutePath() + "/datasets") : new File(conf.get("test.data.set.files")); - // Use the current directory if it is not specified - String scriptsDir = conf.get("test.data.scripts"); - if (scriptsDir == null) { - scriptsDir = new File(".").getAbsolutePath() + "/data/scripts"; - } + String scriptsDir = getScriptsDir(); this.initScript = scriptsDir + File.separator + initScript; this.cleanupScript = scriptsDir + File.separator + cleanupScript; @@ -643,6 +650,14 @@ public QTestUtil(String outDir, String logDir, MiniClusterType clusterType, init(); } + private String getScriptsDir() { + // Use the current directory if it is not specified + String scriptsDir = conf.get("test.data.scripts"); + if (scriptsDir == null) { + scriptsDir = new File(".").getAbsolutePath() + "/data/scripts"; + } + return scriptsDir; + } private void setupFileSystem(HadoopShims shims) throws IOException { @@ -678,7 +693,7 @@ private void setupMiniCluster(HadoopShims shims, String confDir) throws String uriString = fs.getUri().toString(); - if (clusterType == MiniClusterType.druid) { + if (clusterType == MiniClusterType.druid || clusterType == MiniClusterType.druidKafka) { final String tempDir = System.getProperty("test.tmp.dir"); druidCluster = new MiniDruidCluster("mini-druid", getLogDirectory(), @@ -699,6 +714,19 @@ private void setupMiniCluster(HadoopShims shims, String confDir) throws druidCluster.start(); } + if(clusterType == MiniClusterType.kafka || clusterType == MiniClusterType.druidKafka) { + kafkaCluster = new SingleNodeKafkaCluster("kafka", + getLogDirectory() + "/kafka-cluster", + setup.zkPort + ); + kafkaCluster.init(conf); + kafkaCluster.start(); + kafkaCluster.createTopicWithData( + "test-topic", + new File(getScriptsDir(), "kafka_init_data.json") + ); + } + if (clusterType.getCoreClusterType() == CoreClusterType.TEZ) { if (confDir != null && !confDir.isEmpty()) { conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() @@ -735,6 +763,11 @@ public void shutdown() throws Exception { druidCluster.stop(); druidCluster = null; } + + if (kafkaCluster != null) { + kafkaCluster.stop(); + kafkaCluster = null; + } setup.tearDown(); if (sparkSession != null) { try { diff --git a/pom.xml b/pom.xml index 5be30f6696..41904f48b7 100644 --- a/pom.xml +++ b/pom.xml @@ -127,7 +127,7 @@ 4.1.19 3.2.0-m3 1.2 - 1.4 + 1.7 3.2.2 1.9 1.1 diff --git a/ql/src/test/queries/clientpositive/druidkafkamini_basic.q b/ql/src/test/queries/clientpositive/druidkafkamini_basic.q new file mode 100644 index 0000000000..ce456523db --- /dev/null +++ b/ql/src/test/queries/clientpositive/druidkafkamini_basic.q @@ -0,0 +1,19 @@ +CREATE TABLE druid_kafka_test(`__time` timestamp, page string, language string, `user` string, added int, deleted int, delta int) + STORED BY 'org.apache.hadoop.hive.druid.DruidKafkaStreamingStorageHandler' + TBLPROPERTIES ( + "druid.segment.granularity" = "HOUR", + "druid.query.granularity" = "MINUTE", + "kafka.bootstrap.servers" = "localhost:9092", + "kafka.topic" = "test-topic", + "druid.kafka.ingestion.useEarliestOffset" = "true" + ); + +!curl -ss http://localhost:8081/druid/indexer/v1/supervisor; + +!sleep 100; + +Select count(*) FROM druid_kafka_test; + +Select page FROM druid_kafka_test order by page; + +DROP TABLE druid_kafka_test; diff --git a/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out b/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out new file mode 100644 index 0000000000..6944b4e151 --- /dev/null +++ b/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out @@ -0,0 +1,60 @@ +PREHOOK: query: CREATE TABLE druid_kafka_test(`__time` timestamp, page string, language string, `user` string, added int, deleted int, delta int) + STORED BY 'org.apache.hadoop.hive.druid.DruidKafkaStreamingStorageHandler' + TBLPROPERTIES ( + "druid.segment.granularity" = "HOUR", + "druid.query.granularity" = "MINUTE", + "kafka.bootstrap.servers" = "localhost:9092", + "kafka.topic" = "test-topic", + "druid.kafka.ingestion.useEarliestOffset" = "true" + ) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@druid_kafka_test +POSTHOOK: query: CREATE TABLE druid_kafka_test(`__time` timestamp, page string, language string, `user` string, added int, deleted int, delta int) + STORED BY 'org.apache.hadoop.hive.druid.DruidKafkaStreamingStorageHandler' + TBLPROPERTIES ( + "druid.segment.granularity" = "HOUR", + "druid.query.granularity" = "MINUTE", + "kafka.bootstrap.servers" = "localhost:9092", + "kafka.topic" = "test-topic", + "druid.kafka.ingestion.useEarliestOffset" = "true" + ) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@druid_kafka_test +["default.druid_kafka_test"] +PREHOOK: query: Select count(*) FROM druid_kafka_test +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_kafka_test +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select count(*) FROM druid_kafka_test +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_kafka_test +POSTHOOK: Output: hdfs://### HDFS PATH ### +10 +PREHOOK: query: Select page FROM druid_kafka_test order by page +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_kafka_test +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select page FROM druid_kafka_test order by page +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_kafka_test +POSTHOOK: Output: hdfs://### HDFS PATH ### +Cherno Alpha +Cherno Alpha +Coyote Tango +Coyote Tango +Crimson Typhoon +Crimson Typhoon +Gypsy Danger +Gypsy Danger +Striker Eureka +Striker Eureka +PREHOOK: query: DROP TABLE druid_kafka_test +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@druid_kafka_test +PREHOOK: Output: default@druid_kafka_test +POSTHOOK: query: DROP TABLE druid_kafka_test +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@druid_kafka_test +POSTHOOK: Output: default@druid_kafka_test -- 2.11.0 (Apple Git-81)